xenium
kirsch_kfifo_queue.hpp
1 //
2 // Copyright (c) 2018-2020 Manuel Pöter.
3 // Licensed under the MIT License. See LICENSE file in the project root for full license information.
4 //
5 
6 #ifndef XENIUM_KIRSCH_KFIFO_QUEUE_HPP
7 #define XENIUM_KIRSCH_KFIFO_QUEUE_HPP
8 
9 #include <xenium/marked_ptr.hpp>
10 #include <xenium/parameter.hpp>
11 #include <xenium/policy.hpp>
12 #include <xenium/utils.hpp>
13 
14 #include <xenium/detail/pointer_queue_traits.hpp>
15 
16 #include <algorithm>
17 #include <atomic>
18 #include <cstdint>
19 
20 namespace xenium {
42  template <class T, class... Policies>
44  private:
45  using traits = detail::pointer_queue_traits_t<T, Policies...>;
46  using raw_value_type = typename traits::raw_type;
47  public:
48  using value_type = T;
49  using reclaimer = parameter::type_param_t<policy::reclaimer, parameter::nil, Policies...>;
50  static constexpr unsigned padding_bytes = parameter::value_param_t<unsigned, policy::padding_bytes, sizeof(raw_value_type), Policies...>::value;
51 
52  static_assert(parameter::is_set<reclaimer>::value, "reclaimer policy must be specified");
53 
54  template <class... NewPolicies>
55  using with = kirsch_kfifo_queue<T, NewPolicies..., Policies...>;
56 
57  kirsch_kfifo_queue(uint64_t k);
59 
60  kirsch_kfifo_queue(const kirsch_kfifo_queue&) = delete;
62 
63  kirsch_kfifo_queue& operator= (const kirsch_kfifo_queue&) = delete;
64  kirsch_kfifo_queue& operator= (kirsch_kfifo_queue&&) = delete;
65 
72  void push(value_type value);
73 
80  [[nodiscard]] bool try_pop(value_type& result);
81 
82  private:
84 
85  struct padded_entry {
86  std::atomic<marked_value> value;
87  // we use max here to avoid arrays of size zero which are not allowed by Visual C++
88  char padding[std::max(padding_bytes, 1u)];
89  };
90 
91  struct unpadded_entry {
92  std::atomic<marked_value> value;
93  };
94  using entry = std::conditional_t<padding_bytes == 0, unpadded_entry, padded_entry>;
95 
96  public:
100  static constexpr std::size_t entry_size = sizeof(entry);
101 
102  private:
103  struct segment;
104 
105  struct segment_deleter {
106  void operator()(segment* seg) const { release_segment(seg); }
107  };
108  struct segment : reclaimer::template enable_concurrent_ptr<segment, 16, segment_deleter> {
109  using concurrent_ptr = typename reclaimer::template concurrent_ptr<segment, 16>;
110 
111  explicit segment(uint64_t k) : k(k) {}
112  ~segment() {
113  for (unsigned i = 0; i < k; ++i) {
114  assert(items()[i].value.load(std::memory_order_relaxed).get() == nullptr);
115  }
116  }
117 
118  void delete_remaining_items() {
119  for (unsigned i = 0; i < k; ++i) {
120  traits::delete_value(items()[i].value.load(std::memory_order_relaxed).get());
121  items()[i].value.store(nullptr, std::memory_order_relaxed);
122  }
123  }
124 
125  entry* items() noexcept { return reinterpret_cast<entry*>(this + 1); }
126 
127  std::atomic<bool> deleted{false};
128  const uint64_t k;
129  concurrent_ptr next{};
130  };
131 
132  using concurrent_ptr = typename segment::concurrent_ptr;
133  using marked_ptr = typename concurrent_ptr::marked_ptr;
134  using guard_ptr = typename concurrent_ptr::guard_ptr;
135 
136  segment* alloc_segment() const;
137  static void release_segment(segment* seg);
138 
139  template <bool Empty>
140  bool find_index(marked_ptr segment, uint64_t& value_index, marked_value& old) const noexcept;
141  void advance_head(guard_ptr& head_current, marked_ptr tail_current) noexcept;
142  void advance_tail(marked_ptr tail_old) noexcept;
143  bool committed(marked_ptr segment, marked_value value, uint64_t index) noexcept;
144 
145  const std::size_t k_;
146  concurrent_ptr head_;
147  concurrent_ptr tail_;
148  };
149 
150  template <class T, class... Policies>
151  kirsch_kfifo_queue<T, Policies...>::kirsch_kfifo_queue(uint64_t k) :
152  k_(k)
153  {
154  const auto seg = alloc_segment();
155  head_.store(seg, std::memory_order_relaxed);
156  tail_.store(seg, std::memory_order_relaxed);
157  }
158 
159  template <class T, class... Policies>
160  kirsch_kfifo_queue<T, Policies...>::~kirsch_kfifo_queue() {
161  auto seg = head_.load(std::memory_order_relaxed).get();
162  while (seg) {
163  auto next = seg->next.load(std::memory_order_relaxed).get();
164  seg->delete_remaining_items();
165  release_segment(seg);
166  seg = next;
167  }
168  }
169 
170  template <class T, class... Policies>
171  auto kirsch_kfifo_queue<T, Policies...>::alloc_segment() const -> segment* {
172  void* data = ::operator new(sizeof(segment) + k_ * sizeof(entry));
173  auto result = new(data) segment(k_);
174  for (std::size_t i = 0; i < k_; ++i)
175  new(&result->items()[i]) entry();
176  return result;
177  }
178 
179  template <class T, class... Policies>
180  void kirsch_kfifo_queue<T, Policies...>::release_segment(segment* seg) {
181  seg->~segment();
182  ::operator delete(seg);
183  }
184 
185  template <class T, class... Policies>
187  if (value == nullptr)
188  throw std::invalid_argument("value cannot be nullptr");
189 
190  raw_value_type raw_value = traits::get_raw(value);
191  guard_ptr tail_old;
192  for (;;) {
193  // (1) - this acquire-load synchronizes-with the release-CAS (9, 12, 14)
194  tail_old.acquire(tail_, std::memory_order_acquire);
195 
196  // TODO - local linearizability
197 
198  uint64_t idx = 0;
199  marked_value old_value;
200  bool found_idx = find_index<true>(tail_old, idx, old_value);
201  if (tail_old != tail_.load(std::memory_order_relaxed))
202  continue;
203 
204  if (found_idx) {
205  const marked_value new_value(raw_value, old_value.mark() + 1);
206  // (2) - this release-CAS synchronizes-with the acquire-CAS (5)
207  if (tail_old->items()[idx].value.compare_exchange_strong(old_value, new_value,
208  std::memory_order_release, std::memory_order_relaxed) &&
209  committed(tail_old, new_value, idx)) {
210  traits::release(value);
211  // TODO - local linearizability
212  return;
213  }
214  } else {
215  advance_tail(tail_old);
216  }
217  }
218  }
219 
220  template <class T, class... Policies>
222  guard_ptr head_old;
223  for (;;) {
224  // (3) - this acquire-load synchronizes-with the release-CAS (10)
225  head_old.acquire(head_, std::memory_order_acquire);
226  auto h = head_old.get();
227  (void)h;
228  uint64_t idx = 0;
229  marked_value old_value;
230  bool found_idx = find_index<false>(head_old, idx, old_value);
231  if (head_old != head_.load(std::memory_order_relaxed))
232  continue;
233 
234  // (4) - this acquire-load synchronizes-with the release-CAS (9, 12, 14)
235  marked_ptr tail_old = tail_.load(std::memory_order_acquire);
236  if (found_idx) {
237  assert(old_value.get() != (void*)0x100);
238  if (head_old.get() == tail_old.get())
239  advance_tail(tail_old);
240 
241  const marked_value new_value(nullptr, old_value.mark() + 1);
242  // (5) - this acquire-CAS synchronizes-with the release-CAS (2)
243  if (head_old->items()[idx].value.compare_exchange_strong(old_value, new_value,
244  std::memory_order_acquire, std::memory_order_relaxed)) {
245  traits::store(result, old_value.get());
246  return true;
247  }
248  } else {
249  if (head_old.get() == tail_old.get() && tail_old == tail_.load(std::memory_order_relaxed))
250  return false; // queue is empty
251  advance_head(head_old, tail_old);
252  }
253  }
254  }
255 
256  template <class T, class... Policies>
257  template <bool Empty>
259  marked_ptr segment, uint64_t& value_index, marked_value& old) const noexcept
260  {
261  const uint64_t k = segment->k;
262  const uint64_t random_index = utils::random() % k;
263  for (size_t i = 0; i < k; i++) {
264  uint64_t index = ((random_index + i) % k);
265  old = segment->items()[index].value.load(std::memory_order_relaxed);
266  if ((Empty && old.get() == nullptr) || (!Empty && old.get() != nullptr)) {
267  value_index = index;
268  return true;
269  }
270  }
271  return false;
272  }
273 
274  template <class T, class... Policies>
275  bool kirsch_kfifo_queue<T, Policies...>::committed(
276  marked_ptr segment, marked_value value, uint64_t index) noexcept
277  {
278  if (value != segment->items()[index].value.load(std::memory_order_relaxed))
279  return true;
280 
281  const marked_value empty_value(nullptr, value.mark() + 1);
282 
283  if (segment->deleted.load(std::memory_order_relaxed) == true) {
284  // Insert tail segment has been removed, but we are fine if element still has been removed.
285  return !segment->items()[index].value.compare_exchange_strong(value, empty_value, std::memory_order_relaxed);
286  }
287 
288  // (6) - this acquire-load synchronizes-with the release-CAS (10)
289  marked_ptr head_current = head_.load(std::memory_order_acquire);
290  if (segment.get() == head_current.get()) {
291  // Insert tail segment is now head.
292  marked_ptr new_head(head_current.get(), head_current.mark() + 1);
293  // This relaxed-CAS is part of a release sequence headed by (10)
294  if (head_.compare_exchange_strong(head_current, new_head, std::memory_order_relaxed))
295  // We are fine if we can update head and thus fail any concurrent
296  // advance_head attempts.
297  return true;
298 
299  // We are fine if element still has been removed.
300  return !segment->items()[index].value.compare_exchange_strong(value, empty_value, std::memory_order_relaxed);
301  }
302 
303  if (segment->deleted.load(std::memory_order_relaxed) == false) {
304  // Insert tail segment still not deleted.
305  return true;
306  } else {
307  // Head and tail moved beyond this segment. Try to remove the item.
308  // We are fine if element still has been removed.
309  return !segment->items()[index].value.compare_exchange_strong(value, empty_value, std::memory_order_relaxed);
310  }
311  }
312 
313  template <class T, class... Policies>
314  void kirsch_kfifo_queue<T, Policies...>::advance_head(guard_ptr& head_current, marked_ptr tail_current) noexcept {
315  // (7) - this acquire-load synchronizes-with the release-CAS (13)
316  const marked_ptr head_next_segment = head_current->next.load(std::memory_order_acquire);
317  if (head_current != head_.load(std::memory_order_relaxed))
318  return;
319 
320  if (head_current.get() == tail_current.get()) {
321  // (8) - this acquire-load synchronizes-with the release-CAS (13)
322  const marked_ptr tail_next_segment = tail_current->next.load(std::memory_order_acquire);
323  if (tail_next_segment.get() == nullptr)
324  return;
325 
326  if (tail_current == tail_.load(std::memory_order_relaxed)) {
327  marked_ptr new_tail(tail_next_segment.get(), tail_current.mark() + 1);
328  // (9) - this release-CAS synchronizes-with the acquire-load (1, 4)
329  tail_.compare_exchange_strong(tail_current, new_tail, std::memory_order_release, std::memory_order_relaxed);
330  }
331  }
332 
333  head_current->deleted.store(true, std::memory_order_relaxed);
334 
335  marked_ptr expected = head_current;
336  marked_ptr new_head(head_next_segment.get(), head_current.mark() + 1);
337  // (10) - this release-CAS synchronizes-with the acquire-load (3, 6)
338  if (head_.compare_exchange_strong(expected, new_head, std::memory_order_release, std::memory_order_relaxed)) {
339  head_current.reclaim();
340  }
341  }
342 
343  template <class T, class... Policies>
344  void kirsch_kfifo_queue<T, Policies...>::advance_tail(marked_ptr tail_current) noexcept {
345  // (11) - this acquire-load synchronizes-with the release-CAS (13)
346  marked_ptr next_segment = tail_current->next.load(std::memory_order_acquire);
347  if (tail_current != tail_.load(std::memory_order_relaxed))
348  return;
349 
350  if (next_segment.get() != nullptr) {
351  marked_ptr new_tail(next_segment.get(), next_segment.mark() + 1);
352  // (12) - this release-CAS synchronizes-with the acquire-load (1, 4)
353  tail_.compare_exchange_strong(tail_current, new_tail, std::memory_order_release, std::memory_order_relaxed);
354  } else {
355  auto seg = alloc_segment();
356  const marked_ptr new_segment(seg, next_segment.mark() + 1);
357  // TODO - insert own value to simplify push?
358  // (13) - this release-CAS synchronizes-with the acquire-load (7, 8, 11)
359  if (tail_current->next.compare_exchange_strong(next_segment, new_segment,
360  std::memory_order_release, std::memory_order_relaxed)) {
361  marked_ptr new_tail(seg, tail_current.mark() + 1);
362  // (14) - this release-CAS synchronizes-with the acquire-load (1, 4)
363  tail_.compare_exchange_strong(tail_current, new_tail, std::memory_order_release, std::memory_order_relaxed);
364  } else {
365  release_segment(seg);
366  }
367  }
368  }
369 }
370 #endif
xenium::marked_ptr
A pointer with an embedded mark/tag value.
Definition: marked_ptr.hpp:41
xenium::kirsch_kfifo_queue
An unbounded lock-free multi-producer/multi-consumer k-FIFO queue.
Definition: kirsch_kfifo_queue.hpp:43
xenium::marked_ptr::mark
uintptr_t mark() const noexcept
Get the mark value.
Definition: marked_ptr.hpp:70
xenium::kirsch_kfifo_queue::push
void push(value_type value)
Definition: kirsch_kfifo_queue.hpp:186
xenium::policy::padding_bytes
Policy to configure the number of padding bytes to add to each entry in kirsch_kfifo_queue and kirsch...
Definition: policy.hpp:117
xenium::policy::reclaimer
Policy to configure the reclamation scheme to be used.
Definition: policy.hpp:25
xenium::kirsch_kfifo_queue::entry_size
static constexpr std::size_t entry_size
Provides the effective size of a single queue entry (including padding).
Definition: kirsch_kfifo_queue.hpp:100
xenium::marked_ptr::get
T * get() const noexcept
Get underlying pointer (with mark bits stripped off).
Definition: marked_ptr.hpp:77
xenium::kirsch_kfifo_queue::try_pop
bool try_pop(value_type &result)
Definition: kirsch_kfifo_queue.hpp:221