Asynchronous HTTP client with Netty

15,840

You are using a ChannelFutureListener to do all operations in the channel (which is bad), and the future listener will be executed right after calling those channel operations.

The problem is, After sending the message, channel is disconnected immediately and the handler can not receive the response message which comes later.

        ........
    case Sending:
        this.state = State.Disconnecting;
        future.getChannel().disconnect().addListener(this);
        break;
        ........

you should not block the channel future thread at all. The best approach is extend the SimpleChannelUpstreamHandler's

    channelConnected(..) {} 
    messageReceived(..) {} 
    channelDisconnected(..) {} 

methods and react to those events. you can keep the state in that handler too.

Share:
15,840
Nitzan Tomer
Author by

Nitzan Tomer

fugazi.io fugazi.io is a web based terminal application for executing local and remote commands which uses user-defined commands syntax. I've been working on it for a while now as a side project, and we're looking for help and feedback, so feel free to contact us (terminal.fugazi.io AT gmail.com). Check out the github repo. Also: software engineer web developer geek aikidoka kendoka yogi juggler (retired) skateboarder (occasional) snowboarder (24/7) smart-ass

Updated on July 20, 2022

Comments

  • Nitzan Tomer
    Nitzan Tomer almost 2 years

    I'm new to netty and still strugling to find my way. I'm looking to create an http client that works asynchronously. The netty examples of http only show how to wait for IO operations, and not how to use addListener, and so I've been trying to figure this out for the last few days.

    I'm trying to create a request class that will handle all of the different states of a request, from connecting, sending the data, handling the response and then closing of the connection. In order to do that my class extends SimpleChannelUpstreamHandler and implements ChannelFutureListener. I use a ChannelPipelineFactory which adds the (this) instance the class (as a SimpleChannelUpstreamHandler) to the pipeline as a handler.

    The connection is created like this:

    this.state = State.Connecting;
    this.clientBootstrap.connect(this.address).addListener(this);
    

    Then the operationComplete method:

    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        State oldState = this.state;
    
        if (!future.isSuccess()) {
            this.status = Status.Failed;
            future.getChannel().disconnect().addListener(this);
        }
        else if (future.isCancelled()) {
            this.status = Status.Canceled;
            future.getChannel().disconnect().addListener(this);
        }
        else switch (this.state) {
            case Connecting:
                this.state = State.Sending;
                Channel channel = future.getChannel();
                channel.write(this.createRequest()).addListener(this);
                break;
    
            case Sending:
                this.state = State.Disconnecting;
                future.getChannel().disconnect().addListener(this);
                break;
    
            case Disconnecting:
                this.state = State.Closing;
                future.getChannel().close().addListener(this);
                break;
    
            case Closing:
                this.state = State.Finished;
                break;
        }
        System.out.println("request operationComplete start state: " + oldState + ", end state: " + this.state + ", status: " + this.status);
    }
    
    private HttpRequest createRequest() {
        String url = this.url.toString();
    
        HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url);
        request.setHeader(HttpHeaders.Names.HOST, this.url.getHost());
        request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
        request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);
    
        return request;
    }
    

    The class also overrides the messageReceived method:

    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        System.out.println("messageReceived");
        HttpResponse response = (HttpResponse) e.getMessage();
    
        ChannelBuffer content = response.getContent();
        if (content.readable()) {
            System.out.println("CONTENT: " + content.toString(CharsetUtil.UTF_8));
        }
    }
    

    The problem is that I get this output:

    request operationComplete start state: Connecting, end state: Sending, status: Unknown
    request operationComplete start state: Sending, end state: Disconnecting, status: Unknown
    request operationComplete start state: Closing, end state: Finished, status: Unknown
    request operationComplete start state: Disconnecting, end state: Finished, status: Unknown
    

    As you can see the messageReceived of the is not being executed for some reason, even though the pipeline factory adds the instance of this class to the pipeline.

    Any ideas what I'm missing here? Thanks.


    Edit

    I managed to finally get this working thanks to the help of @JestanNirojan, in case someone will be interested in the solution:

    public class ClientRequest extends SimpleChannelUpstreamHandler {
    
        ....
    
        public void connect() {
            this.state = State.Connecting;
            System.out.println(this.state);
            this.clientBootstrap.connect(this.address);
        }
    
        @Override
        public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
            this.state = State.Sending;
            System.out.println(this.state);
            ctx.getChannel().write(this.createRequest());
        }
    
        @Override
        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
            HttpResponse response = (HttpResponse) e.getMessage();
    
            ChannelBuffer content = response.getContent();
            if (content.readable()) {
                System.out.println("CONTENT: " + content.toString(CharsetUtil.UTF_8));
            }
    
            this.state = State.Disconnecting;
            System.out.println(this.state);
        }
    
        @Override
        public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
            this.state = State.Closing;
            System.out.println(this.state);
        }
    
        @Override
        public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
            this.state = State.Finished;
            System.out.println(this.state);
        }
    
        private HttpRequest createRequest() {
            String url = this.url.toString();
    
            HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url);
            request.setHeader(HttpHeaders.Names.HOST, this.url.getHost());
            request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
            request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);
    
            return request;
        }
    }
    
  • Nitzan Tomer
    Nitzan Tomer about 12 years
    Oh. That was simple. Thanks a lot for the info, I wish Netty had better documentation on this.