ConsumerWorker
unknown
java
4 years ago
1.7 kB
5
Indexable
public class ConsumerWorker extends DefaultConsumer {
private long sleep;
private Channel channel;
private ExecutorService executorService;
final Map<String, Object> sourceListeners;
/**
* Creates a new <code>ConsumerWorker</code> instance.
* @param prefetch
* @param threadExecutor
* @param sleep
* @param channel
* @param queue
* @throws IOException
*/
public ConsumerWorker(final int prefetch,
final ExecutorService threadExecutor, final long sleep,
final Channel channel, final String queue,
final Map<String, Object> sourceListeners) throws IOException {
super(channel);
this.sleep = sleep;
this.channel = channel;
this.sourceListeners = sourceListeners;
this.executorService = threadExecutor;
this.channel.basicQos(prefetch);
this.channel.queueDeclare(queue, true, false, false, null);
this.channel.basicConsume(queue, false, this);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
final String message = new String(body);
DataSourceIdPayload payload = GsonFactory.getGson().fromJson(message,
DataSourceIdPayload.class);
if (properties != null) {
payload.setHeaders(properties.getHeaders());
}
Runnable task = new ScheduledDataSourceProcessor(envelope.getDeliveryTag(), channel,
sleep, payload,sourceListeners.get(SyncScheduledDataSourceListener.SUBSCRIBETOSYNCENTITYREQUESTS_NAME));
executorService.submit(task);
}
}Editor is loading...