1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package io.netty.channel;
16
17 import io.netty.buffer.ByteBuf;
18 import io.netty.buffer.ByteBufAllocator;
19 import io.netty.buffer.CompositeByteBuf;
20 import io.netty.util.internal.UnstableApi;
21 import io.netty.util.internal.logging.InternalLogger;
22 import io.netty.util.internal.logging.InternalLoggerFactory;
23
24 import java.util.ArrayDeque;
25
26 import static io.netty.util.ReferenceCountUtil.safeRelease;
27 import static io.netty.util.internal.ObjectUtil.checkNotNull;
28 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
29 import static io.netty.util.internal.PlatformDependent.throwException;
30
31 @UnstableApi
32 public abstract class AbstractCoalescingBufferQueue {
33 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractCoalescingBufferQueue.class);
34 private final ArrayDeque<Object> bufAndListenerPairs;
35 private final PendingBytesTracker tracker;
36 private int readableBytes;
37
38
39
40
41
42
43
44
45 protected AbstractCoalescingBufferQueue(Channel channel, int initSize) {
46 bufAndListenerPairs = new ArrayDeque<Object>(initSize);
47 tracker = channel == null ? null : PendingBytesTracker.newTracker(channel);
48 }
49
50
51
52
53
54
55
56 public final void addFirst(ByteBuf buf, ChannelPromise promise) {
57 addFirst(buf, toChannelFutureListener(promise));
58 }
59
60 private void addFirst(ByteBuf buf, ChannelFutureListener listener) {
61
62 buf.touch();
63
64 if (listener != null) {
65 bufAndListenerPairs.addFirst(listener);
66 }
67 bufAndListenerPairs.addFirst(buf);
68 incrementReadableBytes(buf.readableBytes());
69 }
70
71
72
73
74 public final void add(ByteBuf buf) {
75 add(buf, (ChannelFutureListener) null);
76 }
77
78
79
80
81
82
83
84 public final void add(ByteBuf buf, ChannelPromise promise) {
85
86
87 add(buf, toChannelFutureListener(promise));
88 }
89
90
91
92
93
94
95
96 public final void add(ByteBuf buf, ChannelFutureListener listener) {
97
98 buf.touch();
99
100
101
102 bufAndListenerPairs.add(buf);
103 if (listener != null) {
104 bufAndListenerPairs.add(listener);
105 }
106 incrementReadableBytes(buf.readableBytes());
107 }
108
109
110
111
112
113
114 public final ByteBuf removeFirst(ChannelPromise aggregatePromise) {
115 Object entry = bufAndListenerPairs.poll();
116 if (entry == null) {
117 return null;
118 }
119 assert entry instanceof ByteBuf;
120 ByteBuf result = (ByteBuf) entry;
121
122 decrementReadableBytes(result.readableBytes());
123
124 entry = bufAndListenerPairs.peek();
125 if (entry instanceof ChannelFutureListener) {
126 aggregatePromise.addListener((ChannelFutureListener) entry);
127 bufAndListenerPairs.poll();
128 }
129 return result;
130 }
131
132
133
134
135
136
137
138
139
140
141
142
143 public final ByteBuf remove(ByteBufAllocator alloc, int bytes, ChannelPromise aggregatePromise) {
144 checkPositiveOrZero(bytes, "bytes");
145 checkNotNull(aggregatePromise, "aggregatePromise");
146
147
148 if (bufAndListenerPairs.isEmpty()) {
149 assert readableBytes == 0;
150 return removeEmptyValue();
151 }
152 bytes = Math.min(bytes, readableBytes);
153
154 ByteBuf toReturn = null;
155 ByteBuf entryBuffer = null;
156 int originalBytes = bytes;
157 Object entry = null;
158 try {
159 for (;;) {
160 entry = bufAndListenerPairs.poll();
161 if (entry == null) {
162 break;
163 }
164
165 if (entry instanceof ByteBuf) {
166 entryBuffer = (ByteBuf) entry;
167 int bufferBytes = entryBuffer.readableBytes();
168
169 if (bufferBytes > bytes) {
170
171 bufAndListenerPairs.addFirst(entryBuffer);
172 if (bytes > 0) {
173
174 entryBuffer = entryBuffer.readRetainedSlice(bytes);
175
176 toReturn = toReturn == null ? entryBuffer
177 : compose(alloc, toReturn, entryBuffer);
178 bytes = 0;
179 }
180 break;
181 }
182
183 bytes -= bufferBytes;
184 if (toReturn == null) {
185
186 toReturn = bytes == 0
187 ? entryBuffer
188 : composeFirst(alloc, entryBuffer, bufferBytes + bytes);
189 } else {
190 toReturn = compose(alloc, toReturn, entryBuffer);
191 }
192 entryBuffer = null;
193 } else if (entry instanceof DelegatingChannelPromiseNotifier) {
194 aggregatePromise.addListener((DelegatingChannelPromiseNotifier) entry);
195 } else if (entry instanceof ChannelFutureListener) {
196 aggregatePromise.addListener((ChannelFutureListener) entry);
197 }
198 }
199 } catch (Throwable cause) {
200
201
202
203 decrementReadableBytes(originalBytes - bytes);
204
205
206 entry = bufAndListenerPairs.peek();
207 if (entry instanceof ChannelFutureListener) {
208 aggregatePromise.addListener((ChannelFutureListener) entry);
209 bufAndListenerPairs.poll();
210 }
211
212 safeRelease(entryBuffer);
213 safeRelease(toReturn);
214 aggregatePromise.setFailure(cause);
215 throwException(cause);
216 }
217 decrementReadableBytes(originalBytes - bytes);
218 return toReturn;
219 }
220
221
222
223
224 public final int readableBytes() {
225 return readableBytes;
226 }
227
228
229
230
231 public final boolean isEmpty() {
232 return bufAndListenerPairs.isEmpty();
233 }
234
235
236
237
238 public final void releaseAndFailAll(ChannelOutboundInvoker invoker, Throwable cause) {
239 releaseAndCompleteAll(invoker.newFailedFuture(cause));
240 }
241
242
243
244
245
246 public final void copyTo(AbstractCoalescingBufferQueue dest) {
247 dest.bufAndListenerPairs.addAll(bufAndListenerPairs);
248 dest.incrementReadableBytes(readableBytes);
249 }
250
251
252
253
254
255 public final void writeAndRemoveAll(ChannelHandlerContext ctx) {
256 Throwable pending = null;
257 ByteBuf previousBuf = null;
258 for (;;) {
259 Object entry = bufAndListenerPairs.poll();
260 try {
261 if (entry == null) {
262 if (previousBuf != null) {
263 decrementReadableBytes(previousBuf.readableBytes());
264 ctx.write(previousBuf, ctx.voidPromise());
265 }
266 break;
267 }
268
269 if (entry instanceof ByteBuf) {
270 if (previousBuf != null) {
271 decrementReadableBytes(previousBuf.readableBytes());
272 ctx.write(previousBuf, ctx.voidPromise());
273 }
274 previousBuf = (ByteBuf) entry;
275 } else if (entry instanceof ChannelPromise) {
276 decrementReadableBytes(previousBuf.readableBytes());
277 ctx.write(previousBuf, (ChannelPromise) entry);
278 previousBuf = null;
279 } else {
280 decrementReadableBytes(previousBuf.readableBytes());
281 ctx.write(previousBuf).addListener((ChannelFutureListener) entry);
282 previousBuf = null;
283 }
284 } catch (Throwable t) {
285 if (pending == null) {
286 pending = t;
287 } else {
288 logger.info("Throwable being suppressed because Throwable {} is already pending", pending, t);
289 }
290 }
291 }
292 if (pending != null) {
293 throw new IllegalStateException(pending);
294 }
295 }
296
297 @Override
298 public String toString() {
299 return "bytes: " + readableBytes + " buffers: " + (size() >> 1);
300 }
301
302
303
304
305 protected abstract ByteBuf compose(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf next);
306
307
308
309
310 protected final ByteBuf composeIntoComposite(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf next) {
311
312
313 CompositeByteBuf composite = alloc.compositeBuffer(size() + 2);
314 try {
315 composite.addComponent(true, cumulation);
316 composite.addComponent(true, next);
317 } catch (Throwable cause) {
318 composite.release();
319 safeRelease(next);
320 throwException(cause);
321 }
322 return composite;
323 }
324
325
326
327
328
329
330
331
332 protected final ByteBuf copyAndCompose(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf next) {
333 ByteBuf newCumulation = alloc.ioBuffer(cumulation.readableBytes() + next.readableBytes());
334 try {
335 newCumulation.writeBytes(cumulation).writeBytes(next);
336 } catch (Throwable cause) {
337 newCumulation.release();
338 safeRelease(next);
339 throwException(cause);
340 }
341 cumulation.release();
342 next.release();
343 return newCumulation;
344 }
345
346
347
348
349
350
351
352 protected ByteBuf composeFirst(ByteBufAllocator allocator, ByteBuf first, int bufferSize) {
353 return composeFirst(allocator, first);
354 }
355
356
357
358
359
360
361
362
363 @Deprecated
364 protected ByteBuf composeFirst(ByteBufAllocator allocator, ByteBuf first) {
365 return first;
366 }
367
368
369
370
371
372 protected abstract ByteBuf removeEmptyValue();
373
374
375
376
377
378 protected final int size() {
379 return bufAndListenerPairs.size();
380 }
381
382 private void releaseAndCompleteAll(ChannelFuture future) {
383 Throwable pending = null;
384 for (;;) {
385 Object entry = bufAndListenerPairs.poll();
386 if (entry == null) {
387 break;
388 }
389 try {
390 if (entry instanceof ByteBuf) {
391 ByteBuf buffer = (ByteBuf) entry;
392 decrementReadableBytes(buffer.readableBytes());
393 safeRelease(buffer);
394 } else {
395 ((ChannelFutureListener) entry).operationComplete(future);
396 }
397 } catch (Throwable t) {
398 if (pending == null) {
399 pending = t;
400 } else {
401 logger.info("Throwable being suppressed because Throwable {} is already pending", pending, t);
402 }
403 }
404 }
405 if (pending != null) {
406 throw new IllegalStateException(pending);
407 }
408 }
409
410 private void incrementReadableBytes(int increment) {
411 int nextReadableBytes = readableBytes + increment;
412 if (nextReadableBytes < readableBytes) {
413 throw new IllegalStateException("buffer queue length overflow: " + readableBytes + " + " + increment);
414 }
415 readableBytes = nextReadableBytes;
416 if (tracker != null) {
417 tracker.incrementPendingOutboundBytes(increment);
418 }
419 }
420
421 private void decrementReadableBytes(int decrement) {
422 readableBytes -= decrement;
423 assert readableBytes >= 0;
424 if (tracker != null) {
425 tracker.decrementPendingOutboundBytes(decrement);
426 }
427 }
428
429 private static ChannelFutureListener toChannelFutureListener(ChannelPromise promise) {
430 return promise.isVoid() ? null : new DelegatingChannelPromiseNotifier(promise);
431 }
432 }