5 months ago
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;

import java.util.Properties;

// Assuming 'Result' and 'FormatException' are custom classes
// Replace with actual imports if they are from external libraries
import your.package.Result;
import your.package.FormatException;

public class KafkaToFlinkPrint {

    public static void main(String[] args) throws Exception {

        // 1. Create Flink Execution Environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. Kafka Consumer Configuration
        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("bootstrap.servers", "your_kafka_broker_address:port");
        kafkaProps.setProperty("group.id", "your_consumer_group_id");
        // Add other Kafka properties as needed (e.g., deserializer)

        // 3. Create Kafka Consumer
        FlinkKafkaConsumer<Result<FormatException, String>> kafkaConsumer = 
                new FlinkKafkaConsumer<>("your_kafka_topic", 
                                        new YourKafkaDeserializationSchema(), // Custom deserializer

        // 4. Read from Kafka and Create DataStream
        DataStream<Result<FormatException, String>> input = env.addSource(kafkaConsumer);

        // 5. Process and Print (Handle FormatException)
        input.flatMap(new FlatMapFunction<Result<FormatException, String>, String>() {
            public void flatMap(Result<FormatException, String> result, Collector<String> out) {
                if (result.isSuccess()) {
                    out.collect(result.getSuccessValue()); // Print the String
                } else {
                    FormatException exception = result.getFailureValue();
                    // Handle the exception (log, send to another stream, etc.)
                    System.err.println("FormatException: " + exception.getMessage()); 
        }).print(); // Print to console

        // 6. Execute the Flink Pipeline
        env.execute("Kafka to Flink Print");
