Untitled

 avatar
user_7290026
plain_text
2 months ago
4.2 kB
1
Indexable
Never
package ru.sbrf.ucpcloud.shard.state;

import com.sbt.bm.ucp.util.performance.Activity;
import com.sbt.bm.ucp.util.performance.Perf;
import com.sbt.bm.ucp.util.performance.tag.ActivityType;
import com.sbt.bm.ucp.util.performance.tag.MethodTag;
import com.sbt.bm.ucp.util.performance.tag.ResultTag;
import com.ucp.ck.loader.api.Action;
import com.ucp.ck.loader.api.State;
import com.ucp.ck.loader.api.Topic;
import com.ucp.ck.loader.service.CkLoaderService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.sbrf.ucpcloud.shard.config.StateCheckerConfig;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class StateChecker {

    private static final Logger logger = LoggerFactory.getLogger(StateChecker.class);

    private final ScheduledExecutorService scheduledExecutorService;
    private final CkLoaderService ckLoaderService;
    private final StateFetcher stateFetcher;
    private final StateCheckerConfig stateCheckerConfig;
    private final long timePeriodSec;
    private long updateDateTime;
    private final Activity checkLiveness = new Activity(
            MethodTag.of("StateChecker.checkLiveness"),
            ActivityType.PROCESS, ResultTag.SUCCESS);


    public StateChecker(ScheduledExecutorService scheduledExecutorService,
            CkLoaderService ckLoaderService, StateFetcher stateFetcher,
            StateCheckerConfig stateCheckerConfig) {
        this.scheduledExecutorService = scheduledExecutorService;
        this.ckLoaderService = ckLoaderService;
        this.stateFetcher = stateFetcher;
        this.stateCheckerConfig = stateCheckerConfig;
        this.timePeriodSec = stateCheckerConfig.getTimePeriodSec();
        this.updateDateTime = System.currentTimeMillis();
    }

    @PostConstruct
    public void schedule() {
        Perf.count(new Activity(
                MethodTag.of("StateChecker.schedule"),
                ActivityType.PROCESS, ResultTag.SUCCESS));
        if (stateCheckerConfig.checkUpdateTps()) {
            stateFetcher.subscribeTpsConsumers();
            this.scheduledExecutorService.scheduleWithFixedDelay(this::updateTps, timePeriodSec,
                    timePeriodSec, TimeUnit.SECONDS);
            logger.info("Task for reading tps from kafka was scheduled with frequency = {} sec", timePeriodSec);
        }
        if (stateCheckerConfig.checkLaunchFlag()) {
            stateFetcher.subscribeLaunchFlagConsumers();
            this.scheduledExecutorService.scheduleWithFixedDelay(this::updateState, timePeriodSec,
                    timePeriodSec, TimeUnit.SECONDS);
            logger.info("Task for reading launch flag from kafka was scheduled with frequency = {} sec", timePeriodSec);
        }
    }

    @PreDestroy
    public void unsubscribe() {
        stateFetcher.destroy();

    }

    private void updateTps() {
        stateFetcher.fetchTps(Topic.MAJOR).ifPresent(tps -> ckLoaderService.changeTps(Topic.MAJOR, tps));
        stateFetcher.fetchTps(Topic.MINOR).ifPresent(tps -> ckLoaderService.changeTps(Topic.MINOR, tps));
    }

    private void updateState() {
        stateFetcher.fetchLaunchFlag(Topic.MAJOR).ifPresentOrElse(isRunning -> setState(isRunning, Topic.MAJOR), () -> checkLiveness(Topic.MAJOR));
        stateFetcher.fetchLaunchFlag(Topic.MINOR).ifPresentOrElse(isRunning -> setState(isRunning, Topic.MINOR), () -> checkLiveness(Topic.MINOR));
    }

    private void setState(String isRunning, Topic topic) {
        updateDateTime = System.currentTimeMillis();
        ckLoaderService.changeState(Boolean.parseBoolean(isRunning) ? Action.START : Action.STOP, topic);
    }

    private void checkLiveness(Topic topic) {
        Perf.count(checkLiveness);
        if (!ckLoaderService.state(topic).equals(State.STOPPED)) {
            if (System.currentTimeMillis() - updateDateTime > stateCheckerConfig.getTimeToStopSec() * 1000L) {
                logger.info("Reading from topic {} was stopped because fetching timeout is exceeded", topic);
                ckLoaderService.changeState(Action.STOP, topic);
            }
        }
    }
}
Leave a Comment