xenium
kirsch_bounded_kfifo_queue.hpp
1 //
2 // Copyright (c) 2018-2020 Manuel Pöter.
3 // Licensed under the MIT License. See LICENSE file in the project root for full license information.
4 //
5 
6 #ifndef XENIUM_KIRSCH_BOUNDED_KFIFO_QUEUE_HPP
7 #define XENIUM_KIRSCH_BOUNDED_KFIFO_QUEUE_HPP
8 
9 #include <xenium/marked_ptr.hpp>
10 #include <xenium/parameter.hpp>
11 #include <xenium/policy.hpp>
12 #include <xenium/utils.hpp>
13 
14 #include <xenium/detail/pointer_queue_traits.hpp>
15 
16 #include <algorithm>
17 #include <atomic>
18 #include <cstdint>
19 
20 #ifdef _MSC_VER
21 #pragma warning(push)
22 #pragma warning(disable: 26495) // uninitialized member variable
23 #endif
24 
25 namespace xenium {
45  template <class T, class... Policies>
47  private:
48  using traits = detail::pointer_queue_traits_t<T, Policies...>;
49  using raw_value_type = typename traits::raw_type;
50  public:
51  using value_type = T;
52  static constexpr unsigned padding_bytes = parameter::value_param_t<unsigned, policy::padding_bytes, sizeof(raw_value_type), Policies...>::value;
53 
54  kirsch_bounded_kfifo_queue(uint64_t k, uint64_t num_segments);
56 
59 
60  kirsch_bounded_kfifo_queue& operator= (const kirsch_bounded_kfifo_queue&) = delete;
62 
69  bool try_push(value_type value);
70 
77  bool try_pop(value_type& result);
78 
79  private:
81 
82  struct padded_entry {
83  std::atomic<marked_value> value;
84  // we use max here to avoid arrays of size zero which are not allowed by Visual C++
85  char padding[std::max(padding_bytes, 1u)];
86  };
87 
88  struct unpadded_entry {
89  std::atomic<marked_value> value;
90  };
91 
92  using entry = std::conditional_t<padding_bytes == 0, unpadded_entry, padded_entry>;
93 
94  public:
98  static constexpr std::size_t entry_size = sizeof(entry);
99 
100  private:
101  struct marked_idx {
102  marked_idx() = default;
103  marked_idx(uint64_t val, uint64_t mark) noexcept { val_ = val | (mark << bits); }
104 
105  uint64_t get() const noexcept { return val_ & val_mask; }
106  uint64_t mark() const noexcept { return val_ >> bits; }
107  bool operator==(const marked_idx& other) const noexcept { return this->val_ == other.val_; }
108  bool operator!=(const marked_idx& other) const noexcept { return this->val_ != other.val_; }
109  private:
110  static constexpr unsigned bits = 16;
111  static constexpr uint64_t val_mask = (static_cast<uint64_t>(1) << bits) - 1;
112  uint64_t val_ = 0;
113  };
114 
115  template <bool Empty>
116  bool find_index(uint64_t start_index, uint64_t& index, marked_value& old);
117  bool queue_full(const marked_idx& head_old, const marked_idx& tail_old) const;
118  bool segment_empty(const marked_idx& head_old) const;
119  bool not_in_valid_region(uint64_t tail_old, uint64_t tail_current, uint64_t head_current) const;
120  bool in_valid_region(uint64_t tail_old, uint64_t tail_current, uint64_t head_current) const;
121  bool committed(const marked_idx& tail_old, marked_value new_value, uint64_t index);
122 
123  std::uint64_t queue_size_;
124  std::size_t k_;
125  // all operations on head/tail are synchronized via the value operations and
126  // can therefore use memory_order_relaxed.
127  std::atomic<marked_idx> head_;
128  std::atomic<marked_idx> tail_;
129  std::unique_ptr<entry[]> queue_;
130  };
131 
132  template <class T, class... Policies>
133  kirsch_bounded_kfifo_queue<T, Policies...>::kirsch_bounded_kfifo_queue(uint64_t k, uint64_t num_segments) :
134  queue_size_(k * num_segments),
135  k_(k),
136  head_(),
137  tail_(),
138  queue_(new entry[k * num_segments]())
139  {}
140 
141  template <class T, class... Policies>
142  kirsch_bounded_kfifo_queue<T, Policies...>::~kirsch_bounded_kfifo_queue() {
143  for (unsigned i = 0; i < queue_size_; ++i)
144  traits::delete_value(queue_[i].value.load(std::memory_order_relaxed).get());
145  }
146 
147  template <class T, class... Policies>
149  if (value == nullptr)
150  throw std::invalid_argument("value can not be nullptr");
151 
152  raw_value_type raw_value = traits::get_raw(value);
153  for (;;) {
154  marked_idx tail_old = tail_.load(std::memory_order_relaxed);
155  marked_idx head_old = head_.load(std::memory_order_relaxed);
156 
157  uint64_t idx;
158  marked_value old_value;
159  bool found_idx = find_index<true>(tail_old.get(), idx, old_value);
160  if (tail_old != tail_.load(std::memory_order_relaxed))
161  continue;
162 
163  if (found_idx) {
164  assert(old_value.get() == nullptr);
165  const marked_value new_value(raw_value, old_value.mark() + 1);
166  // (1) - this release-CAS synchronizes with the acquire-load (3, 4)
167  if (queue_[idx].value.compare_exchange_strong(
168  old_value, new_value, std::memory_order_release, std::memory_order_relaxed) &&
169  committed(tail_old, new_value, idx)) {
170  traits::release(value);
171  return true;
172  }
173  } else {
174  if (queue_full(head_old, tail_old)) {
175  if (segment_empty(head_old)) {
176  // increment head by k
177  marked_idx new_head((head_old.get() + k_) % queue_size_, head_old.mark() + 1);
178  head_.compare_exchange_strong(head_old, new_head, std::memory_order_relaxed);
179  } else if (head_old == head_.load(std::memory_order_relaxed)) {
180  // queue is full
181  return false;
182  }
183  }
184  // increment tail by k
185  marked_idx new_tail((tail_old.get() + k_) % queue_size_, tail_old.mark() + 1);
186  tail_.compare_exchange_strong(tail_old, new_tail, std::memory_order_relaxed);
187  }
188  }
189  }
190 
191  template <class T, class... Policies>
193  for (;;) {
194  marked_idx head_old = head_.load(std::memory_order_relaxed);
195  marked_idx tail_old = tail_.load(std::memory_order_relaxed);
196 
197  uint64_t idx;
198  marked_value old_value;
199  bool found_idx = find_index<false>(head_old.get(), idx, old_value);
200  if (head_old != head_.load(std::memory_order_relaxed))
201  continue;
202 
203  if (found_idx) {
204  assert(old_value.get() != nullptr);
205  if (head_old.get() == tail_old.get()) {
206  marked_idx new_tail((tail_old.get() + k_) % queue_size_, tail_old.mark() + 1);
207  tail_.compare_exchange_strong(tail_old, new_tail, std::memory_order_relaxed);
208  }
209  marked_value new_value(nullptr, old_value.mark() + 1);
210  // (2) - this release-CAS synchronizes with the acquire-load (3, 4)
211  if (queue_[idx].value.compare_exchange_strong(
212  old_value, new_value, std::memory_order_release, std::memory_order_relaxed)) {
213  traits::store(result, old_value.get());
214  return true;
215  }
216  } else {
217  if (head_old.get() == tail_old.get() && tail_old == tail_.load(std::memory_order_relaxed))
218  return false;
219 
220  marked_idx new_head((head_old.get() + k_) % queue_size_, head_old.mark() + 1);
221  head_.compare_exchange_strong(head_old, new_head, std::memory_order_relaxed);
222  }
223  }
224  }
225 
226  template <class T, class... Policies>
227  template <bool Empty>
229  uint64_t start_index, uint64_t& value_index, marked_value& old)
230  {
231  const uint64_t random_index = utils::random() % k_;
232  for (size_t i = 0; i < k_; i++) {
233  // TODO - this can be simplified if queue_size is a multiple of k!
234  uint64_t index = (start_index + ((random_index + i) % k_)) % queue_size_;
235  // (3) - this acquire-load synchronizes-with the release-CAS (1, 2)
236  old = queue_[index].value.load(std::memory_order_acquire);
237  if ((Empty && old.get() == nullptr) || (!Empty && old.get() != nullptr)) {
238  value_index = index;
239  return true;
240  }
241  }
242  return false;
243  }
244 
245  template <class T, class... Policies>
246  bool kirsch_bounded_kfifo_queue<T, Policies...>::committed(
247  const marked_idx& tail_old, marked_value value, uint64_t index)
248  {
249  if (queue_[index].value.load(std::memory_order_relaxed) != value)
250  return true;
251 
252  marked_idx tail_current = tail_.load(std::memory_order_relaxed);
253  marked_idx head_current = head_.load(std::memory_order_relaxed);
254  if (in_valid_region(tail_old.get(), tail_current.get(), head_current.get())) {
255  return true;
256  } else if (not_in_valid_region(tail_old.get(), tail_current.get(), head_current.get())) {
257  marked_value new_value(nullptr, value.mark() + 1);
258  if (!queue_[index].value.compare_exchange_strong(value, new_value, std::memory_order_relaxed))
259  return true;
260  } else {
261  marked_idx new_head(head_current.get(), head_current.mark() + 1);
262  if (head_.compare_exchange_strong(head_current, new_head, std::memory_order_relaxed))
263  return true;
264 
265  marked_value new_value(nullptr, value.mark() + 1);
266  if (!queue_[index].value.compare_exchange_strong(value, new_value, std::memory_order_relaxed))
267  return true;
268  }
269  return false;
270  }
271 
272  template <class T, class... Policies>
273  bool kirsch_bounded_kfifo_queue<T, Policies...>::queue_full(
274  const marked_idx& head_old, const marked_idx& tail_old) const
275  {
276  if (((tail_old.get() + k_) % queue_size_) == head_old.get() &&
277  (head_old == head_.load(std::memory_order_relaxed)))
278  return true;
279  return false;
280  }
281 
282  template <class T, class... Policies>
283  bool kirsch_bounded_kfifo_queue<T, Policies...>::segment_empty(const marked_idx& head_old) const {
284  const uint64_t start = head_old.get();
285  for (size_t i = 0; i < k_; i++) {
286  // TODO - this can be simplified if queue_size is a multiple of k!
287  // (4) - this acquire-load synchronizes-with the release-CAS (1, 2)
288  if (queue_[(start + i) % queue_size_].value.load(std::memory_order_acquire).get() != nullptr)
289  return false;
290  }
291  return true;
292  }
293 
294  template <class T, class... Policies>
295  bool kirsch_bounded_kfifo_queue<T, Policies...>::in_valid_region(uint64_t tail_old,
296  uint64_t tail_current, uint64_t head_current) const
297  {
298  bool wrap_around = tail_current < head_current;
299  if (!wrap_around)
300  return head_current < tail_old && tail_old <= tail_current;
301  return head_current < tail_old || tail_old <= tail_current;
302  }
303 
304  template <class T, class... Policies>
305  bool kirsch_bounded_kfifo_queue<T, Policies...>::not_in_valid_region(uint64_t tail_old,
306  uint64_t tail_current, uint64_t head_current) const
307  {
308  bool wrap_around = tail_current < head_current;
309  if (!wrap_around)
310  return tail_old < tail_current || head_current < tail_old;
311  return tail_old < tail_current && head_current < tail_old;
312  }
313 }
314 #ifdef _MSC_VER
315 #pragma warning(pop)
316 #endif
317 
318 #endif
xenium::marked_ptr
A pointer with an embedded mark/tag value.
Definition: marked_ptr.hpp:41
xenium::kirsch_bounded_kfifo_queue::try_push
bool try_push(value_type value)
Tries to push a new element to the queue. Progress guarantees: lock-free.
Definition: kirsch_bounded_kfifo_queue.hpp:148
xenium::marked_ptr::mark
uintptr_t mark() const noexcept
Get the mark value.
Definition: marked_ptr.hpp:70
xenium::policy::padding_bytes
Policy to configure the number of padding bytes to add to each entry in kirsch_kfifo_queue and kirsch...
Definition: policy.hpp:117
xenium::kirsch_bounded_kfifo_queue::try_pop
bool try_pop(value_type &result)
Definition: kirsch_bounded_kfifo_queue.hpp:192
xenium::marked_ptr::get
T * get() const noexcept
Get underlying pointer (with mark bits stripped off).
Definition: marked_ptr.hpp:77
xenium::kirsch_bounded_kfifo_queue::entry_size
static constexpr std::size_t entry_size
Provides the effective size of a single queue entry (including padding).
Definition: kirsch_bounded_kfifo_queue.hpp:98
xenium::kirsch_bounded_kfifo_queue
A bounded lock-free multi-producer/multi-consumer k-FIFO queue.
Definition: kirsch_bounded_kfifo_queue.hpp:46