View Javadoc
1   /*
2    * Copyright 2017 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.channel;
16  
17  import io.netty.buffer.ByteBuf;
18  import io.netty.buffer.ByteBufAllocator;
19  import io.netty.buffer.CompositeByteBuf;
20  import io.netty.util.internal.UnstableApi;
21  import io.netty.util.internal.logging.InternalLogger;
22  import io.netty.util.internal.logging.InternalLoggerFactory;
23  
24  import java.util.ArrayDeque;
25  
26  import static io.netty.util.ReferenceCountUtil.safeRelease;
27  import static io.netty.util.internal.ObjectUtil.checkNotNull;
28  import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
29  import static io.netty.util.internal.PlatformDependent.throwException;
30  
31  @UnstableApi
32  public abstract class AbstractCoalescingBufferQueue {
33      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractCoalescingBufferQueue.class);
34      private final ArrayDeque<Object> bufAndListenerPairs;
35      private final PendingBytesTracker tracker;
36      private int readableBytes;
37  
38      /**
39       * Create a new instance.
40       *
41       * @param channel the {@link Channel} which will have the {@link Channel#isWritable()} reflect the amount of queued
42       *                buffers or {@code null} if there is no writability state updated.
43       * @param initSize the initial size of the underlying queue.
44       */
45      protected AbstractCoalescingBufferQueue(Channel channel, int initSize) {
46          bufAndListenerPairs = new ArrayDeque<Object>(initSize);
47          tracker = channel == null ? null : PendingBytesTracker.newTracker(channel);
48      }
49  
50      /**
51       * Add a buffer to the front of the queue and associate a promise with it that should be completed when
52       * all the buffer's bytes have been consumed from the queue and written.
53       * @param buf to add to the head of the queue
54       * @param promise to complete when all the bytes have been consumed and written, can be void.
55       */
56      public final void addFirst(ByteBuf buf, ChannelPromise promise) {
57          addFirst(buf, toChannelFutureListener(promise));
58      }
59  
60      private void addFirst(ByteBuf buf, ChannelFutureListener listener) {
61          if (listener != null) {
62              bufAndListenerPairs.addFirst(listener);
63          }
64          bufAndListenerPairs.addFirst(buf);
65          incrementReadableBytes(buf.readableBytes());
66      }
67  
68      /**
69       * Add a buffer to the end of the queue.
70       */
71      public final void add(ByteBuf buf) {
72          add(buf, (ChannelFutureListener) null);
73      }
74  
75      /**
76       * Add a buffer to the end of the queue and associate a promise with it that should be completed when
77       * all the buffer's bytes have been consumed from the queue and written.
78       * @param buf to add to the tail of the queue
79       * @param promise to complete when all the bytes have been consumed and written, can be void.
80       */
81      public final void add(ByteBuf buf, ChannelPromise promise) {
82          // buffers are added before promises so that we naturally 'consume' the entire buffer during removal
83          // before we complete it's promise.
84          add(buf, toChannelFutureListener(promise));
85      }
86  
87      /**
88       * Add a buffer to the end of the queue and associate a listener with it that should be completed when
89       * all the buffers  bytes have been consumed from the queue and written.
90       * @param buf to add to the tail of the queue
91       * @param listener to notify when all the bytes have been consumed and written, can be {@code null}.
92       */
93      public final void add(ByteBuf buf, ChannelFutureListener listener) {
94          // buffers are added before promises so that we naturally 'consume' the entire buffer during removal
95          // before we complete it's promise.
96          bufAndListenerPairs.add(buf);
97          if (listener != null) {
98              bufAndListenerPairs.add(listener);
99          }
100         incrementReadableBytes(buf.readableBytes());
101     }
102 
103     /**
104      * Remove the first {@link ByteBuf} from the queue.
105      * @param aggregatePromise used to aggregate the promises and listeners for the returned buffer.
106      * @return the first {@link ByteBuf} from the queue.
107      */
108     public final ByteBuf removeFirst(ChannelPromise aggregatePromise) {
109         Object entry = bufAndListenerPairs.poll();
110         if (entry == null) {
111             return null;
112         }
113         assert entry instanceof ByteBuf;
114         ByteBuf result = (ByteBuf) entry;
115 
116         decrementReadableBytes(result.readableBytes());
117 
118         entry = bufAndListenerPairs.peek();
119         if (entry instanceof ChannelFutureListener) {
120             aggregatePromise.addListener((ChannelFutureListener) entry);
121             bufAndListenerPairs.poll();
122         }
123         return result;
124     }
125 
126     /**
127      * Remove a {@link ByteBuf} from the queue with the specified number of bytes. Any added buffer who's bytes are
128      * fully consumed during removal will have it's promise completed when the passed aggregate {@link ChannelPromise}
129      * completes.
130      *
131      * @param alloc The allocator used if a new {@link ByteBuf} is generated during the aggregation process.
132      * @param bytes the maximum number of readable bytes in the returned {@link ByteBuf}, if {@code bytes} is greater
133      *              than {@link #readableBytes} then a buffer of length {@link #readableBytes} is returned.
134      * @param aggregatePromise used to aggregate the promises and listeners for the constituent buffers.
135      * @return a {@link ByteBuf} composed of the enqueued buffers.
136      */
137     public final ByteBuf remove(ByteBufAllocator alloc, int bytes, ChannelPromise aggregatePromise) {
138         checkPositiveOrZero(bytes, "bytes");
139         checkNotNull(aggregatePromise, "aggregatePromise");
140 
141         // Use isEmpty rather than readableBytes==0 as we may have a promise associated with an empty buffer.
142         if (bufAndListenerPairs.isEmpty()) {
143             assert readableBytes == 0;
144             return removeEmptyValue();
145         }
146         bytes = Math.min(bytes, readableBytes);
147 
148         ByteBuf toReturn = null;
149         ByteBuf entryBuffer = null;
150         int originalBytes = bytes;
151         try {
152             for (;;) {
153                 Object entry = bufAndListenerPairs.poll();
154                 if (entry == null) {
155                     break;
156                 }
157                 if (entry instanceof ChannelFutureListener) {
158                     aggregatePromise.addListener((ChannelFutureListener) entry);
159                     continue;
160                 }
161                 entryBuffer = (ByteBuf) entry;
162                 if (entryBuffer.readableBytes() > bytes) {
163                     // Add the buffer back to the queue as we can't consume all of it.
164                     bufAndListenerPairs.addFirst(entryBuffer);
165                     if (bytes > 0) {
166                         // Take a slice of what we can consume and retain it.
167                         entryBuffer = entryBuffer.readRetainedSlice(bytes);
168                         toReturn = toReturn == null ? composeFirst(alloc, entryBuffer)
169                                                     : compose(alloc, toReturn, entryBuffer);
170                         bytes = 0;
171                     }
172                     break;
173                 } else {
174                     bytes -= entryBuffer.readableBytes();
175                     toReturn = toReturn == null ? composeFirst(alloc, entryBuffer)
176                                                 : compose(alloc, toReturn, entryBuffer);
177                 }
178                 entryBuffer = null;
179             }
180         } catch (Throwable cause) {
181             safeRelease(entryBuffer);
182             safeRelease(toReturn);
183             aggregatePromise.setFailure(cause);
184             throwException(cause);
185         }
186         decrementReadableBytes(originalBytes - bytes);
187         return toReturn;
188     }
189 
190     /**
191      * The number of readable bytes.
192      */
193     public final int readableBytes() {
194         return readableBytes;
195     }
196 
197     /**
198      * Are there pending buffers in the queue.
199      */
200     public final boolean isEmpty() {
201         return bufAndListenerPairs.isEmpty();
202     }
203 
204     /**
205      *  Release all buffers in the queue and complete all listeners and promises.
206      */
207     public final void releaseAndFailAll(ChannelOutboundInvoker invoker, Throwable cause) {
208         releaseAndCompleteAll(invoker.newFailedFuture(cause));
209     }
210 
211     /**
212      * Copy all pending entries in this queue into the destination queue.
213      * @param dest to copy pending buffers to.
214      */
215     public final void copyTo(AbstractCoalescingBufferQueue dest) {
216         dest.bufAndListenerPairs.addAll(bufAndListenerPairs);
217         dest.incrementReadableBytes(readableBytes);
218     }
219 
220     /**
221      * Writes all remaining elements in this queue.
222      * @param ctx The context to write all elements to.
223      */
224     public final void writeAndRemoveAll(ChannelHandlerContext ctx) {
225         Throwable pending = null;
226         ByteBuf previousBuf = null;
227         for (;;) {
228             Object entry = bufAndListenerPairs.poll();
229             try {
230                 if (entry == null) {
231                     if (previousBuf != null) {
232                         decrementReadableBytes(previousBuf.readableBytes());
233                         ctx.write(previousBuf, ctx.voidPromise());
234                     }
235                     break;
236                 }
237 
238                 if (entry instanceof ByteBuf) {
239                     if (previousBuf != null) {
240                         decrementReadableBytes(previousBuf.readableBytes());
241                         ctx.write(previousBuf, ctx.voidPromise());
242                     }
243                     previousBuf = (ByteBuf) entry;
244                 } else if (entry instanceof ChannelPromise) {
245                     decrementReadableBytes(previousBuf.readableBytes());
246                     ctx.write(previousBuf, (ChannelPromise) entry);
247                     previousBuf = null;
248                 } else {
249                     decrementReadableBytes(previousBuf.readableBytes());
250                     ctx.write(previousBuf).addListener((ChannelFutureListener) entry);
251                     previousBuf = null;
252                 }
253             } catch (Throwable t) {
254                 if (pending == null) {
255                     pending = t;
256                 } else {
257                     logger.info("Throwable being suppressed because Throwable {} is already pending", pending, t);
258                 }
259             }
260         }
261         if (pending != null) {
262             throw new IllegalStateException(pending);
263         }
264     }
265 
266     @Override
267     public String toString() {
268         return "bytes: " + readableBytes + " buffers: " + (size() >> 1);
269     }
270 
271     /**
272      * Calculate the result of {@code current + next}.
273      */
274     protected abstract ByteBuf compose(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf next);
275 
276     /**
277      * Compose {@code cumulation} and {@code next} into a new {@link CompositeByteBuf}.
278      */
279     protected final ByteBuf composeIntoComposite(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf next) {
280         // Create a composite buffer to accumulate this pair and potentially all the buffers
281         // in the queue. Using +2 as we have already dequeued current and next.
282         CompositeByteBuf composite = alloc.compositeBuffer(size() + 2);
283         try {
284             composite.addComponent(true, cumulation);
285             composite.addComponent(true, next);
286         } catch (Throwable cause) {
287             composite.release();
288             safeRelease(next);
289             throwException(cause);
290         }
291         return composite;
292     }
293 
294     /**
295      * Compose {@code cumulation} and {@code next} into a new {@link ByteBufAllocator#ioBuffer()}.
296      * @param alloc The allocator to use to allocate the new buffer.
297      * @param cumulation The current cumulation.
298      * @param next The next buffer.
299      * @return The result of {@code cumulation + next}.
300      */
301     protected final ByteBuf copyAndCompose(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf next) {
302         ByteBuf newCumulation = alloc.ioBuffer(cumulation.readableBytes() + next.readableBytes());
303         try {
304             newCumulation.writeBytes(cumulation).writeBytes(next);
305         } catch (Throwable cause) {
306             newCumulation.release();
307             safeRelease(next);
308             throwException(cause);
309         }
310         cumulation.release();
311         next.release();
312         return newCumulation;
313     }
314 
315     /**
316      * Calculate the first {@link ByteBuf} which will be used in subsequent calls to
317      * {@link #compose(ByteBufAllocator, ByteBuf, ByteBuf)}.
318      */
319     protected ByteBuf composeFirst(ByteBufAllocator allocator, ByteBuf first) {
320         return first;
321     }
322 
323     /**
324      * The value to return when {@link #remove(ByteBufAllocator, int, ChannelPromise)} is called but the queue is empty.
325      * @return the {@link ByteBuf} which represents an empty queue.
326      */
327     protected abstract ByteBuf removeEmptyValue();
328 
329     /**
330      * Get the number of elements in this queue added via one of the {@link #add(ByteBuf)} methods.
331      * @return the number of elements in this queue.
332      */
333     protected final int size() {
334         return bufAndListenerPairs.size();
335     }
336 
337     private void releaseAndCompleteAll(ChannelFuture future) {
338         Throwable pending = null;
339         for (;;) {
340             Object entry = bufAndListenerPairs.poll();
341             if (entry == null) {
342                 break;
343             }
344             try {
345                 if (entry instanceof ByteBuf) {
346                     ByteBuf buffer = (ByteBuf) entry;
347                     decrementReadableBytes(buffer.readableBytes());
348                     safeRelease(buffer);
349                 } else {
350                     ((ChannelFutureListener) entry).operationComplete(future);
351                 }
352             } catch (Throwable t) {
353                 if (pending == null) {
354                     pending = t;
355                 } else {
356                     logger.info("Throwable being suppressed because Throwable {} is already pending", pending, t);
357                 }
358             }
359         }
360         if (pending != null) {
361             throw new IllegalStateException(pending);
362         }
363     }
364 
365     private void incrementReadableBytes(int increment) {
366         int nextReadableBytes = readableBytes + increment;
367         if (nextReadableBytes < readableBytes) {
368             throw new IllegalStateException("buffer queue length overflow: " + readableBytes + " + " + increment);
369         }
370         readableBytes = nextReadableBytes;
371         if (tracker != null) {
372             tracker.incrementPendingOutboundBytes(increment);
373         }
374     }
375 
376     private void decrementReadableBytes(int decrement) {
377         readableBytes -= decrement;
378         assert readableBytes >= 0;
379         if (tracker != null) {
380             tracker.decrementPendingOutboundBytes(decrement);
381         }
382     }
383 
384     private static ChannelFutureListener toChannelFutureListener(ChannelPromise promise) {
385         return promise.isVoid() ? null : new DelegatingChannelPromiseNotifier(promise);
386     }
387 }