Untitled
unknown
java
a year ago
3.9 kB
4
Indexable
import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class MQTTPublisher { public static void main(String[] args) { String broker = args[0]; int port = Integer.parseInt(args[1]); String topic = args[2]; String message = args[3]; int numMessages = Integer.parseInt(args[4]); String clientId = "MQTTPublisher"; MemoryPersistence persistence = new MemoryPersistence(); try { MqttClient mqttClient = new MqttClient("tcp://" + broker + ":" + port, clientId, persistence); MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setCleanSession(true); mqttClient.connect(connOpts); for (int i = 0; i < numMessages; i++) { String content = message + " " + i; MqttMessage mqttMessage = new MqttMessage(content.getBytes()); mqttMessage.setQos(2); mqttClient.publish(topic, mqttMessage); System.out.println("Message published: " + content); } mqttClient.disconnect(); System.out.println("Publisher disconnected."); } catch (MqttException e) { e.printStackTrace(); } } } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; public class RabbitMQPublisher { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { String broker = args[0]; String message = args[1]; int numMessages = Integer.parseInt(args[2]); ConnectionFactory factory = new ConnectionFactory(); factory.setHost(broker); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(QUEUE_NAME, false, false, false, null); for (int i = 0; i < numMessages; i++) { String content = message + " " + i; channel.basicPublish("", QUEUE_NAME, null, content.getBytes()); System.out.println("Message published: " + content); } } } } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// import org.apache.kafka.clients.producer.*; import java.util.Properties; public class KafkaPublisher { public static void main(String[] args) { String broker = args[0]; String topic = args[1]; String message = args[2]; int numMessages = Integer.parseInt(args[3]); Properties props = new Properties(); props.put("bootstrap.servers", broker); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); try (Producer<String, String> producer = new KafkaProducer<>(props)) { for (int i = 0; i < numMessages; i++) { String content = message + " " + i; producer.send(new ProducerRecord<>(topic, content)); System.out.println("Message published: " + content); } } } }
Editor is loading...
Leave a Comment