Untitled
unknown
plain_text
16 days ago
1.2 kB
3
Indexable
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import java.util.Collections; import java.util.Properties; public class KafkaUtils { private KafkaConsumer<String, String> consumer; public KafkaUtils(String bootstrapServers, String topic) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, "karate-consumer-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList(topic)); } public String consumeMessage() { ConsumerRecords<String, String> records = consumer.poll(1000); for (ConsumerRecord<String, String> record : records) { return record.value(); } return null; } }
Editor is loading...
Leave a Comment