Untitled
unknown
java
4 years ago
3.3 kB
8
Indexable
public static void main(String[] args) throws Exception {
Connection connection = connectionManager.getConnection();
String scheduledDataSourceSyncQueueName = ListenerFactory
.getQueueName(DATA_SOURCE_SCHEDULED_SYNC_QUEUE_NAME);
int prefetchCount = 10;
int DEFAULT_NUM_THREADS = Runtime.getRuntime().availableProcessors() * 2;
ExecutorService threadExecutor = Executors.newFixedThreadPool(DEFAULT_NUM_THREADS, new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = Executors.defaultThreadFactory().newThread(r);
t.setDaemon(true);
t.setName("Thread-ivelin-" + counter++);
return t;
}
});
for (int i = 0; i < consumersCount; i++) {
ConsumerWorker consumerWorker = new ConsumerWorker(prefetchCount, threadExecutor, 1,
connection.createChannel(), scheduledDataSourceSyncQueueName);
}
//TODO shutdown threadpool
}
class ConsumerWorker extends DefaultConsumer {
private String name;
private long sleep;
private Channel channel;
private String queue;
private int processed;
private ExecutorService executorService;
private String message;
private Envelope envelope;
public ConsumerWorker(int prefetch, ExecutorService threadExecutor, long s,
Channel c, String q) throws IOException {
super(c);
sleep = s;
channel = c;
queue = q;
channel.basicQos(prefetch);
channel.queueDeclare(queue, true, false,true, null);
channel.basicConsume(queue, false, this);
executorService = threadExecutor;
}
@Override
public void handleDelivery(String consumerTag,
Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
message = new String(body);
envelope = envelope;
DataSourceIdPayload payload = GsonFactory.getGson()
.fromJson(message, DataSourceIdPayload.class);
if (properties != null) {
payload.setHeaders(properties.getHeaders());
}
Runnable task = new MessageTask(this,
envelope.getDeliveryTag(),
channel, sleep, envelope, payload);
executorService.submit(task);
}
}
class MessageTask implements Runnable {
private long tag;
private long sleep;
private Channel chan;
private ConsumerWorker worker;
private Envelope envelope;
private DataSourceIdPayload payload;
public MessageTask(ConsumerWorker w, long t, Channel c, long s, Envelope envelope, DataSourceIdPayload payload) {
this.worker = w;
this.tag = t;
this.chan = c;
this.sleep = s;
this.envelope = envelope;
this.payload = payload;
}
@Override
public void run() {
//processing message
channel.basicAck(deliveryTag, false);
}
try {
Thread.sleep(sleep * 10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}Editor is loading...