Untitled
user_7290026
plain_text
2 years ago
4.2 kB
5
Indexable
package ru.sbrf.ucpcloud.shard.state;
import com.sbt.bm.ucp.util.performance.Activity;
import com.sbt.bm.ucp.util.performance.Perf;
import com.sbt.bm.ucp.util.performance.tag.ActivityType;
import com.sbt.bm.ucp.util.performance.tag.MethodTag;
import com.sbt.bm.ucp.util.performance.tag.ResultTag;
import com.ucp.ck.loader.api.Action;
import com.ucp.ck.loader.api.State;
import com.ucp.ck.loader.api.Topic;
import com.ucp.ck.loader.service.CkLoaderService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.sbrf.ucpcloud.shard.config.StateCheckerConfig;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class StateChecker {
private static final Logger logger = LoggerFactory.getLogger(StateChecker.class);
private final ScheduledExecutorService scheduledExecutorService;
private final CkLoaderService ckLoaderService;
private final StateFetcher stateFetcher;
private final StateCheckerConfig stateCheckerConfig;
private final long timePeriodSec;
private long updateDateTime;
private final Activity checkLiveness = new Activity(
MethodTag.of("StateChecker.checkLiveness"),
ActivityType.PROCESS, ResultTag.SUCCESS);
public StateChecker(ScheduledExecutorService scheduledExecutorService,
CkLoaderService ckLoaderService, StateFetcher stateFetcher,
StateCheckerConfig stateCheckerConfig) {
this.scheduledExecutorService = scheduledExecutorService;
this.ckLoaderService = ckLoaderService;
this.stateFetcher = stateFetcher;
this.stateCheckerConfig = stateCheckerConfig;
this.timePeriodSec = stateCheckerConfig.getTimePeriodSec();
this.updateDateTime = System.currentTimeMillis();
}
@PostConstruct
public void schedule() {
Perf.count(new Activity(
MethodTag.of("StateChecker.schedule"),
ActivityType.PROCESS, ResultTag.SUCCESS));
if (stateCheckerConfig.checkUpdateTps()) {
stateFetcher.subscribeTpsConsumers();
this.scheduledExecutorService.scheduleWithFixedDelay(this::updateTps, timePeriodSec,
timePeriodSec, TimeUnit.SECONDS);
logger.info("Task for reading tps from kafka was scheduled with frequency = {} sec", timePeriodSec);
}
if (stateCheckerConfig.checkLaunchFlag()) {
stateFetcher.subscribeLaunchFlagConsumers();
this.scheduledExecutorService.scheduleWithFixedDelay(this::updateState, timePeriodSec,
timePeriodSec, TimeUnit.SECONDS);
logger.info("Task for reading launch flag from kafka was scheduled with frequency = {} sec", timePeriodSec);
}
}
@PreDestroy
public void unsubscribe() {
stateFetcher.destroy();
}
private void updateTps() {
stateFetcher.fetchTps(Topic.MAJOR).ifPresent(tps -> ckLoaderService.changeTps(Topic.MAJOR, tps));
stateFetcher.fetchTps(Topic.MINOR).ifPresent(tps -> ckLoaderService.changeTps(Topic.MINOR, tps));
}
private void updateState() {
stateFetcher.fetchLaunchFlag(Topic.MAJOR).ifPresentOrElse(isRunning -> setState(isRunning, Topic.MAJOR), () -> checkLiveness(Topic.MAJOR));
stateFetcher.fetchLaunchFlag(Topic.MINOR).ifPresentOrElse(isRunning -> setState(isRunning, Topic.MINOR), () -> checkLiveness(Topic.MINOR));
}
private void setState(String isRunning, Topic topic) {
updateDateTime = System.currentTimeMillis();
ckLoaderService.changeState(Boolean.parseBoolean(isRunning) ? Action.START : Action.STOP, topic);
}
private void checkLiveness(Topic topic) {
Perf.count(checkLiveness);
if (!ckLoaderService.state(topic).equals(State.STOPPED)) {
if (System.currentTimeMillis() - updateDateTime > stateCheckerConfig.getTimeToStopSec() * 1000L) {
logger.info("Reading from topic {} was stopped because fetching timeout is exceeded", topic);
ckLoaderService.changeState(Action.STOP, topic);
}
}
}
}
Editor is loading...
Leave a Comment