ConsumerWorker

 avatar
unknown
java
4 years ago
1.7 kB
3
Indexable
public class ConsumerWorker extends DefaultConsumer {
    private long sleep;
    private Channel channel;
    private ExecutorService executorService;
    final Map<String, Object> sourceListeners;

    /**
     * Creates a new <code>ConsumerWorker</code> instance.
     * @param prefetch
     * @param threadExecutor
     * @param sleep
     * @param channel
     * @param queue
     * @throws IOException
     */
    public ConsumerWorker(final int prefetch,
            final ExecutorService threadExecutor, final long sleep,
            final Channel channel, final String queue,
            final Map<String, Object> sourceListeners) throws IOException {
        super(channel);

        this.sleep = sleep;
        this.channel = channel;
        this.sourceListeners = sourceListeners;
        this.executorService = threadExecutor;
        this.channel.basicQos(prefetch);

        this.channel.queueDeclare(queue, true, false, false, null);
        this.channel.basicConsume(queue, false, this);

    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
            AMQP.BasicProperties properties, byte[] body) throws IOException {
        final String message = new String(body);
        DataSourceIdPayload payload = GsonFactory.getGson().fromJson(message,
                DataSourceIdPayload.class);
        if (properties != null) {
            payload.setHeaders(properties.getHeaders());
        }

        Runnable task = new ScheduledDataSourceProcessor(envelope.getDeliveryTag(), channel,
                sleep, payload,sourceListeners.get(SyncScheduledDataSourceListener.SUBSCRIBETOSYNCENTITYREQUESTS_NAME));
        executorService.submit(task);
    }

}
Editor is loading...