Untitled

 avatar
user_7290026
plain_text
2 years ago
2.6 kB
5
Indexable
package ru.sbrf.ucpcloud.shard.state;

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.sbrf.ucpcloud.shard.config.PodNameProviderConfig;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Properties;

public class StateKafkaConsumer<T> {
    private final KafkaConsumer<String, T> kafkaConsumer;
    private final String topic;
    private final String clientId;
    private final String groupId;
    private final long timeoutMs;

    private static final Logger logger = LoggerFactory.getLogger(StateKafkaConsumer.class);

    public StateKafkaConsumer(PodNameProviderConfig podNameProvider, String topicName, String groupId, String clientId, long timeoutMs,
            Properties consumerProps, Class<?> typeParameterClass) {
        this.clientId = clientId;
        this.groupId = groupId;
        consumerProps.setProperty("group.id", groupId + podNameProvider.getPodName());
        consumerProps.setProperty("client.id", clientId);
        consumerProps.setProperty("key.deserializer", StringDeserializer.class.getName());
        consumerProps.setProperty("value.deserializer", typeParameterClass.getName());
        this.kafkaConsumer = new KafkaConsumer<>(consumerProps);
        this.topic = topicName;
        this.timeoutMs = timeoutMs;
    }

    public Optional<T> fetch() {
        ConsumerRecords<String, T> records = this.kafkaConsumer.poll(timeoutMs);
        logger.debug("Client id {} fetched count {}", clientId, records.count());
        List<T> entities = new ArrayList<>();
        if (records.isEmpty()) {
            return Optional.empty();
        }
        records.forEach(rec -> {
            entities.add(rec.value()
            );
        });
        return Optional.of(entities.get(entities.size()-1));
    }


    public void unsubscribe() {
        if (!kafkaConsumer.subscription().isEmpty()) {
            logger.info("Unsubscribe KafkaConsumer for group={}, clientId{}, topic={}", groupId, clientId, topic);
            kafkaConsumer.unsubscribe();
        }
    }

    public void subscribe() {
        if (kafkaConsumer.subscription().isEmpty()) {
            logger.info("Subscribe KafkaConsumer for group={}, clientId{}, topic={}", groupId, clientId, topic);
            kafkaConsumer.subscribe(Collections.singletonList(topic));
        }
    }
}
Editor is loading...
Leave a Comment