View Javadoc
1   /*
2    * Copyright 2025 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util.concurrent;
17  
18  import io.netty.util.internal.MathUtil;
19  import io.netty.util.internal.ObjectUtil;
20  import io.netty.util.internal.UnstableApi;
21  import io.netty.util.IntSupplier;
22  import io.netty.util.IntConsumer;
23  
24  import java.util.concurrent.atomic.AtomicIntegerArray;
25  import java.util.concurrent.atomic.AtomicLongFieldUpdater;
26  
27  /**
28   * This implementation is based on MpscAtomicUnpaddedArrayQueue from JCTools.
29   */
30  @UnstableApi
31  public final class MpscAtomicIntegerArrayQueue extends AtomicIntegerArray implements MpscIntQueue {
32      private static final long serialVersionUID = 8740338425124821455L;
33      private static final AtomicLongFieldUpdater<MpscAtomicIntegerArrayQueue> PRODUCER_INDEX =
34              AtomicLongFieldUpdater.newUpdater(MpscAtomicIntegerArrayQueue.class, "producerIndex");
35      private static final AtomicLongFieldUpdater<MpscAtomicIntegerArrayQueue> PRODUCER_LIMIT =
36              AtomicLongFieldUpdater.newUpdater(MpscAtomicIntegerArrayQueue.class, "producerLimit");
37      private static final AtomicLongFieldUpdater<MpscAtomicIntegerArrayQueue> CONSUMER_INDEX =
38              AtomicLongFieldUpdater.newUpdater(MpscAtomicIntegerArrayQueue.class, "consumerIndex");
39      private final int mask;
40      private final int emptyValue;
41      private volatile long producerIndex;
42      private volatile long producerLimit;
43      private volatile long consumerIndex;
44  
45      /**
46       * Create a new queue instance of the given size.
47       * <p>
48       * Note: the size of the queue may be rounded up to nearest power-of-2.
49       *
50       * @param capacity The required fixed size of the queue.
51       * @param emptyValue The special value that the queue should use to signal the "empty" case.
52       * This value will be returned from {@link #poll()} when the queue is empty,
53       * and giving this value to {@link #offer(int)} will cause an exception to be thrown.
54       */
55      public MpscAtomicIntegerArrayQueue(int capacity, int emptyValue) {
56          super(MathUtil.safeFindNextPositivePowerOfTwo(capacity));
57          if (emptyValue != 0) {
58              this.emptyValue = emptyValue;
59              int end = capacity - 1;
60              for (int i = 0; i < end; i++) {
61                  lazySet(i, emptyValue);
62              }
63              getAndSet(end, emptyValue); // 'getAndSet' acts as a full barrier, giving us initialization safety.
64          } else {
65              this.emptyValue = 0;
66          }
67          mask = length() - 1;
68      }
69  
70      @Override
71      public boolean offer(int value) {
72          if (value == emptyValue) {
73              throw new IllegalArgumentException("Cannot offer the \"empty\" value: " + emptyValue);
74          }
75          // use a cached view on consumer index (potentially updated in loop)
76          final int mask = this.mask;
77          long producerLimit = this.producerLimit;
78          long pIndex;
79          do {
80              pIndex = producerIndex;
81              if (pIndex >= producerLimit) {
82                  final long cIndex = consumerIndex;
83                  producerLimit = cIndex + mask + 1;
84                  if (pIndex >= producerLimit) {
85                      // FULL :(
86                      return false;
87                  } else {
88                      // update producer limit to the next index that we must recheck the consumer index
89                      // this is racy, but the race is benign
90                      PRODUCER_LIMIT.lazySet(this, producerLimit);
91                  }
92              }
93          } while (!PRODUCER_INDEX.compareAndSet(this, pIndex, pIndex + 1));
94          /*
95           * NOTE: the new producer index value is made visible BEFORE the element in the array. If we relied on
96           * the index visibility to poll() we would need to handle the case where the element is not visible.
97           */
98          // Won CAS, move on to storing
99          final int offset = (int) (pIndex & mask);
100         lazySet(offset, value);
101         // AWESOME :)
102         return true;
103     }
104 
105     @Override
106     public int poll() {
107         final long cIndex = consumerIndex;
108         final int offset = (int) (cIndex & mask);
109         // If we can't see the next available element we can't poll
110         int value = get(offset);
111         if (emptyValue == value) {
112             /*
113              * NOTE: Queue may not actually be empty in the case of a producer (P1) being interrupted after
114              * winning the CAS on offer but before storing the element in the queue. Other producers may go on
115              * to fill up the queue after this element.
116              */
117             if (cIndex != producerIndex) {
118                 do {
119                     value = get(offset);
120                 } while (emptyValue == value);
121             } else {
122                 return emptyValue;
123             }
124         }
125         lazySet(offset, emptyValue);
126         CONSUMER_INDEX.lazySet(this, cIndex + 1);
127         return value;
128     }
129 
130     @Override
131     public int drain(int limit, IntConsumer consumer) {
132         ObjectUtil.checkNotNull(consumer, "consumer");
133         ObjectUtil.checkPositiveOrZero(limit, "limit");
134         if (limit == 0) {
135             return 0;
136         }
137         final int mask = this.mask;
138         final long cIndex = consumerIndex; // Note: could be weakened to plain-load.
139         for (int i = 0; i < limit; i++) {
140             final long index = cIndex + i;
141             final int offset = (int) (index & mask);
142             final int value = get(offset);
143             if (emptyValue == value) {
144                 return i;
145             }
146             lazySet(offset, emptyValue); // Note: could be weakened to plain-store.
147             // ordered store -> atomic and ordered for size()
148             CONSUMER_INDEX.lazySet(this, index + 1);
149             try {
150                 consumer.accept(value);
151             } catch (Exception e) {
152                 throw new RuntimeException(e);
153             }
154         }
155         return limit;
156     }
157 
158     @Override
159     public int fill(int limit, IntSupplier supplier) {
160         ObjectUtil.checkNotNull(supplier, "supplier");
161         ObjectUtil.checkPositiveOrZero(limit, "limit");
162         if (limit == 0) {
163             return 0;
164         }
165         final int mask = this.mask;
166         final long capacity = mask + 1;
167         long producerLimit = this.producerLimit;
168         long pIndex;
169         int actualLimit;
170         do {
171             pIndex = producerIndex;
172             long available = producerLimit - pIndex;
173             if (available <= 0) {
174                 final long cIndex = consumerIndex;
175                 producerLimit = cIndex + capacity;
176                 available = producerLimit - pIndex;
177                 if (available <= 0) {
178                     // FULL :(
179                     return 0;
180                 } else {
181                     // update producer limit to the next index that we must recheck the consumer index
182                     PRODUCER_LIMIT.lazySet(this, producerLimit);
183                 }
184             }
185             actualLimit = Math.min((int) available, limit);
186         } while (!PRODUCER_INDEX.compareAndSet(this, pIndex, pIndex + actualLimit));
187         // right, now we claimed a few slots and can fill them with goodness
188         for (int i = 0; i < actualLimit; i++) {
189             // Won CAS, move on to storing
190             final int offset = (int) (pIndex + i & mask);
191             int value;
192             try {
193                 value = supplier.get();
194             } catch (Exception e) {
195                 throw new RuntimeException(e);
196             }
197             lazySet(offset, value);
198         }
199         return actualLimit;
200     }
201 
202     @Override
203     public boolean isEmpty() {
204         // Load consumer index before producer index, so our check is conservative.
205         long cIndex = consumerIndex;
206         long pIndex = producerIndex;
207         return cIndex >= pIndex;
208     }
209 
210     @Override
211     public int size() {
212         // Loop until we get a consistent read of both the consumer and producer indices.
213         long after = consumerIndex;
214         long size;
215         for (;;) {
216             long before = after;
217             long pIndex = producerIndex;
218             after = consumerIndex;
219             if (before == after) {
220                 size = pIndex - after;
221                 break;
222             }
223         }
224         return size < 0 ? 0 : size > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) size;
225     }
226 }