zeromq: reset REQ/REP socket state

14,305

Solution 1

The good news is that, as of ZMQ 3.0 and later (the modern era), you can set a timeout on a socket. As others have noted elsewhere, you must do this after you have created the socket, but before you connect it:

zmq_req_socket.setsockopt( zmq.RCVTIMEO, 500 ) # milliseconds

Then, when you actually try to receive the reply (after you have sent a message to the REP socket), you can catch the error that will be asserted if the timeout is exceeded:

 try:
   send( message, 0 )
   send_failed = False

 except zmq.Again:
   logging.warning( "Image send failed." )
   send_failed = True

However! When this happens, as observed elsewhere, your socket will be in a funny state, because it will still be expecting the response. At this point, I cannot find anything that works reliably other than just restarting the socket. Note that if you disconnect() the socket and then re connect() it, it will still be in this bad state. Thus you need to

def reset_my_socket:
  zmq_req_socket.close()
  zmq_req_socket = zmq_context.socket( zmq.REQ )
  zmq_req_socket.setsockopt( zmq.RCVTIMEO, 500 ) # milliseconds
  zmq_req_socket.connect( zmq_endpoint )

You will also notice that because I close()d the socket, the receive timeout option was "lost", so it is important set that on the new socket.

I hope this helps. And I hope that this does not turn out to be the best answer to this question. :)

Solution 2

As the accepted answer seem so terribly sad to me, I did some research and have found that everything we need was actually in the documentation.

The .setsockopt() with the correct parameter can help you resetting your socket state-machine without brutally destroy it and rebuild another on top of the previous one dead body.

(yeah I like the image).

ZMQ_REQ_CORRELATE: match replies with requests
The default behaviour of REQ sockets is to rely on the ordering of messages to match requests and responses and that is usually sufficient. When this option is set to 1, the REQ socket will prefix outgoing messages with an extra frame containing a request id. That means the full message is (request id, 0, user frames…). The REQ socket will discard all incoming messages that don't begin with these two frames.
Option value type int
Option value unit 0, 1
Default value 0
Applicable socket types ZMQ_REQ

ZMQ_REQ_RELAXED: relax strict alternation between request and reply
By default, a REQ socket does not allow initiating a new request with zmq_send(3) until the reply to the previous one has been received. When set to 1, sending another message is allowed and has the effect of disconnecting the underlying connection to the peer from which the reply was expected, triggering a reconnection attempt on transports that support it. The request-reply state machine is reset and a new request is sent to the next available peer.
If set to 1, also enable ZMQ_REQ_CORRELATE to ensure correct matching of requests and replies. Otherwise a late reply to an aborted request can be reported as the reply to the superseding request.
Option value type int
Option value unit 0, 1
Default value 0
Applicable socket types ZMQ_REQ

A complete documentation is here

Solution 3

There is one solution to this and that is adding timeouts to all calls. Since ZeroMQ by itself does not really provide simple timeout functionality I recommend using a subclass of the ZeroMQ socket that adds a timeout parameter to all important calls.

So, instead of calling s.recv() you would call s.recv(timeout=5.0) and if a response does not come back within that 5 second window it will return None and stop blocking. I had made a futile attempt at this when I run into this problem.

Solution 4

I'm actually looking into this at the moment, because I am retro fitting a legacy system.

I am coming across code constantly that "needs" to know about the state of the connection. However the thing is I want to move to the message passing paradigm that the library promotes.

I found the following function : zmq_socket_monitor

What it does is monitor the socket passed to it and generate events that are then passed to an "inproc" endpoint - at that point you can add handling code to actually do something.

There is also an example (actually test code) here : github

I have not got any specific code to give at the moment (maybe at the end of the week) but my intention is to respond to the connect and disconnects such that I can actually perform any resetting of logic required.

Hope this helps, and despite quoting 4.2 docs, I am using 4.0.4 which seems to have the functionality as well.

Note I notice you talk about python above, but the question is tagged C++ so that's where my answer is coming from...

Share:
14,305
frans
Author by

frans

Updated on June 05, 2022

Comments

  • frans
    frans almost 2 years

    When you use the simple ZeroMQ REQ/REP pattern you depend on a fixed send()->recv() / recv()->send() sequence. As this article describes you get into trouble when a participant disconnects in the middle of a request because then you can't just start over with receiving the next request from another connection but the state machine would force you to send a request to the disconnected one.

    Has there emerged a more elegant way to solve this since the mentioned article has been written?

    Is reconnecting the only way to solve this (apart from not using REQ/REP but use another pattern)

  • frans
    frans over 9 years
    I have currently no zeromq 4 here to test it with python but I believe that this approach you recommend will solve the problem with infinitive waiting but not with the state machine. You will then be able to call recv() again but not send(). You would then get an exception like zmq.error.ZMQError: Operation cannot be accomplished in current state. Tell me if I'm wrong (I will try it anyway but I have to install a recent version of pyzmq)
  • user3162307
    user3162307 about 9 years
    My experience is the same as you describe frans. If the REQ socket has made a send() call but never receives a response, then it is stuck waiting for a response from somewhere. So I can confirm your intuition empirically.
  • orodbhen
    orodbhen about 8 years
    This still leaves the REP socket in a bad state, because it attempted to send a reply that was never received.
  • orodbhen
    orodbhen about 8 years
    As I commented above, the REP socket will still be stuck in a bad state, even if you reset the REQ socket. It's stuck in reply mode, instead of receive mode.