Background
A single-producer single-consumer queue is a FIFO buffer that acts as a message passing mechanism between two threads (the producer and the consumer). This can be especially useful when a thread produces (or receives) data which should then be passed onto another thread for processing.
An example usage of such queue would be for asynchronous logging: if we have a latency-sensitive section of code and we wish to log something, we should pass it to another background thread which deals with string formatting and I/O. Like this, the only cost incurred in the critical path is just push the contents of the log onto the queue.
Naturally, in a multi-threaded environment, we may decide to have multiple producers and multiple consumers passing data around. However, this requires a more careful implementation to avoid data races and often, using locks which can lead to lower performance (due to context switches). Although implementations without using mutexes exist (via atomic RMWs), they are not technically lock-free nor trivial to implement.
On the other hand, if we have exactly 2 threads (one writer and one reader) sharing the queue, it is possible to implement a wait-free SPSC queue using only atomic loads and stores (no RMW loops!).
A wait-free algorithm means that all the threads in the system make progress regardless of contention and the operations are executed in a finite number of steps.
This is largely why SPSC queues are commonly found in high-throughput multi-threaad systems: they are very fast and fairly easy to implement.
V1: A simple wait-free SPSC queue
We will implement the queue as a circular ring buffer using std::vector
as the underlying container. We also need to keep track of the head
and tail
fields, which are atomically updated. The queue will have a bounded capacity, which is specified on construction.
template <typename T>
class SpscQueue {
private:
std::vector<T> buffer_;
std::atomic<size_t> head_;
std::atomic<size_t> tail_;
public:
SpscQueue(size_t cap) : buffer_(cap + 1), head_(0), tail_(0) {
assert(cap + 1 > 0); // prevent overflow
}
...
}
Notice that we are actually providing the capacity plus one to the vector. The reason for this is that we require one item in the queue to distinguish when the queue is empty or full, such that our empty/full conditions are given by:
- Is empty?
head_ == tail_
- Is full?
head_ == (tail_ + 1) % queue_size
Given this information, we can implement our enqueue and dequeue methods. The code is given first and then explained:
bool enqueue(const T& item) {
const size_t t = tail_.load(std::memory_order_relaxed);
const size_t next_t = (t + 1) % buffer_.size();
if (next_t == head_.load(std::memory_order_acquire)) {
return false;
}
buffer_[t] = item;
tail_.store(next_t, std::memory_order_release);
return true;
}
bool dequeue(T& item) {
const size_t h = head_.load(std::memory_order_relaxed);
if (h == tail_.load(std::memory_order_acquire)) {
return false;
}
item = buffer_[h];
const size_t next_h = (h + 1) % buffer_.size();
head_.store(next_h, std::memory_order_release);
return true;
}
As you can see, there are no atomic RMW loops or blocking operations, so both methods are guaranteed to be wait-free and lock-free.
Let's start with enqueue()
. We first check that the queue is not full by comparing the next possible tail index with the current head index. We use std::memory_order_acquire
to synchronise with possible updates to the head_
field by the consumer thread calling dequeue()
.
We then store the item into the queue at the current tail and update the tail_
field atomically to point to the next location. Because we want to synchronise with the dequeue()
method, we use std::memory_order_release
so that our local changes are visible to the other thread. This also happens to be our linearisation point, ie the point at which the enqueue()
method appears to take effect.
The reasoning for the dequeue()
method is symmetric, but instead we check that the queue is not empty and update the head_
field atomically.
Initially, I tried implementing the queue using only sequentially consistent atomic operations, however the performance is much worse (nearly x4 lower throughput). I recommend you benchmark an SC-only version of the queue to see how much slower it can be (due to memory barriers).
We can now benchmark this unoptimised implementation with a simple program consisting of two threads passing 100 million integers from one to the other, with a queue size of 100k. We measure how long it takes for the consumer thread to read all of the items.
We also pin the consumer and producer threads to different physical cores in our benchmark program. We will see why has makes an important point in the next section.
We compile the benchmark with O3 optimisations and -march=native
, and we obtain the following results (run over 10 iterations):
Mean: 46,247,967 elems/s
Median: 46,477,813 elems/s
Min: 44,271,890 elems/s
Max: 46,672,184 elems/s
We are processing around 46 million elements per second, which is quite good, but we can do even better.
V2: Eliminating false sharing
Going back to our class definition, we see that our head and tail fields are defined contiguously in memory. This can lead to false sharing, where thread 1 modifies one of the fields and this invalidates thread 2's cache line because both fields lie on the same cache line. This forces thread 2 to go to main memory to fetch the other field even though thread 1 did not change it.
We can easily fix this by adding enough padding between the two fields such that they end up falling on different cache lines. On most modern processors, an L1 cache line is typically 64 bytes, so we can align both fields at 64 bytes each. We can do this as follows in C++:
class SpscQueue {
private:
std::vector<T> buffer_;
alignas(64) std::atomic<size_t> head_;
alignas(64) std::atomic<size_t> tail_;
...
}
Our queue should no longer suffer from false sharing! Our new results show a slight improvement:
Mean: 49,357,127 elems/s
Median: 49,245,343 elems/s
Min: 47,540,700 elems/s
Max: 50,012,641 elems/s
V3: A cache-optimised queue
The final but substantial optimisation aims to improve cache usage and reduce cache misses (inspired by this).
Consider the case when the consumer calls dequeue()
to read an item:
- The
head_
needs to be updated, so that cache line is loaded into the L1 cache in an exclusive state. - The
tail_
needs to be read to check that the queue is not empty, so its cache line is loaded into the L1 cache in a shared state.
Now, assume the producer calls enqueue()
to push a new item:
3. The head_
needs to be read (to check that it is not full), so it causes the consumer's cache line containing head_
to transition into a shared state.
4. This causes cache coherency traffic, as the consumer needs to bring back the head_
cache line into an exclusive state.
A symmetric situation occurs the other way round, meaning that in the worst case, there will be one cache line transition from a shared state to an exclusive state for every read and write. In the MESI cache coherency protocol, these transitions are considered cache misses and produce bus traffic.
To reduce bus traffic, the producer and consumer threads will each have their own cached copies of the head and tail indices which can be used to avoid having to always load the head_
or tail_
when checking if the queue is empty or full.
Essentially, when the consumer first observes that N items are available to read, it caches this information and the N-1 subsequent reads won’t need to read the tail_
. Similarly when the producer first observes that N items are available for writing, it caches this information and the N-1 subsequent writes won’t need to read the head_
.
Our cache-friendly implementation looks like this now:
bool enqueue(const T& item) {
const size_t t = tail_.load(std::memory_order_relaxed);
const size_t next_t = (t + 1) % buffer_.size();
// Use the cached head first instead of loading the actual head from memory.
// If they are equal, then we know that the queue may be full, so only then load
// the actual value of head to check if currently full.
if (next_t == head_cached_) {
head_cached_ = head_.load(std::memory_order_acquire);
if (next_t == head_cached_) {
return false;
}
}
buffer_[t] = item;
tail_.store(next_t, std::memory_order_release);
return true;
}
bool dequeue(T& item) {
const size_t h = head_.load(std::memory_order_relaxed);
// Use the cached tail first instead of loading the actual tail from memory.
// If they are equal, then we know that the queue may be empty, so only then load
// the actual value of tail to check if currently full.
if (h == tail_cached_) {
tail_cached_ = tail_.load(std::memory_order_acquire);
if (h == tail_cached_) {
return false;
}
}
item = buffer_[h];
const size_t next_h = (h + 1) % buffer_.size();
head_.store(next_h, std::memory_order_release);
return true;
}
Compiling and running our benchmark again with our new queue implementation, we obtain:
Mean: 101,138,856 elems/s
Median: 101,566,018 elems/s
Min: 97,563,021 elems/s
Max: 102,558,883 elems/s
We have more than doubled our initial throughput, reaching 100 million items per second. A fantastic improvement! Obviously, the results shown throughout this post are dependent on your hardware, but we have clearly optimised the queue, starting from a simple implementation and finishing with a cache-efficient queue.
The code for the queue and the benchmark can be found here.
Closing thoughts
We have gone from considering a naive SPSC queue using sequentially consistent operations, introducing weaker memory orderings and improving cache efficiency by understanding the hardware, to obtain a high-throughput low-latency message passing queue.
I find this process of benchmarking and tuning very interesting, especially when seeing that your initial theories drawn up on paper can be translated into big performance boosts in practice.
There are further micro-optimisations you can try and profile, such as ensuring the buffer size is a power of two (in order to avoid performing integer division), using huge pages to reduce TLB misses during virtual address translation, etc.