Need to wait for asynchronous api callback before I return from method in Java

27,456

You could use CountDownLatch like this:

public class LogonHandler implements Handler {
    private final CountDownLatch loginLatch = new CountDownLatch (1);

    private boolean callbackResults;

    public void serverResponseCallback(boolean result) {
        callbackResults = result;
        loginLatch.countDown ();
    }

    public boolean tryLogon(Credentials creds) throws InterruptedException {
        SomeServer server = new SomeServer(address);
        server.tryLogon (creds.getName (), creds.getPass ());
        loginLatch.await ();
        return callbackResults;
    }
}

If you want to limit waiting time by, for example, 5 seconds, then instead of loginLatch.await () use the following:

if (loginLatch.await (5L, TimeUnit.SECONDS))
    return callbackResults;
else
    return false; // Timeout exceeded
Share:
27,456

Related videos on Youtube

shaz
Author by

shaz

Updated on September 26, 2020

Comments

  • shaz
    shaz over 3 years
      import java.util.concurrent.CountDownLatch;
    
      import quickfix.Initiator;
    
    
      public class UserSession {
        private final CountDownLatch latch = new CountDownLatch(1);
    
    public String await() {
            try {
                System.out.println("waiting...");
                if (latch.await(5, TimeUnit.SECONDS))
                    System.out.println("released!");
                else
                    System.out.println("timed out");
                return secret;
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                System.out.println(e.getMessage());
                e.printStackTrace();
            }
            return null;
        }
    
        public void countdown(String s) {
            System.out.println("In countdown: "+s+ ". Latch count: "+latch.getCount());
            secret = s;
            latch.countDown();
            System.out.println("Latch count: "+latch.getCount());
        }
      }
    
    
      public class LogonHandler extends AbstractHandler {
    
        public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) 
            throws IOException, ServletException
            {
                Map<String,String[]> query = request.getParameterMap();
    
                if (query.containsKey("method")) {
                    if (query.get("method")[0].compareTo(method) == 0) {
                        baseRequest.setHandled(true);
                        response.getWriter().println(logon(query));
                    }
                }
                else
                    baseRequest.setHandled(false);
            }
    
        private String logon(Map<String,String[]> query) {
            if (query.containsKey("username") && query.containsKey("password") &&           query.containsKey("sendercompid")) {
    
                app.mapUser(query.get("sendercompid")[0], new   UserSession(query.get("username")[0], query.get("password")[0]));
    
                SessionID session = new SessionID(new BeginString("FIX.4.4"), new SenderCompID(query.get("sendercompid")[0]), new TargetCompID("PARFX"));
    
                try {
                    ThreadedSocketInitiator tsi = new ThreadedSocketInitiator(app, app.getFileStoreFactory(), settings, app.getLogFactory(), app.getMessageFactory());
                    UserSession userSession = new UserSession(query.get("username")[0], query.get("password")[0]);
                    userSession.setInitiator(tsi);
    
                    tsi.start();
                    return userSession.await();
                } catch (ConfigError e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                    return e.toString();
                }
            }
            return "fail";
        }
      }
    
    
    public class QuickfixjApplication implements Application {
        private Map<String,UserSession> users = new HashMap<String,UserSession>();
    
        public void mapUser(String s, UserSession u) {
            users.put(s, u);
        }
    
        public void toAdmin(Message message, SessionID sessionId) {
    
            try {
                if (message.getHeader().getField(new StringField(MsgType.FIELD)).valueEquals(Logon.MSGTYPE)) {
                    UserSession user = users.get(sessionId.getSenderCompID());
                    message.setField(new Username(user.getUsername()));
                    message.setField(new Password(user.getPassword()));
                }
            } catch (FieldNotFound e) {
                e.printStackTrace();
            }
        }
    
        public void fromAdmin(Message message, SessionID sessionId)
            throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, RejectLogon {
    
            if (message.getHeader().getField(new StringField(MsgType.FIELD)).valueEquals(Logon.MSGTYPE)) {
                System.out.println(message.toString());
                UserSession user = users.get(sessionId.getSenderCompID());
                user.countdown(message.toString());
            }
        }
    }
    

    Ok, I've tried to only include the minimum amount of code here. There are three interesting classes, UserSession is the internal glue between the Jetty handler and the QuickFix/j application.

    The LogonHandler receives an HTTP logon request and tries to log a user onto a QuickFix/j application session.

    QuickFix/j is sending a logon message to a FIX server, this logon request / response is asynchronous. The HTTP logon request is of course synchronous. So we have to wait for the reply from the FIX server before we return from the HTTP request. I do this using CountDownLatch and this UserSession object.

    When I create the QuickFix/j session object I also create a UserSession object and add it to a map (that happens in the LogonHandler logon method).

    There are two callbacks in the QuickFix/j application object, toAdmin() and fromAdmin(). In fromAdmin() I check if the message is a logon response and if it is I call a method of UserSession to countdown the latch. In debugging the code I see that the fromAdmin() method is hit, the UserSession object is found in the map and the countdown() method is called and the latch.getCount() goes from 1 to 0, but the latch.await() method in UserSession await() never returns. It always times out.

  • shaz
    shaz about 11 years
    Thanks Mikhail, that's what I tried. The loginLatch.count() goes to 0 but the await() never releases.
  • Mikhail Vladimirov
    Mikhail Vladimirov about 11 years
    @shaz CountDownLatch should work. Probably error is in some other place of your code. Could you post your code with CountDownLatch here?
  • Mikhail Vladimirov
    Mikhail Vladimirov about 11 years
    @shaz Your code looks fine. Once you said your latch goes to zero, while await() does not unblock, it is possible that you call countdown on one instance of UserSession object, while you call await() on another instance, i.e. on cloned instance?
  • shaz
    shaz about 11 years
    So you mean the fact that I'm putting it in a HashMap?
  • shaz
    shaz about 11 years
    waiting...java.util.concurrent.CountDownLatch@98350a[Count = 1] Feb 18, 2013 11:45:14 AM quickfix.mina.SessionConnector startSessionTimer INFO: SessionTimer started Feb 18, 2013 11:45:14 AM quickfix.mina.initiator.InitiatorIoHandler sessionCreated INFO: MINA session created for FIX.4.4:test9044->XXX: local=/172.16.34.167:60198, class org.apache.mina.transport.socket.nio.SocketSessionImpl, remote=/172.17.22.52:8020 in onLogon method... In countdown: FIX.4.4:test9044->XXX. Latch count: 1 java.util.concurrent.CountDownLatch@bb2bc3[Count = 1] Latch count: 0 timed out
  • shaz
    shaz about 11 years
    It does look like I'm dealing with different instances.
  • shaz
    shaz about 11 years
    Thanks very much Mikhail, that was the error. I was adding a different instance of UserSession to my map.
  • Nadeem Shaikh
    Nadeem Shaikh almost 4 years
    await() function even blocking the callback response thereby freezing the app