Asynchronously writing to a file in c++ unix

12,289

Solution 1

Before going into asynchronous writing, if you are using IOStreams you might want to try to avoid flushing the stream accidentally, e.g., by not using std::endl but rather using '\n' instead. Since writing to IOStreams is buffered this can improve performance quite a bit.

If that's not sufficient, the next question is how the data is written. If there is a lot of formatting going on, there is a chance that the actual formatting takes most of the time. You might be able to push the formatting off into a separate thread but that's quite different from merely passing off writing a couple of bytes to another thread: you'd need to pass on a suitable data structure holding the data to be formatted. What is suitable depends on what you are actually writing, though.

Finally, if writing the buffers to a file is really the bottleneck and you want to stick with the standard C++ library, it may be reasonable to have a writer thread which listens on a queue filled with buffers from a suitable stream buffer and writes the buffers to an std::ofstream: the producer interface would be an std::ostream which would send off probably fixed sized buffers either when the buffer is full or when the stream is flushed (for which I'd use std::flush explicitly) to a queue on which the other read listens. Below is a quick implementation of that idea using only standard library facilities:

#include <condition_variable>
#include <fstream>
#include <mutex>
#include <queue>
#include <streambuf>
#include <string>
#include <thread>
#include <vector>

struct async_buf
    : std::streambuf
{
    std::ofstream                 out;
    std::mutex                    mutex;
    std::condition_variable       condition;
    std::queue<std::vector<char>> queue;
    std::vector<char>             buffer;
    bool                          done;
    std::thread                   thread;

    void worker() {
        bool local_done(false);
        std::vector<char> buf;
        while (!local_done) {
            {
                std::unique_lock<std::mutex> guard(this->mutex);
                this->condition.wait(guard,
                                     [this](){ return !this->queue.empty()
                                                   || this->done; });
                if (!this->queue.empty()) {
                    buf.swap(queue.front());
                    queue.pop();
                }
                local_done = this->queue.empty() && this->done;
            }
            if (!buf.empty()) {
                out.write(buf.data(), std::streamsize(buf.size()));
                buf.clear();
            }
        }
        out.flush();
    }

public:
    async_buf(std::string const& name)
        : out(name)
        , buffer(128)
        , done(false)
        , thread(&async_buf::worker, this) {
        this->setp(this->buffer.data(),
                   this->buffer.data() + this->buffer.size() - 1);
    }
    ~async_buf() {
        std::unique_lock<std::mutex>(this->mutex), (this->done = true);
        this->condition.notify_one();
        this->thread.join();
    }
    int overflow(int c) {
        if (c != std::char_traits<char>::eof()) {
            *this->pptr() = std::char_traits<char>::to_char_type(c);
            this->pbump(1);
        }
        return this->sync() != -1
            ? std::char_traits<char>::not_eof(c): std::char_traits<char>::eof();
    }
    int sync() {
        if (this->pbase() != this->pptr()) {
            this->buffer.resize(std::size_t(this->pptr() - this->pbase()));
            {
                std::unique_lock<std::mutex> guard(this->mutex);
                this->queue.push(std::move(this->buffer));
            }
            this->condition.notify_one();
            this->buffer = std::vector<char>(128);
            this->setp(this->buffer.data(),
                       this->buffer.data() + this->buffer.size() - 1);
        }
        return 0;
    }

};

int main()
{
    async_buf    sbuf("async.out");
    std::ostream astream(&sbuf);
    std::ifstream in("async_stream.cpp");
    for (std::string line; std::getline(in, line); ) {
        astream << line << '\n' << std::flush;
    }
}

Solution 2

Search the web for "double buffering."

In general, one thread will write to one or more buffers. Another thread reads from the buffers, "chasing" the writing thread.

This may not make your program more efficient. Efficiency with files is achieved by writing in huge blocks so that the drive doesn't get a chance to spin down. One write of many bytes is more efficient than many writes of a few bytes.

This could be achieved by having the writing thread only write when the buffer content has exceeded some threshold like 1k.

Also research the topic of "spooling" or "print spooling".

You'll need to use C++11 since previous versions don't have threading support in the standard library. I don't know why you limit yourself, since Boost has some good stuff in it.

Share:
12,289
Andrew Spott
Author by

Andrew Spott

Broadly Skilled Data Geek.

Updated on June 04, 2022

Comments

  • Andrew Spott
    Andrew Spott almost 2 years

    I have some long loop that I need to write some data to a file on every iteration. The problem is that writing to a file can be slow, so I would like to reduce the time this takes by doing the writing asynchronously.

    Does anyone know a good way to do this? Should I be creating a thread that consumes whatever is put into it's buffer by writing it out ( in this case, a single producer, single consumer )?

    I am interested mostly in solutions that don't involve anything but the standard library (C++11).

  • Andrew Spott
    Andrew Spott over 10 years
    Is std::ofstream::write (for writing binary data) buffered?
  • Dietmar Kühl
    Dietmar Kühl over 10 years
    AndrewSpott: with the file stream's default setup it is buffered. You can disable buffering for file streams by calling stream.rdbuf()->setbuf(0, 0).
  • zangw
    zangw over 8 years
    If I want to flush the buffer when the buffer is full, should I invoke flush manually or anything else?
  • zangw
    zangw over 8 years
    when I just remove std::flush from write data into astream, it seems when the buffer is full, only 128 bytes are written into file?
  • Dietmar Kühl
    Dietmar Kühl over 8 years
    @zangw: When the buffer is full, it should flush automatically: overflow() is called when a character is written for which there is no more space in the buffer [based on what the stream knows: there is one more character space to stick the argument to overflow() in]. If you want to send data without the buffer being full, you'll need to flush (when the stream is destroyed it'll flush). The implementation above chops the data up into units of 128 bytes. The constant can be changed, of course (I haven't profiled the code to see which size makes most sense).
  • zangw
    zangw over 8 years
    @DietmarKühl I try to enlarge the size of the buffer to 8192, when I try to write more than 8192 bytes, only 8192 bytes are written to file. I want to flush this buffer is full, then reuse it for more bytes to be written, is it possible?
  • Dietmar Kühl
    Dietmar Kühl over 8 years
    @zangw: well, sure. The design of the class above writes a buffer and hands it off to another thread when it is full, using a new buffer to write to. There could be a queue of available buffers (prefer to use these and create a new one if none is available) but that's not implemented. You can also grow the buffer in overflow() and only sent it when flushed explicitly (i.e., when sync() is called).
  • zangw
    zangw over 8 years
    @DietmarKühl, could I just use one buffer to do that rather than a queue of buffers? if this buffer is full, then call sync(), and reuse this buffer for next write operation. here is link of my codes change, Could you please help me to check it? I am not familiar with some APIs of streambuf, and I try to study them, please forgive me for some mistakes.
  • Dietmar Kühl
    Dietmar Kühl over 8 years
    @zangw: First off, the code at this link looks remarkably like my code in the answer above. However, it does not carry the appropriate Creative Commons license (see, e.g., this meta article) or any attribution. Both are required. You'd need to talk to an attorney to determine the implications of copyright violations.
  • Dietmar Kühl
    Dietmar Kühl over 8 years
    @zwang: next you seem to confuse this site with some source of free labor which it is not. If you want someone to review your code, you'd need to put it, e.g., on codereview, possibly pointing at it from a comment over here to draw attention of people interested in this question at your code review. If you have concrete questions how something works you can possibly ask over here.
  • Dietmar Kühl
    Dietmar Kühl over 8 years
    @zangw: With respect to the concrete question asked above: you should create a question, not ask in a comment. .. and the short answer is: whether you can use one buffer has little to do with stream buffers but rather with how you synchronize the access to the buffer between the two threads. If you make sure that the threads don't touch the same bytes in the one buffer things would be OK. If you end up writing a byte in on thread which is accessed unsynchronized in another thread, you have undefined behavior.
  • Mast
    Mast over 8 years
    @DietmarKühl Code Review would require actual code in the question. I don't see any here.
  • Dietmar Kühl
    Dietmar Kühl over 8 years
    @Mast: The code I'm referring to was posted to here at github (see the link in zangw's earlier comment). That's also the code infringing the copyright granted for my answer above.
  • zangw
    zangw over 8 years
    @DietmarKühl, I am very sorry to infringe the copyright of your answer, and I have delete those codes from github. Thank you very much for this link, I will following this rule in future.
  • Andreas W. Wylach
    Andreas W. Wylach about 7 years
    @DietmarKühl To actually flush the m_out input stream, eg. to pass-thru the std::flush used on astream I needed to add a flush() call right after the write in the thread loop. I am not sure if this is really intended, but if async_buf would be used on a logger stream where multiple threads write on, a flush is helpful to have a real time output. I see async_buf as a good base to have a thread-safe stream. I been playing with your code a little (using in a logger class) as I think this idea is neat.
  • Mooing Duck
    Mooing Duck over 4 years
    When I tried this, my files kept ending too short. It appears there's a bug where when destroyed, the currently filling buffer is not added to the queue in time, leading to the last buffer being silently dropped. Should the destructor call sync() before setting done?