Untitled
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.util.Collector; import java.util.Properties; // Assuming 'Result' and 'FormatException' are custom classes // Replace with actual imports if they are from external libraries import your.package.Result; import your.package.FormatException; public class KafkaToFlinkPrint { public static void main(String[] args) throws Exception { // 1. Create Flink Execution Environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2. Kafka Consumer Configuration Properties kafkaProps = new Properties(); kafkaProps.setProperty("bootstrap.servers", "your_kafka_broker_address:port"); kafkaProps.setProperty("group.id", "your_consumer_group_id"); // Add other Kafka properties as needed (e.g., deserializer) // 3. Create Kafka Consumer FlinkKafkaConsumer<Result<FormatException, String>> kafkaConsumer = new FlinkKafkaConsumer<>("your_kafka_topic", new YourKafkaDeserializationSchema(), // Custom deserializer kafkaProps); // 4. Read from Kafka and Create DataStream DataStream<Result<FormatException, String>> input = env.addSource(kafkaConsumer); // 5. Process and Print (Handle FormatException) input.flatMap(new FlatMapFunction<Result<FormatException, String>, String>() { @Override public void flatMap(Result<FormatException, String> result, Collector<String> out) { if (result.isSuccess()) { out.collect(result.getSuccessValue()); // Print the String } else { FormatException exception = result.getFailureValue(); // Handle the exception (log, send to another stream, etc.) System.err.println("FormatException: " + exception.getMessage()); } } }).print(); // Print to console // 6. Execute the Flink Pipeline env.execute("Kafka to Flink Print"); } }
Leave a Comment