Untitled

 avatar
unknown
java
4 years ago
3.3 kB
4
Indexable
public static void main(String[] args) throws Exception {

        Connection connection = connectionManager.getConnection();
        String scheduledDataSourceSyncQueueName = ListenerFactory
                .getQueueName(DATA_SOURCE_SCHEDULED_SYNC_QUEUE_NAME);
        int prefetchCount = 10;
        int DEFAULT_NUM_THREADS = Runtime.getRuntime().availableProcessors() * 2;
        ExecutorService threadExecutor = Executors.newFixedThreadPool(DEFAULT_NUM_THREADS, new ThreadFactory() {
            public Thread newThread(Runnable r) {
                Thread t = Executors.defaultThreadFactory().newThread(r);
                t.setDaemon(true);
                t.setName("Thread-ivelin-" + counter++);
                return t;
            }
        });
        
        for (int i = 0; i < consumersCount; i++) {
            
            ConsumerWorker consumerWorker = new ConsumerWorker(prefetchCount, threadExecutor, 1,
                    connection.createChannel(), scheduledDataSourceSyncQueueName);
            
        }
        
        //TODO shutdown threadpool
}
 
 class ConsumerWorker extends DefaultConsumer {

    private String name;
    private long sleep;
    private Channel channel;
    private String queue;
    private int processed;
    private ExecutorService executorService;
    private String message;
    private Envelope envelope;

    public ConsumerWorker(int prefetch, ExecutorService threadExecutor, long s,
            Channel c, String q) throws IOException {
        super(c);

        sleep = s;
        channel = c;
        queue = q;
        channel.basicQos(prefetch);
        channel.queueDeclare(queue, true, false,true, null);
        channel.basicConsume(queue, false, this);
        executorService = threadExecutor;
    }
    
    
    @Override
    public void handleDelivery(String consumerTag,
            Envelope envelope, AMQP.BasicProperties properties,
            byte[] body) throws IOException {
        
        message = new String(body);
        envelope = envelope;
        DataSourceIdPayload payload = GsonFactory.getGson()
                .fromJson(message, DataSourceIdPayload.class);
        if (properties != null) {
            payload.setHeaders(properties.getHeaders());
        }
        
        
        Runnable task = new MessageTask(this,
                envelope.getDeliveryTag(),
                channel, sleep, envelope, payload);
            executorService.submit(task);
    }
}

class MessageTask implements Runnable {
    private long tag;
    private long sleep;
    private Channel chan;
    private ConsumerWorker worker;
    private Envelope envelope;
    private DataSourceIdPayload payload;

    public MessageTask(ConsumerWorker w, long t, Channel c, long s, Envelope envelope, DataSourceIdPayload payload) {
        this.worker = w;
        this.tag = t;
        this.chan = c;
        this.sleep = s;
        this.envelope = envelope;
        this.payload = payload;
    }

    @Override
    public void run() {
            //processing message
            channel.basicAck(deliveryTag, false);
        }

        try {
            Thread.sleep(sleep * 10);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

    }

}
Editor is loading...