6 #ifndef XENIUM_KIRSCH_BOUNDED_KFIFO_QUEUE_HPP
7 #define XENIUM_KIRSCH_BOUNDED_KFIFO_QUEUE_HPP
9 #include <xenium/marked_ptr.hpp>
10 #include <xenium/parameter.hpp>
11 #include <xenium/policy.hpp>
12 #include <xenium/utils.hpp>
14 #include <xenium/detail/pointer_queue_traits.hpp>
22 #pragma warning(disable: 26495) // uninitialized member variable
45 template <
class T,
class... Policies>
48 using traits = detail::pointer_queue_traits_t<T, Policies...>;
49 using raw_value_type =
typename traits::raw_type;
52 static constexpr
unsigned padding_bytes = parameter::value_param_t<unsigned,
policy::padding_bytes,
sizeof(raw_value_type), Policies...>::value;
77 bool try_pop(value_type& result);
83 std::atomic<marked_value> value;
85 char padding[std::max(padding_bytes, 1u)];
88 struct unpadded_entry {
89 std::atomic<marked_value> value;
92 using entry = std::conditional_t<padding_bytes == 0, unpadded_entry, padded_entry>;
102 marked_idx() =
default;
103 marked_idx(uint64_t val, uint64_t mark) noexcept { val_ = val | (mark << bits); }
105 uint64_t get()
const noexcept {
return val_ & val_mask; }
106 uint64_t mark()
const noexcept {
return val_ >> bits; }
107 bool operator==(
const marked_idx& other)
const noexcept {
return this->val_ == other.val_; }
108 bool operator!=(
const marked_idx& other)
const noexcept {
return this->val_ != other.val_; }
110 static constexpr
unsigned bits = 16;
111 static constexpr uint64_t val_mask = (
static_cast<uint64_t
>(1) << bits) - 1;
115 template <
bool Empty>
116 bool find_index(uint64_t start_index, uint64_t& index, marked_value& old);
117 bool queue_full(
const marked_idx& head_old,
const marked_idx& tail_old)
const;
118 bool segment_empty(
const marked_idx& head_old)
const;
119 bool not_in_valid_region(uint64_t tail_old, uint64_t tail_current, uint64_t head_current)
const;
120 bool in_valid_region(uint64_t tail_old, uint64_t tail_current, uint64_t head_current)
const;
121 bool committed(
const marked_idx& tail_old, marked_value new_value, uint64_t index);
123 std::uint64_t queue_size_;
127 std::atomic<marked_idx> head_;
128 std::atomic<marked_idx> tail_;
129 std::unique_ptr<entry[]> queue_;
132 template <
class T,
class... Policies>
133 kirsch_bounded_kfifo_queue<T, Policies...>::kirsch_bounded_kfifo_queue(uint64_t k, uint64_t num_segments) :
134 queue_size_(k * num_segments),
138 queue_(new entry[k * num_segments]())
141 template <
class T,
class... Policies>
142 kirsch_bounded_kfifo_queue<T, Policies...>::~kirsch_bounded_kfifo_queue() {
143 for (
unsigned i = 0; i < queue_size_; ++i)
144 traits::delete_value(queue_[i].value.load(std::memory_order_relaxed).get());
147 template <
class T,
class... Policies>
149 if (value ==
nullptr)
150 throw std::invalid_argument(
"value can not be nullptr");
152 raw_value_type raw_value = traits::get_raw(value);
154 marked_idx tail_old = tail_.load(std::memory_order_relaxed);
155 marked_idx head_old = head_.load(std::memory_order_relaxed);
159 bool found_idx = find_index<true>(tail_old.get(), idx, old_value);
160 if (tail_old != tail_.load(std::memory_order_relaxed))
164 assert(old_value.
get() ==
nullptr);
167 if (queue_[idx].value.compare_exchange_strong(
168 old_value, new_value, std::memory_order_release, std::memory_order_relaxed) &&
169 committed(tail_old, new_value, idx)) {
170 traits::release(value);
174 if (queue_full(head_old, tail_old)) {
175 if (segment_empty(head_old)) {
177 marked_idx new_head((head_old.get() + k_) % queue_size_, head_old.mark() + 1);
178 head_.compare_exchange_strong(head_old, new_head, std::memory_order_relaxed);
179 }
else if (head_old == head_.load(std::memory_order_relaxed)) {
185 marked_idx new_tail((tail_old.get() + k_) % queue_size_, tail_old.mark() + 1);
186 tail_.compare_exchange_strong(tail_old, new_tail, std::memory_order_relaxed);
191 template <
class T,
class... Policies>
194 marked_idx head_old = head_.load(std::memory_order_relaxed);
195 marked_idx tail_old = tail_.load(std::memory_order_relaxed);
199 bool found_idx = find_index<false>(head_old.get(), idx, old_value);
200 if (head_old != head_.load(std::memory_order_relaxed))
204 assert(old_value.
get() !=
nullptr);
205 if (head_old.get() == tail_old.get()) {
206 marked_idx new_tail((tail_old.get() + k_) % queue_size_, tail_old.mark() + 1);
207 tail_.compare_exchange_strong(tail_old, new_tail, std::memory_order_relaxed);
211 if (queue_[idx].value.compare_exchange_strong(
212 old_value, new_value, std::memory_order_release, std::memory_order_relaxed)) {
213 traits::store(result, old_value.
get());
217 if (head_old.get() == tail_old.get() && tail_old == tail_.load(std::memory_order_relaxed))
220 marked_idx new_head((head_old.get() + k_) % queue_size_, head_old.mark() + 1);
221 head_.compare_exchange_strong(head_old, new_head, std::memory_order_relaxed);
226 template <
class T,
class... Policies>
227 template <
bool Empty>
229 uint64_t start_index, uint64_t& value_index, marked_value& old)
231 const uint64_t random_index = utils::random() % k_;
232 for (
size_t i = 0; i < k_; i++) {
234 uint64_t index = (start_index + ((random_index + i) % k_)) % queue_size_;
236 old = queue_[index].value.load(std::memory_order_acquire);
237 if ((Empty && old.get() ==
nullptr) || (!Empty && old.get() !=
nullptr)) {
245 template <
class T,
class... Policies>
246 bool kirsch_bounded_kfifo_queue<T, Policies...>::committed(
247 const marked_idx& tail_old, marked_value value, uint64_t index)
249 if (queue_[index].value.load(std::memory_order_relaxed) != value)
252 marked_idx tail_current = tail_.load(std::memory_order_relaxed);
253 marked_idx head_current = head_.load(std::memory_order_relaxed);
254 if (in_valid_region(tail_old.get(), tail_current.get(), head_current.get())) {
256 }
else if (not_in_valid_region(tail_old.get(), tail_current.get(), head_current.get())) {
257 marked_value new_value(
nullptr, value.mark() + 1);
258 if (!queue_[index].value.compare_exchange_strong(value, new_value, std::memory_order_relaxed))
261 marked_idx new_head(head_current.get(), head_current.mark() + 1);
262 if (head_.compare_exchange_strong(head_current, new_head, std::memory_order_relaxed))
265 marked_value new_value(
nullptr, value.mark() + 1);
266 if (!queue_[index].value.compare_exchange_strong(value, new_value, std::memory_order_relaxed))
272 template <
class T,
class... Policies>
273 bool kirsch_bounded_kfifo_queue<T, Policies...>::queue_full(
274 const marked_idx& head_old,
const marked_idx& tail_old)
const
276 if (((tail_old.get() + k_) % queue_size_) == head_old.get() &&
277 (head_old == head_.load(std::memory_order_relaxed)))
282 template <
class T,
class... Policies>
283 bool kirsch_bounded_kfifo_queue<T, Policies...>::segment_empty(
const marked_idx& head_old)
const {
284 const uint64_t start = head_old.get();
285 for (
size_t i = 0; i < k_; i++) {
288 if (queue_[(start + i) % queue_size_].value.load(std::memory_order_acquire).get() !=
nullptr)
294 template <
class T,
class... Policies>
295 bool kirsch_bounded_kfifo_queue<T, Policies...>::in_valid_region(uint64_t tail_old,
296 uint64_t tail_current, uint64_t head_current)
const
298 bool wrap_around = tail_current < head_current;
300 return head_current < tail_old && tail_old <= tail_current;
301 return head_current < tail_old || tail_old <= tail_current;
304 template <
class T,
class... Policies>
305 bool kirsch_bounded_kfifo_queue<T, Policies...>::not_in_valid_region(uint64_t tail_old,
306 uint64_t tail_current, uint64_t head_current)
const
308 bool wrap_around = tail_current < head_current;
310 return tail_old < tail_current || head_current < tail_old;
311 return tail_old < tail_current && head_current < tail_old;