@Component
@RequiredArgsConstructor
public class OrderEvent {
public Integer orderId;
public String customerName;
public static int eventsCounter;
@Autowired
public static DataSource dataSource;
@Autowired
public static KafkaConsumer<OrderEvent> kafkaConsumer;
public static void run() {
while (true) {
OrderEvent e = kafkaConsumer.consume();
new Thread(new Runnable() {
public void run() {
processEvent(e);
}
}).run();
}
}
public static void processEvent(OrderEvent e) {
try {
Connection c = dataSource.getConnection();
String sql =
String.format("SELECT * FROM customers WHERE customer_name = '%s' AND blacklisted IS FALSE", e.customerName);
ResultSet rs = c.createStatement().executeQuery(sql);
if (rs.next()) {
eventsCounter++;
String sqlIns =
String.format("INSERT INTO orders VALUES (%d, '%s')", e.orderId, e.customerName);
c.createStatement().executeUpdate(sqlIns);
}
} catch (Throwable t) { }
}
}