Python serial port listener

30,644

It is possible to use the select module to select on a serial connection:

import serial
import select
timeout = 10
conn = serial.Serial(serial_name, baud_rate, timeout=0)
read,_,_ = select.select([conn], [], [], timeout)
read_data = conn.read(0x100)

Below is a full example that creates a pseudo-tty in the main thread. The main thread writes data to the pty in 5 second intervals. A child thread is created that attempts uses select.select to wait for reading data, as well as using the normal serial.Serial.read method to read data from the pty:

#!/bin/bash
import os
import pty
import select
import serial
import threading
import time
def out(msg):
    print(msg, flush=True)
# child process
def serial_select():
    time.sleep(1)
    out("CHILD THREAD: connecting to serial {}".format(serial_name))
    conn = serial.Serial(serial_name, timeout=0)
    conn.nonblocking()
    out("CHILD THREAD: selecting on serial connection")
    avail_read, avail_write, avail_error = select.select([conn],[],[], 7)
    out("CHILD THREAD: selected!")
    output = conn.read(0x100)
    out("CHILD THREAD: output was {!r}".format(output))
    out("CHILD THREAD: normal read serial connection, set timeout to 7")
    conn.timeout = 7
    # len("GOODBYE FROM MAIN THREAD") == 24
    # serial.Serial.read will attempt to read NUM bytes for entire
    # duration of the timeout. It will only return once either NUM
    # bytes have been read, OR the timeout has been reached
    output = conn.read(len("GOODBYE FROM MAIN THREAD"))
    out("CHILD THREAD: read data! output was {!r}".format(output))
master, slave = pty.openpty()
serial_name = os.ttyname(slave)
child_thread = threading.Thread(target=serial_select)
child_thread.start()
out("MAIN THREAD: sleeping for 5")
time.sleep(5)
out("MAIN THREAD: writing to serial")
os.write(master, "HELLO FROM MAIN THREAD")
out("MAIN THREAD: sleeping for 5")
time.sleep(5)
out("MAIN THREAD: writing to serial")
os.write(master, "GOODBYE FROM MAIN THREAD")
child_thread.join()

PySerial internally uses select on the read method (if you're on a POSIX system), using the timeout provided. However, it will attempt to read NUM bytes until NUM bytes total have been read or until the timeout has been reached. This is why the example explicitly reads len("GOODBYE FROM MAIN THREAD") (24 bytes) from the pty so that the conn.read the call would return immediately instead of waiting for the entire timeout.

TL;DR It is possible to use serial.Serial connections with select.select, which is what, I believe, you are looking for.

Share:
30,644

Related videos on Youtube

user3817250
Author by

user3817250

Updated on December 21, 2020

Comments

  • user3817250
    user3817250 about 2 years

    I've begun writing some code using PySerial to send and receive data to a serial device.

    Up until now I've only been working on initiating a transaction from a terminal and receiving a response from the serial device.

    pseudo:

    main:
        loop:
            message = get_message()
            send_to_serial(message)
            time_delay(1 second)
            read_response()
    

    Now I'd like to implement a listener on that port in the case that the serial device is the one initiating the communication. Also to remove the time_delay which could end up way too long, or not long enough for the serial device to respond.

    state = idle
    register_interrupt(serial_device, read_response())
    main:
        loop:
            message = get_message()
            state = sending
            send_to_serial(message)
    

    So far I haven't found any PySerial code that uses an interrupt to communicate.

    I just have no idea how to set up an interrupt of any sort in python, how would

  • xzoert
    xzoert almost 6 years
    According to my experience, it is good to set a very small timeout on the serial.Serial instance, say 1 ms. Depending on the transmitting device, data may arrive a little slower and with a timeout=0 you can end up reading almost byte by byte, hence looping / selecting a lot with a noticeable CPU consumption increment.
  • JDM
    JDM about 4 years
    When trying this solution on my environment (Python 2.7.15, with the current version of PySerial as of Dec. 2018), the abbreviated example throws the exception io.UnsupportedOperation: fileno on the line with the select statement. It works normally however when I simply use the read() method with a timeout of 1 second. I'm not sure what the problem is, but it seems to me that you can just do a child thread with an infinite loop and a .read() with a brief timeout -- and no select is needed.
  • JDM
    JDM about 4 years
    PS - I tested this approach (child thread, infinite loop with just a read(), and a brief timeout) in my own application, and it worked perfectly.

Related