Untitled
user_7290026
plain_text
a year ago
4.2 kB
4
Indexable
package ru.sbrf.ucpcloud.shard.state; import com.sbt.bm.ucp.util.performance.Activity; import com.sbt.bm.ucp.util.performance.Perf; import com.sbt.bm.ucp.util.performance.tag.ActivityType; import com.sbt.bm.ucp.util.performance.tag.MethodTag; import com.sbt.bm.ucp.util.performance.tag.ResultTag; import com.ucp.ck.loader.api.Action; import com.ucp.ck.loader.api.State; import com.ucp.ck.loader.api.Topic; import com.ucp.ck.loader.service.CkLoaderService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import ru.sbrf.ucpcloud.shard.config.StateCheckerConfig; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class StateChecker { private static final Logger logger = LoggerFactory.getLogger(StateChecker.class); private final ScheduledExecutorService scheduledExecutorService; private final CkLoaderService ckLoaderService; private final StateFetcher stateFetcher; private final StateCheckerConfig stateCheckerConfig; private final long timePeriodSec; private long updateDateTime; private final Activity checkLiveness = new Activity( MethodTag.of("StateChecker.checkLiveness"), ActivityType.PROCESS, ResultTag.SUCCESS); public StateChecker(ScheduledExecutorService scheduledExecutorService, CkLoaderService ckLoaderService, StateFetcher stateFetcher, StateCheckerConfig stateCheckerConfig) { this.scheduledExecutorService = scheduledExecutorService; this.ckLoaderService = ckLoaderService; this.stateFetcher = stateFetcher; this.stateCheckerConfig = stateCheckerConfig; this.timePeriodSec = stateCheckerConfig.getTimePeriodSec(); this.updateDateTime = System.currentTimeMillis(); } @PostConstruct public void schedule() { Perf.count(new Activity( MethodTag.of("StateChecker.schedule"), ActivityType.PROCESS, ResultTag.SUCCESS)); if (stateCheckerConfig.checkUpdateTps()) { stateFetcher.subscribeTpsConsumers(); this.scheduledExecutorService.scheduleWithFixedDelay(this::updateTps, timePeriodSec, timePeriodSec, TimeUnit.SECONDS); logger.info("Task for reading tps from kafka was scheduled with frequency = {} sec", timePeriodSec); } if (stateCheckerConfig.checkLaunchFlag()) { stateFetcher.subscribeLaunchFlagConsumers(); this.scheduledExecutorService.scheduleWithFixedDelay(this::updateState, timePeriodSec, timePeriodSec, TimeUnit.SECONDS); logger.info("Task for reading launch flag from kafka was scheduled with frequency = {} sec", timePeriodSec); } } @PreDestroy public void unsubscribe() { stateFetcher.destroy(); } private void updateTps() { stateFetcher.fetchTps(Topic.MAJOR).ifPresent(tps -> ckLoaderService.changeTps(Topic.MAJOR, tps)); stateFetcher.fetchTps(Topic.MINOR).ifPresent(tps -> ckLoaderService.changeTps(Topic.MINOR, tps)); } private void updateState() { stateFetcher.fetchLaunchFlag(Topic.MAJOR).ifPresentOrElse(isRunning -> setState(isRunning, Topic.MAJOR), () -> checkLiveness(Topic.MAJOR)); stateFetcher.fetchLaunchFlag(Topic.MINOR).ifPresentOrElse(isRunning -> setState(isRunning, Topic.MINOR), () -> checkLiveness(Topic.MINOR)); } private void setState(String isRunning, Topic topic) { updateDateTime = System.currentTimeMillis(); ckLoaderService.changeState(Boolean.parseBoolean(isRunning) ? Action.START : Action.STOP, topic); } private void checkLiveness(Topic topic) { Perf.count(checkLiveness); if (!ckLoaderService.state(topic).equals(State.STOPPED)) { if (System.currentTimeMillis() - updateDateTime > stateCheckerConfig.getTimeToStopSec() * 1000L) { logger.info("Reading from topic {} was stopped because fetching timeout is exceeded", topic); ckLoaderService.changeState(Action.STOP, topic); } } } }
Editor is loading...
Leave a Comment