View Javadoc
1   /*
2    * Copyright 2017 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.channel;
16  
17  import io.netty.buffer.ByteBuf;
18  import io.netty.buffer.ByteBufAllocator;
19  import io.netty.buffer.CompositeByteBuf;
20  import io.netty.util.internal.UnstableApi;
21  import io.netty.util.internal.logging.InternalLogger;
22  import io.netty.util.internal.logging.InternalLoggerFactory;
23  
24  import java.util.ArrayDeque;
25  
26  import static io.netty.util.ReferenceCountUtil.safeRelease;
27  import static io.netty.util.internal.ObjectUtil.checkNotNull;
28  import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
29  import static io.netty.util.internal.PlatformDependent.throwException;
30  
31  @UnstableApi
32  public abstract class AbstractCoalescingBufferQueue {
33      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractCoalescingBufferQueue.class);
34      private final ArrayDeque<Object> bufAndListenerPairs;
35      private final PendingBytesTracker tracker;
36      private int readableBytes;
37  
38      /**
39       * Create a new instance.
40       *
41       * @param channel the {@link Channel} which will have the {@link Channel#isWritable()} reflect the amount of queued
42       *                buffers or {@code null} if there is no writability state updated.
43       * @param initSize the initial size of the underlying queue.
44       */
45      protected AbstractCoalescingBufferQueue(Channel channel, int initSize) {
46          bufAndListenerPairs = new ArrayDeque<Object>(initSize);
47          tracker = channel == null ? null : PendingBytesTracker.newTracker(channel);
48      }
49  
50      /**
51       * Add a buffer to the front of the queue and associate a promise with it that should be completed when
52       * all the buffer's bytes have been consumed from the queue and written.
53       * @param buf to add to the head of the queue
54       * @param promise to complete when all the bytes have been consumed and written, can be void.
55       */
56      public final void addFirst(ByteBuf buf, ChannelPromise promise) {
57          addFirst(buf, toChannelFutureListener(promise));
58      }
59  
60      private void addFirst(ByteBuf buf, ChannelFutureListener listener) {
61          // Touch the message to make it easier to debug buffer leaks.
62          buf.touch();
63  
64          if (listener != null) {
65              bufAndListenerPairs.addFirst(listener);
66          }
67          bufAndListenerPairs.addFirst(buf);
68          incrementReadableBytes(buf.readableBytes());
69      }
70  
71      /**
72       * Add a buffer to the end of the queue.
73       */
74      public final void add(ByteBuf buf) {
75          add(buf, (ChannelFutureListener) null);
76      }
77  
78      /**
79       * Add a buffer to the end of the queue and associate a promise with it that should be completed when
80       * all the buffer's bytes have been consumed from the queue and written.
81       * @param buf to add to the tail of the queue
82       * @param promise to complete when all the bytes have been consumed and written, can be void.
83       */
84      public final void add(ByteBuf buf, ChannelPromise promise) {
85          // buffers are added before promises so that we naturally 'consume' the entire buffer during removal
86          // before we complete it's promise.
87          add(buf, toChannelFutureListener(promise));
88      }
89  
90      /**
91       * Add a buffer to the end of the queue and associate a listener with it that should be completed when
92       * all the buffers  bytes have been consumed from the queue and written.
93       * @param buf to add to the tail of the queue
94       * @param listener to notify when all the bytes have been consumed and written, can be {@code null}.
95       */
96      public final void add(ByteBuf buf, ChannelFutureListener listener) {
97          // Touch the message to make it easier to debug buffer leaks.
98          buf.touch();
99  
100         // buffers are added before promises so that we naturally 'consume' the entire buffer during removal
101         // before we complete it's promise.
102         bufAndListenerPairs.add(buf);
103         if (listener != null) {
104             bufAndListenerPairs.add(listener);
105         }
106         incrementReadableBytes(buf.readableBytes());
107     }
108 
109     /**
110      * Remove the first {@link ByteBuf} from the queue.
111      * @param aggregatePromise used to aggregate the promises and listeners for the returned buffer.
112      * @return the first {@link ByteBuf} from the queue.
113      */
114     public final ByteBuf removeFirst(ChannelPromise aggregatePromise) {
115         Object entry = bufAndListenerPairs.poll();
116         if (entry == null) {
117             return null;
118         }
119         assert entry instanceof ByteBuf;
120         ByteBuf result = (ByteBuf) entry;
121 
122         decrementReadableBytes(result.readableBytes());
123 
124         entry = bufAndListenerPairs.peek();
125         if (entry instanceof ChannelFutureListener) {
126             aggregatePromise.addListener((ChannelFutureListener) entry);
127             bufAndListenerPairs.poll();
128         }
129         return result;
130     }
131 
132     /**
133      * Remove a {@link ByteBuf} from the queue with the specified number of bytes. Any added buffer who's bytes are
134      * fully consumed during removal will have it's promise completed when the passed aggregate {@link ChannelPromise}
135      * completes.
136      *
137      * @param alloc The allocator used if a new {@link ByteBuf} is generated during the aggregation process.
138      * @param bytes the maximum number of readable bytes in the returned {@link ByteBuf}, if {@code bytes} is greater
139      *              than {@link #readableBytes} then a buffer of length {@link #readableBytes} is returned.
140      * @param aggregatePromise used to aggregate the promises and listeners for the constituent buffers.
141      * @return a {@link ByteBuf} composed of the enqueued buffers.
142      */
143     public final ByteBuf remove(ByteBufAllocator alloc, int bytes, ChannelPromise aggregatePromise) {
144         checkPositiveOrZero(bytes, "bytes");
145         checkNotNull(aggregatePromise, "aggregatePromise");
146 
147         // Use isEmpty rather than readableBytes==0 as we may have a promise associated with an empty buffer.
148         if (bufAndListenerPairs.isEmpty()) {
149             assert readableBytes == 0;
150             return removeEmptyValue();
151         }
152         bytes = Math.min(bytes, readableBytes);
153 
154         ByteBuf toReturn = null;
155         ByteBuf entryBuffer = null;
156         int originalBytes = bytes;
157         try {
158             for (;;) {
159                 Object entry = bufAndListenerPairs.poll();
160                 if (entry == null) {
161                     break;
162                 }
163                 // fast-path vs abstract type
164                 if (entry instanceof ByteBuf) {
165                     entryBuffer = (ByteBuf) entry;
166                     int bufferBytes = entryBuffer.readableBytes();
167 
168                     if (bufferBytes > bytes) {
169                         // Add the buffer back to the queue as we can't consume all of it.
170                         bufAndListenerPairs.addFirst(entryBuffer);
171                         if (bytes > 0) {
172                             // Take a slice of what we can consume and retain it.
173                             entryBuffer = entryBuffer.readRetainedSlice(bytes);
174                             // we end here, so if this is the only buffer to return, skip composing
175                             toReturn = toReturn == null ? entryBuffer
176                                     : compose(alloc, toReturn, entryBuffer);
177                             bytes = 0;
178                         }
179                         break;
180                     }
181 
182                     bytes -= bufferBytes;
183                     if (toReturn == null) {
184                         // if there are no more bytes in the queue after this, there's no reason to compose
185                         toReturn = bufferBytes == readableBytes
186                                 ? entryBuffer
187                                 : composeFirst(alloc, entryBuffer);
188                     } else {
189                         toReturn = compose(alloc, toReturn, entryBuffer);
190                     }
191                     entryBuffer = null;
192                 } else if (entry instanceof DelegatingChannelPromiseNotifier) {
193                     aggregatePromise.addListener((DelegatingChannelPromiseNotifier) entry);
194                 } else if (entry instanceof ChannelFutureListener) {
195                     aggregatePromise.addListener((ChannelFutureListener) entry);
196                 }
197             }
198         } catch (Throwable cause) {
199             safeRelease(entryBuffer);
200             safeRelease(toReturn);
201             aggregatePromise.setFailure(cause);
202             throwException(cause);
203         }
204         decrementReadableBytes(originalBytes - bytes);
205         return toReturn;
206     }
207 
208     /**
209      * The number of readable bytes.
210      */
211     public final int readableBytes() {
212         return readableBytes;
213     }
214 
215     /**
216      * Are there pending buffers in the queue.
217      */
218     public final boolean isEmpty() {
219         return bufAndListenerPairs.isEmpty();
220     }
221 
222     /**
223      *  Release all buffers in the queue and complete all listeners and promises.
224      */
225     public final void releaseAndFailAll(ChannelOutboundInvoker invoker, Throwable cause) {
226         releaseAndCompleteAll(invoker.newFailedFuture(cause));
227     }
228 
229     /**
230      * Copy all pending entries in this queue into the destination queue.
231      * @param dest to copy pending buffers to.
232      */
233     public final void copyTo(AbstractCoalescingBufferQueue dest) {
234         dest.bufAndListenerPairs.addAll(bufAndListenerPairs);
235         dest.incrementReadableBytes(readableBytes);
236     }
237 
238     /**
239      * Writes all remaining elements in this queue.
240      * @param ctx The context to write all elements to.
241      */
242     public final void writeAndRemoveAll(ChannelHandlerContext ctx) {
243         Throwable pending = null;
244         ByteBuf previousBuf = null;
245         for (;;) {
246             Object entry = bufAndListenerPairs.poll();
247             try {
248                 if (entry == null) {
249                     if (previousBuf != null) {
250                         decrementReadableBytes(previousBuf.readableBytes());
251                         ctx.write(previousBuf, ctx.voidPromise());
252                     }
253                     break;
254                 }
255 
256                 if (entry instanceof ByteBuf) {
257                     if (previousBuf != null) {
258                         decrementReadableBytes(previousBuf.readableBytes());
259                         ctx.write(previousBuf, ctx.voidPromise());
260                     }
261                     previousBuf = (ByteBuf) entry;
262                 } else if (entry instanceof ChannelPromise) {
263                     decrementReadableBytes(previousBuf.readableBytes());
264                     ctx.write(previousBuf, (ChannelPromise) entry);
265                     previousBuf = null;
266                 } else {
267                     decrementReadableBytes(previousBuf.readableBytes());
268                     ctx.write(previousBuf).addListener((ChannelFutureListener) entry);
269                     previousBuf = null;
270                 }
271             } catch (Throwable t) {
272                 if (pending == null) {
273                     pending = t;
274                 } else {
275                     logger.info("Throwable being suppressed because Throwable {} is already pending", pending, t);
276                 }
277             }
278         }
279         if (pending != null) {
280             throw new IllegalStateException(pending);
281         }
282     }
283 
284     @Override
285     public String toString() {
286         return "bytes: " + readableBytes + " buffers: " + (size() >> 1);
287     }
288 
289     /**
290      * Calculate the result of {@code current + next}.
291      */
292     protected abstract ByteBuf compose(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf next);
293 
294     /**
295      * Compose {@code cumulation} and {@code next} into a new {@link CompositeByteBuf}.
296      */
297     protected final ByteBuf composeIntoComposite(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf next) {
298         // Create a composite buffer to accumulate this pair and potentially all the buffers
299         // in the queue. Using +2 as we have already dequeued current and next.
300         CompositeByteBuf composite = alloc.compositeBuffer(size() + 2);
301         try {
302             composite.addComponent(true, cumulation);
303             composite.addComponent(true, next);
304         } catch (Throwable cause) {
305             composite.release();
306             safeRelease(next);
307             throwException(cause);
308         }
309         return composite;
310     }
311 
312     /**
313      * Compose {@code cumulation} and {@code next} into a new {@link ByteBufAllocator#ioBuffer()}.
314      * @param alloc The allocator to use to allocate the new buffer.
315      * @param cumulation The current cumulation.
316      * @param next The next buffer.
317      * @return The result of {@code cumulation + next}.
318      */
319     protected final ByteBuf copyAndCompose(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf next) {
320         ByteBuf newCumulation = alloc.ioBuffer(cumulation.readableBytes() + next.readableBytes());
321         try {
322             newCumulation.writeBytes(cumulation).writeBytes(next);
323         } catch (Throwable cause) {
324             newCumulation.release();
325             safeRelease(next);
326             throwException(cause);
327         }
328         cumulation.release();
329         next.release();
330         return newCumulation;
331     }
332 
333     /**
334      * Calculate the first {@link ByteBuf} which will be used in subsequent calls to
335      * {@link #compose(ByteBufAllocator, ByteBuf, ByteBuf)}.
336      */
337     protected ByteBuf composeFirst(ByteBufAllocator allocator, ByteBuf first) {
338         return first;
339     }
340 
341     /**
342      * The value to return when {@link #remove(ByteBufAllocator, int, ChannelPromise)} is called but the queue is empty.
343      * @return the {@link ByteBuf} which represents an empty queue.
344      */
345     protected abstract ByteBuf removeEmptyValue();
346 
347     /**
348      * Get the number of elements in this queue added via one of the {@link #add(ByteBuf)} methods.
349      * @return the number of elements in this queue.
350      */
351     protected final int size() {
352         return bufAndListenerPairs.size();
353     }
354 
355     private void releaseAndCompleteAll(ChannelFuture future) {
356         Throwable pending = null;
357         for (;;) {
358             Object entry = bufAndListenerPairs.poll();
359             if (entry == null) {
360                 break;
361             }
362             try {
363                 if (entry instanceof ByteBuf) {
364                     ByteBuf buffer = (ByteBuf) entry;
365                     decrementReadableBytes(buffer.readableBytes());
366                     safeRelease(buffer);
367                 } else {
368                     ((ChannelFutureListener) entry).operationComplete(future);
369                 }
370             } catch (Throwable t) {
371                 if (pending == null) {
372                     pending = t;
373                 } else {
374                     logger.info("Throwable being suppressed because Throwable {} is already pending", pending, t);
375                 }
376             }
377         }
378         if (pending != null) {
379             throw new IllegalStateException(pending);
380         }
381     }
382 
383     private void incrementReadableBytes(int increment) {
384         int nextReadableBytes = readableBytes + increment;
385         if (nextReadableBytes < readableBytes) {
386             throw new IllegalStateException("buffer queue length overflow: " + readableBytes + " + " + increment);
387         }
388         readableBytes = nextReadableBytes;
389         if (tracker != null) {
390             tracker.incrementPendingOutboundBytes(increment);
391         }
392     }
393 
394     private void decrementReadableBytes(int decrement) {
395         readableBytes -= decrement;
396         assert readableBytes >= 0;
397         if (tracker != null) {
398             tracker.decrementPendingOutboundBytes(decrement);
399         }
400     }
401 
402     private static ChannelFutureListener toChannelFutureListener(ChannelPromise promise) {
403         return promise.isVoid() ? null : new DelegatingChannelPromiseNotifier(promise);
404     }
405 }