Untitled
unknown
java
2 years ago
3.9 kB
5
Indexable
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MQTTPublisher {
public static void main(String[] args) {
String broker = args[0];
int port = Integer.parseInt(args[1]);
String topic = args[2];
String message = args[3];
int numMessages = Integer.parseInt(args[4]);
String clientId = "MQTTPublisher";
MemoryPersistence persistence = new MemoryPersistence();
try {
MqttClient mqttClient = new MqttClient("tcp://" + broker + ":" + port, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
mqttClient.connect(connOpts);
for (int i = 0; i < numMessages; i++) {
String content = message + " " + i;
MqttMessage mqttMessage = new MqttMessage(content.getBytes());
mqttMessage.setQos(2);
mqttClient.publish(topic, mqttMessage);
System.out.println("Message published: " + content);
}
mqttClient.disconnect();
System.out.println("Publisher disconnected.");
} catch (MqttException e) {
e.printStackTrace();
}
}
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
public class RabbitMQPublisher {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
String broker = args[0];
String message = args[1];
int numMessages = Integer.parseInt(args[2]);
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(broker);
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i < numMessages; i++) {
String content = message + " " + i;
channel.basicPublish("", QUEUE_NAME, null, content.getBytes());
System.out.println("Message published: " + content);
}
}
}
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class KafkaPublisher {
public static void main(String[] args) {
String broker = args[0];
String topic = args[1];
String message = args[2];
int numMessages = Integer.parseInt(args[3]);
Properties props = new Properties();
props.put("bootstrap.servers", broker);
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
try (Producer<String, String> producer = new KafkaProducer<>(props)) {
for (int i = 0; i < numMessages; i++) {
String content = message + " " + i;
producer.send(new ProducerRecord<>(topic, content));
System.out.println("Message published: " + content);
}
}
}
}
Editor is loading...
Leave a Comment