multithreading within vertx

10,825

Solution 1

I don't think this is the way to go for vert.x.

A better way would be to use the event bus properly instead of Executor. Have a worker respond to the event on the bus, do the processing, and signal the bus when it's completed.

Creating threads defeats the purpose of going with vert.x.

Solution 2

The most flexible way is to create an ExecutorService and process requests with it. This brings fine-grained control over threading model of workers (fixed or variable number of threads, what work should be performed serially on a single thread, etc).

Modified sample might look like this:

public class TCPListener extends Verticle {

    private final ExecutorService executor = Executors.newFixedThreadPool(10);

    public void start(){

        NetServer server = vertx.createNetServer();

        server.connectHandler(new Handler<NetSocket>() {
            public void handle(final NetSocket sock) { // <-- Note 'final' here
                container.logger().info("A client has connected");
                sock.dataHandler(new Handler<Buffer>() {
                    public void handle(final Buffer buffer) { // <-- Note 'final' here

                        //Trigger another component here. SHould be done in a sperate thread. 
                        //The previous call should be returned . No need to wait for component response.
                        executor.submit(new Runnable() {

                            public void run() {
                                //It's okay to read buffer data here
                                //and use sock.write() if necessary
                                container.logger().info("I received " + buffer.length() + " bytes of data");
                                container.logger().info("I received " + new String(buffer.getBytes()));
                            }
                        }
                    }
                });
            }
        }).listen(1234, "host");
    }
}
Share:
10,825
Raveesh Sharma
Author by

Raveesh Sharma

Interested in : Technologies : Java, J2EE, hibernate,sprin boot, spark, zeppelin, zookeeper Research Areas: Speech recogntion, Mecahnical turk , IOT

Updated on June 23, 2022

Comments

  • Raveesh Sharma
    Raveesh Sharma almost 2 years

    I am a newbie to vert.x. I was trying out the vert.x "NetServer" capability. http://vertx.io/core_manual_java.html#writing-tcp-servers-and-clients and it works like a charm .

    However , I also read that "A verticle instance is strictly single threaded.

    If you create a simple TCP server and deploy a single instance of it then all the handlers for that server are always executed on the same event loop (thread)."

    Currently, for my implementation, I wanted to receive the TCP stream of bytes and then trigger another component. But this should not be a blocking call within the "start" method of the Verticle. So, is it a good practice, to write an executor within the start method? or does vertx automatically handle such cases.

    Here is a snippet

    public class TCPListener extends Verticle {
    
        public void start(){
    
            NetServer server = vertx.createNetServer();
    
            server.connectHandler(new Handler<NetSocket>() {
                public void handle(NetSocket sock) {
                    container.logger().info("A client has connected");
                    sock.dataHandler(new Handler<Buffer>() {
                        public void handle(Buffer buffer) {
                            container.logger().info("I received " + buffer.length() + " bytes of data");
    
                            container.logger().info("I received " + new String(buffer.getBytes()));
                            //Trigger another component here. SHould be done in a sperate thread. 
                            //The previous call should be returned . No need to wait for component response.
                        }
                    });
                }
            }).listen(1234, "host");
        }
    }
    

    What should be mechanism to make this a non blocking call.