1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.util.concurrent;
17
18 import io.netty.util.internal.MathUtil;
19 import io.netty.util.internal.ObjectUtil;
20
21 import java.util.Objects;
22 import java.util.concurrent.atomic.AtomicIntegerArray;
23 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
24 import java.util.function.IntConsumer;
25 import java.util.function.IntSupplier;
26
27
28
29
30
31 public interface MpscIntQueue {
32
33
34
35
36
37
38
39
40
41
42
43 static MpscIntQueue create(int size, int emptyValue) {
44 return new MpscAtomicIntegerArrayQueue(size, emptyValue);
45 }
46
47
48
49
50
51
52
53 boolean offer(int value);
54
55
56
57
58
59 int poll();
60
61
62
63
64
65
66
67 int drain(int limit, IntConsumer consumer);
68
69
70
71
72
73
74
75 int fill(int limit, IntSupplier supplier);
76
77
78
79
80
81
82
83 boolean isEmpty();
84
85
86
87
88
89
90
91 int size();
92
93
94
95
96 final class MpscAtomicIntegerArrayQueue extends AtomicIntegerArray implements MpscIntQueue {
97 private static final long serialVersionUID = 8740338425124821455L;
98 private static final AtomicLongFieldUpdater<MpscAtomicIntegerArrayQueue> PRODUCER_INDEX =
99 AtomicLongFieldUpdater.newUpdater(MpscAtomicIntegerArrayQueue.class, "producerIndex");
100 private static final AtomicLongFieldUpdater<MpscAtomicIntegerArrayQueue> PRODUCER_LIMIT =
101 AtomicLongFieldUpdater.newUpdater(MpscAtomicIntegerArrayQueue.class, "producerLimit");
102 private static final AtomicLongFieldUpdater<MpscAtomicIntegerArrayQueue> CONSUMER_INDEX =
103 AtomicLongFieldUpdater.newUpdater(MpscAtomicIntegerArrayQueue.class, "consumerIndex");
104 private final int mask;
105 private final int emptyValue;
106 private volatile long producerIndex;
107 private volatile long producerLimit;
108 private volatile long consumerIndex;
109
110 public MpscAtomicIntegerArrayQueue(int capacity, int emptyValue) {
111 super(MathUtil.safeFindNextPositivePowerOfTwo(capacity));
112 if (emptyValue != 0) {
113 this.emptyValue = emptyValue;
114 int end = capacity - 1;
115 for (int i = 0; i < end; i++) {
116 lazySet(i, emptyValue);
117 }
118 getAndSet(end, emptyValue);
119 } else {
120 this.emptyValue = 0;
121 }
122 mask = length() - 1;
123 }
124
125 @Override
126 public boolean offer(int value) {
127 if (value == emptyValue) {
128 throw new IllegalArgumentException("Cannot offer the \"empty\" value: " + emptyValue);
129 }
130
131 final int mask = this.mask;
132 long producerLimit = this.producerLimit;
133 long pIndex;
134 do {
135 pIndex = producerIndex;
136 if (pIndex >= producerLimit) {
137 final long cIndex = consumerIndex;
138 producerLimit = cIndex + mask + 1;
139 if (pIndex >= producerLimit) {
140
141 return false;
142 } else {
143
144
145 PRODUCER_LIMIT.lazySet(this, producerLimit);
146 }
147 }
148 } while (!PRODUCER_INDEX.compareAndSet(this, pIndex, pIndex + 1));
149
150
151
152
153
154 final int offset = (int) (pIndex & mask);
155 lazySet(offset, value);
156
157 return true;
158 }
159
160 @Override
161 public int poll() {
162 final long cIndex = consumerIndex;
163 final int offset = (int) (cIndex & mask);
164
165 int value = get(offset);
166 if (emptyValue == value) {
167
168
169
170
171
172 if (cIndex != producerIndex) {
173 do {
174 value = get(offset);
175 } while (emptyValue == value);
176 } else {
177 return emptyValue;
178 }
179 }
180 lazySet(offset, emptyValue);
181 CONSUMER_INDEX.lazySet(this, cIndex + 1);
182 return value;
183 }
184
185 @Override
186 public int drain(int limit, IntConsumer consumer) {
187 Objects.requireNonNull(consumer, "consumer");
188 ObjectUtil.checkPositiveOrZero(limit, "limit");
189 if (limit == 0) {
190 return 0;
191 }
192 final int mask = this.mask;
193 final long cIndex = consumerIndex;
194 for (int i = 0; i < limit; i++) {
195 final long index = cIndex + i;
196 final int offset = (int) (index & mask);
197 final int value = get(offset);
198 if (emptyValue == value) {
199 return i;
200 }
201 lazySet(offset, emptyValue);
202
203 CONSUMER_INDEX.lazySet(this, index + 1);
204 consumer.accept(value);
205 }
206 return limit;
207 }
208
209 @Override
210 public int fill(int limit, IntSupplier supplier) {
211 Objects.requireNonNull(supplier, "supplier");
212 ObjectUtil.checkPositiveOrZero(limit, "limit");
213 if (limit == 0) {
214 return 0;
215 }
216 final int mask = this.mask;
217 final long capacity = mask + 1;
218 long producerLimit = this.producerLimit;
219 long pIndex;
220 int actualLimit;
221 do {
222 pIndex = producerIndex;
223 long available = producerLimit - pIndex;
224 if (available <= 0) {
225 final long cIndex = consumerIndex;
226 producerLimit = cIndex + capacity;
227 available = producerLimit - pIndex;
228 if (available <= 0) {
229
230 return 0;
231 } else {
232
233 PRODUCER_LIMIT.lazySet(this, producerLimit);
234 }
235 }
236 actualLimit = Math.min((int) available, limit);
237 } while (!PRODUCER_INDEX.compareAndSet(this, pIndex, pIndex + actualLimit));
238
239 for (int i = 0; i < actualLimit; i++) {
240
241 final int offset = (int) (pIndex + i & mask);
242 lazySet(offset, supplier.getAsInt());
243 }
244 return actualLimit;
245 }
246
247 @Override
248 public boolean isEmpty() {
249
250 long cIndex = consumerIndex;
251 long pIndex = producerIndex;
252 return cIndex >= pIndex;
253 }
254
255 @Override
256 public int size() {
257
258 long after = consumerIndex;
259 long size;
260 for (;;) {
261 long before = after;
262 long pIndex = producerIndex;
263 after = consumerIndex;
264 if (before == after) {
265 size = pIndex - after;
266 break;
267 }
268 }
269 return size < 0 ? 0 : size > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) size;
270 }
271 }
272 }