View Javadoc
1   /*
2    * Copyright 2017 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.channel;
16  
17  import io.netty.buffer.ByteBuf;
18  import io.netty.buffer.ByteBufAllocator;
19  import io.netty.buffer.CompositeByteBuf;
20  import io.netty.util.internal.UnstableApi;
21  import io.netty.util.internal.logging.InternalLogger;
22  import io.netty.util.internal.logging.InternalLoggerFactory;
23  
24  import java.util.ArrayDeque;
25  
26  import static io.netty.util.ReferenceCountUtil.safeRelease;
27  import static io.netty.util.internal.ObjectUtil.checkNotNull;
28  import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
29  import static io.netty.util.internal.PlatformDependent.throwException;
30  
31  @UnstableApi
32  public abstract class AbstractCoalescingBufferQueue {
33      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractCoalescingBufferQueue.class);
34      private final ArrayDeque<Object> bufAndListenerPairs;
35      private final PendingBytesTracker tracker;
36      private int readableBytes;
37  
38      /**
39       * Create a new instance.
40       *
41       * @param channel the {@link Channel} which will have the {@link Channel#isWritable()} reflect the amount of queued
42       *                buffers or {@code null} if there is no writability state updated.
43       * @param initSize the initial size of the underlying queue.
44       */
45      protected AbstractCoalescingBufferQueue(Channel channel, int initSize) {
46          bufAndListenerPairs = new ArrayDeque<Object>(initSize);
47          tracker = channel == null ? null : PendingBytesTracker.newTracker(channel);
48      }
49  
50      /**
51       * Add a buffer to the front of the queue and associate a promise with it that should be completed when
52       * all the buffer's bytes have been consumed from the queue and written.
53       * @param buf to add to the head of the queue
54       * @param promise to complete when all the bytes have been consumed and written, can be void.
55       */
56      public final void addFirst(ByteBuf buf, ChannelPromise promise) {
57          addFirst(buf, toChannelFutureListener(promise));
58      }
59  
60      private void addFirst(ByteBuf buf, ChannelFutureListener listener) {
61          // Touch the message to make it easier to debug buffer leaks.
62          buf.touch();
63  
64          if (listener != null) {
65              bufAndListenerPairs.addFirst(listener);
66          }
67          bufAndListenerPairs.addFirst(buf);
68          incrementReadableBytes(buf.readableBytes());
69      }
70  
71      /**
72       * Add a buffer to the end of the queue.
73       */
74      public final void add(ByteBuf buf) {
75          add(buf, (ChannelFutureListener) null);
76      }
77  
78      /**
79       * Add a buffer to the end of the queue and associate a promise with it that should be completed when
80       * all the buffer's bytes have been consumed from the queue and written.
81       * @param buf to add to the tail of the queue
82       * @param promise to complete when all the bytes have been consumed and written, can be void.
83       */
84      public final void add(ByteBuf buf, ChannelPromise promise) {
85          // buffers are added before promises so that we naturally 'consume' the entire buffer during removal
86          // before we complete it's promise.
87          add(buf, toChannelFutureListener(promise));
88      }
89  
90      /**
91       * Add a buffer to the end of the queue and associate a listener with it that should be completed when
92       * all the buffers  bytes have been consumed from the queue and written.
93       * @param buf to add to the tail of the queue
94       * @param listener to notify when all the bytes have been consumed and written, can be {@code null}.
95       */
96      public final void add(ByteBuf buf, ChannelFutureListener listener) {
97          // Touch the message to make it easier to debug buffer leaks.
98          buf.touch();
99  
100         // buffers are added before promises so that we naturally 'consume' the entire buffer during removal
101         // before we complete it's promise.
102         bufAndListenerPairs.add(buf);
103         if (listener != null) {
104             bufAndListenerPairs.add(listener);
105         }
106         incrementReadableBytes(buf.readableBytes());
107     }
108 
109     /**
110      * Remove the first {@link ByteBuf} from the queue.
111      * @param aggregatePromise used to aggregate the promises and listeners for the returned buffer.
112      * @return the first {@link ByteBuf} from the queue.
113      */
114     public final ByteBuf removeFirst(ChannelPromise aggregatePromise) {
115         Object entry = bufAndListenerPairs.poll();
116         if (entry == null) {
117             return null;
118         }
119         assert entry instanceof ByteBuf;
120         ByteBuf result = (ByteBuf) entry;
121 
122         decrementReadableBytes(result.readableBytes());
123 
124         entry = bufAndListenerPairs.peek();
125         if (entry instanceof ChannelFutureListener) {
126             aggregatePromise.addListener((ChannelFutureListener) entry);
127             bufAndListenerPairs.poll();
128         }
129         return result;
130     }
131 
132     /**
133      * Remove a {@link ByteBuf} from the queue with the specified number of bytes. Any added buffer who's bytes are
134      * fully consumed during removal will have it's promise completed when the passed aggregate {@link ChannelPromise}
135      * completes.
136      *
137      * @param alloc The allocator used if a new {@link ByteBuf} is generated during the aggregation process.
138      * @param bytes the maximum number of readable bytes in the returned {@link ByteBuf}, if {@code bytes} is greater
139      *              than {@link #readableBytes} then a buffer of length {@link #readableBytes} is returned.
140      * @param aggregatePromise used to aggregate the promises and listeners for the constituent buffers.
141      * @return a {@link ByteBuf} composed of the enqueued buffers.
142      */
143     public final ByteBuf remove(ByteBufAllocator alloc, int bytes, ChannelPromise aggregatePromise) {
144         checkPositiveOrZero(bytes, "bytes");
145         checkNotNull(aggregatePromise, "aggregatePromise");
146 
147         // Use isEmpty rather than readableBytes==0 as we may have a promise associated with an empty buffer.
148         if (bufAndListenerPairs.isEmpty()) {
149             assert readableBytes == 0;
150             return removeEmptyValue();
151         }
152         bytes = Math.min(bytes, readableBytes);
153 
154         ByteBuf toReturn = null;
155         ByteBuf entryBuffer = null;
156         int originalBytes = bytes;
157         Object entry = null;
158         try {
159             for (;;) {
160                 entry = bufAndListenerPairs.poll();
161                 if (entry == null) {
162                     break;
163                 }
164                 // fast-path vs abstract type
165                 if (entry instanceof ByteBuf) {
166                     entryBuffer = (ByteBuf) entry;
167                     int bufferBytes = entryBuffer.readableBytes();
168 
169                     if (bufferBytes > bytes) {
170                         // Add the buffer back to the queue as we can't consume all of it.
171                         bufAndListenerPairs.addFirst(entryBuffer);
172                         if (bytes > 0) {
173                             // Take a slice of what we can consume and retain it.
174                             entryBuffer = entryBuffer.readRetainedSlice(bytes);
175                             // we end here, so if this is the only buffer to return, skip composing
176                             toReturn = toReturn == null ? entryBuffer
177                                     : compose(alloc, toReturn, entryBuffer);
178                             bytes = 0;
179                         }
180                         break;
181                     }
182 
183                     bytes -= bufferBytes;
184                     if (toReturn == null) {
185                         // if there are no more bytes to read, there's no reason to compose
186                         toReturn = bytes == 0
187                                 ? entryBuffer
188                                 : composeFirst(alloc, entryBuffer, bufferBytes + bytes);
189                     } else {
190                         toReturn = compose(alloc, toReturn, entryBuffer);
191                     }
192                     entryBuffer = null;
193                 } else if (entry instanceof DelegatingChannelPromiseNotifier) {
194                     aggregatePromise.addListener((DelegatingChannelPromiseNotifier) entry);
195                 } else if (entry instanceof ChannelFutureListener) {
196                     aggregatePromise.addListener((ChannelFutureListener) entry);
197                 }
198             }
199         } catch (Throwable cause) {
200             // Always decrement to keep things consistent. We decrement directly here and not in a finally-block
201             // to ensure that the state is consistent even if it would be accessed via a listener that is
202             // attached to the promise that we fail below.
203             decrementReadableBytes(originalBytes - bytes);
204 
205             // Poll the next element if it's a listener that belongs to the ByteBuf.
206             entry = bufAndListenerPairs.peek();
207             if (entry instanceof ChannelFutureListener) {
208                 aggregatePromise.addListener((ChannelFutureListener) entry);
209                 bufAndListenerPairs.poll();
210             }
211 
212             safeRelease(entryBuffer);
213             safeRelease(toReturn);
214             aggregatePromise.setFailure(cause);
215             throwException(cause);
216         }
217         decrementReadableBytes(originalBytes - bytes);
218         return toReturn;
219     }
220 
221     /**
222      * The number of readable bytes.
223      */
224     public final int readableBytes() {
225         return readableBytes;
226     }
227 
228     /**
229      * Are there pending buffers in the queue.
230      */
231     public final boolean isEmpty() {
232         return bufAndListenerPairs.isEmpty();
233     }
234 
235     /**
236      *  Release all buffers in the queue and complete all listeners and promises.
237      */
238     public final void releaseAndFailAll(ChannelOutboundInvoker invoker, Throwable cause) {
239         releaseAndCompleteAll(invoker.newFailedFuture(cause));
240     }
241 
242     /**
243      * Copy all pending entries in this queue into the destination queue.
244      * @param dest to copy pending buffers to.
245      */
246     public final void copyTo(AbstractCoalescingBufferQueue dest) {
247         dest.bufAndListenerPairs.addAll(bufAndListenerPairs);
248         dest.incrementReadableBytes(readableBytes);
249     }
250 
251     /**
252      * Writes all remaining elements in this queue.
253      * @param ctx The context to write all elements to.
254      */
255     public final void writeAndRemoveAll(ChannelHandlerContext ctx) {
256         Throwable pending = null;
257         ByteBuf previousBuf = null;
258         for (;;) {
259             Object entry = bufAndListenerPairs.poll();
260             try {
261                 if (entry == null) {
262                     if (previousBuf != null) {
263                         decrementReadableBytes(previousBuf.readableBytes());
264                         ctx.write(previousBuf, ctx.voidPromise());
265                     }
266                     break;
267                 }
268 
269                 if (entry instanceof ByteBuf) {
270                     if (previousBuf != null) {
271                         decrementReadableBytes(previousBuf.readableBytes());
272                         ctx.write(previousBuf, ctx.voidPromise());
273                     }
274                     previousBuf = (ByteBuf) entry;
275                 } else if (entry instanceof ChannelPromise) {
276                     decrementReadableBytes(previousBuf.readableBytes());
277                     ctx.write(previousBuf, (ChannelPromise) entry);
278                     previousBuf = null;
279                 } else {
280                     decrementReadableBytes(previousBuf.readableBytes());
281                     ctx.write(previousBuf).addListener((ChannelFutureListener) entry);
282                     previousBuf = null;
283                 }
284             } catch (Throwable t) {
285                 if (pending == null) {
286                     pending = t;
287                 } else {
288                     logger.info("Throwable being suppressed because Throwable {} is already pending", pending, t);
289                 }
290             }
291         }
292         if (pending != null) {
293             throw new IllegalStateException(pending);
294         }
295     }
296 
297     @Override
298     public String toString() {
299         return "bytes: " + readableBytes + " buffers: " + (size() >> 1);
300     }
301 
302     /**
303      * Calculate the result of {@code current + next}.
304      */
305     protected abstract ByteBuf compose(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf next);
306 
307     /**
308      * Compose {@code cumulation} and {@code next} into a new {@link CompositeByteBuf}.
309      */
310     protected final ByteBuf composeIntoComposite(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf next) {
311         // Create a composite buffer to accumulate this pair and potentially all the buffers
312         // in the queue. Using +2 as we have already dequeued current and next.
313         CompositeByteBuf composite = alloc.compositeBuffer(size() + 2);
314         try {
315             composite.addComponent(true, cumulation);
316             composite.addComponent(true, next);
317         } catch (Throwable cause) {
318             composite.release();
319             safeRelease(next);
320             throwException(cause);
321         }
322         return composite;
323     }
324 
325     /**
326      * Compose {@code cumulation} and {@code next} into a new {@link ByteBufAllocator#ioBuffer()}.
327      * @param alloc The allocator to use to allocate the new buffer.
328      * @param cumulation The current cumulation.
329      * @param next The next buffer.
330      * @return The result of {@code cumulation + next}.
331      */
332     protected final ByteBuf copyAndCompose(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf next) {
333         ByteBuf newCumulation = alloc.ioBuffer(cumulation.readableBytes() + next.readableBytes());
334         try {
335             newCumulation.writeBytes(cumulation).writeBytes(next);
336         } catch (Throwable cause) {
337             newCumulation.release();
338             safeRelease(next);
339             throwException(cause);
340         }
341         cumulation.release();
342         next.release();
343         return newCumulation;
344     }
345 
346     /**
347      * Calculate the first {@link ByteBuf} which will be used in subsequent calls to
348      * {@link #compose(ByteBufAllocator, ByteBuf, ByteBuf)}.
349      * @param bufferSize the optimal size of the buffer needed for cumulation
350      * @return the first buffer
351      */
352     protected ByteBuf composeFirst(ByteBufAllocator allocator, ByteBuf first, int bufferSize) {
353         return composeFirst(allocator, first);
354     }
355 
356     /**
357      * Calculate the first {@link ByteBuf} which will be used in subsequent calls to
358      * {@link #compose(ByteBufAllocator, ByteBuf, ByteBuf)}.
359      * This method is deprecated and will be removed in the future. Implementing classes should
360      * override {@link #composeFirst(ByteBufAllocator, ByteBuf, int)} instead.
361      * @deprecated Use {AbstractCoalescingBufferQueue#composeFirst(ByteBufAllocator, ByteBuf, int)}
362      */
363     @Deprecated
364     protected ByteBuf composeFirst(ByteBufAllocator allocator, ByteBuf first) {
365         return first;
366     }
367 
368     /**
369      * The value to return when {@link #remove(ByteBufAllocator, int, ChannelPromise)} is called but the queue is empty.
370      * @return the {@link ByteBuf} which represents an empty queue.
371      */
372     protected abstract ByteBuf removeEmptyValue();
373 
374     /**
375      * Get the number of elements in this queue added via one of the {@link #add(ByteBuf)} methods.
376      * @return the number of elements in this queue.
377      */
378     protected final int size() {
379         return bufAndListenerPairs.size();
380     }
381 
382     private void releaseAndCompleteAll(ChannelFuture future) {
383         Throwable pending = null;
384         for (;;) {
385             Object entry = bufAndListenerPairs.poll();
386             if (entry == null) {
387                 break;
388             }
389             try {
390                 if (entry instanceof ByteBuf) {
391                     ByteBuf buffer = (ByteBuf) entry;
392                     decrementReadableBytes(buffer.readableBytes());
393                     safeRelease(buffer);
394                 } else {
395                     ((ChannelFutureListener) entry).operationComplete(future);
396                 }
397             } catch (Throwable t) {
398                 if (pending == null) {
399                     pending = t;
400                 } else {
401                     logger.info("Throwable being suppressed because Throwable {} is already pending", pending, t);
402                 }
403             }
404         }
405         if (pending != null) {
406             throw new IllegalStateException(pending);
407         }
408     }
409 
410     private void incrementReadableBytes(int increment) {
411         int nextReadableBytes = readableBytes + increment;
412         if (nextReadableBytes < readableBytes) {
413             throw new IllegalStateException("buffer queue length overflow: " + readableBytes + " + " + increment);
414         }
415         readableBytes = nextReadableBytes;
416         if (tracker != null) {
417             tracker.incrementPendingOutboundBytes(increment);
418         }
419     }
420 
421     private void decrementReadableBytes(int decrement) {
422         readableBytes -= decrement;
423         assert readableBytes >= 0;
424         if (tracker != null) {
425             tracker.decrementPendingOutboundBytes(decrement);
426         }
427     }
428 
429     private static ChannelFutureListener toChannelFutureListener(ChannelPromise promise) {
430         return promise.isVoid() ? null : new DelegatingChannelPromiseNotifier(promise);
431     }
432 }