1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.util.concurrent;
17
18 import io.netty.util.internal.MathUtil;
19 import io.netty.util.internal.ObjectUtil;
20 import io.netty.util.internal.UnstableApi;
21 import io.netty.util.IntSupplier;
22 import io.netty.util.IntConsumer;
23
24 import java.util.concurrent.atomic.AtomicIntegerArray;
25 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
26
27
28
29
30 @UnstableApi
31 public final class MpscAtomicIntegerArrayQueue extends AtomicIntegerArray implements MpscIntQueue {
32 private static final long serialVersionUID = 8740338425124821455L;
33 private static final AtomicLongFieldUpdater<MpscAtomicIntegerArrayQueue> PRODUCER_INDEX =
34 AtomicLongFieldUpdater.newUpdater(MpscAtomicIntegerArrayQueue.class, "producerIndex");
35 private static final AtomicLongFieldUpdater<MpscAtomicIntegerArrayQueue> PRODUCER_LIMIT =
36 AtomicLongFieldUpdater.newUpdater(MpscAtomicIntegerArrayQueue.class, "producerLimit");
37 private static final AtomicLongFieldUpdater<MpscAtomicIntegerArrayQueue> CONSUMER_INDEX =
38 AtomicLongFieldUpdater.newUpdater(MpscAtomicIntegerArrayQueue.class, "consumerIndex");
39 private final int mask;
40 private final int emptyValue;
41 private volatile long producerIndex;
42 private volatile long producerLimit;
43 private volatile long consumerIndex;
44
45
46
47
48
49
50
51
52
53
54
55 public MpscAtomicIntegerArrayQueue(int capacity, int emptyValue) {
56 super(MathUtil.safeFindNextPositivePowerOfTwo(capacity));
57 if (emptyValue != 0) {
58 this.emptyValue = emptyValue;
59 int end = capacity - 1;
60 for (int i = 0; i < end; i++) {
61 lazySet(i, emptyValue);
62 }
63 getAndSet(end, emptyValue);
64 } else {
65 this.emptyValue = 0;
66 }
67 mask = length() - 1;
68 }
69
70 @Override
71 public boolean offer(int value) {
72 if (value == emptyValue) {
73 throw new IllegalArgumentException("Cannot offer the \"empty\" value: " + emptyValue);
74 }
75
76 final int mask = this.mask;
77 long producerLimit = this.producerLimit;
78 long pIndex;
79 do {
80 pIndex = producerIndex;
81 if (pIndex >= producerLimit) {
82 final long cIndex = consumerIndex;
83 producerLimit = cIndex + mask + 1;
84 if (pIndex >= producerLimit) {
85
86 return false;
87 } else {
88
89
90 PRODUCER_LIMIT.lazySet(this, producerLimit);
91 }
92 }
93 } while (!PRODUCER_INDEX.compareAndSet(this, pIndex, pIndex + 1));
94
95
96
97
98
99 final int offset = (int) (pIndex & mask);
100 lazySet(offset, value);
101
102 return true;
103 }
104
105 @Override
106 public int poll() {
107 final long cIndex = consumerIndex;
108 final int offset = (int) (cIndex & mask);
109
110 int value = get(offset);
111 if (emptyValue == value) {
112
113
114
115
116
117 if (cIndex != producerIndex) {
118 do {
119 value = get(offset);
120 } while (emptyValue == value);
121 } else {
122 return emptyValue;
123 }
124 }
125 lazySet(offset, emptyValue);
126 CONSUMER_INDEX.lazySet(this, cIndex + 1);
127 return value;
128 }
129
130 @Override
131 public int drain(int limit, IntConsumer consumer) {
132 ObjectUtil.checkNotNull(consumer, "consumer");
133 ObjectUtil.checkPositiveOrZero(limit, "limit");
134 if (limit == 0) {
135 return 0;
136 }
137 final int mask = this.mask;
138 final long cIndex = consumerIndex;
139 for (int i = 0; i < limit; i++) {
140 final long index = cIndex + i;
141 final int offset = (int) (index & mask);
142 final int value = get(offset);
143 if (emptyValue == value) {
144 return i;
145 }
146 lazySet(offset, emptyValue);
147
148 CONSUMER_INDEX.lazySet(this, index + 1);
149 try {
150 consumer.accept(value);
151 } catch (Exception e) {
152 throw new RuntimeException(e);
153 }
154 }
155 return limit;
156 }
157
158 @Override
159 public int fill(int limit, IntSupplier supplier) {
160 ObjectUtil.checkNotNull(supplier, "supplier");
161 ObjectUtil.checkPositiveOrZero(limit, "limit");
162 if (limit == 0) {
163 return 0;
164 }
165 final int mask = this.mask;
166 final long capacity = mask + 1;
167 long producerLimit = this.producerLimit;
168 long pIndex;
169 int actualLimit;
170 do {
171 pIndex = producerIndex;
172 long available = producerLimit - pIndex;
173 if (available <= 0) {
174 final long cIndex = consumerIndex;
175 producerLimit = cIndex + capacity;
176 available = producerLimit - pIndex;
177 if (available <= 0) {
178
179 return 0;
180 } else {
181
182 PRODUCER_LIMIT.lazySet(this, producerLimit);
183 }
184 }
185 actualLimit = Math.min((int) available, limit);
186 } while (!PRODUCER_INDEX.compareAndSet(this, pIndex, pIndex + actualLimit));
187
188 for (int i = 0; i < actualLimit; i++) {
189
190 final int offset = (int) (pIndex + i & mask);
191 int value;
192 try {
193 value = supplier.get();
194 } catch (Exception e) {
195 throw new RuntimeException(e);
196 }
197 lazySet(offset, value);
198 }
199 return actualLimit;
200 }
201
202 @Override
203 public boolean isEmpty() {
204
205 long cIndex = consumerIndex;
206 long pIndex = producerIndex;
207 return cIndex >= pIndex;
208 }
209
210 @Override
211 public int size() {
212
213 long after = consumerIndex;
214 long size;
215 for (;;) {
216 long before = after;
217 long pIndex = producerIndex;
218 after = consumerIndex;
219 if (before == after) {
220 size = pIndex - after;
221 break;
222 }
223 }
224 return size < 0 ? 0 : size > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) size;
225 }
226 }