1 /*
2 * Copyright 2025 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty.util.concurrent;
17
18 import io.netty.util.internal.MathUtil;
19 import io.netty.util.internal.ObjectUtil;
20
21 import java.util.Objects;
22 import java.util.concurrent.atomic.AtomicIntegerArray;
23 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
24 import java.util.function.IntBinaryOperator;
25 import java.util.function.IntConsumer;
26 import java.util.function.IntSupplier;
27
28 /**
29 * A multi-producer (concurrent and thread-safe {@code offer} and {@code fill}),
30 * single-consumer (single-threaded {@code poll} and {@code drain}) queue of primitive integers.
31 */
32 public interface MpscIntQueue {
33 /**
34 * Create a new queue instance of the given size.
35 * <p>
36 * Note: the size of the queue may be rounded up to nearest power-of-2.
37 *
38 * @param size The required fixed size of the queue.
39 * @param emptyValue The special value that the queue should use to signal the "empty" case.
40 * This value will be returned from {@link #poll()} when the queue is empty,
41 * and giving this value to {@link #offer(int)} will cause an exception to be thrown.
42 * @return The queue instance.
43 */
44 static MpscIntQueue create(int size, int emptyValue) {
45 return new MpscAtomicIntegerArrayQueue(size, emptyValue);
46 }
47
48 /**
49 * Offer the given value to the queue. This will throw an exception if the given value is the "empty" value.
50 * @param value The value to add to the queue.
51 * @return {@code true} if the value was added to the queue,
52 * or {@code false} if the value could not be added because the queue is full.
53 */
54 boolean offer(int value);
55
56 /**
57 * Remove and return the next value from the queue, or return the "empty" value if the queue is empty.
58 * @return The next value or the "empty" value.
59 */
60 int poll();
61
62 /**
63 * Remove up to the given limit of elements from the queue, and pass them to the consumer in order.
64 * @param limit The maximum number of elements to dequeue.
65 * @param consumer The consumer to pass the removed elements to.
66 * @return The actual number of elements removed.
67 */
68 int drain(int limit, IntConsumer consumer);
69
70 /**
71 * Add up to the given limit of elements to this queue, from the given supplier.
72 * @param limit The maximum number of elements to enqueue.
73 * @param supplier The supplier to obtain the elements from.
74 * @return The actual number of elements added.
75 */
76 int fill(int limit, IntSupplier supplier);
77
78 /**
79 * Peek at all available elements and compute a reduction.
80 * The elements are not removed, and the iteration is weakly consistent.
81 * @param limit The maximum number of elements to process.
82 * @param initial The initial value to the reduction operation.
83 * @param op The reduction operation, taking a prior result and an element, and producing a new result.
84 * @return The last result of the reduction operation.
85 */
86 default int weakPeekReduce(int limit, int initial, IntBinaryOperator op) {
87 // There's no safe way to implement this method in terms of the other operations.
88 // Take the "weak" definition to the extreme and just return the initial value.
89 return initial;
90 }
91
92 /**
93 * Query if the queue is empty or not.
94 * <p>
95 * This method is inherently racy and the result may be out of date by the time the method returns.
96 * @return {@code true} if the queue was observed to be empty, otherwise {@code false.
97 */
98 boolean isEmpty();
99
100 /**
101 * Query the number of elements currently in the queue.
102 * <p>
103 * This method is inherently racy and the result may be out of date by the time the method returns.
104 * @return An estimate of the number of elements observed in the queue.
105 */
106 int size();
107
108 /**
109 * This implementation is based on MpscAtomicUnpaddedArrayQueue from JCTools.
110 */
111 final class MpscAtomicIntegerArrayQueue extends AtomicIntegerArray implements MpscIntQueue {
112 private static final long serialVersionUID = 8740338425124821455L;
113 private static final AtomicLongFieldUpdater<MpscAtomicIntegerArrayQueue> PRODUCER_INDEX =
114 AtomicLongFieldUpdater.newUpdater(MpscAtomicIntegerArrayQueue.class, "producerIndex");
115 private static final AtomicLongFieldUpdater<MpscAtomicIntegerArrayQueue> PRODUCER_LIMIT =
116 AtomicLongFieldUpdater.newUpdater(MpscAtomicIntegerArrayQueue.class, "producerLimit");
117 private static final AtomicLongFieldUpdater<MpscAtomicIntegerArrayQueue> CONSUMER_INDEX =
118 AtomicLongFieldUpdater.newUpdater(MpscAtomicIntegerArrayQueue.class, "consumerIndex");
119 private final int mask;
120 private final int emptyValue;
121 private volatile long producerIndex;
122 private volatile long producerLimit;
123 private volatile long consumerIndex;
124
125 public MpscAtomicIntegerArrayQueue(int capacity, int emptyValue) {
126 super(MathUtil.safeFindNextPositivePowerOfTwo(capacity));
127 if (emptyValue != 0) {
128 this.emptyValue = emptyValue;
129 int end = length() - 1;
130 for (int i = 0; i < end; i++) {
131 lazySet(i, emptyValue);
132 }
133 getAndSet(end, emptyValue); // 'getAndSet' acts as a full barrier, giving us initialization safety.
134 } else {
135 this.emptyValue = 0;
136 }
137 mask = length() - 1;
138 }
139
140 @Override
141 public boolean offer(int value) {
142 if (value == emptyValue) {
143 throw new IllegalArgumentException("Cannot offer the \"empty\" value: " + emptyValue);
144 }
145 // use a cached view on consumer index (potentially updated in loop)
146 final int mask = this.mask;
147 long producerLimit = this.producerLimit;
148 long pIndex;
149 do {
150 pIndex = producerIndex;
151 if (pIndex >= producerLimit) {
152 final long cIndex = consumerIndex;
153 producerLimit = cIndex + mask + 1;
154 if (pIndex >= producerLimit) {
155 // FULL :(
156 return false;
157 } else {
158 // update producer limit to the next index that we must recheck the consumer index
159 // this is racy, but the race is benign
160 PRODUCER_LIMIT.lazySet(this, producerLimit);
161 }
162 }
163 } while (!PRODUCER_INDEX.compareAndSet(this, pIndex, pIndex + 1));
164 /*
165 * NOTE: the new producer index value is made visible BEFORE the element in the array. If we relied on
166 * the index visibility to poll() we would need to handle the case where the element is not visible.
167 */
168 // Won CAS, move on to storing
169 final int offset = (int) (pIndex & mask);
170 lazySet(offset, value);
171 // AWESOME :)
172 return true;
173 }
174
175 @Override
176 public int poll() {
177 final long cIndex = consumerIndex;
178 final int offset = (int) (cIndex & mask);
179 // If we can't see the next available element we can't poll
180 int value = get(offset);
181 if (emptyValue == value) {
182 /*
183 * NOTE: Queue may not actually be empty in the case of a producer (P1) being interrupted after
184 * winning the CAS on offer but before storing the element in the queue. Other producers may go on
185 * to fill up the queue after this element.
186 */
187 if (cIndex != producerIndex) {
188 do {
189 value = get(offset);
190 } while (emptyValue == value);
191 } else {
192 return emptyValue;
193 }
194 }
195 lazySet(offset, emptyValue);
196 CONSUMER_INDEX.lazySet(this, cIndex + 1);
197 return value;
198 }
199
200 @Override
201 public int drain(int limit, IntConsumer consumer) {
202 Objects.requireNonNull(consumer, "consumer");
203 ObjectUtil.checkPositiveOrZero(limit, "limit");
204 if (limit == 0) {
205 return 0;
206 }
207 final int mask = this.mask;
208 final long cIndex = consumerIndex; // Note: could be weakened to plain-load.
209 for (int i = 0; i < limit; i++) {
210 final long index = cIndex + i;
211 final int offset = (int) (index & mask);
212 final int value = get(offset);
213 if (emptyValue == value) {
214 return i;
215 }
216 lazySet(offset, emptyValue); // Note: could be weakened to plain-store.
217 // ordered store -> atomic and ordered for size()
218 CONSUMER_INDEX.lazySet(this, index + 1);
219 consumer.accept(value);
220 }
221 return limit;
222 }
223
224 @Override
225 public int fill(int limit, IntSupplier supplier) {
226 Objects.requireNonNull(supplier, "supplier");
227 ObjectUtil.checkPositiveOrZero(limit, "limit");
228 if (limit == 0) {
229 return 0;
230 }
231 final int mask = this.mask;
232 final long capacity = mask + 1;
233 long producerLimit = this.producerLimit;
234 long pIndex;
235 int actualLimit;
236 do {
237 pIndex = producerIndex;
238 long available = producerLimit - pIndex;
239 if (available <= 0) {
240 final long cIndex = consumerIndex;
241 producerLimit = cIndex + capacity;
242 available = producerLimit - pIndex;
243 if (available <= 0) {
244 // FULL :(
245 return 0;
246 } else {
247 // update producer limit to the next index that we must recheck the consumer index
248 PRODUCER_LIMIT.lazySet(this, producerLimit);
249 }
250 }
251 actualLimit = Math.min((int) available, limit);
252 } while (!PRODUCER_INDEX.compareAndSet(this, pIndex, pIndex + actualLimit));
253 // right, now we claimed a few slots and can fill them with goodness
254 for (int i = 0; i < actualLimit; i++) {
255 // Won CAS, move on to storing
256 final int offset = (int) (pIndex + i & mask);
257 lazySet(offset, supplier.getAsInt());
258 }
259 return actualLimit;
260 }
261
262 @Override
263 public int weakPeekReduce(int limit, int initial, IntBinaryOperator op) {
264 Objects.requireNonNull(op, "op");
265 ObjectUtil.checkPositiveOrZero(limit, "limit");
266 if (limit == 0) {
267 return 0;
268 }
269 int result = initial;
270
271 final int mask = this.mask;
272 final long cIndex = consumerIndex; // Note: could be weakened to plain-load.
273 for (int i = 0; i < limit; i++) {
274 final long index = cIndex + i;
275 final int offset = (int) (index & mask);
276 final int value = get(offset);
277 if (emptyValue == value) {
278 return result;
279 }
280 // Do not remove the element or advance the consumer index.
281 result = op.applyAsInt(result, value);
282 }
283 return result;
284 }
285
286 @Override
287 public boolean isEmpty() {
288 // Load consumer index before producer index, so our check is conservative.
289 long cIndex = consumerIndex;
290 long pIndex = producerIndex;
291 return cIndex >= pIndex;
292 }
293
294 @Override
295 public int size() {
296 // Loop until we get a consistent read of both the consumer and producer indices.
297 long after = consumerIndex;
298 long size;
299 for (;;) {
300 long before = after;
301 long pIndex = producerIndex;
302 after = consumerIndex;
303 if (before == after) {
304 size = pIndex - after;
305 break;
306 }
307 }
308 return size < 0 ? 0 : size > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) size;
309 }
310 }
311 }