View Javadoc
1   /*
2    * Copyright 2025 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util.concurrent;
17  
18  import io.netty.util.internal.MathUtil;
19  import io.netty.util.internal.ObjectUtil;
20  
21  import java.util.Objects;
22  import java.util.concurrent.atomic.AtomicIntegerArray;
23  import java.util.concurrent.atomic.AtomicLongFieldUpdater;
24  import java.util.function.IntBinaryOperator;
25  import java.util.function.IntConsumer;
26  import java.util.function.IntSupplier;
27  
28  /**
29   * A multi-producer (concurrent and thread-safe {@code offer} and {@code fill}),
30   * single-consumer (single-threaded {@code poll} and {@code drain}) queue of primitive integers.
31   */
32  public interface MpscIntQueue {
33      /**
34       * Create a new queue instance of the given size.
35       * <p>
36       * Note: the size of the queue may be rounded up to nearest power-of-2.
37       *
38       * @param size The required fixed size of the queue.
39       * @param emptyValue The special value that the queue should use to signal the "empty" case.
40       * This value will be returned from {@link #poll()} when the queue is empty,
41       * and giving this value to {@link #offer(int)} will cause an exception to be thrown.
42       * @return The queue instance.
43       */
44      static MpscIntQueue create(int size, int emptyValue) {
45          return new MpscAtomicIntegerArrayQueue(size, emptyValue);
46      }
47  
48      /**
49       * Offer the given value to the queue. This will throw an exception if the given value is the "empty" value.
50       * @param value The value to add to the queue.
51       * @return {@code true} if the value was added to the queue,
52       * or {@code false} if the value could not be added because the queue is full.
53       */
54      boolean offer(int value);
55  
56      /**
57       * Remove and return the next value from the queue, or return the "empty" value if the queue is empty.
58       * @return The next value or the "empty" value.
59       */
60      int poll();
61  
62      /**
63       * Remove up to the given limit of elements from the queue, and pass them to the consumer in order.
64       * @param limit The maximum number of elements to dequeue.
65       * @param consumer The consumer to pass the removed elements to.
66       * @return The actual number of elements removed.
67       */
68      int drain(int limit, IntConsumer consumer);
69  
70      /**
71       * Add up to the given limit of elements to this queue, from the given supplier.
72       * @param limit The maximum number of elements to enqueue.
73       * @param supplier The supplier to obtain the elements from.
74       * @return The actual number of elements added.
75       */
76      int fill(int limit, IntSupplier supplier);
77  
78      /**
79       * Peek at all available elements and compute a reduction.
80       * The elements are not removed, and the iteration is weakly consistent.
81       * @param limit The maximum number of elements to process.
82       * @param initial The initial value to the reduction operation.
83       * @param op The reduction operation, taking a prior result and an element, and producing a new result.
84       * @return The last result of the reduction operation.
85       */
86      default int weakPeekReduce(int limit, int initial, IntBinaryOperator op) {
87          // There's no safe way to implement this method in terms of the other operations.
88          // Take the "weak" definition to the extreme and just return the initial value.
89          return initial;
90      }
91  
92      /**
93       * Query if the queue is empty or not.
94       * <p>
95       * This method is inherently racy and the result may be out of date by the time the method returns.
96       * @return {@code true} if the queue was observed to be empty, otherwise {@code false.
97       */
98      boolean isEmpty();
99  
100     /**
101      * Query the number of elements currently in the queue.
102      * <p>
103      * This method is inherently racy and the result may be out of date by the time the method returns.
104      * @return An estimate of the number of elements observed in the queue.
105      */
106     int size();
107 
108     /**
109      * This implementation is based on MpscAtomicUnpaddedArrayQueue from JCTools.
110      */
111     final class MpscAtomicIntegerArrayQueue extends AtomicIntegerArray implements MpscIntQueue {
112         private static final long serialVersionUID = 8740338425124821455L;
113         private static final AtomicLongFieldUpdater<MpscAtomicIntegerArrayQueue> PRODUCER_INDEX =
114                 AtomicLongFieldUpdater.newUpdater(MpscAtomicIntegerArrayQueue.class, "producerIndex");
115         private static final AtomicLongFieldUpdater<MpscAtomicIntegerArrayQueue> PRODUCER_LIMIT =
116                 AtomicLongFieldUpdater.newUpdater(MpscAtomicIntegerArrayQueue.class, "producerLimit");
117         private static final AtomicLongFieldUpdater<MpscAtomicIntegerArrayQueue> CONSUMER_INDEX =
118                 AtomicLongFieldUpdater.newUpdater(MpscAtomicIntegerArrayQueue.class, "consumerIndex");
119         private final int mask;
120         private final int emptyValue;
121         private volatile long producerIndex;
122         private volatile long producerLimit;
123         private volatile long consumerIndex;
124 
125         public MpscAtomicIntegerArrayQueue(int capacity, int emptyValue) {
126             super(MathUtil.safeFindNextPositivePowerOfTwo(capacity));
127             if (emptyValue != 0) {
128                 this.emptyValue = emptyValue;
129                 int end = length() - 1;
130                 for (int i = 0; i < end; i++) {
131                     lazySet(i, emptyValue);
132                 }
133                 getAndSet(end, emptyValue); // 'getAndSet' acts as a full barrier, giving us initialization safety.
134             } else {
135                 this.emptyValue = 0;
136             }
137             mask = length() - 1;
138         }
139 
140         @Override
141         public boolean offer(int value) {
142             if (value == emptyValue) {
143                 throw new IllegalArgumentException("Cannot offer the \"empty\" value: " + emptyValue);
144             }
145             // use a cached view on consumer index (potentially updated in loop)
146             final int mask = this.mask;
147             long producerLimit = this.producerLimit;
148             long pIndex;
149             do {
150                 pIndex = producerIndex;
151                 if (pIndex >= producerLimit) {
152                     final long cIndex = consumerIndex;
153                     producerLimit = cIndex + mask + 1;
154                     if (pIndex >= producerLimit) {
155                         // FULL :(
156                         return false;
157                     } else {
158                         // update producer limit to the next index that we must recheck the consumer index
159                         // this is racy, but the race is benign
160                         PRODUCER_LIMIT.lazySet(this, producerLimit);
161                     }
162                 }
163             } while (!PRODUCER_INDEX.compareAndSet(this, pIndex, pIndex + 1));
164             /*
165              * NOTE: the new producer index value is made visible BEFORE the element in the array. If we relied on
166              * the index visibility to poll() we would need to handle the case where the element is not visible.
167              */
168             // Won CAS, move on to storing
169             final int offset = (int) (pIndex & mask);
170             lazySet(offset, value);
171             // AWESOME :)
172             return true;
173         }
174 
175         @Override
176         public int poll() {
177             final long cIndex = consumerIndex;
178             final int offset = (int) (cIndex & mask);
179             // If we can't see the next available element we can't poll
180             int value = get(offset);
181             if (emptyValue == value) {
182                 /*
183                  * NOTE: Queue may not actually be empty in the case of a producer (P1) being interrupted after
184                  * winning the CAS on offer but before storing the element in the queue. Other producers may go on
185                  * to fill up the queue after this element.
186                  */
187                 if (cIndex != producerIndex) {
188                     do {
189                         value = get(offset);
190                     } while (emptyValue == value);
191                 } else {
192                     return emptyValue;
193                 }
194             }
195             lazySet(offset, emptyValue);
196             CONSUMER_INDEX.lazySet(this, cIndex + 1);
197             return value;
198         }
199 
200         @Override
201         public int drain(int limit, IntConsumer consumer) {
202             Objects.requireNonNull(consumer, "consumer");
203             ObjectUtil.checkPositiveOrZero(limit, "limit");
204             if (limit == 0) {
205                 return 0;
206             }
207             final int mask = this.mask;
208             final long cIndex = consumerIndex; // Note: could be weakened to plain-load.
209             for (int i = 0; i < limit; i++) {
210                 final long index = cIndex + i;
211                 final int offset = (int) (index & mask);
212                 final int value = get(offset);
213                 if (emptyValue == value) {
214                     return i;
215                 }
216                 lazySet(offset, emptyValue); // Note: could be weakened to plain-store.
217                 // ordered store -> atomic and ordered for size()
218                 CONSUMER_INDEX.lazySet(this, index + 1);
219                 consumer.accept(value);
220             }
221             return limit;
222         }
223 
224         @Override
225         public int fill(int limit, IntSupplier supplier) {
226             Objects.requireNonNull(supplier, "supplier");
227             ObjectUtil.checkPositiveOrZero(limit, "limit");
228             if (limit == 0) {
229                 return 0;
230             }
231             final int mask = this.mask;
232             final long capacity = mask + 1;
233             long producerLimit = this.producerLimit;
234             long pIndex;
235             int actualLimit;
236             do {
237                 pIndex = producerIndex;
238                 long available = producerLimit - pIndex;
239                 if (available <= 0) {
240                     final long cIndex = consumerIndex;
241                     producerLimit = cIndex + capacity;
242                     available = producerLimit - pIndex;
243                     if (available <= 0) {
244                         // FULL :(
245                         return 0;
246                     } else {
247                         // update producer limit to the next index that we must recheck the consumer index
248                         PRODUCER_LIMIT.lazySet(this, producerLimit);
249                     }
250                 }
251                 actualLimit = Math.min((int) available, limit);
252             } while (!PRODUCER_INDEX.compareAndSet(this, pIndex, pIndex + actualLimit));
253             // right, now we claimed a few slots and can fill them with goodness
254             for (int i = 0; i < actualLimit; i++) {
255                 // Won CAS, move on to storing
256                 final int offset = (int) (pIndex + i & mask);
257                 lazySet(offset, supplier.getAsInt());
258             }
259             return actualLimit;
260         }
261 
262         @Override
263         public int weakPeekReduce(int limit, int initial, IntBinaryOperator op) {
264             Objects.requireNonNull(op, "op");
265             ObjectUtil.checkPositiveOrZero(limit, "limit");
266             if (limit == 0) {
267                 return 0;
268             }
269             int result = initial;
270 
271             final int mask = this.mask;
272             final long cIndex = consumerIndex; // Note: could be weakened to plain-load.
273             for (int i = 0; i < limit; i++) {
274                 final long index = cIndex + i;
275                 final int offset = (int) (index & mask);
276                 final int value = get(offset);
277                 if (emptyValue == value) {
278                     return result;
279                 }
280                 // Do not remove the element or advance the consumer index.
281                 result = op.applyAsInt(result, value);
282             }
283             return result;
284         }
285 
286         @Override
287         public boolean isEmpty() {
288             // Load consumer index before producer index, so our check is conservative.
289             long cIndex = consumerIndex;
290             long pIndex = producerIndex;
291             return cIndex >= pIndex;
292         }
293 
294         @Override
295         public int size() {
296             // Loop until we get a consistent read of both the consumer and producer indices.
297             long after = consumerIndex;
298             long size;
299             for (;;) {
300                 long before = after;
301                 long pIndex = producerIndex;
302                 after = consumerIndex;
303                 if (before == after) {
304                     size = pIndex - after;
305                     break;
306                 }
307             }
308             return size < 0 ? 0 : size > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) size;
309         }
310     }
311 }