1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package io.netty.channel;
16
17 import io.netty.buffer.ByteBuf;
18 import io.netty.buffer.ByteBufAllocator;
19 import io.netty.buffer.CompositeByteBuf;
20 import io.netty.util.internal.UnstableApi;
21 import io.netty.util.internal.logging.InternalLogger;
22 import io.netty.util.internal.logging.InternalLoggerFactory;
23
24 import java.util.ArrayDeque;
25
26 import static io.netty.util.ReferenceCountUtil.safeRelease;
27 import static io.netty.util.internal.ObjectUtil.checkNotNull;
28 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
29 import static io.netty.util.internal.PlatformDependent.throwException;
30
31 @UnstableApi
32 public abstract class AbstractCoalescingBufferQueue {
33 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractCoalescingBufferQueue.class);
34 private final ArrayDeque<Object> bufAndListenerPairs;
35 private final PendingBytesTracker tracker;
36 private int readableBytes;
37
38
39
40
41
42
43
44
45 protected AbstractCoalescingBufferQueue(Channel channel, int initSize) {
46 bufAndListenerPairs = new ArrayDeque<Object>(initSize);
47 tracker = channel == null ? null : PendingBytesTracker.newTracker(channel);
48 }
49
50
51
52
53
54
55
56 public final void addFirst(ByteBuf buf, ChannelPromise promise) {
57 addFirst(buf, toChannelFutureListener(promise));
58 }
59
60 private void addFirst(ByteBuf buf, ChannelFutureListener listener) {
61 if (listener != null) {
62 bufAndListenerPairs.addFirst(listener);
63 }
64 bufAndListenerPairs.addFirst(buf);
65 incrementReadableBytes(buf.readableBytes());
66 }
67
68
69
70
71 public final void add(ByteBuf buf) {
72 add(buf, (ChannelFutureListener) null);
73 }
74
75
76
77
78
79
80
81 public final void add(ByteBuf buf, ChannelPromise promise) {
82
83
84 add(buf, toChannelFutureListener(promise));
85 }
86
87
88
89
90
91
92
93 public final void add(ByteBuf buf, ChannelFutureListener listener) {
94
95
96 bufAndListenerPairs.add(buf);
97 if (listener != null) {
98 bufAndListenerPairs.add(listener);
99 }
100 incrementReadableBytes(buf.readableBytes());
101 }
102
103
104
105
106
107
108 public final ByteBuf removeFirst(ChannelPromise aggregatePromise) {
109 Object entry = bufAndListenerPairs.poll();
110 if (entry == null) {
111 return null;
112 }
113 assert entry instanceof ByteBuf;
114 ByteBuf result = (ByteBuf) entry;
115
116 decrementReadableBytes(result.readableBytes());
117
118 entry = bufAndListenerPairs.peek();
119 if (entry instanceof ChannelFutureListener) {
120 aggregatePromise.addListener((ChannelFutureListener) entry);
121 bufAndListenerPairs.poll();
122 }
123 return result;
124 }
125
126
127
128
129
130
131
132
133
134
135
136
137 public final ByteBuf remove(ByteBufAllocator alloc, int bytes, ChannelPromise aggregatePromise) {
138 checkPositiveOrZero(bytes, "bytes");
139 checkNotNull(aggregatePromise, "aggregatePromise");
140
141
142 if (bufAndListenerPairs.isEmpty()) {
143 assert readableBytes == 0;
144 return removeEmptyValue();
145 }
146 bytes = Math.min(bytes, readableBytes);
147
148 ByteBuf toReturn = null;
149 ByteBuf entryBuffer = null;
150 int originalBytes = bytes;
151 try {
152 for (;;) {
153 Object entry = bufAndListenerPairs.poll();
154 if (entry == null) {
155 break;
156 }
157 if (entry instanceof ChannelFutureListener) {
158 aggregatePromise.addListener((ChannelFutureListener) entry);
159 continue;
160 }
161 entryBuffer = (ByteBuf) entry;
162 if (entryBuffer.readableBytes() > bytes) {
163
164 bufAndListenerPairs.addFirst(entryBuffer);
165 if (bytes > 0) {
166
167 entryBuffer = entryBuffer.readRetainedSlice(bytes);
168 toReturn = toReturn == null ? composeFirst(alloc, entryBuffer)
169 : compose(alloc, toReturn, entryBuffer);
170 bytes = 0;
171 }
172 break;
173 } else {
174 bytes -= entryBuffer.readableBytes();
175 toReturn = toReturn == null ? composeFirst(alloc, entryBuffer)
176 : compose(alloc, toReturn, entryBuffer);
177 }
178 entryBuffer = null;
179 }
180 } catch (Throwable cause) {
181 safeRelease(entryBuffer);
182 safeRelease(toReturn);
183 aggregatePromise.setFailure(cause);
184 throwException(cause);
185 }
186 decrementReadableBytes(originalBytes - bytes);
187 return toReturn;
188 }
189
190
191
192
193 public final int readableBytes() {
194 return readableBytes;
195 }
196
197
198
199
200 public final boolean isEmpty() {
201 return bufAndListenerPairs.isEmpty();
202 }
203
204
205
206
207 public final void releaseAndFailAll(ChannelOutboundInvoker invoker, Throwable cause) {
208 releaseAndCompleteAll(invoker.newFailedFuture(cause));
209 }
210
211
212
213
214
215 public final void copyTo(AbstractCoalescingBufferQueue dest) {
216 dest.bufAndListenerPairs.addAll(bufAndListenerPairs);
217 dest.incrementReadableBytes(readableBytes);
218 }
219
220
221
222
223
224 public final void writeAndRemoveAll(ChannelHandlerContext ctx) {
225 Throwable pending = null;
226 ByteBuf previousBuf = null;
227 for (;;) {
228 Object entry = bufAndListenerPairs.poll();
229 try {
230 if (entry == null) {
231 if (previousBuf != null) {
232 decrementReadableBytes(previousBuf.readableBytes());
233 ctx.write(previousBuf, ctx.voidPromise());
234 }
235 break;
236 }
237
238 if (entry instanceof ByteBuf) {
239 if (previousBuf != null) {
240 decrementReadableBytes(previousBuf.readableBytes());
241 ctx.write(previousBuf, ctx.voidPromise());
242 }
243 previousBuf = (ByteBuf) entry;
244 } else if (entry instanceof ChannelPromise) {
245 decrementReadableBytes(previousBuf.readableBytes());
246 ctx.write(previousBuf, (ChannelPromise) entry);
247 previousBuf = null;
248 } else {
249 decrementReadableBytes(previousBuf.readableBytes());
250 ctx.write(previousBuf).addListener((ChannelFutureListener) entry);
251 previousBuf = null;
252 }
253 } catch (Throwable t) {
254 if (pending == null) {
255 pending = t;
256 } else {
257 logger.info("Throwable being suppressed because Throwable {} is already pending", pending, t);
258 }
259 }
260 }
261 if (pending != null) {
262 throw new IllegalStateException(pending);
263 }
264 }
265
266 @Override
267 public String toString() {
268 return "bytes: " + readableBytes + " buffers: " + (size() >> 1);
269 }
270
271
272
273
274 protected abstract ByteBuf compose(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf next);
275
276
277
278
279 protected final ByteBuf composeIntoComposite(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf next) {
280
281
282 CompositeByteBuf composite = alloc.compositeBuffer(size() + 2);
283 try {
284 composite.addComponent(true, cumulation);
285 composite.addComponent(true, next);
286 } catch (Throwable cause) {
287 composite.release();
288 safeRelease(next);
289 throwException(cause);
290 }
291 return composite;
292 }
293
294
295
296
297
298
299
300
301 protected final ByteBuf copyAndCompose(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf next) {
302 ByteBuf newCumulation = alloc.ioBuffer(cumulation.readableBytes() + next.readableBytes());
303 try {
304 newCumulation.writeBytes(cumulation).writeBytes(next);
305 } catch (Throwable cause) {
306 newCumulation.release();
307 safeRelease(next);
308 throwException(cause);
309 }
310 cumulation.release();
311 next.release();
312 return newCumulation;
313 }
314
315
316
317
318
319 protected ByteBuf composeFirst(ByteBufAllocator allocator, ByteBuf first) {
320 return first;
321 }
322
323
324
325
326
327 protected abstract ByteBuf removeEmptyValue();
328
329
330
331
332
333 protected final int size() {
334 return bufAndListenerPairs.size();
335 }
336
337 private void releaseAndCompleteAll(ChannelFuture future) {
338 Throwable pending = null;
339 for (;;) {
340 Object entry = bufAndListenerPairs.poll();
341 if (entry == null) {
342 break;
343 }
344 try {
345 if (entry instanceof ByteBuf) {
346 ByteBuf buffer = (ByteBuf) entry;
347 decrementReadableBytes(buffer.readableBytes());
348 safeRelease(buffer);
349 } else {
350 ((ChannelFutureListener) entry).operationComplete(future);
351 }
352 } catch (Throwable t) {
353 if (pending == null) {
354 pending = t;
355 } else {
356 logger.info("Throwable being suppressed because Throwable {} is already pending", pending, t);
357 }
358 }
359 }
360 if (pending != null) {
361 throw new IllegalStateException(pending);
362 }
363 }
364
365 private void incrementReadableBytes(int increment) {
366 int nextReadableBytes = readableBytes + increment;
367 if (nextReadableBytes < readableBytes) {
368 throw new IllegalStateException("buffer queue length overflow: " + readableBytes + " + " + increment);
369 }
370 readableBytes = nextReadableBytes;
371 if (tracker != null) {
372 tracker.incrementPendingOutboundBytes(increment);
373 }
374 }
375
376 private void decrementReadableBytes(int decrement) {
377 readableBytes -= decrement;
378 assert readableBytes >= 0;
379 if (tracker != null) {
380 tracker.decrementPendingOutboundBytes(decrement);
381 }
382 }
383
384 private static ChannelFutureListener toChannelFutureListener(ChannelPromise promise) {
385 return promise.isVoid() ? null : new DelegatingChannelPromiseNotifier(promise);
386 }
387 }