MQTT subscribe client doesn't receive messages after connection

10,885

Messages published before the subscriber connects and subscribes will only be delivered under the following 2 situations

  1. When the messages was published as retained. This means the last message on that topic will be delivered to a new subscriber at the point of subscription. This will only deliver the last message.

  2. If the client had been previously connected and subscribed, then been disconnected. A message is then published and the client connects again with cleansession = false. (and when the subscription is at QOS1/2)

This may help: http://www.thingsprime.com/?p=2897

Share:
10,885
Admin
Author by

Admin

Updated on June 16, 2022

Comments

  • Admin
    Admin about 2 years

    I am trying to learn MQTT and have been playing around with it. I've written a client for publishing and a client for subscribing (see below).

    If I run the subscribe client and then run the publish client (while subscribe is running), then everything works fine. My subscribe client receives the messages published to the topic correctly.

    However, if I run the publish client first (ie. I publish a message to a topic) and then I run the subscribe client, I receive no messages.

    In other words, if I connect with the sub client first and then publish messages with the pub client while sub client is connected, everything works fine. However, if I publish a message first, and then connect with my sub client, I receive no messages. My understanding is that I should receive the messages that are present on the topic once I connect with a client and subscribe to the topic.

    I found what seems a similar issue: Cannot receive already published messages to subscribed topic on mqtt paho, although that case seems a little different. I've tried changing different QoS setting or cleanSession flag, but that didn't resolve the issue.

    Any help would be appreciated!

    Publish Client:

    public class MQTT_Client_Pub implements MqttCallback{
    
    MqttClient client;
    
    public static void main(String[] args) {
    
        new MQTT_Client_Pub().mqttPub();
    }
    
    public void mqttPub(){
        try {
            this.setConnection();
    
            // Connect
            client.connect();
    
            // Create new message
            MqttMessage message = new MqttMessage();
            message.setPayload("A single test message from b112358".getBytes());
            message.setQos(0);
    
            // Publish message to a topic
            System.out.println("Publishing a message.");
            client.publish("pahodemo/test/b112358", message);
    
            // Disconnect
            client.disconnect();
    
          } catch (MqttException e) {
            e.printStackTrace();
          } catch (Exception e){
            e.printStackTrace();
          }
    }
    
    public void setConnection(){
        // Client
        try{
            client = new MqttClient("tcp://iot.eclipse.org:1883", "mqtt_test_b112358_pub");
        } catch (MqttException e) {
            e.printStackTrace();
        }
    
        // Connection Options
        MqttConnectOptions options = new MqttConnectOptions();
    
        // Set the will
        options.setWill("pahodemo/clienterrors", "CRASHED - CONNECTION NOT CLOSED CLEANLY".getBytes(),2,true);
    
        // Set Callback
        client.setCallback(this);
    }
    
    public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("Message delivered to the broker.");
    }
    
    public void messageArrived(String topic, MqttMessage message) throws Exception {}
    
    public void connectionLost(Throwable cause) {}
    

    }

    Subscribe Client:

    public class MQTT_Client_Sub implements MqttCallback{
    
    MqttClient client;
    
    public static void main(String[] args) {
    
        new MQTT_Client_Sub().mqttSub();
    
    }
    
    public void mqttSub(){
        try {
            // Set connection
            this.setConnection();
    
            // Connect
            client.connect();
    
            // Subscribe
    
            client.subscribe("pahodemo/test/b112358", 0);
            // Disconnect
            // client.disconnect();
    
          } catch (MqttException e) {
            e.printStackTrace();
          }
    }
    
    public void setConnection(){
        try {
            // Client
            client = new MqttClient("tcp://iot.eclipse.org:1883", "mqtt_test_b112358_sub");
        } catch (MqttException e) {
            e.printStackTrace();
        }
    
        // Connection Options
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(false);
    
        // Set the will
        options.setWill("pahodemo/clienterrors", "CRASHED - CONNECTION NOT CLOSED CLEANLY".getBytes(),2,true);
    
        client.setCallback(this);
    }
    
    public void deliveryComplete(IMqttDeliveryToken token) {}
    
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        System.out.println("Message Arrived: " + message.getPayload() + " on tipic: " + topic.getBytes());
    }
    
    public void connectionLost(Throwable cause) {}
    

    }

  • knolleary
    knolleary over 9 years
    Just to add, for the second scenario, when the client initialled connects and subscribes, it must also have cleansession set to false then.
  • Admin
    Admin over 9 years
    Thanks for the feedback. I am still a bit unclear. For case 1. Let's say I am connecting with a publisher client and publish a message to a topic with qos = 0. Then I connect with a subscriber client and subscribe to the topic for the very first time with cleanSession set to true, should I receive the message that was published by the first client? If I understand it correctly, the answer should be yes, but I am not receiving the published message in my sub client when I run the test.
  • Admin
    Admin over 9 years
    Additionally, regarding case 2. I've also tried similar scenario as described. I've connected with a subscriber client. Then I would terminate my sub client. I would then publish a message with qos 2 to a topic with my pub client. Afterwords, I would connect with my sub client with a cleanSession set to false and subscribe to the topic using qos 2. If I understand point 2, I should receive the published message in this case. However, when I test it out, I get no message. Is my understanding off?
  • hardillb
    hardillb over 9 years
    For case 1 did you publish the message with the retained bit set? for case 2 the code in the linked example walks you through the steps.
  • Admin
    Admin over 9 years
    I was not aware of the retained bit. Somehow missed it during my readings. That fixed it! Thank you!