Untitled

mail@pastecode.io avatar
unknown
plain_text
a year ago
1.2 kB
2
Indexable
@Component
@RequiredArgsConstructor
public class OrderEvent {
    public Integer orderId;
    public String customerName;

    public static int eventsCounter;

    @Autowired
    public static DataSource dataSource;
    
    @Autowired
    public static KafkaConsumer<OrderEvent> kafkaConsumer;

    public static void run() {
        while (true) {
            OrderEvent e = kafkaConsumer.consume();

            new Thread(new Runnable() {
                public void run() {
                    processEvent(e);
                }
            }).run();
        }
    }

    public static void processEvent(OrderEvent e) {
        try {
            Connection c = dataSource.getConnection();
            String sql =
                String.format("SELECT * FROM customers WHERE customer_name = '%s' AND blacklisted IS FALSE", e.customerName);
            ResultSet rs = c.createStatement().executeQuery(sql);

            if (rs.next()) {
                eventsCounter++;
                String sqlIns =
                    String.format("INSERT INTO orders VALUES (%d, '%s')", e.orderId, e.customerName);
                c.createStatement().executeUpdate(sqlIns);
            }
        } catch (Throwable t) { }
    }
}