6 #ifndef XENIUM_KIRSCH_KFIFO_QUEUE_HPP
7 #define XENIUM_KIRSCH_KFIFO_QUEUE_HPP
9 #include <xenium/marked_ptr.hpp>
10 #include <xenium/parameter.hpp>
11 #include <xenium/policy.hpp>
12 #include <xenium/utils.hpp>
14 #include <xenium/detail/pointer_queue_traits.hpp>
42 template <
class T,
class... Policies>
45 using traits = detail::pointer_queue_traits_t<T, Policies...>;
46 using raw_value_type =
typename traits::raw_type;
49 using reclaimer = parameter::type_param_t<
policy::reclaimer, parameter::nil, Policies...>;
50 static constexpr
unsigned padding_bytes = parameter::value_param_t<unsigned,
policy::padding_bytes,
sizeof(raw_value_type), Policies...>::value;
52 static_assert(parameter::is_set<reclaimer>::value,
"reclaimer policy must be specified");
54 template <
class... NewPolicies>
72 void push(value_type value);
80 [[nodiscard]]
bool try_pop(value_type& result);
86 std::atomic<marked_value> value;
88 char padding[std::max(padding_bytes, 1u)];
91 struct unpadded_entry {
92 std::atomic<marked_value> value;
94 using entry = std::conditional_t<padding_bytes == 0, unpadded_entry, padded_entry>;
105 struct segment_deleter {
106 void operator()(segment* seg)
const { release_segment(seg); }
108 struct segment : reclaimer::template enable_concurrent_ptr<segment, 16, segment_deleter> {
109 using concurrent_ptr =
typename reclaimer::template concurrent_ptr<segment, 16>;
111 explicit segment(uint64_t k) : k(k) {}
113 for (
unsigned i = 0; i < k; ++i) {
114 assert(items()[i].value.load(std::memory_order_relaxed).get() ==
nullptr);
118 void delete_remaining_items() {
119 for (
unsigned i = 0; i < k; ++i) {
120 traits::delete_value(items()[i].value.load(std::memory_order_relaxed).get());
121 items()[i].value.store(
nullptr, std::memory_order_relaxed);
125 entry* items() noexcept {
return reinterpret_cast<entry*
>(
this + 1); }
127 std::atomic<bool> deleted{
false};
129 concurrent_ptr next{};
132 using concurrent_ptr =
typename segment::concurrent_ptr;
133 using marked_ptr =
typename concurrent_ptr::marked_ptr;
134 using guard_ptr =
typename concurrent_ptr::guard_ptr;
136 segment* alloc_segment()
const;
137 static void release_segment(segment* seg);
139 template <
bool Empty>
140 bool find_index(marked_ptr segment, uint64_t& value_index, marked_value& old)
const noexcept;
141 void advance_head(guard_ptr& head_current, marked_ptr tail_current) noexcept;
142 void advance_tail(marked_ptr tail_old) noexcept;
143 bool committed(marked_ptr segment, marked_value value, uint64_t index) noexcept;
145 const std::size_t k_;
146 concurrent_ptr head_;
147 concurrent_ptr tail_;
150 template <
class T,
class... Policies>
151 kirsch_kfifo_queue<T, Policies...>::kirsch_kfifo_queue(uint64_t k) :
154 const auto seg = alloc_segment();
155 head_.store(seg, std::memory_order_relaxed);
156 tail_.store(seg, std::memory_order_relaxed);
159 template <
class T,
class... Policies>
160 kirsch_kfifo_queue<T, Policies...>::~kirsch_kfifo_queue() {
161 auto seg = head_.load(std::memory_order_relaxed).get();
163 auto next = seg->next.load(std::memory_order_relaxed).get();
164 seg->delete_remaining_items();
165 release_segment(seg);
170 template <
class T,
class... Policies>
171 auto kirsch_kfifo_queue<T, Policies...>::alloc_segment() const -> segment* {
172 void* data = ::operator
new(
sizeof(segment) + k_ *
sizeof(entry));
173 auto result =
new(data) segment(k_);
174 for (std::size_t i = 0; i < k_; ++i)
175 new(&result->items()[i]) entry();
179 template <
class T,
class... Policies>
180 void kirsch_kfifo_queue<T, Policies...>::release_segment(segment* seg) {
182 ::operator
delete(seg);
185 template <
class T,
class... Policies>
187 if (value ==
nullptr)
188 throw std::invalid_argument(
"value cannot be nullptr");
190 raw_value_type raw_value = traits::get_raw(value);
194 tail_old.acquire(tail_, std::memory_order_acquire);
200 bool found_idx = find_index<true>(tail_old, idx, old_value);
201 if (tail_old != tail_.load(std::memory_order_relaxed))
207 if (tail_old->items()[idx].value.compare_exchange_strong(old_value, new_value,
208 std::memory_order_release, std::memory_order_relaxed) &&
209 committed(tail_old, new_value, idx)) {
210 traits::release(value);
215 advance_tail(tail_old);
220 template <
class T,
class... Policies>
225 head_old.acquire(head_, std::memory_order_acquire);
226 auto h = head_old.get();
230 bool found_idx = find_index<false>(head_old, idx, old_value);
231 if (head_old != head_.load(std::memory_order_relaxed))
235 marked_ptr tail_old = tail_.load(std::memory_order_acquire);
237 assert(old_value.
get() != (
void*)0x100);
238 if (head_old.get() == tail_old.get())
239 advance_tail(tail_old);
243 if (head_old->items()[idx].value.compare_exchange_strong(old_value, new_value,
244 std::memory_order_acquire, std::memory_order_relaxed)) {
245 traits::store(result, old_value.
get());
249 if (head_old.get() == tail_old.get() && tail_old == tail_.load(std::memory_order_relaxed))
251 advance_head(head_old, tail_old);
256 template <
class T,
class... Policies>
257 template <
bool Empty>
259 marked_ptr segment, uint64_t& value_index, marked_value& old)
const noexcept
261 const uint64_t k = segment->k;
262 const uint64_t random_index = utils::random() % k;
263 for (
size_t i = 0; i < k; i++) {
264 uint64_t index = ((random_index + i) % k);
265 old = segment->items()[index].value.load(std::memory_order_relaxed);
266 if ((Empty && old.get() ==
nullptr) || (!Empty && old.get() !=
nullptr)) {
274 template <
class T,
class... Policies>
275 bool kirsch_kfifo_queue<T, Policies...>::committed(
276 marked_ptr segment, marked_value value, uint64_t index) noexcept
278 if (value != segment->items()[index].value.load(std::memory_order_relaxed))
281 const marked_value empty_value(
nullptr, value.mark() + 1);
283 if (segment->deleted.load(std::memory_order_relaxed) ==
true) {
285 return !segment->items()[index].value.compare_exchange_strong(value, empty_value, std::memory_order_relaxed);
289 marked_ptr head_current = head_.load(std::memory_order_acquire);
290 if (segment.get() == head_current.get()) {
292 marked_ptr new_head(head_current.get(), head_current.mark() + 1);
294 if (head_.compare_exchange_strong(head_current, new_head, std::memory_order_relaxed))
300 return !segment->items()[index].value.compare_exchange_strong(value, empty_value, std::memory_order_relaxed);
303 if (segment->deleted.load(std::memory_order_relaxed) ==
false) {
309 return !segment->items()[index].value.compare_exchange_strong(value, empty_value, std::memory_order_relaxed);
313 template <
class T,
class... Policies>
314 void kirsch_kfifo_queue<T, Policies...>::advance_head(guard_ptr& head_current, marked_ptr tail_current) noexcept {
316 const marked_ptr head_next_segment = head_current->next.load(std::memory_order_acquire);
317 if (head_current != head_.load(std::memory_order_relaxed))
320 if (head_current.get() == tail_current.get()) {
322 const marked_ptr tail_next_segment = tail_current->next.load(std::memory_order_acquire);
323 if (tail_next_segment.get() ==
nullptr)
326 if (tail_current == tail_.load(std::memory_order_relaxed)) {
327 marked_ptr new_tail(tail_next_segment.get(), tail_current.mark() + 1);
329 tail_.compare_exchange_strong(tail_current, new_tail, std::memory_order_release, std::memory_order_relaxed);
333 head_current->deleted.store(
true, std::memory_order_relaxed);
335 marked_ptr expected = head_current;
336 marked_ptr new_head(head_next_segment.get(), head_current.mark() + 1);
338 if (head_.compare_exchange_strong(expected, new_head, std::memory_order_release, std::memory_order_relaxed)) {
339 head_current.reclaim();
343 template <
class T,
class... Policies>
344 void kirsch_kfifo_queue<T, Policies...>::advance_tail(marked_ptr tail_current) noexcept {
346 marked_ptr next_segment = tail_current->next.load(std::memory_order_acquire);
347 if (tail_current != tail_.load(std::memory_order_relaxed))
350 if (next_segment.get() !=
nullptr) {
351 marked_ptr new_tail(next_segment.get(), next_segment.mark() + 1);
353 tail_.compare_exchange_strong(tail_current, new_tail, std::memory_order_release, std::memory_order_relaxed);
355 auto seg = alloc_segment();
356 const marked_ptr new_segment(seg, next_segment.mark() + 1);
359 if (tail_current->next.compare_exchange_strong(next_segment, new_segment,
360 std::memory_order_release, std::memory_order_relaxed)) {
361 marked_ptr new_tail(seg, tail_current.mark() + 1);
363 tail_.compare_exchange_strong(tail_current, new_tail, std::memory_order_release, std::memory_order_relaxed);
365 release_segment(seg);