1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package io.netty5.channel;
16
17 import io.netty5.buffer.api.Buffer;
18 import io.netty5.buffer.api.BufferAllocator;
19 import io.netty5.buffer.api.CompositeBuffer;
20 import io.netty5.util.concurrent.Future;
21 import io.netty5.util.concurrent.FutureListener;
22 import io.netty5.util.concurrent.Promise;
23 import io.netty5.util.internal.SilentDispose;
24 import io.netty5.util.internal.UnstableApi;
25 import io.netty5.util.internal.logging.InternalLogger;
26 import io.netty5.util.internal.logging.InternalLoggerFactory;
27
28 import java.util.ArrayDeque;
29 import java.util.List;
30
31 import static io.netty5.util.internal.ObjectUtil.checkPositiveOrZero;
32 import static java.util.Objects.requireNonNull;
33
34 @SuppressWarnings("unchecked")
35 @UnstableApi
36 public abstract class AbstractCoalescingBufferQueue {
37 private static final InternalLogger logger = InternalLoggerFactory.getInstance(
38 AbstractCoalescingBufferQueue.class);
39 private final ArrayDeque<Object> bufAndListenerPairs;
40 private int readableBytes;
41
42
43
44
45
46
47 protected AbstractCoalescingBufferQueue(int initSize) {
48 bufAndListenerPairs = new ArrayDeque<>(initSize);
49 }
50
51
52
53
54
55
56
57 public final void addFirst(Buffer buf, Promise<Void> promise) {
58 addFirst(buf, f -> f.cascadeTo(promise));
59 }
60
61 private void addFirst(Buffer buf, FutureListener<Void> listener) {
62 if (listener != null) {
63 bufAndListenerPairs.addFirst(listener);
64 }
65 bufAndListenerPairs.addFirst(buf);
66 incrementReadableBytes(buf.readableBytes());
67 }
68
69
70
71
72 public final void add(Buffer buf) {
73 add(buf, (FutureListener<Void>) null);
74 }
75
76
77
78
79
80
81
82 public final void add(Buffer buf, Promise<Void> promise) {
83
84
85 add(buf, f -> f.cascadeTo(promise));
86 }
87
88
89
90
91
92
93
94 public final void add(Buffer buf, FutureListener<Void> listener) {
95
96
97 bufAndListenerPairs.add(buf);
98 if (listener != null) {
99 bufAndListenerPairs.add(listener);
100 }
101 incrementReadableBytes(buf.readableBytes());
102 }
103
104
105
106
107
108
109 public final Buffer removeFirst(Promise<Void> aggregatePromise) {
110 Object entry = bufAndListenerPairs.poll();
111 if (entry == null) {
112 return null;
113 }
114 assert entry instanceof Buffer;
115 Buffer result = (Buffer) entry;
116
117 decrementReadableBytes(result.readableBytes());
118
119 entry = bufAndListenerPairs.peek();
120 if (entry instanceof FutureListener) {
121 aggregatePromise.asFuture().addListener((FutureListener<Void>) entry);
122 bufAndListenerPairs.poll();
123 }
124 return result;
125 }
126
127
128
129
130
131
132
133
134
135
136
137
138 public final Buffer remove(BufferAllocator alloc, int bytes, Promise<Void> aggregatePromise) {
139 checkPositiveOrZero(bytes, "bytes");
140 requireNonNull(aggregatePromise, "aggregatePromise");
141
142
143 if (bufAndListenerPairs.isEmpty()) {
144 assert readableBytes == 0;
145 return removeEmptyValue();
146 }
147 bytes = Math.min(bytes, readableBytes);
148
149 Buffer toReturn = null;
150 Buffer entryBuffer = null;
151 int originalBytes = bytes;
152 try {
153 for (;;) {
154 Object entry = bufAndListenerPairs.poll();
155 if (entry == null) {
156 break;
157 }
158 if (entry instanceof FutureListener) {
159 aggregatePromise.asFuture().addListener((FutureListener<Void>) entry);
160 continue;
161 }
162 entryBuffer = (Buffer) entry;
163 if (entryBuffer.readableBytes() > bytes) {
164
165 bufAndListenerPairs.addFirst(entryBuffer);
166 if (bytes > 0) {
167
168 entryBuffer = entryBuffer.readSplit(bytes);
169 toReturn = toReturn == null ? composeFirst(alloc, entryBuffer)
170 : compose(alloc, toReturn, entryBuffer);
171 bytes = 0;
172 }
173 break;
174 }
175 bytes -= entryBuffer.readableBytes();
176 toReturn = toReturn == null ? composeFirst(alloc, entryBuffer)
177 : compose(alloc, toReturn, entryBuffer);
178 entryBuffer = null;
179 }
180 } catch (Throwable cause) {
181 SilentDispose.dispose(entryBuffer, logger);
182 SilentDispose.dispose(toReturn, logger);
183 aggregatePromise.setFailure(cause);
184 throw cause;
185 }
186 decrementReadableBytes(originalBytes - bytes);
187 return toReturn;
188 }
189
190
191
192
193 public final int readableBytes() {
194 return readableBytes;
195 }
196
197
198
199
200 public final boolean isEmpty() {
201 return bufAndListenerPairs.isEmpty();
202 }
203
204
205
206
207 public final void releaseAndFailAll(ChannelOutboundInvoker invoker, Throwable cause) {
208 releaseAndCompleteAll(invoker.newFailedFuture(cause));
209 }
210
211
212
213
214
215 public final void copyTo(AbstractCoalescingBufferQueue dest) {
216 dest.bufAndListenerPairs.addAll(bufAndListenerPairs);
217 dest.incrementReadableBytes(readableBytes);
218 }
219
220
221
222
223
224 public final void writeAndRemoveAll(ChannelHandlerContext ctx) {
225 Throwable pending = null;
226 Buffer previousBuf = null;
227 for (;;) {
228 Object entry = bufAndListenerPairs.poll();
229 try {
230 if (entry == null) {
231 if (previousBuf != null) {
232 decrementReadableBytes(previousBuf.readableBytes());
233
234
235 ctx.write(previousBuf)
236 .addListener(ctx.channel(), ChannelFutureListeners.FIRE_EXCEPTION_ON_FAILURE);
237 }
238 break;
239 }
240
241 if (entry instanceof Buffer) {
242 if (previousBuf != null) {
243 decrementReadableBytes(previousBuf.readableBytes());
244
245
246 ctx.write(previousBuf)
247 .addListener(ctx.channel(), ChannelFutureListeners.FIRE_EXCEPTION_ON_FAILURE);
248 }
249 previousBuf = (Buffer) entry;
250 } else if (entry instanceof Promise) {
251 decrementReadableBytes(previousBuf.readableBytes());
252 ctx.write(previousBuf).cascadeTo((Promise<? super Void>) entry);
253 previousBuf = null;
254 } else {
255 decrementReadableBytes(previousBuf.readableBytes());
256 ctx.write(previousBuf).addListener((FutureListener<Void>) entry);
257 previousBuf = null;
258 }
259 } catch (Throwable t) {
260 if (pending == null) {
261 pending = t;
262 } else {
263 logger.info("Throwable being suppressed because Throwable {} is already pending", pending, t);
264 }
265 }
266 }
267 if (pending != null) {
268 throw new IllegalStateException(pending);
269 }
270 }
271
272 @Override
273 public String toString() {
274 return "bytes: " + readableBytes + " buffers: " + (size() >> 1);
275 }
276
277
278
279
280 protected abstract Buffer compose(BufferAllocator alloc, Buffer cumulation, Buffer next);
281
282
283
284
285 protected final Buffer composeIntoComposite(BufferAllocator alloc, Buffer cumulation, Buffer next) {
286
287
288 return alloc.compose(List.of(cumulation.send(), next.send()));
289 }
290
291
292
293
294
295
296
297
298
299 protected final Buffer copyAndCompose(BufferAllocator alloc, Buffer cumulation, Buffer next, int minIncrement) {
300 try (cumulation; next) {
301 int sum = cumulation.readableBytes() + Math.max(minIncrement, next.readableBytes());
302 return alloc.allocate(sum).writeBytes(cumulation).writeBytes(next);
303 }
304 }
305
306
307
308
309
310 protected Buffer composeFirst(BufferAllocator allocator, Buffer first) {
311 return first;
312 }
313
314
315
316
317
318 protected abstract Buffer removeEmptyValue();
319
320
321
322
323
324 protected final int size() {
325 return bufAndListenerPairs.size();
326 }
327
328 private void releaseAndCompleteAll(Future<Void> future) {
329 Throwable pending = null;
330 for (;;) {
331 Object entry = bufAndListenerPairs.poll();
332 if (entry == null) {
333 break;
334 }
335 try {
336 if (entry instanceof Buffer) {
337 Buffer buffer = (Buffer) entry;
338 decrementReadableBytes(buffer.readableBytes());
339 SilentDispose.dispose(buffer, logger);
340 } else {
341 ((FutureListener<Void>) entry).operationComplete(future);
342 }
343 } catch (Throwable t) {
344 if (pending == null) {
345 pending = t;
346 } else {
347 logger.info("Throwable being suppressed because Throwable {} is already pending", pending, t);
348 }
349 }
350 }
351 if (pending != null) {
352 throw new IllegalStateException(pending);
353 }
354 }
355
356 private void incrementReadableBytes(int increment) {
357 int nextReadableBytes = readableBytes + increment;
358 if (nextReadableBytes < readableBytes) {
359 throw new IllegalStateException("buffer queue length overflow: " + readableBytes + " + " + increment);
360 }
361 readableBytes = nextReadableBytes;
362 }
363
364 private void decrementReadableBytes(int decrement) {
365 readableBytes -= decrement;
366 assert readableBytes >= 0;
367 }
368 }