Shared-memory IPC synchronization (lock-free)
Boost Interprocess has support for Shared Memory.
Boost Lockfree has a Single-Producer Single-Consumer queue type (spsc_queue
). This is basically what you refer to as a circular buffer.
Here's a demonstration that passes IPC messages (in this case, of type string
) using this queue, in a lock-free fashion.
Defining the types
First, let's define our types:
namespace bip = boost::interprocess;
namespace shm
{
template <typename T>
using alloc = bip::allocator<T, bip::managed_shared_memory::segment_manager>;
using char_alloc = alloc<char>;
using shared_string = bip::basic_string<char, std::char_traits<char>, char_alloc >;
using string_alloc = alloc<shared_string>;
using ring_buffer = boost::lockfree::spsc_queue<
shared_string,
boost::lockfree::capacity<200>
// alternatively, pass
// boost::lockfree::allocator<string_alloc>
>;
}
For simplicity I chose to demo the runtime-size spsc_queue
implementation, randomly requesting a capacity of 200 elements.
The shared_string
typedef defines a string that will transparently allocate from the shared memory segment, so they are also "magically" shared with the other process.
The consumer side
This is the simplest, so:
int main()
{
// create segment and corresponding allocator
bip::managed_shared_memory segment(bip::open_or_create, "MySharedMemory", 65536);
shm::string_alloc char_alloc(segment.get_segment_manager());
shm::ring_buffer *queue = segment.find_or_construct<shm::ring_buffer>("queue")();
This opens the shared memory area, locates the shared queue if it exists. NOTE This should be synchronized in real life.
Now for the actual demonstration:
while (true)
{
std::this_thread::sleep_for(std::chrono::milliseconds(10));
shm::shared_string v(char_alloc);
if (queue->pop(v))
std::cout << "Processed: '" << v << "'\n";
}
The consumer just infinitely monitors the queue for pending jobs and processes one each ~10ms.
The Producer side
The producer side is very similar:
int main()
{
bip::managed_shared_memory segment(bip::open_or_create, "MySharedMemory", 65536);
shm::char_alloc char_alloc(segment.get_segment_manager());
shm::ring_buffer *queue = segment.find_or_construct<shm::ring_buffer>("queue")();
Again, add proper synchronization to the initialization phase. Also, you would probably make the producer in charge of freeing the shared memory segment in due time. In this demonstration, I just "let it hang". This is nice for testing, see below.
So, what does the producer do?
for (const char* s : { "hello world", "the answer is 42", "where is your towel" })
{
std::this_thread::sleep_for(std::chrono::milliseconds(250));
queue->push({s, char_alloc});
}
}
Right, the producer produces precisely 3 messages in ~750ms and then exits.
Note that consequently if we do (assume a POSIX shell with job control):
./producer& ./producer& ./producer&
wait
./consumer&
Will print 3x3 messages "immediately", while leaving the consumer running. Doing
./producer& ./producer& ./producer&
again after this, will show the messages "trickle in" in realtime (in burst of 3 at ~250ms intervals) because the consumer is still running in the background
See the full code online in this gist: https://gist.github.com/sehe/9376856
Ben
Updated on June 05, 2022Comments
-
Ben almost 2 years
Consider the following scenario:
Requirements:
- Intel x64 Server (multiple CPU-sockets => NUMA)
- Ubuntu 12, GCC 4.6
- Two processes sharing large amounts of data over (named) shared-memory
- Classical producer-consumer scenario
- Memory is arranged in a circular buffer (with M elements)
Program sequence (pseudo code):
Process A (Producer):
int bufferPos = 0; while( true ) { if( isBufferEmpty( bufferPos ) ) { writeData( bufferPos ); setBufferFull( bufferPos ); bufferPos = ( bufferPos + 1 ) % M; } }
Process B (Consumer):
int bufferPos = 0; while( true ) { if( isBufferFull( bufferPos ) ) { readData( bufferPos ); setBufferEmpty( bufferPos ); bufferPos = ( bufferPos + 1 ) % M; } }
Now the age-old question: How to synchronize them effectively!?
- Protect every read/write access with mutexes
- Introduce a "grace period", to allow writes to complete: Read data in buffer N, when buffer(N+3) has been marked as full (dangerous, but seems to work...)
- ?!?
Ideally I would like something along the lines of a memory-barrier, that guarantees that all previous reads/writes are visible across all CPUs, along the lines of:
writeData( i ); MemoryBarrier(); //All data written and visible, set flag setBufferFull( i );
This way, I would only have to monitor the buffer flags and then could read the large data chunks safely.
Generally I'm looking for something along the lines of acquire/release fences as described by Preshing here:
http://preshing.com/20130922/acquire-and-release-fences/
(if I understand it correctly the C++11 atomics only work for threads of a single process and not along multiple processes.)
However the GCC-own memory barriers (__sync_synchronize in combination with the compiler barrier asm volatile( "" ::: "memory" ) to be sure) don't seem to work as expected, as writes become visible after the barrier, when I expected them to be completed.
Any help would be appreciated...
BTW: Under windows this just works fine using volatile variables (a Microsoft specific behaviour)...
-
sehe about 10 yearsJust noticed gcc 4.6 mentioned. It's possible that this version doesn't have uniform inializers. Just explicitly call the constructor for
shared_string
then when pushing onto the queue. -
Ben about 10 yearsThank you for your detailed answer. It is always fascinating to see how even complex problems come down to 100 lines of code in boost ;-) However my question still remains. Is there a construct of some kind that enforces memory visibility (something similar to smp_mb(), that is only available in the kernel)). Or in other words, how do mutexes enforce memory visibility?
-
sehe about 10 years@Ben It's 54 lines of code (not counting makefile). I've just pushed a c++03 update to the gist. The Lockfree library uses atomics in the underlying implementation, so I trust it to employ the right barriers. I would be surprised if the fact that memory pages are shared has any impact on the visibility semantics. Here's some notes about Interprocess support in Boost Lockfree documentation
-
sehe about 10 yearsIf you're looking for a shared mutex (named mutex on Win32), see Mutex or Synchronization mechanisms overview in the Boost Interprocess docs.
-
Ben about 10 yearsOk, thank you again for your answer. I did take a look into the boost interprocess and atomics sources and only encountered "familiar" barrier implementations. So I still don't understand why the memory barrier (__sync_synchronize) does not work as expected in my implementation. However, as this goes beyond my original question, I will mark your (excellent) answer as accepted :-)