Untitled
user_7290026
plain_text
2 years ago
2.6 kB
9
Indexable
package ru.sbrf.ucpcloud.shard.state;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.sbrf.ucpcloud.shard.config.PodNameProviderConfig;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
public class StateKafkaConsumer<T> {
private final KafkaConsumer<String, T> kafkaConsumer;
private final String topic;
private final String clientId;
private final String groupId;
private final long timeoutMs;
private static final Logger logger = LoggerFactory.getLogger(StateKafkaConsumer.class);
public StateKafkaConsumer(PodNameProviderConfig podNameProvider, String topicName, String groupId, String clientId, long timeoutMs,
Properties consumerProps, Class<?> typeParameterClass) {
this.clientId = clientId;
this.groupId = groupId;
consumerProps.setProperty("group.id", groupId + podNameProvider.getPodName());
consumerProps.setProperty("client.id", clientId);
consumerProps.setProperty("key.deserializer", StringDeserializer.class.getName());
consumerProps.setProperty("value.deserializer", typeParameterClass.getName());
this.kafkaConsumer = new KafkaConsumer<>(consumerProps);
this.topic = topicName;
this.timeoutMs = timeoutMs;
}
public Optional<T> fetch() {
ConsumerRecords<String, T> records = this.kafkaConsumer.poll(timeoutMs);
logger.debug("Client id {} fetched count {}", clientId, records.count());
List<T> entities = new ArrayList<>();
if (records.isEmpty()) {
return Optional.empty();
}
records.forEach(rec -> {
entities.add(rec.value()
);
});
return Optional.of(entities.get(entities.size()-1));
}
public void unsubscribe() {
if (!kafkaConsumer.subscription().isEmpty()) {
logger.info("Unsubscribe KafkaConsumer for group={}, clientId{}, topic={}", groupId, clientId, topic);
kafkaConsumer.unsubscribe();
}
}
public void subscribe() {
if (kafkaConsumer.subscription().isEmpty()) {
logger.info("Subscribe KafkaConsumer for group={}, clientId{}, topic={}", groupId, clientId, topic);
kafkaConsumer.subscribe(Collections.singletonList(topic));
}
}
}
Editor is loading...
Leave a Comment