When using the Eclipse Paho client library to develop Java MQTT applications, you can implement message publishing and receiving concurrently. This typically requires two main steps: setting up an MQTT client and creating a callback to handle received messages and publish new messages as needed. Below are the specific steps and code examples for this process.
Step 1: Set up the MQTT Client
First, establish an MQTT client connected to the MQTT server. You can achieve this using the MqttClient class.
javaimport org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class MqttPublishReceive { public static void main(String[] args) { String broker = "tcp://mqtt.example.com:1883"; String clientId = "JavaClient"; MemoryPersistence persistence = new MemoryPersistence(); try { MqttClient mqttClient = new MqttClient(broker, clientId, persistence); MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setCleanSession(true); mqttClient.connect(connOpts); System.out.println("Connected to broker: " + broker); // Set callback later } catch(MqttException me) { System.out.println("reason " + me.getReasonCode()); System.out.println("msg " + me.getMessage()); System.out.println("loc " + me.getLocalizedMessage()); System.out.println("cause " + me.getCause()); System.out.println("excep " + me); me.printStackTrace(); } } }
Step 2: Set up the Message Callback
After the client successfully connects, set up a callback function that is invoked upon receiving a message. Within this callback, process the received message and publish new messages as required.
javaimport org.eclipse.paho.client.mqttv3.MqttCallback; public class MqttCallbackExample implements MqttCallback { MqttClient client; public MqttCallbackExample(MqttClient client) { this.client = client; } @Override public void connectionLost(Throwable cause) { System.out.println("Connection lost!"); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.println("Received a message: " + new String(message.getPayload())); // Process the received message and publish new messages if needed String responseTopic = "response/topic"; String responseContent = "Processed message: " + new String(message.getPayload()); publishMessage(responseTopic, responseContent); } @Override public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("Delivery Complete!"); } private void publishMessage(String topic, String content) throws MqttException { MqttMessage message = new MqttMessage(content.getBytes()); message.setQos(2); client.publish(topic, message); System.out.println("Message published to topic: " + topic); } }
Then, in the main program, register this callback:
javamqttClient.setCallback(new MqttCallbackExample(mqttClient)); mqttClient.subscribe("input/topic", 1);
Example Conclusion
With this setup, your Java MQTT client can publish new messages concurrently with receiving messages, based on the content of the received messages. This is especially beneficial for systems requiring real-time responses to external data, such as IoT applications.