Untitled
unknown
plain_text
a year ago
2.4 kB
8
Indexable
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import java.util.Properties;
// Assuming 'Result' and 'FormatException' are custom classes
// Replace with actual imports if they are from external libraries
import your.package.Result;
import your.package.FormatException;
public class KafkaToFlinkPrint {
public static void main(String[] args) throws Exception {
// 1. Create Flink Execution Environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. Kafka Consumer Configuration
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", "your_kafka_broker_address:port");
kafkaProps.setProperty("group.id", "your_consumer_group_id");
// Add other Kafka properties as needed (e.g., deserializer)
// 3. Create Kafka Consumer
FlinkKafkaConsumer<Result<FormatException, String>> kafkaConsumer =
new FlinkKafkaConsumer<>("your_kafka_topic",
new YourKafkaDeserializationSchema(), // Custom deserializer
kafkaProps);
// 4. Read from Kafka and Create DataStream
DataStream<Result<FormatException, String>> input = env.addSource(kafkaConsumer);
// 5. Process and Print (Handle FormatException)
input.flatMap(new FlatMapFunction<Result<FormatException, String>, String>() {
@Override
public void flatMap(Result<FormatException, String> result, Collector<String> out) {
if (result.isSuccess()) {
out.collect(result.getSuccessValue()); // Print the String
} else {
FormatException exception = result.getFailureValue();
// Handle the exception (log, send to another stream, etc.)
System.err.println("FormatException: " + exception.getMessage());
}
}
}).print(); // Print to console
// 6. Execute the Flink Pipeline
env.execute("Kafka to Flink Print");
}
}Editor is loading...
Leave a Comment