ConsumerWorker
unknown
java
4 years ago
1.7 kB
3
Indexable
public class ConsumerWorker extends DefaultConsumer { private long sleep; private Channel channel; private ExecutorService executorService; final Map<String, Object> sourceListeners; /** * Creates a new <code>ConsumerWorker</code> instance. * @param prefetch * @param threadExecutor * @param sleep * @param channel * @param queue * @throws IOException */ public ConsumerWorker(final int prefetch, final ExecutorService threadExecutor, final long sleep, final Channel channel, final String queue, final Map<String, Object> sourceListeners) throws IOException { super(channel); this.sleep = sleep; this.channel = channel; this.sourceListeners = sourceListeners; this.executorService = threadExecutor; this.channel.basicQos(prefetch); this.channel.queueDeclare(queue, true, false, false, null); this.channel.basicConsume(queue, false, this); } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { final String message = new String(body); DataSourceIdPayload payload = GsonFactory.getGson().fromJson(message, DataSourceIdPayload.class); if (properties != null) { payload.setHeaders(properties.getHeaders()); } Runnable task = new ScheduledDataSourceProcessor(envelope.getDeliveryTag(), channel, sleep, payload,sourceListeners.get(SyncScheduledDataSourceListener.SUBSCRIBETOSYNCENTITYREQUESTS_NAME)); executorService.submit(task); } }
Editor is loading...