1 /*
2 * Copyright 2025 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty.util.concurrent;
17
18 import io.netty.util.internal.MathUtil;
19 import io.netty.util.internal.ObjectUtil;
20
21 import java.util.Objects;
22 import java.util.concurrent.atomic.AtomicIntegerArray;
23 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
24 import java.util.function.IntConsumer;
25 import java.util.function.IntSupplier;
26
27 /**
28 * A multi-producer (concurrent and thread-safe {@code offer} and {@code fill}),
29 * single-consumer (single-threaded {@code poll} and {@code drain}) queue of primitive integers.
30 */
31 public interface MpscIntQueue {
32 /**
33 * Create a new queue instance of the given size.
34 * <p>
35 * Note: the size of the queue may be rounded up to nearest power-of-2.
36 *
37 * @param size The required fixed size of the queue.
38 * @param emptyValue The special value that the queue should use to signal the "empty" case.
39 * This value will be returned from {@link #poll()} when the queue is empty,
40 * and giving this value to {@link #offer(int)} will cause an exception to be thrown.
41 * @return The queue instance.
42 */
43 static MpscIntQueue create(int size, int emptyValue) {
44 return new MpscAtomicIntegerArrayQueue(size, emptyValue);
45 }
46
47 /**
48 * Offer the given value to the queue. This will throw an exception if the given value is the "empty" value.
49 * @param value The value to add to the queue.
50 * @return {@code true} if the value was added to the queue,
51 * or {@code false} if the value could not be added because the queue is full.
52 */
53 boolean offer(int value);
54
55 /**
56 * Remove and return the next value from the queue, or return the "empty" value if the queue is empty.
57 * @return The next value or the "empty" value.
58 */
59 int poll();
60
61 /**
62 * Remove up to the given limit of elements from the queue, and pass them to the consumer in order.
63 * @param limit The maximum number of elements to dequeue.
64 * @param consumer The consumer to pass the removed elements to.
65 * @return The actual number of elements removed.
66 */
67 int drain(int limit, IntConsumer consumer);
68
69 /**
70 * Add up to the given limit of elements to this queue, from the given supplier.
71 * @param limit The maximum number of elements to enqueue.
72 * @param supplier The supplier to obtain the elements from.
73 * @return The actual number of elements added.
74 */
75 int fill(int limit, IntSupplier supplier);
76
77 /**
78 * Query if the queue is empty or not.
79 * <p>
80 * This method is inherently racy and the result may be out of date by the time the method returns.
81 * @return {@code true} if the queue was observed to be empty, otherwise {@code false.
82 */
83 boolean isEmpty();
84
85 /**
86 * Query the number of elements currently in the queue.
87 * <p>
88 * This method is inherently racy and the result may be out of date by the time the method returns.
89 * @return An estimate of the number of elements observed in the queue.
90 */
91 int size();
92
93 /**
94 * This implementation is based on MpscAtomicUnpaddedArrayQueue from JCTools.
95 */
96 final class MpscAtomicIntegerArrayQueue extends AtomicIntegerArray implements MpscIntQueue {
97 private static final long serialVersionUID = 8740338425124821455L;
98 private static final AtomicLongFieldUpdater<MpscAtomicIntegerArrayQueue> PRODUCER_INDEX =
99 AtomicLongFieldUpdater.newUpdater(MpscAtomicIntegerArrayQueue.class, "producerIndex");
100 private static final AtomicLongFieldUpdater<MpscAtomicIntegerArrayQueue> PRODUCER_LIMIT =
101 AtomicLongFieldUpdater.newUpdater(MpscAtomicIntegerArrayQueue.class, "producerLimit");
102 private static final AtomicLongFieldUpdater<MpscAtomicIntegerArrayQueue> CONSUMER_INDEX =
103 AtomicLongFieldUpdater.newUpdater(MpscAtomicIntegerArrayQueue.class, "consumerIndex");
104 private final int mask;
105 private final int emptyValue;
106 private volatile long producerIndex;
107 private volatile long producerLimit;
108 private volatile long consumerIndex;
109
110 public MpscAtomicIntegerArrayQueue(int capacity, int emptyValue) {
111 super(MathUtil.safeFindNextPositivePowerOfTwo(capacity));
112 if (emptyValue != 0) {
113 this.emptyValue = emptyValue;
114 int end = capacity - 1;
115 for (int i = 0; i < end; i++) {
116 lazySet(i, emptyValue);
117 }
118 getAndSet(end, emptyValue); // 'getAndSet' acts as a full barrier, giving us initialization safety.
119 } else {
120 this.emptyValue = 0;
121 }
122 mask = length() - 1;
123 }
124
125 @Override
126 public boolean offer(int value) {
127 if (value == emptyValue) {
128 throw new IllegalArgumentException("Cannot offer the \"empty\" value: " + emptyValue);
129 }
130 // use a cached view on consumer index (potentially updated in loop)
131 final int mask = this.mask;
132 long producerLimit = this.producerLimit;
133 long pIndex;
134 do {
135 pIndex = producerIndex;
136 if (pIndex >= producerLimit) {
137 final long cIndex = consumerIndex;
138 producerLimit = cIndex + mask + 1;
139 if (pIndex >= producerLimit) {
140 // FULL :(
141 return false;
142 } else {
143 // update producer limit to the next index that we must recheck the consumer index
144 // this is racy, but the race is benign
145 PRODUCER_LIMIT.lazySet(this, producerLimit);
146 }
147 }
148 } while (!PRODUCER_INDEX.compareAndSet(this, pIndex, pIndex + 1));
149 /*
150 * NOTE: the new producer index value is made visible BEFORE the element in the array. If we relied on
151 * the index visibility to poll() we would need to handle the case where the element is not visible.
152 */
153 // Won CAS, move on to storing
154 final int offset = (int) (pIndex & mask);
155 lazySet(offset, value);
156 // AWESOME :)
157 return true;
158 }
159
160 @Override
161 public int poll() {
162 final long cIndex = consumerIndex;
163 final int offset = (int) (cIndex & mask);
164 // If we can't see the next available element we can't poll
165 int value = get(offset);
166 if (emptyValue == value) {
167 /*
168 * NOTE: Queue may not actually be empty in the case of a producer (P1) being interrupted after
169 * winning the CAS on offer but before storing the element in the queue. Other producers may go on
170 * to fill up the queue after this element.
171 */
172 if (cIndex != producerIndex) {
173 do {
174 value = get(offset);
175 } while (emptyValue == value);
176 } else {
177 return emptyValue;
178 }
179 }
180 lazySet(offset, emptyValue);
181 CONSUMER_INDEX.lazySet(this, cIndex + 1);
182 return value;
183 }
184
185 @Override
186 public int drain(int limit, IntConsumer consumer) {
187 Objects.requireNonNull(consumer, "consumer");
188 ObjectUtil.checkPositiveOrZero(limit, "limit");
189 if (limit == 0) {
190 return 0;
191 }
192 final int mask = this.mask;
193 final long cIndex = consumerIndex; // Note: could be weakened to plain-load.
194 for (int i = 0; i < limit; i++) {
195 final long index = cIndex + i;
196 final int offset = (int) (index & mask);
197 final int value = get(offset);
198 if (emptyValue == value) {
199 return i;
200 }
201 lazySet(offset, emptyValue); // Note: could be weakened to plain-store.
202 // ordered store -> atomic and ordered for size()
203 CONSUMER_INDEX.lazySet(this, index + 1);
204 consumer.accept(value);
205 }
206 return limit;
207 }
208
209 @Override
210 public int fill(int limit, IntSupplier supplier) {
211 Objects.requireNonNull(supplier, "supplier");
212 ObjectUtil.checkPositiveOrZero(limit, "limit");
213 if (limit == 0) {
214 return 0;
215 }
216 final int mask = this.mask;
217 final long capacity = mask + 1;
218 long producerLimit = this.producerLimit;
219 long pIndex;
220 int actualLimit;
221 do {
222 pIndex = producerIndex;
223 long available = producerLimit - pIndex;
224 if (available <= 0) {
225 final long cIndex = consumerIndex;
226 producerLimit = cIndex + capacity;
227 available = producerLimit - pIndex;
228 if (available <= 0) {
229 // FULL :(
230 return 0;
231 } else {
232 // update producer limit to the next index that we must recheck the consumer index
233 PRODUCER_LIMIT.lazySet(this, producerLimit);
234 }
235 }
236 actualLimit = Math.min((int) available, limit);
237 } while (!PRODUCER_INDEX.compareAndSet(this, pIndex, pIndex + actualLimit));
238 // right, now we claimed a few slots and can fill them with goodness
239 for (int i = 0; i < actualLimit; i++) {
240 // Won CAS, move on to storing
241 final int offset = (int) (pIndex + i & mask);
242 lazySet(offset, supplier.getAsInt());
243 }
244 return actualLimit;
245 }
246
247 @Override
248 public boolean isEmpty() {
249 // Load consumer index before producer index, so our check is conservative.
250 long cIndex = consumerIndex;
251 long pIndex = producerIndex;
252 return cIndex >= pIndex;
253 }
254
255 @Override
256 public int size() {
257 // Loop until we get a consistent read of both the consumer and producer indices.
258 long after = consumerIndex;
259 long size;
260 for (;;) {
261 long before = after;
262 long pIndex = producerIndex;
263 after = consumerIndex;
264 if (before == after) {
265 size = pIndex - after;
266 break;
267 }
268 }
269 return size < 0 ? 0 : size > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) size;
270 }
271 }
272 }