Untitled
unknown
java
4 years ago
3.3 kB
4
Indexable
public static void main(String[] args) throws Exception { Connection connection = connectionManager.getConnection(); String scheduledDataSourceSyncQueueName = ListenerFactory .getQueueName(DATA_SOURCE_SCHEDULED_SYNC_QUEUE_NAME); int prefetchCount = 10; int DEFAULT_NUM_THREADS = Runtime.getRuntime().availableProcessors() * 2; ExecutorService threadExecutor = Executors.newFixedThreadPool(DEFAULT_NUM_THREADS, new ThreadFactory() { public Thread newThread(Runnable r) { Thread t = Executors.defaultThreadFactory().newThread(r); t.setDaemon(true); t.setName("Thread-ivelin-" + counter++); return t; } }); for (int i = 0; i < consumersCount; i++) { ConsumerWorker consumerWorker = new ConsumerWorker(prefetchCount, threadExecutor, 1, connection.createChannel(), scheduledDataSourceSyncQueueName); } //TODO shutdown threadpool } class ConsumerWorker extends DefaultConsumer { private String name; private long sleep; private Channel channel; private String queue; private int processed; private ExecutorService executorService; private String message; private Envelope envelope; public ConsumerWorker(int prefetch, ExecutorService threadExecutor, long s, Channel c, String q) throws IOException { super(c); sleep = s; channel = c; queue = q; channel.basicQos(prefetch); channel.queueDeclare(queue, true, false,true, null); channel.basicConsume(queue, false, this); executorService = threadExecutor; } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { message = new String(body); envelope = envelope; DataSourceIdPayload payload = GsonFactory.getGson() .fromJson(message, DataSourceIdPayload.class); if (properties != null) { payload.setHeaders(properties.getHeaders()); } Runnable task = new MessageTask(this, envelope.getDeliveryTag(), channel, sleep, envelope, payload); executorService.submit(task); } } class MessageTask implements Runnable { private long tag; private long sleep; private Channel chan; private ConsumerWorker worker; private Envelope envelope; private DataSourceIdPayload payload; public MessageTask(ConsumerWorker w, long t, Channel c, long s, Envelope envelope, DataSourceIdPayload payload) { this.worker = w; this.tag = t; this.chan = c; this.sleep = s; this.envelope = envelope; this.payload = payload; } @Override public void run() { //processing message channel.basicAck(deliveryTag, false); } try { Thread.sleep(sleep * 10); } catch (InterruptedException e) { throw new RuntimeException(e); } } }
Editor is loading...