MQTT subscribe client doesn't receive messages after connection
Messages published before the subscriber connects and subscribes will only be delivered under the following 2 situations
When the messages was published as retained. This means the last message on that topic will be delivered to a new subscriber at the point of subscription. This will only deliver the last message.
If the client had been previously connected and subscribed, then been disconnected. A message is then published and the client connects again with cleansession = false. (and when the subscription is at QOS1/2)
This may help: http://www.thingsprime.com/?p=2897
Admin
Updated on June 16, 2022Comments
-
Admin about 2 years
I am trying to learn MQTT and have been playing around with it. I've written a client for publishing and a client for subscribing (see below).
If I run the subscribe client and then run the publish client (while subscribe is running), then everything works fine. My subscribe client receives the messages published to the topic correctly.
However, if I run the publish client first (ie. I publish a message to a topic) and then I run the subscribe client, I receive no messages.
In other words, if I connect with the sub client first and then publish messages with the pub client while sub client is connected, everything works fine. However, if I publish a message first, and then connect with my sub client, I receive no messages. My understanding is that I should receive the messages that are present on the topic once I connect with a client and subscribe to the topic.
I found what seems a similar issue: Cannot receive already published messages to subscribed topic on mqtt paho, although that case seems a little different. I've tried changing different QoS setting or cleanSession flag, but that didn't resolve the issue.
Any help would be appreciated!
Publish Client:
public class MQTT_Client_Pub implements MqttCallback{ MqttClient client; public static void main(String[] args) { new MQTT_Client_Pub().mqttPub(); } public void mqttPub(){ try { this.setConnection(); // Connect client.connect(); // Create new message MqttMessage message = new MqttMessage(); message.setPayload("A single test message from b112358".getBytes()); message.setQos(0); // Publish message to a topic System.out.println("Publishing a message."); client.publish("pahodemo/test/b112358", message); // Disconnect client.disconnect(); } catch (MqttException e) { e.printStackTrace(); } catch (Exception e){ e.printStackTrace(); } } public void setConnection(){ // Client try{ client = new MqttClient("tcp://iot.eclipse.org:1883", "mqtt_test_b112358_pub"); } catch (MqttException e) { e.printStackTrace(); } // Connection Options MqttConnectOptions options = new MqttConnectOptions(); // Set the will options.setWill("pahodemo/clienterrors", "CRASHED - CONNECTION NOT CLOSED CLEANLY".getBytes(),2,true); // Set Callback client.setCallback(this); } public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("Message delivered to the broker."); } public void messageArrived(String topic, MqttMessage message) throws Exception {} public void connectionLost(Throwable cause) {}
}
Subscribe Client:
public class MQTT_Client_Sub implements MqttCallback{ MqttClient client; public static void main(String[] args) { new MQTT_Client_Sub().mqttSub(); } public void mqttSub(){ try { // Set connection this.setConnection(); // Connect client.connect(); // Subscribe client.subscribe("pahodemo/test/b112358", 0); // Disconnect // client.disconnect(); } catch (MqttException e) { e.printStackTrace(); } } public void setConnection(){ try { // Client client = new MqttClient("tcp://iot.eclipse.org:1883", "mqtt_test_b112358_sub"); } catch (MqttException e) { e.printStackTrace(); } // Connection Options MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(false); // Set the will options.setWill("pahodemo/clienterrors", "CRASHED - CONNECTION NOT CLOSED CLEANLY".getBytes(),2,true); client.setCallback(this); } public void deliveryComplete(IMqttDeliveryToken token) {} public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.println("Message Arrived: " + message.getPayload() + " on tipic: " + topic.getBytes()); } public void connectionLost(Throwable cause) {}
}
-
knolleary over 9 yearsJust to add, for the second scenario, when the client initialled connects and subscribes, it must also have cleansession set to false then.
-
Admin over 9 yearsThanks for the feedback. I am still a bit unclear. For case 1. Let's say I am connecting with a publisher client and publish a message to a topic with qos = 0. Then I connect with a subscriber client and subscribe to the topic for the very first time with cleanSession set to true, should I receive the message that was published by the first client? If I understand it correctly, the answer should be yes, but I am not receiving the published message in my sub client when I run the test.
-
Admin over 9 yearsAdditionally, regarding case 2. I've also tried similar scenario as described. I've connected with a subscriber client. Then I would terminate my sub client. I would then publish a message with qos 2 to a topic with my pub client. Afterwords, I would connect with my sub client with a cleanSession set to false and subscribe to the topic using qos 2. If I understand point 2, I should receive the published message in this case. However, when I test it out, I get no message. Is my understanding off?
-
hardillb over 9 yearsFor case 1 did you publish the message with the retained bit set? for case 2 the code in the linked example walks you through the steps.
-
Admin over 9 yearsI was not aware of the retained bit. Somehow missed it during my readings. That fixed it! Thank you!