Untitled
unknown
plain_text
a year ago
1.2 kB
2
Indexable
Never
@Component @RequiredArgsConstructor public class OrderEvent { public Integer orderId; public String customerName; public static int eventsCounter; @Autowired public static DataSource dataSource; @Autowired public static KafkaConsumer<OrderEvent> kafkaConsumer; public static void run() { while (true) { OrderEvent e = kafkaConsumer.consume(); new Thread(new Runnable() { public void run() { processEvent(e); } }).run(); } } public static void processEvent(OrderEvent e) { try { Connection c = dataSource.getConnection(); String sql = String.format("SELECT * FROM customers WHERE customer_name = '%s' AND blacklisted IS FALSE", e.customerName); ResultSet rs = c.createStatement().executeQuery(sql); if (rs.next()) { eventsCounter++; String sqlIns = String.format("INSERT INTO orders VALUES (%d, '%s')", e.orderId, e.customerName); c.createStatement().executeUpdate(sqlIns); } } catch (Throwable t) { } } }