Untitled
user_7290026
plain_text
2 years ago
2.6 kB
5
Indexable
package ru.sbrf.ucpcloud.shard.state; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import ru.sbrf.ucpcloud.shard.config.PodNameProviderConfig; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.Properties; public class StateKafkaConsumer<T> { private final KafkaConsumer<String, T> kafkaConsumer; private final String topic; private final String clientId; private final String groupId; private final long timeoutMs; private static final Logger logger = LoggerFactory.getLogger(StateKafkaConsumer.class); public StateKafkaConsumer(PodNameProviderConfig podNameProvider, String topicName, String groupId, String clientId, long timeoutMs, Properties consumerProps, Class<?> typeParameterClass) { this.clientId = clientId; this.groupId = groupId; consumerProps.setProperty("group.id", groupId + podNameProvider.getPodName()); consumerProps.setProperty("client.id", clientId); consumerProps.setProperty("key.deserializer", StringDeserializer.class.getName()); consumerProps.setProperty("value.deserializer", typeParameterClass.getName()); this.kafkaConsumer = new KafkaConsumer<>(consumerProps); this.topic = topicName; this.timeoutMs = timeoutMs; } public Optional<T> fetch() { ConsumerRecords<String, T> records = this.kafkaConsumer.poll(timeoutMs); logger.debug("Client id {} fetched count {}", clientId, records.count()); List<T> entities = new ArrayList<>(); if (records.isEmpty()) { return Optional.empty(); } records.forEach(rec -> { entities.add(rec.value() ); }); return Optional.of(entities.get(entities.size()-1)); } public void unsubscribe() { if (!kafkaConsumer.subscription().isEmpty()) { logger.info("Unsubscribe KafkaConsumer for group={}, clientId{}, topic={}", groupId, clientId, topic); kafkaConsumer.unsubscribe(); } } public void subscribe() { if (kafkaConsumer.subscription().isEmpty()) { logger.info("Subscribe KafkaConsumer for group={}, clientId{}, topic={}", groupId, clientId, topic); kafkaConsumer.subscribe(Collections.singletonList(topic)); } } }
Editor is loading...
Leave a Comment