C - How to use both aio_read() and aio_write()

11,281

Solution 1

If you wish to have separate AIO queues for reads and writes, so that a write issued later can execute before a read issued earlier, then you can use dup() to create a duplicate of the socket, and use one to issue reads and the other to issue writes.

However, I second the recommendations to avoid AIO entirely and simply use an epoll()-driven event loop with non-blocking sockets. This technique has been shown to scale to high numbers of clients - if you are getting high CPU usage, profile it and find out where that's happening, because the chances are that it's not your event loop that's the culprit.

Solution 2

First of all, consider dumping aio. There are lots of other ways to do asynchronous I/O that are not as braindead (yes, aio is breaindead). Lots of alternatives; if you're on linux you can use libaio (io_submit and friends). aio(7) mentions this.

Back to your question.
I haven't used aio in a long time but here's what I remember. aio_read and aio_write both put requests (aiocb) on some queue. They return immediately even if the requests will complete some time later. It's entirely possible to queue multiple requests without caring what happened to the earlier ones. So, in a nutshell: stop cancelling read requests and keep adding them.

/* populate read_aiocb */
rc = aio_read(&read_aiocb);

/* time passes ... */
/* populate write_aiocb */
rc = aio_write(&write_aiocb)

Later you're free to wait using aio_suspend, poll using aio_error, wait for signals etc.

I see you mention epoll in your comment. You should definitely go for libaio.

Solution 3

Unless I'm not mistaken, POSIX AIO (that is, aio_read(), aio_write() and so on) is guaranteed to work only on seekable file descriptors. From the aio_read() manpage:

   The  data  is  read starting at the absolute file offset aiocbp->aio_offset, regardless of the
   current file position.  After this request, the value of the current file position is unspeci‐
   fied.

For devices which do not have an associated file position such as network sockets, AFAICS, POSIX AIO is undefined. Perhaps it happens to work on your current setup, but that seems more by accident than by design.

Also, on Linux, POSIX AIO is implemented in glibc with the help of userspace threads.

That is, where possible use non-blocking IO and epoll(). However, epoll() does not work for seekable file descriptors such as regular files (same goes for the classical select()/poll() as well); in that case POSIX AIO is an alternative to rolling your own thread pool.

Solution 4

There should be no reason to stop or cancel an aio read or write request just because you need to make another read or write. If that were the case, that would defeat the whole point of asynchronous reading and writing since it's main purpose is to allow you to setup a reading or writing operation, and then move on. Since multiple requests can be queued, it would be much better to setup a couple of asynchronous reader/writer pools where you can grab a set of pre-initialized aiocb structures from an "available" pool that have been setup for asynchronous operations whenever you need them, and then return them to another "finished" pool when they're done and you can access the buffers they point to. While they're in the middle of an asynchronous read or write, they would be in a "busy" pool and wouldn't be touched. That way you won't have to keep creating aiocb structures on the heap dynamically every time you need to make a read or write operation, although that's okay to-do ... it's just not very efficient if you never plan on going over a certain limit, or plan to have only a certain number of "in-flight" requests.

BTW, keep in mind with a couple different in-flight asynchronous requests that your asychronous read/write handler can actually be interrupted by another read/write event. So you really don't want to be doing a whole-lot with your handler. In the above scenario I described, your handler would basically move the aiocb struct that triggered the signal handler from one of the pools to the next in the listed "available"->"busy"->"finished" stages. Your main code, after reading from the buffer pointed to by the aiocb structures in the "finished" pool would then move the structure back to the "available" pool.

Share:
11,281
Slaus
Author by

Slaus

Updated on June 14, 2022

Comments

  • Slaus
    Slaus almost 2 years

    I implement game server where I need to both read and write. So I accept incoming connection and start reading from it using aio_read() but when I need to send something, I stop reading using aio_cancel() and then use aio_write(). Within write's callback I resume reading. So, I do read all the time but when I need to send something - I pause reading.

    It works for ~20% of time - in other case call to aio_cancel() fails with "Operation now in progress" - and I cannot cancel it (even within permanent while cycle). So, my added write operation never happens.

    How to use these functions well? What did I missed?

    EDIT: Used under Linux 2.6.35. Ubuntu 10 - 32 bit.

    Example code:

    void handle_read(union sigval sigev_value) { /* handle data or disconnection */ }
    void handle_write(union sigval sigev_value) { /* free writing buffer memory */ }
    void start()
    {
        const int acceptorSocket = socket(AF_INET, SOCK_STREAM, 0);
        struct sockaddr_in addr;
    memset(&addr, 0, sizeof(struct sockaddr_in));
    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = INADDR_ANY;
    addr.sin_port = htons(port);
        bind(acceptorSocket, (struct sockaddr*)&addr, sizeof(struct sockaddr_in));
    
        listen(acceptorSocket, SOMAXCONN);
    
        struct sockaddr_in address;
    socklen_t addressLen = sizeof(struct sockaddr_in);
    
        for(;;)
        {
             const int incomingSocket = accept(acceptorSocket, (struct sockaddr*)&address, &addressLen);
             if(incomingSocket == -1)
             { /* handle error ... */}
             else
             {
                  //say socket to append outcoming messages at writing:
                  const int currentFlags = fcntl(incomingSocket, F_GETFL, 0);
                  if(currentFlags < 0) { /* handle error ... */ }
                  if(fcntl(incomingSocket, F_SETFL, currentFlags | O_APPEND) == -1) { /* handle another error ... */ }
    
                  //start reading:
                  struct aiocb* readingAiocb = new struct aiocb;
                  memset(readingAiocb, 0, sizeof(struct aiocb));
                  readingAiocb->aio_nbytes = MY_SOME_BUFFER_SIZE;
                  readingAiocb->aio_fildes = socketDesc;
                  readingAiocb->aio_buf = mySomeReadBuffer;
                  readingAiocb->aio_sigevent.sigev_notify = SIGEV_THREAD;
                  readingAiocb->aio_sigevent.sigev_value.sival_ptr = (void*)mySomeData;
                  readingAiocb->aio_sigevent.sigev_notify_function = handle_read;
                  if(aio_read(readingAiocb) != 0) { /* handle error ... */ }
              }
        }
    }
    
    //called at any time from server side:
    send(void* data, const size_t dataLength)
    {
        //... some thread-safety precautions not needed here ...
    
        const int cancellingResult = aio_cancel(socketDesc, readingAiocb);
        if(cancellingResult != AIO_CANCELED)
        {
            //this one happens ~80% of the time - embracing previous call to permanent while cycle does not help:
            if(cancellingResult == AIO_NOTCANCELED)
            {
                puts(strerror(aio_return(readingAiocb))); // "Operation now in progress"
                /* don't know what to do... */
            }
        }
        //otherwise it's okay to send:
        else
        {
            aio_write(...);
        }
    }
    
  • Slaus
    Slaus almost 13 years
    Thank you. What is about "read-write-read-write" chain? Will I be able to, e.g., write twice without reading between them? Currently I face real queue - I can't write without waiting for read_handler (I DO aio_write(), but it does nothing until aio_read() will do it's job - read at least one byte from client). I will look at libaio.
  • Slaus
    Slaus almost 13 years
    lse.sourceforge.net/io/aio.html - is this about libaio? If so, it has "AIO read and write on sockets" clause within "What Does Not Work?" menu. Did I missed something?
  • cnicutar
    cnicutar almost 13 years
    @Slav see the link to io_submit in the answer.
  • MK.
    MK. almost 13 years
    "The current Linux POSIX AIO implementation is provided in userspace by glibc." WTF? How does one come to an idea of implementing something like this in userland? I mean, isn't it clear that everyone is better off w/o this implemented than with something that spawns userland threads?
  • janneb
    janneb almost 13 years
    @MK: POSIX AIO is not an alternative to non-blocking IO and select()/poll()/epoll(); it's an alternative to rolling your own thread pool when doing IO on regular files. See my answer.
  • Slaus
    Slaus almost 13 years
    I used dup() and it worked. Thank you. So, now I have both implementations - will test them on live project and compare (network part is implemented as standalone module which can be switched at compile-time).
  • jcoffland
    jcoffland almost 4 years
    @Slav is right. libaio is synchronous for sockets.