SeqAn3 3.4.0-rc.3
The Modern C++ library for sequence analysis.
Loading...
Searching...
No Matches
buffer_queue.hpp
Go to the documentation of this file.
1// SPDX-FileCopyrightText: 2006-2024 Knut Reinert & Freie Universität Berlin
2// SPDX-FileCopyrightText: 2016-2024 Knut Reinert & MPI für molekulare Genetik
3// SPDX-License-Identifier: BSD-3-Clause
4
10#pragma once
11
12#include <algorithm>
13#include <atomic>
14#include <bit>
15#include <cassert>
16#include <cmath>
17#include <concepts>
18#include <exception>
19#include <mutex>
20#include <ranges>
21#include <seqan3/std/new>
22#include <shared_mutex>
23#include <span>
24#include <type_traits>
25#include <vector>
26
29#include <seqan3/utility/parallel/detail/spin_delay.hpp>
30
31namespace seqan3::contrib
32{
33
35enum class queue_op_status : uint8_t
36{
37 success = 0,
38 empty,
39 full,
40 closed
41};
42
43enum struct buffer_queue_policy : uint8_t
44{
45 fixed,
46 dynamic
47};
48
49// Ringbuffer implementation:
50// The underlying buffer has size (number of actual elements + 1). This is a trick to easily check if the queue is empty
51// or full. Furthermore, the ring buffer uses 4 pointers. The actual push_back and pop_front position as well as
52// the pending push_back and pop_front position. The latter indicates the position that have been advanced concurrently
53// by multiple threads from either end.
54//
55// head: position to read/extract from the queue (first inserted elment) => pop_front_position
56// tail: position where to write to/push new elements to the queue => push_back_position
57// head_read: The actual position after x threads concurrently popped from the queue. => pending_pop_front_position
58// tail_write: The actual position after x threads concurrently pushed to the queue. => pending_push_back_position
59// [ ? ] [ 4 ] [ 3 ] [ 8 ] [ 0 ] [ x ] [ ? ]
60// | ^
61// v |
62// head headRead tail tailWrite
63//
64// valid buffer between [headRead, tail)
65// currently filled [tail, tailWrite)
66// currently removed [head, headRead)
67//
68// State: empty = (head == tail)
69// [ ? ] [ ? ] [ ? ] [ ? ] [ ? ] [ ? ] [ ? ]
70// tail
71// head
72// The head is on the same position as tail.
73// This means that currently no element is in the buffer.
74
75// State: full = (tail + 1 == head)
76// [ 2 ] [ 4 ] [ 3 ] [ ? ] [ 8 ] [ 0 ] [ 7 ]
77// tail
78// head
79// The tail is one position before the head.
80// This means that currently no element can be added to the buffer since it is full.
81// Strategies are to either wait until some elements have been popped or to expand the capacity of the
82// queue by one, inserting the element at the current tail position and moving all elements starting from head one
83// position to the right.
84
85template <std::semiregular value_t,
87 buffer_queue_policy buffer_policy = buffer_queue_policy::dynamic>
88class buffer_queue
89{
90public:
91 using buffer_type = buffer_t;
92 using value_type = typename buffer_type::value_type;
93 using size_type = typename buffer_type::size_type;
94 using reference = void;
95 using const_reference = void;
96
97 // Default constructor sets capacity to 1 (still empty)
98 buffer_queue() : buffer_queue{0u}
99 {}
100 buffer_queue(buffer_queue const &) = delete;
101 buffer_queue(buffer_queue &&) = delete;
102 buffer_queue & operator=(buffer_queue const &) = delete;
103 buffer_queue & operator=(buffer_queue &&) = delete;
104 ~buffer_queue() = default;
105
106 // you can set the initial capacity here
107 explicit buffer_queue(size_type const init_capacity)
108 {
109 buffer.resize(init_capacity + 1);
110 ring_buffer_capacity = std::bit_ceil(buffer.size());
111 }
112
113 template <std::ranges::input_range range_type>
114 requires std::convertible_to<std::ranges::range_value_t<range_type>, value_type>
115 buffer_queue(size_type const init_capacity, range_type && r) : buffer_queue{init_capacity}
116 {
117 std::ranges::copy(r, std::ranges::begin(buffer));
118 }
119
123 template <typename value2_t>
124 requires std::convertible_to<value2_t, value_t>
125 void push(value2_t && value)
126 {
127 detail::spin_delay delay{};
128
129 for (;;)
130 {
131 auto status = try_push(std::forward<value2_t>(value));
132 if (status == queue_op_status::closed)
133 throw queue_op_status::closed;
134 else if (status == queue_op_status::success)
135 return;
136
137 assert(status != queue_op_status::empty);
138 assert(status == queue_op_status::full);
139 delay.wait(); // pause and then try again.
140 }
141 } // throws if closed
142
143 template <typename value2_t>
144 requires std::convertible_to<value2_t, value_t>
145 queue_op_status wait_push(value2_t && value)
146 {
147 detail::spin_delay delay{};
148
149 for (;;)
150 {
151 auto status = try_push(std::forward<value2_t>(value));
152 // wait until queue is not full anymore..
153 if (status != queue_op_status::full)
154 return status;
155
156 assert(status != queue_op_status::empty);
157 assert(status == queue_op_status::full);
158 delay.wait(); // pause and then try again.
159 }
160 }
161
162 value_type value_pop() // throws if closed
163 {
164 detail::spin_delay delay{};
165
166 value_type value{};
167 for (;;)
168 {
169 if (!writer_waiting.load())
170 {
171 auto status = try_pop(value);
172
173 if (status == queue_op_status::closed)
174 throw queue_op_status::closed;
175 else if (status == queue_op_status::success)
176 return value;
177
178 assert(status != queue_op_status::full);
179 assert(status == queue_op_status::empty);
180 }
181 delay.wait(); // pause and then try again.
182 }
183 }
184
185 queue_op_status wait_pop(value_type & value)
186 {
187 detail::spin_delay delay{};
188
189 queue_op_status status;
190 for (;;)
191 {
192 if (!writer_waiting.load())
193 {
194 status = try_pop(value);
195
196 if (status == queue_op_status::closed || status == queue_op_status::success)
197 break;
198
199 assert(status != queue_op_status::full);
200 assert(status == queue_op_status::empty);
201 }
202 delay.wait(); // pause and then try again.
203 }
204 return status;
205 }
207
211 template <typename value2_t>
212 requires std::convertible_to<value2_t, value_t>
213 queue_op_status try_push(value2_t &&);
214
215 queue_op_status try_pop(value_t &);
217
221 void close()
222 {
223 if (writer_waiting.exchange(true)) // First writer that closes the queue will continue, the rest returns.
224 return;
225
226 try
227 {
228 std::unique_lock write_lock{mutex};
229 closed_flag = true;
230 writer_waiting.store(false); // reset the lock.
231 }
232 catch (...)
233 {
234 writer_waiting.store(false); // reset the lock.
236 }
237 }
238
239 bool is_closed() const noexcept
240 {
241 return closed_flag;
242 }
243
244 bool is_empty() const noexcept
245 {
246 std::unique_lock write_lock(mutex);
247 return pop_front_position == push_back_position;
248 }
249
250 bool is_full() const noexcept
251 {
252 std::unique_lock write_lock(mutex);
253 return is_ring_buffer_exhausted(pop_front_position, push_back_position);
254 }
255
256 size_type size() const noexcept
257 {
258 std::unique_lock write_lock(mutex);
259 if (to_buffer_position(pop_front_position) <= to_buffer_position(push_back_position))
260 {
261 return to_buffer_position(push_back_position) - to_buffer_position(pop_front_position);
262 }
263 else
264 {
265 assert(buffer.size() > (to_buffer_position(pop_front_position) - to_buffer_position(push_back_position)));
266 return buffer.size() - (to_buffer_position(pop_front_position) - to_buffer_position(push_back_position));
267 }
268 }
270
271private:
281 constexpr bool is_ring_buffer_exhausted(size_type const from, size_type const to) const
282 {
283 assert(to <= (from + ring_buffer_capacity + 1)); // The tail cannot overwrite the head.
284
285 return to >= from + ring_buffer_capacity;
286 }
287
301 constexpr size_type to_buffer_position(size_type const position) const
302 {
303 return position & (ring_buffer_capacity - 1);
304 }
305
325 size_type cyclic_increment(size_type position)
326 {
327 // invariants:
328 // - ring_buffer_capacity is a power of 2
329 // - (position % ring_buffer_capacity) is in [0, buffer.size())
330 //
331 // return the next greater position that fulfils the invariants
332 if (to_buffer_position(++position) >= buffer.size())
333 position += ring_buffer_capacity - buffer.size(); // If the position reached
334 return position;
335 }
336
337 template <typename value2_t>
338 requires (std::convertible_to<value2_t, value_t>) && (buffer_policy == buffer_queue_policy::fixed)
339 bool overflow(value2_t &&)
340 {
341 return false;
342 }
343
344 template <typename value2_t>
345 requires (std::convertible_to<value2_t, value_t>) && (buffer_policy == buffer_queue_policy::dynamic)
346 bool overflow(value2_t && value);
347
349 buffer_t buffer;
352 alignas(std::hardware_destructive_interference_size) std::atomic<size_type> pending_pop_front_position{0};
354 alignas(std::hardware_destructive_interference_size) std::atomic<size_type> pending_push_back_position{0};
357 alignas(std::hardware_destructive_interference_size) bool closed_flag{false};
358};
359
360// Specifies a fixed size buffer queue.
361template <std::semiregular value_t, sequence_container buffer_t = std::vector<value_t>>
362using fixed_buffer_queue = buffer_queue<value_t, buffer_t, buffer_queue_policy::fixed>;
363
364// Specifies a dynamic size buffer queue (growable).
365template <std::semiregular value_t, sequence_container buffer_t = std::vector<value_t>>
366using dynamic_buffer_queue = buffer_queue<value_t, buffer_t, buffer_queue_policy::dynamic>;
367
368// ============================================================================
369// Metafunctions
370// ============================================================================
371
372// ============================================================================
373// Functions
374// ============================================================================
375
376template <std::semiregular value_t, sequence_container buffer_t, buffer_queue_policy buffer_policy>
377template <typename value2_t>
378 requires (std::convertible_to<value2_t, value_t>) && (buffer_policy == buffer_queue_policy::dynamic)
379inline bool buffer_queue<value_t, buffer_t, buffer_policy>::overflow(value2_t && value)
380{
381 // try to extend capacity
382 std::unique_lock write_lock{mutex};
383
384 size_type old_size = buffer.size();
385 size_type ring_buffer_capacity = this->ring_buffer_capacity;
386 size_type local_front = this->pop_front_position;
387 size_type local_back = this->push_back_position;
388
389 // Expects no pending pushes or pops in unique lock.
390 assert(local_back == this->pending_push_back_position);
391 assert(local_front == this->pending_pop_front_position);
392
393 bool valueWasAppended = false;
394
395 // did we reach the capacity limit (another thread could have done the upgrade already)?
396 // buffer is full if tail_pos + 1 == head_pos
397 if (is_ring_buffer_exhausted(local_front, cyclic_increment(local_back)))
398 {
399 // In case of a full queue write the value into the additional slot.
400 // Note, that the ring-buffer implementation uses one additional field which is not used except
401 // when overflow happens. This invariant is used, to simply check for the full/empty state of the queue.
402 if (old_size != 0)
403 {
404 auto it = std::ranges::begin(buffer) + to_buffer_position(local_back);
405 *it = std::forward<value2_t>(value);
406 local_back = local_front + ring_buffer_capacity;
407 valueWasAppended = true;
408 }
409
410 assert(is_ring_buffer_exhausted(local_front, local_back));
411
412 // get positions of head/tail in current buffer sequence
413 size_type front_buffer_position = to_buffer_position(local_front);
414 size_type back_buffer_position = to_buffer_position(local_back);
415
416 // increase capacity by one and move all elements from current pop_front_position one to the right.
417 buffer.resize(old_size + 1);
418 ring_buffer_capacity = std::bit_ceil(buffer.size());
419 std::ranges::move_backward(std::span{buffer.data() + front_buffer_position, buffer.data() + old_size},
420 buffer.data() + buffer.size());
421
422 // Update the pop_front and push_back positions.
423 if (old_size != 0)
424 {
425 this->pending_pop_front_position = this->pop_front_position = front_buffer_position + 1;
426 this->pending_push_back_position = this->push_back_position = back_buffer_position + ring_buffer_capacity;
427 }
428 this->ring_buffer_capacity = ring_buffer_capacity;
429 }
430 return valueWasAppended;
431}
432
433// ----------------------------------------------------------------------------
434// Function try_pop()
435// ----------------------------------------------------------------------------
436
437/*
438 * @fn ConcurrentQueue#tryPopFront
439 * @headerfile <seqan/parallel.h>
440 * @brief Try to dequeue a value from a queue.
441 *
442 * @signature bool tryPopFront(result, queue[, parallelTag]);
443 *
444 *
445 * @param[in,out] queue A queue.
446 * @param[out] result The dequeued value (if available).
447 * @param[in] parallelTag The concurrency scheme. If multiple threads dequeue values concurrently this tag must be
448 * @link ParallelismTags#Parallel @endlink. The more efficient @link ParallelismTags#Serial
449 * @endlink tag can only be used if one thread calls <tt>popFront</tt> at a time.
450 * Default is @link ParallelismTags#Parallel @endlink.
451 * @return bool Returns <tt>true</tt> if a value could be dequeued and <tt>false</tt> otherwise.
452 */
453template <std::semiregular value_t, sequence_container buffer_t, buffer_queue_policy buffer_policy>
454inline queue_op_status buffer_queue<value_t, buffer_t, buffer_policy>::try_pop(value_t & result)
455{
456 // try to extract a value
457 std::shared_lock read_lock{mutex};
458
459 size_type local_pending_pop_front_position{};
460 size_type next_local_pop_front_position{};
461 detail::spin_delay spinDelay{};
462
463 local_pending_pop_front_position = this->pending_pop_front_position;
464 // wait for queue to become filled
465 while (true)
466 {
467 size_type local_push_back_position = this->push_back_position;
468
469 assert(local_pending_pop_front_position <= local_push_back_position);
470
471 // Check if queue is empty
472 if (local_pending_pop_front_position == local_push_back_position)
473 {
474 return is_closed() ? queue_op_status::closed : queue_op_status::empty;
475 }
476
477 // Get the next ring-buffer position to read from.
478 next_local_pop_front_position = cyclic_increment(local_pending_pop_front_position);
479 // Did another/other thread(s) already acquired this slot?
480 // If yes, try with next position. If not, break and read from aquired position.
481 if (this->pending_pop_front_position.compare_exchange_weak(local_pending_pop_front_position,
482 next_local_pop_front_position))
483 break;
484
485 spinDelay.wait();
486 }
487
488 // Store the value from the aquired read position.
489 result = std::ranges::iter_move(buffer.begin() + to_buffer_position(local_pending_pop_front_position));
490
491 // wait for pending previous reads and synchronize pop_front_position to local_pending_pop_front_position
492 {
493 detail::spin_delay delay{};
494 size_type acquired_slot = local_pending_pop_front_position;
495 while (!this->pop_front_position.compare_exchange_weak(acquired_slot, next_local_pop_front_position))
496 {
497 acquired_slot = local_pending_pop_front_position;
498 delay.wait(); // add adapting delay in case of high contention.
499 }
500 }
501
502 return queue_op_status::success;
503}
504
505// ----------------------------------------------------------------------------
506// Function try_push()
507// ----------------------------------------------------------------------------
508
509/*
510 * @fn ConcurrentQueue#appendValue
511 * @headerfile <seqan/parallel.h>
512 * @brief Enqueue a value to a queue.
513 *
514 * @signature void appendValue(queue, val[, expandTag[, parallelTag]);
515 *
516 *
517 * @param[in,out] queue A queue.
518 * @param[in] val The value to enqueue.
519 * @param[in] expandTag The overflow strategy. If @link OverflowStrategyTags#Generous @endlink the queue will be
520 * automatically resized if the capacity is exceeded, otherwise the thread spinlocks until
521 * the element can be enqueued.
522 * Default is the @link DefaultOverflowImplicit @endlink result for the <tt>queue</tt> type.
523 * @param[in] parallelTag The concurrency scheme. If multiple threads enqueue values concurrently this tag must be
524 * @link ParallelismTags#Parallel @endlink. The more efficient @link ParallelismTags#Serial
525 * @endlink tag can only be used if one thread calls <tt>appendValue</tt> at a time.
526 * Default is @link ParallelismTags#Parallel @endlink.
527 */
528template <std::semiregular value_t, sequence_container buffer_t, buffer_queue_policy buffer_policy>
529template <typename value2_t>
530 requires std::convertible_to<value2_t, value_t>
531inline queue_op_status buffer_queue<value_t, buffer_t, buffer_policy>::try_push(value2_t && value)
532{
533 // try to push the value
534 {
535 detail::spin_delay delay{};
536
537 std::shared_lock read_lock(mutex);
538
539 if (is_closed())
540 return queue_op_status::closed;
541
542 // Current up to date position to push an element to
543 size_type local_pending_push_back_position = this->pending_push_back_position;
544
545 while (true)
546 {
547 // Get the next potential position to write the value too.
548 size_type next_local_push_back_position = cyclic_increment(local_pending_push_back_position);
549 size_type local_pop_front_position = this->pop_front_position;
550
551 // Check if there are enough slots to write to.
552 // If not either wait or try to overflow if it is a dynamic queue.
553 if (is_ring_buffer_exhausted(local_pop_front_position, next_local_push_back_position))
554 break;
555
556 // Did another/other thread(s) acquired the current pending position before this thread
557 // If yes, try again if not, write into acquired slot.
558 if (this->pending_push_back_position.compare_exchange_weak(local_pending_push_back_position,
559 next_local_push_back_position))
560 {
561 // Current thread acquired the local_pending_push_back_position and can now write the value into the
562 // proper slot of the ring buffer.
563 auto it = std::ranges::begin(buffer) + to_buffer_position(local_pending_push_back_position);
564 *it = std::forward<value2_t>(value);
565
566 // wait for pending previous writes and synchronise push_back_position to
567 // local_pending_push_back_position
568 {
569 detail::spin_delay delay{};
570 // the slot this thread acquired to write to
571 size_type acquired_slot = local_pending_push_back_position;
572 while (
573 !this->push_back_position.compare_exchange_weak(acquired_slot, next_local_push_back_position))
574 {
575 acquired_slot = local_pending_push_back_position;
576 delay.wait();
577 }
578 }
579 return queue_op_status::success;
580 }
581
582 delay.wait();
583 }
584 }
585
586 // if possible extend capacity and return.
587 if (overflow(std::forward<value2_t>(value)))
588 {
589 return queue_op_status::success; // always return success, since the queue resizes and cannot be full.
590 }
591
592 // We could not extend the queue so it must be full.
593 return queue_op_status::full;
594}
596} // namespace seqan3::contrib
Provides various transformation traits used by the range module.
T current_exception(T... args)
T empty(T... args)
T fixed(T... args)
constexpr std::size_t hardware_destructive_interference_size
Minimum offset between two objects to avoid false sharing.
Definition new:54
seqan::stl::ranges::to to
Converts a range to a container. <dl class="no-api">This entity is not part of the SeqAn API....
Definition to.hpp:23
constexpr size_t size
The size of a type pack.
Definition type_pack/traits.hpp:143
A more refined container concept than seqan3::container.
The <new> header from C++17's standard library.
T rethrow_exception(T... args)
Adaptations of concepts from the standard library.
Hide me