Untitled

 avatar
unknown
java
a year ago
3.9 kB
4
Indexable
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MQTTPublisher {
    public static void main(String[] args) {
        String broker = args[0];
        int port = Integer.parseInt(args[1]);
        String topic = args[2];
        String message = args[3];
        int numMessages = Integer.parseInt(args[4]);

        String clientId = "MQTTPublisher";
        MemoryPersistence persistence = new MemoryPersistence();

        try {
            MqttClient mqttClient = new MqttClient("tcp://" + broker + ":" + port, clientId, persistence);
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setCleanSession(true);
            mqttClient.connect(connOpts);

            for (int i = 0; i < numMessages; i++) {
                String content = message + " " + i;
                MqttMessage mqttMessage = new MqttMessage(content.getBytes());
                mqttMessage.setQos(2);
                mqttClient.publish(topic, mqttMessage);
                System.out.println("Message published: " + content);
            }

            mqttClient.disconnect();
            System.out.println("Publisher disconnected.");
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}


////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class RabbitMQPublisher {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        String broker = args[0];
        String message = args[1];
        int numMessages = Integer.parseInt(args[2]);

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(broker);
        try (Connection connection = factory.newConnection(); 
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);

            for (int i = 0; i < numMessages; i++) {
                String content = message + " " + i;
                channel.basicPublish("", QUEUE_NAME, null, content.getBytes());
                System.out.println("Message published: " + content);
            }
        }
    }
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class KafkaPublisher {
    public static void main(String[] args) {
        String broker = args[0];
        String topic = args[1];
        String message = args[2];
        int numMessages = Integer.parseInt(args[3]);

        Properties props = new Properties();
        props.put("bootstrap.servers", broker);
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        try (Producer<String, String> producer = new KafkaProducer<>(props)) {
            for (int i = 0; i < numMessages; i++) {
                String content = message + " " + i;
                producer.send(new ProducerRecord<>(topic, content));
                System.out.println("Message published: " + content);
            }
        }
    }
}

Editor is loading...
Leave a Comment