Thread interrupt not ending blocking call on input stream read

16,136

Solution 1

The RXTX SerialInputStream (what is returned by the serial.getInputStream() call) supports a timeout scheme that ended up solving all my problems. Adding the following before creating the new SerialReader object causes the reads to no longer block indefinitely:

serial.enableReceiveTimeout(1000);

Within the SerialReader object, I had to change a few things around to read directly from the InputStream instead of creating the ReadableByteChannel, but now, I can stop and restart the reader without issue.

Solution 2

You can't make a stream that doesn't support interruptible I/O into an InterruptibleChannel simply by wrapping it (and, anyway, ReadableByteChannel doesn't extend InterruptibleChannel).

You have to look at the contract of the underlying InputStream. What does SerialPort.getInputStream() say about the interruptibility of its result? If it doesn't say anything, you should assume that it ignores interrupts.

For any I/O that doesn't explicitly support interruptibility, the only option is generally closing the stream from another thread. This may immediately raise an IOException (though it might not be an AsynchronousCloseException) in the thread blocked on a call to the stream.

However, even this is extremely dependent on the implementation of the InputStream—and the underlying OS can be a factor too.


Note the source code comment on the ReadableByteChannelImpl class returned by newChannel():

  private static class ReadableByteChannelImpl
    extends AbstractInterruptibleChannel       // Not really interruptible
    implements ReadableByteChannel
  {
    InputStream in;
    ⋮

Solution 3

i am using the code below to shutdown rxtx. i run tests that start them up and shut them down and the seems to work ok. my reader looks like:

private void addPartsToQueue(final InputStream inputStream) {
    byte[] buffer = new byte[1024];
    int len = -1;
    boolean first = true;
    // the read can throw
    try {
        while ((len = inputStream.read(buffer)) > -1) {
            if (len > 0) {
                if (first) {
                    first = false;
                    t0 = System.currentTimeMillis();
                } else
                    t1 = System.currentTimeMillis();
                final String part = new String(new String(buffer, 0, len));
                queue.add(part);
                //System.out.println(part + " " + (t1 - t0));
            }
            try {
                Thread.sleep(sleep);
            } catch (InterruptedException e) {
                //System.out.println(Thread.currentThread().getName() + " interrupted " + e);
                break;
            }
        }
    } catch (IOException e) {
        System.err.println(Thread.currentThread().getName() + " " + e);
        //if(interruSystem.err.println(e);
        e.printStackTrace();
    }
    //System.out.println(Thread.currentThread().getName() + " is ending.");
}

thanks

public void shutdown(final Device device) {
    shutdown(serialReaderThread);
    shutdown(messageAssemblerThread);
    serialPort.close();
    if (device != null)
        device.setSerialPort(null);
}

public static void shutdown(final Thread thread) {
    if (thread != null) {
        //System.out.println("before intterupt() on thread " + thread.getName() + ", it's state is " + thread.getState());
        thread.interrupt();
        //System.out.println("after intterupt() on thread " + thread.getName() + ", it's state is " + thread.getState());
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            System.out.println(Thread.currentThread().getName() + " was interrupted trying to sleep after interrupting" + thread.getName() + " " + e);
        }
        //System.out.println("before join() on thread " + thread.getName() + ", it's state is " + thread.getState());
        try {
            thread.join();
        } catch (InterruptedException e) {
            System.out.println(Thread.currentThread().getName() + " join interruped");
        }
        //System.out.println(Thread.currentThread().getName() + " after join() on thread " + thread.getName() + ", it's state is" + thread.getState());
    }
Share:
16,136
JDS
Author by

JDS

Updated on July 19, 2022

Comments

  • JDS
    JDS almost 2 years

    I'm using RXTX to read data from a serial port. The reading is done within a thread spawned in the following manner:

    CommPortIdentifier portIdentifier = CommPortIdentifier.getPortIdentifier(port);
    CommPort comm = portIdentifier.open("Whatever", 2000);
    SerialPort serial = (SerialPort)comm;
    ...settings
    Thread t = new Thread(new SerialReader(serial.getInputStream()));
    t.start();
    

    The SerialReader class implements Runnable and just loops indefinitely, reading from the port and constructing the data into useful packages before sending it off to other applications. However, I've reduced it down to the following simplicity:

    public void run() {
      ReadableByteChannel byteChan = Channels.newChannel(in); //in = InputStream passed to SerialReader
      ByteBuffer buffer = ByteBuffer.allocate(100);
      while (true) {
        try {
          byteChan.read(buffer);
        } catch (Exception e) {
          System.out.println(e);
        }
      }
    }
    

    When a user clicks a stop button, the following functionality fires that should in theory close the input stream and break out of the blocking byteChan.read(buffer) call. The code is as follows:

    public void stop() {
      t.interrupt();
      serial.close();
    }
    

    However, when I run this code, I never get a ClosedByInterruptException, which SHOULD fire once the input stream closes. Furthermore, the execution blocks on the call to serial.close() -- because the underlying input stream is still blocking on the read call. I've tried replacing the interrupt call with byteChan.close(), which should then cause an AsynchronousCloseException, however, I'm getting the same results.

    Any help on what I'm missing would be greatly appreciated.

  • JDS
    JDS over 13 years
    In my example, Channels.newChannel(<input stream>) returns an object of type ReadableByteChannelImpl, which implements ReadableByteChannel (but more importantly extends AbstractInterruptibleChannel which implements InterruptibleChannel).
  • JDS
    JDS over 13 years
    Oops...enter submits the comment. Anyways, doing an instanceof check for byteChan against InterruptibleChannel returns true. Also, since it wasn't clear, the call to stop() is done in the thread that spawns the reading loop thread.
  • brady
    brady over 13 years
    @JDS - … and yet, it doesn't work, does it? Please see my update. The channel is not interruptible, and you may have no viable option for bailing out of an RXTX read.