View Javadoc
1   /*
2    * Copyright 2025 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util.concurrent;
17  
18  import io.netty.util.internal.MathUtil;
19  import io.netty.util.internal.ObjectUtil;
20  
21  import java.util.Objects;
22  import java.util.concurrent.atomic.AtomicIntegerArray;
23  import java.util.concurrent.atomic.AtomicLongFieldUpdater;
24  import java.util.function.IntConsumer;
25  import java.util.function.IntSupplier;
26  
27  /**
28   * A multi-producer (concurrent and thread-safe {@code offer} and {@code fill}),
29   * single-consumer (single-threaded {@code poll} and {@code drain}) queue of primitive integers.
30   */
31  public interface MpscIntQueue {
32      /**
33       * Create a new queue instance of the given size.
34       * <p>
35       * Note: the size of the queue may be rounded up to nearest power-of-2.
36       *
37       * @param size The required fixed size of the queue.
38       * @param emptyValue The special value that the queue should use to signal the "empty" case.
39       * This value will be returned from {@link #poll()} when the queue is empty,
40       * and giving this value to {@link #offer(int)} will cause an exception to be thrown.
41       * @return The queue instance.
42       */
43      static MpscIntQueue create(int size, int emptyValue) {
44          return new MpscAtomicIntegerArrayQueue(size, emptyValue);
45      }
46  
47      /**
48       * Offer the given value to the queue. This will throw an exception if the given value is the "empty" value.
49       * @param value The value to add to the queue.
50       * @return {@code true} if the value was added to the queue,
51       * or {@code false} if the value could not be added because the queue is full.
52       */
53      boolean offer(int value);
54  
55      /**
56       * Remove and return the next value from the queue, or return the "empty" value if the queue is empty.
57       * @return The next value or the "empty" value.
58       */
59      int poll();
60  
61      /**
62       * Remove up to the given limit of elements from the queue, and pass them to the consumer in order.
63       * @param limit The maximum number of elements to dequeue.
64       * @param consumer The consumer to pass the removed elements to.
65       * @return The actual number of elements removed.
66       */
67      int drain(int limit, IntConsumer consumer);
68  
69      /**
70       * Add up to the given limit of elements to this queue, from the given supplier.
71       * @param limit The maximum number of elements to enqueue.
72       * @param supplier The supplier to obtain the elements from.
73       * @return The actual number of elements added.
74       */
75      int fill(int limit, IntSupplier supplier);
76  
77      /**
78       * Query if the queue is empty or not.
79       * <p>
80       * This method is inherently racy and the result may be out of date by the time the method returns.
81       * @return {@code true} if the queue was observed to be empty, otherwise {@code false.
82       */
83      boolean isEmpty();
84  
85      /**
86       * Query the number of elements currently in the queue.
87       * <p>
88       * This method is inherently racy and the result may be out of date by the time the method returns.
89       * @return An estimate of the number of elements observed in the queue.
90       */
91      int size();
92  
93      /**
94       * This implementation is based on MpscAtomicUnpaddedArrayQueue from JCTools.
95       */
96      final class MpscAtomicIntegerArrayQueue extends AtomicIntegerArray implements MpscIntQueue {
97          private static final long serialVersionUID = 8740338425124821455L;
98          private static final AtomicLongFieldUpdater<MpscAtomicIntegerArrayQueue> PRODUCER_INDEX =
99                  AtomicLongFieldUpdater.newUpdater(MpscAtomicIntegerArrayQueue.class, "producerIndex");
100         private static final AtomicLongFieldUpdater<MpscAtomicIntegerArrayQueue> PRODUCER_LIMIT =
101                 AtomicLongFieldUpdater.newUpdater(MpscAtomicIntegerArrayQueue.class, "producerLimit");
102         private static final AtomicLongFieldUpdater<MpscAtomicIntegerArrayQueue> CONSUMER_INDEX =
103                 AtomicLongFieldUpdater.newUpdater(MpscAtomicIntegerArrayQueue.class, "consumerIndex");
104         private final int mask;
105         private final int emptyValue;
106         private volatile long producerIndex;
107         private volatile long producerLimit;
108         private volatile long consumerIndex;
109 
110         public MpscAtomicIntegerArrayQueue(int capacity, int emptyValue) {
111             super(MathUtil.safeFindNextPositivePowerOfTwo(capacity));
112             if (emptyValue != 0) {
113                 this.emptyValue = emptyValue;
114                 int end = capacity - 1;
115                 for (int i = 0; i < end; i++) {
116                     lazySet(i, emptyValue);
117                 }
118                 getAndSet(end, emptyValue); // 'getAndSet' acts as a full barrier, giving us initialization safety.
119             } else {
120                 this.emptyValue = 0;
121             }
122             mask = length() - 1;
123         }
124 
125         @Override
126         public boolean offer(int value) {
127             if (value == emptyValue) {
128                 throw new IllegalArgumentException("Cannot offer the \"empty\" value: " + emptyValue);
129             }
130             // use a cached view on consumer index (potentially updated in loop)
131             final int mask = this.mask;
132             long producerLimit = this.producerLimit;
133             long pIndex;
134             do {
135                 pIndex = producerIndex;
136                 if (pIndex >= producerLimit) {
137                     final long cIndex = consumerIndex;
138                     producerLimit = cIndex + mask + 1;
139                     if (pIndex >= producerLimit) {
140                         // FULL :(
141                         return false;
142                     } else {
143                         // update producer limit to the next index that we must recheck the consumer index
144                         // this is racy, but the race is benign
145                         PRODUCER_LIMIT.lazySet(this, producerLimit);
146                     }
147                 }
148             } while (!PRODUCER_INDEX.compareAndSet(this, pIndex, pIndex + 1));
149             /*
150              * NOTE: the new producer index value is made visible BEFORE the element in the array. If we relied on
151              * the index visibility to poll() we would need to handle the case where the element is not visible.
152              */
153             // Won CAS, move on to storing
154             final int offset = (int) (pIndex & mask);
155             lazySet(offset, value);
156             // AWESOME :)
157             return true;
158         }
159 
160         @Override
161         public int poll() {
162             final long cIndex = consumerIndex;
163             final int offset = (int) (cIndex & mask);
164             // If we can't see the next available element we can't poll
165             int value = get(offset);
166             if (emptyValue == value) {
167                 /*
168                  * NOTE: Queue may not actually be empty in the case of a producer (P1) being interrupted after
169                  * winning the CAS on offer but before storing the element in the queue. Other producers may go on
170                  * to fill up the queue after this element.
171                  */
172                 if (cIndex != producerIndex) {
173                     do {
174                         value = get(offset);
175                     } while (emptyValue == value);
176                 } else {
177                     return emptyValue;
178                 }
179             }
180             lazySet(offset, emptyValue);
181             CONSUMER_INDEX.lazySet(this, cIndex + 1);
182             return value;
183         }
184 
185         @Override
186         public int drain(int limit, IntConsumer consumer) {
187             Objects.requireNonNull(consumer, "consumer");
188             ObjectUtil.checkPositiveOrZero(limit, "limit");
189             if (limit == 0) {
190                 return 0;
191             }
192             final int mask = this.mask;
193             final long cIndex = consumerIndex; // Note: could be weakened to plain-load.
194             for (int i = 0; i < limit; i++) {
195                 final long index = cIndex + i;
196                 final int offset = (int) (index & mask);
197                 final int value = get(offset);
198                 if (emptyValue == value) {
199                     return i;
200                 }
201                 lazySet(offset, emptyValue); // Note: could be weakened to plain-store.
202                 // ordered store -> atomic and ordered for size()
203                 CONSUMER_INDEX.lazySet(this, index + 1);
204                 consumer.accept(value);
205             }
206             return limit;
207         }
208 
209         @Override
210         public int fill(int limit, IntSupplier supplier) {
211             Objects.requireNonNull(supplier, "supplier");
212             ObjectUtil.checkPositiveOrZero(limit, "limit");
213             if (limit == 0) {
214                 return 0;
215             }
216             final int mask = this.mask;
217             final long capacity = mask + 1;
218             long producerLimit = this.producerLimit;
219             long pIndex;
220             int actualLimit;
221             do {
222                 pIndex = producerIndex;
223                 long available = producerLimit - pIndex;
224                 if (available <= 0) {
225                     final long cIndex = consumerIndex;
226                     producerLimit = cIndex + capacity;
227                     available = producerLimit - pIndex;
228                     if (available <= 0) {
229                         // FULL :(
230                         return 0;
231                     } else {
232                         // update producer limit to the next index that we must recheck the consumer index
233                         PRODUCER_LIMIT.lazySet(this, producerLimit);
234                     }
235                 }
236                 actualLimit = Math.min((int) available, limit);
237             } while (!PRODUCER_INDEX.compareAndSet(this, pIndex, pIndex + actualLimit));
238             // right, now we claimed a few slots and can fill them with goodness
239             for (int i = 0; i < actualLimit; i++) {
240                 // Won CAS, move on to storing
241                 final int offset = (int) (pIndex + i & mask);
242                 lazySet(offset, supplier.getAsInt());
243             }
244             return actualLimit;
245         }
246 
247         @Override
248         public boolean isEmpty() {
249             // Load consumer index before producer index, so our check is conservative.
250             long cIndex = consumerIndex;
251             long pIndex = producerIndex;
252             return cIndex >= pIndex;
253         }
254 
255         @Override
256         public int size() {
257             // Loop until we get a consistent read of both the consumer and producer indices.
258             long after = consumerIndex;
259             long size;
260             for (;;) {
261                 long before = after;
262                 long pIndex = producerIndex;
263                 after = consumerIndex;
264                 if (before == after) {
265                     size = pIndex - after;
266                     break;
267                 }
268             }
269             return size < 0 ? 0 : size > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) size;
270         }
271     }
272 }