'An established connection was aborted by the software in your host machine' when using java nio socket

51,282

When you get end of stream (read() returns -1) you aren't closing the channel, so you're leaking channels. You need to close the channel, and note that doing so will automatically cancel the key. Just cancelling the key by itself is not sufficient. Same when you cancel the key because clientNumber == null or you get an IOException.

NB you shouldn't register OP_WRITE unless you have just received zero from a write() call, and you should deregister it whenever you don't get that zero.

Share:
51,282
Husein Behboudi Rad
Author by

Husein Behboudi Rad

Updated on July 05, 2022

Comments

  • Husein Behboudi Rad
    Husein Behboudi Rad almost 2 years

    I had developed an java server using java nio sockets. It is the code of my application:

    public class EchoServer {
    
    static final org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(Main.class);
    
    private static final int BUFFER_SIZE = 1024;
    
    private final static int DEFAULT_PORT = 4664;
    
    private InetAddress hostAddress = null;
    
    private int port;
    
    private String ipAddress = "my ip";
    private Selector selector;
    
    // The buffer into which we'll read data when it's available
    private ByteBuffer readBuffer = ByteBuffer.allocate(BUFFER_SIZE);
    
    int timestamp = 1;
    
    HashMap<Integer, String> connectedClients = new HashMap<Integer, String>();
    HashMap<String, Integer> clientIds= new HashMap<String,Integer>();
    HashMap<String, String> messageToClients = new HashMap<String, String>();
    
    
    
    
    
    public EchoServer() {
        this(DEFAULT_PORT);
    
    }
    
    public EchoServer(int port)  {
        try{
            this.port = port;
            hostAddress = InetAddress.getByName(ipAddress);
            selector = initSelector();
            loop();
        }catch(Exception ex){
            logger.error("Exception Accoured:",ex);
        }
    }
    
    private Selector initSelector()  {
        try{
            Selector socketSelector = SelectorProvider.provider().openSelector();
            ServerSocketChannel serverChannel = ServerSocketChannel.open();
            serverChannel.configureBlocking(false);
    
            InetSocketAddress isa = new InetSocketAddress(hostAddress, port);
            serverChannel.socket().bind(isa);
            serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT);
    
            return socketSelector;
        }catch(Exception ex){
            logger.error("Exception Accoured:",ex);
            return null;
        }
    }
    
    private void loop() {
        while (true) {
            try {
    
                // Do defined operations for clients
                // ------------------------------
                selector.select();
                Iterator<SelectionKey> selectedKeys = selector.selectedKeys()
                        .iterator();
    
                while (selectedKeys.hasNext()) {
                    SelectionKey key = selectedKeys.next();
                    selectedKeys.remove();
    
                    if (!key.isValid()) {
                        logger.warn(key.hashCode() + "- is invalid");
                        continue;
                    }
                    // Check what event is available and deal with it
                    if (key.isAcceptable()) {
                        accept(key);
                    } else if (key.isReadable()) {
                        read(key);
                    } else if (key.isWritable()) {
                        write(key);
                    }
                }
    
                // Fetch List from server
                // -----------------------------------------
                try {
                    ResultSet resultset = DataBase.getInstance()
                            .getQueryResult();
                    boolean flag = false;
                    while (resultset.next()) {
                        String mobileNumber = resultset.getString("MobileNo");
    
                        String message = resultset.getInt("IsMessage") + ","
                                + resultset.getInt("IsDeliver") + ","
                                + resultset.getInt("IsGroup") + ","
                                + resultset.getInt("IsSeen");
                        messageToClients.put(mobileNumber, message);
                    }
    
                } catch (Exception ex) {
                    //ex.printStackTrace();
                    logger.error("Exception Accoured:",ex);
                }
    
                // Wait for 1 second
                // -----------------------------------------------
                Thread.sleep(1000);
                timestamp++;
    
            } catch (Exception e) {
                e.printStackTrace();
                System.exit(1);
            }
    
        }
    }
    
    private void accept(SelectionKey key)  {
    
        try{
            // Initialize the connection ------------------------------------------
            ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key
                    .channel();
            SocketChannel socketChannel = serverSocketChannel.accept();
            socketChannel.configureBlocking(false);
            socketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
            socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
            logger.info("New client accepted");
    
            // Fire read for reading phone number --------------------------------
            socketChannel.register(selector, SelectionKey.OP_READ);
        }catch(Exception ex){
            logger.error("Exception Accoured:",ex);
        }
    }
    
    private void read(SelectionKey key)  {
    
        try{
            // Initialize Socket -----------------------------------------------------
            SocketChannel socketChannel = (SocketChannel) key.channel();
    
    
            // Reading Client Number -------------------------------------------------
    
            readBuffer.clear();
    
            int numRead;
            try {
                numRead = socketChannel.read(readBuffer);
            } catch (IOException e) {
                logger.error("Forceful shutdown");
                key.cancel();
                return;
            }
    
            // read was not successful
            if (numRead == -1) {
                logger.error("Graceful shutdown");
                key.cancel();
                return;
            }
    
            // read was successful and now we can write it to String
            readBuffer.flip();
            byte[] bytes = new byte[readBuffer.limit()];
            readBuffer.get(bytes);
    
            String number = new String(bytes);
    
            number = number.replace("\r\n", "");
            number = number.trim();
    
            // Update Connect Clients Status -----------------------------------------
            Integer clientId=clientIds.get(number);
            if ( clientId == null) {
                connectedClients.put(key.hashCode(), number);
                clientIds.put(number, key.hashCode());
                logger.error(number + "- (" + key.hashCode() + ") has Connected");
            }else{
                connectedClients.remove(clientId);
                connectedClients.put(key.hashCode(), number);
                clientIds.put(number, key.hashCode());
                logger.error(number + "- (" + key.hashCode() + ") REconnected");
            }
    
            //System.err.println("All clients number are:" + connectedClients.size());
    
            logger.error("All clients number are:" + connectedClients.size());
    
            // Fire Write Operations -------------------------------------------------
            socketChannel.register(selector, SelectionKey.OP_WRITE);
    
        }catch(Exception ex){
            //ex.printStackTrace();
            logger.error("Exception Accoured:",ex);
        }
    }
    
    private void write(SelectionKey key)  {
        try {
    
            //Check channel still alive ----------------------------------------------
    
            String clientNumber = connectedClients.get(key.hashCode());
    
            if(clientNumber == null){
                key.cancel();
                return;
            }
    
            // Get Channel -----------------------------------------------------------
            SocketChannel socketChannel = (SocketChannel) key.channel();
    
            // Send Message if client number have new message ------------------------
    
            if (messageToClients.get(clientNumber) != null) {
                logger.info(clientNumber + "-" + key.hashCode()
                                + "- Sent write message");
                String timeStamp = String.valueOf(timestamp);
                String message = messageToClients.get(clientNumber);
                ByteBuffer dummyResponse = ByteBuffer.wrap((message + "\r\n").getBytes("UTF-8"));
                socketChannel.write(dummyResponse);
                messageToClients.remove(clientNumber);
            }
    
            // Fire new write state --------------------------------------------------
            socketChannel.register(selector, SelectionKey.OP_WRITE);
    
        } catch (IOException iox) {
            logger.error("Exception Accoured:",iox);
            String number = connectedClients.get(key.hashCode());
            clientIds.remove(number);
            connectedClients.remove(key.hashCode());
            key.cancel();
        } catch(Exception ex){
            logger.error("Exception Accoured:",ex);
        }
    
    }
    

    }

    When I am testing with 2-3 clients it is working fine but when I start testing it with about 100-300 client I recived below exception at several times(Actually it is happeingin on write() method and line socketChannel.write(dummyResponse);:

    java.io.IOException: An established connection was aborted by the software in your host machine
        at sun.nio.ch.SocketDispatcher.write0(Native Method)
        at sun.nio.ch.SocketDispatcher.write(Unknown Source)
        at sun.nio.ch.IOUtil.writeFromNativeBuffer(Unknown Source)
        at sun.nio.ch.IOUtil.write(Unknown Source)
        at sun.nio.ch.SocketChannelImpl.write(Unknown Source)
        at net.behboodi.testserver.EchoServer.write(EchoServer.java:274)
        at net.behboodi.testserver.EchoServer.loop(EchoServer.java:106)
        at net.behboodi.testserver.EchoServer.<init>(EchoServer.java:56)
        at net.behboodi.testserver.EchoServer.<init>(EchoServer.java:47)
        at net.behboodi.testserver.Main.main(Main.java:44)
    

    and then I can not receive messages from the server.

  • slashms
    slashms about 7 years
    why you should deregister whenever you don't get that zero?
  • user207421
    user207421 almost 7 years
    @slashms Because the write has succeeded, so the socket send buffer is no longer full, so you are no longer beholden to the selector to tell you when you may write. You can do it any time.