View Javadoc
1   /*
2    * Copyright 2017 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.channel;
16  
17  import io.netty.buffer.ByteBuf;
18  import io.netty.buffer.ByteBufAllocator;
19  import io.netty.buffer.CompositeByteBuf;
20  import io.netty.util.internal.UnstableApi;
21  import io.netty.util.internal.logging.InternalLogger;
22  import io.netty.util.internal.logging.InternalLoggerFactory;
23  
24  import java.util.ArrayDeque;
25  
26  import static io.netty.util.ReferenceCountUtil.safeRelease;
27  import static io.netty.util.internal.ObjectUtil.checkNotNull;
28  import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
29  import static io.netty.util.internal.PlatformDependent.throwException;
30  
31  @UnstableApi
32  public abstract class AbstractCoalescingBufferQueue {
33      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractCoalescingBufferQueue.class);
34      private final ArrayDeque<Object> bufAndListenerPairs;
35      private final PendingBytesTracker tracker;
36      private int readableBytes;
37  
38      /**
39       * Create a new instance.
40       *
41       * @param channel the {@link Channel} which will have the {@link Channel#isWritable()} reflect the amount of queued
42       *                buffers or {@code null} if there is no writability state updated.
43       * @param initSize theinitial size of the underlying queue.
44       */
45      protected AbstractCoalescingBufferQueue(Channel channel, int initSize) {
46          bufAndListenerPairs = new ArrayDeque<Object>(initSize);
47          tracker = channel == null ? null : PendingBytesTracker.newTracker(channel);
48      }
49  
50      /**
51       * Add a buffer to the front of the queue and associate a promise with it that should be completed when
52       * all the buffer's bytes have been consumed from the queue and written.
53       * @param buf to add to the head of the queue
54       * @param promise to complete when all the bytes have been consumed and written, can be void.
55       */
56      public final void addFirst(ByteBuf buf, ChannelPromise promise) {
57          addFirst(buf, toChannelFutureListener(promise));
58      }
59  
60      private void addFirst(ByteBuf buf, ChannelFutureListener listener) {
61          if (listener != null) {
62              bufAndListenerPairs.addFirst(listener);
63          }
64          bufAndListenerPairs.addFirst(buf);
65          incrementReadableBytes(buf.readableBytes());
66      }
67  
68      /**
69       * Add a buffer to the end of the queue.
70       */
71      public final void add(ByteBuf buf) {
72          add(buf, (ChannelFutureListener) null);
73      }
74  
75      /**
76       * Add a buffer to the end of the queue and associate a promise with it that should be completed when
77       * all the buffer's bytes have been consumed from the queue and written.
78       * @param buf to add to the tail of the queue
79       * @param promise to complete when all the bytes have been consumed and written, can be void.
80       */
81      public final void add(ByteBuf buf, ChannelPromise promise) {
82          // buffers are added before promises so that we naturally 'consume' the entire buffer during removal
83          // before we complete it's promise.
84          add(buf, toChannelFutureListener(promise));
85      }
86  
87      /**
88       * Add a buffer to the end of the queue and associate a listener with it that should be completed when
89       * all the buffers  bytes have been consumed from the queue and written.
90       * @param buf to add to the tail of the queue
91       * @param listener to notify when all the bytes have been consumed and written, can be {@code null}.
92       */
93      public final void add(ByteBuf buf, ChannelFutureListener listener) {
94          // buffers are added before promises so that we naturally 'consume' the entire buffer during removal
95          // before we complete it's promise.
96          bufAndListenerPairs.add(buf);
97          if (listener != null) {
98              bufAndListenerPairs.add(listener);
99          }
100         incrementReadableBytes(buf.readableBytes());
101     }
102 
103     /**
104      * Remove the first {@link ByteBuf} from the queue.
105      * @param aggregatePromise used to aggregate the promises and listeners for the returned buffer.
106      * @return the first {@link ByteBuf} from the queue.
107      */
108     public final ByteBuf removeFirst(ChannelPromise aggregatePromise) {
109         Object entry = bufAndListenerPairs.poll();
110         if (entry == null) {
111             return null;
112         }
113         assert entry instanceof ByteBuf;
114         ByteBuf result = (ByteBuf) entry;
115 
116         decrementReadableBytes(result.readableBytes());
117 
118         entry = bufAndListenerPairs.peek();
119         if (entry instanceof ChannelFutureListener) {
120             aggregatePromise.addListener((ChannelFutureListener) entry);
121             bufAndListenerPairs.poll();
122         }
123         return result;
124     }
125 
126     /**
127      * Remove a {@link ByteBuf} from the queue with the specified number of bytes. Any added buffer who's bytes are
128      * fully consumed during removal will have it's promise completed when the passed aggregate {@link ChannelPromise}
129      * completes.
130      *
131      * @param alloc The allocator used if a new {@link ByteBuf} is generated during the aggregation process.
132      * @param bytes the maximum number of readable bytes in the returned {@link ByteBuf}, if {@code bytes} is greater
133      *              than {@link #readableBytes} then a buffer of length {@link #readableBytes} is returned.
134      * @param aggregatePromise used to aggregate the promises and listeners for the constituent buffers.
135      * @return a {@link ByteBuf} composed of the enqueued buffers.
136      */
137     public final ByteBuf remove(ByteBufAllocator alloc, int bytes, ChannelPromise aggregatePromise) {
138         checkPositiveOrZero(bytes, "bytes");
139         checkNotNull(aggregatePromise, "aggregatePromise");
140 
141         // Use isEmpty rather than readableBytes==0 as we may have a promise associated with an empty buffer.
142         if (bufAndListenerPairs.isEmpty()) {
143             return removeEmptyValue();
144         }
145         bytes = Math.min(bytes, readableBytes);
146 
147         ByteBuf toReturn = null;
148         ByteBuf entryBuffer = null;
149         int originalBytes = bytes;
150         try {
151             for (;;) {
152                 Object entry = bufAndListenerPairs.poll();
153                 if (entry == null) {
154                     break;
155                 }
156                 if (entry instanceof ChannelFutureListener) {
157                     aggregatePromise.addListener((ChannelFutureListener) entry);
158                     continue;
159                 }
160                 entryBuffer = (ByteBuf) entry;
161                 if (entryBuffer.readableBytes() > bytes) {
162                     // Add the buffer back to the queue as we can't consume all of it.
163                     bufAndListenerPairs.addFirst(entryBuffer);
164                     if (bytes > 0) {
165                         // Take a slice of what we can consume and retain it.
166                         entryBuffer = entryBuffer.readRetainedSlice(bytes);
167                         toReturn = toReturn == null ? composeFirst(alloc, entryBuffer)
168                                                     : compose(alloc, toReturn, entryBuffer);
169                         bytes = 0;
170                     }
171                     break;
172                 } else {
173                     bytes -= entryBuffer.readableBytes();
174                     toReturn = toReturn == null ? composeFirst(alloc, entryBuffer)
175                                                 : compose(alloc, toReturn, entryBuffer);
176                 }
177                 entryBuffer = null;
178             }
179         } catch (Throwable cause) {
180             safeRelease(entryBuffer);
181             safeRelease(toReturn);
182             aggregatePromise.setFailure(cause);
183             throwException(cause);
184         }
185         decrementReadableBytes(originalBytes - bytes);
186         return toReturn;
187     }
188 
189     /**
190      * The number of readable bytes.
191      */
192     public final int readableBytes() {
193         return readableBytes;
194     }
195 
196     /**
197      * Are there pending buffers in the queue.
198      */
199     public final boolean isEmpty() {
200         return bufAndListenerPairs.isEmpty();
201     }
202 
203     /**
204      *  Release all buffers in the queue and complete all listeners and promises.
205      */
206     public final void releaseAndFailAll(ChannelOutboundInvoker invoker, Throwable cause) {
207         releaseAndCompleteAll(invoker.newFailedFuture(cause));
208     }
209 
210     /**
211      * Copy all pending entries in this queue into the destination queue.
212      * @param dest to copy pending buffers to.
213      */
214     public final void copyTo(AbstractCoalescingBufferQueue dest) {
215         dest.bufAndListenerPairs.addAll(bufAndListenerPairs);
216         dest.incrementReadableBytes(readableBytes);
217     }
218 
219     /**
220      * Writes all remaining elements in this queue.
221      * @param ctx The context to write all elements to.
222      */
223     public final void writeAndRemoveAll(ChannelHandlerContext ctx) {
224         decrementReadableBytes(readableBytes);
225         Throwable pending = null;
226         ByteBuf previousBuf = null;
227         for (;;) {
228             Object entry = bufAndListenerPairs.poll();
229             try {
230                 if (entry == null) {
231                     if (previousBuf != null) {
232                         ctx.write(previousBuf, ctx.voidPromise());
233                     }
234                     break;
235                 }
236 
237                 if (entry instanceof ByteBuf) {
238                     if (previousBuf != null) {
239                         ctx.write(previousBuf, ctx.voidPromise());
240                     }
241                     previousBuf = (ByteBuf) entry;
242                 } else if (entry instanceof ChannelPromise) {
243                     ctx.write(previousBuf, (ChannelPromise) entry);
244                     previousBuf = null;
245                 } else {
246                     ctx.write(previousBuf).addListener((ChannelFutureListener) entry);
247                     previousBuf = null;
248                 }
249             } catch (Throwable t) {
250                 if (pending == null) {
251                     pending = t;
252                 } else {
253                     logger.info("Throwable being suppressed because Throwable {} is already pending", pending, t);
254                 }
255             }
256         }
257         if (pending != null) {
258             throw new IllegalStateException(pending);
259         }
260     }
261 
262     /**
263      * Calculate the result of {@code current + next}.
264      */
265     protected abstract ByteBuf compose(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf next);
266 
267     /**
268      * Compose {@code cumulation} and {@code next} into a new {@link CompositeByteBuf}.
269      */
270     protected final ByteBuf composeIntoComposite(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf next) {
271         // Create a composite buffer to accumulate this pair and potentially all the buffers
272         // in the queue. Using +2 as we have already dequeued current and next.
273         CompositeByteBuf composite = alloc.compositeBuffer(size() + 2);
274         try {
275             composite.addComponent(true, cumulation);
276             composite.addComponent(true, next);
277         } catch (Throwable cause) {
278             composite.release();
279             safeRelease(next);
280             throwException(cause);
281         }
282         return composite;
283     }
284 
285     /**
286      * Compose {@code cumulation} and {@code next} into a new {@link ByteBufAllocator#ioBuffer()}.
287      * @param alloc The allocator to use to allocate the new buffer.
288      * @param cumulation The current cumulation.
289      * @param next The next buffer.
290      * @return The result of {@code cumulation + next}.
291      */
292     protected final ByteBuf copyAndCompose(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf next) {
293         ByteBuf newCumulation = alloc.ioBuffer(cumulation.readableBytes() + next.readableBytes());
294         try {
295             newCumulation.writeBytes(cumulation).writeBytes(next);
296         } catch (Throwable cause) {
297             newCumulation.release();
298             safeRelease(next);
299             throwException(cause);
300         }
301         cumulation.release();
302         next.release();
303         return newCumulation;
304     }
305 
306     /**
307      * Calculate the first {@link ByteBuf} which will be used in subsequent calls to
308      * {@link #compose(ByteBufAllocator, ByteBuf, ByteBuf)}.
309      */
310     protected ByteBuf composeFirst(ByteBufAllocator allocator, ByteBuf first) {
311         return first;
312     }
313 
314     /**
315      * The value to return when {@link #remove(ByteBufAllocator, int, ChannelPromise)} is called but the queue is empty.
316      * @return the {@link ByteBuf} which represents an empty queue.
317      */
318     protected abstract ByteBuf removeEmptyValue();
319 
320     /**
321      * Get the number of elements in this queue added via one of the {@link #add(ByteBuf)} methods.
322      * @return the number of elements in this queue.
323      */
324     protected final int size() {
325         return bufAndListenerPairs.size();
326     }
327 
328     private void releaseAndCompleteAll(ChannelFuture future) {
329         decrementReadableBytes(readableBytes);
330         Throwable pending = null;
331         for (;;) {
332             Object entry = bufAndListenerPairs.poll();
333             if (entry == null) {
334                 break;
335             }
336             try {
337                 if (entry instanceof ByteBuf) {
338                     safeRelease(entry);
339                 } else {
340                     ((ChannelFutureListener) entry).operationComplete(future);
341                 }
342             } catch (Throwable t) {
343                 if (pending == null) {
344                     pending = t;
345                 } else {
346                     logger.info("Throwable being suppressed because Throwable {} is already pending", pending, t);
347                 }
348             }
349         }
350         if (pending != null) {
351             throw new IllegalStateException(pending);
352         }
353     }
354 
355     private void incrementReadableBytes(int increment) {
356         int nextReadableBytes = readableBytes + increment;
357         if (nextReadableBytes < readableBytes) {
358             throw new IllegalStateException("buffer queue length overflow: " + readableBytes + " + " + increment);
359         }
360         readableBytes = nextReadableBytes;
361         if (tracker != null) {
362             tracker.incrementPendingOutboundBytes(increment);
363         }
364     }
365 
366     private void decrementReadableBytes(int decrement) {
367         readableBytes -= decrement;
368         assert readableBytes >= 0;
369         if (tracker != null) {
370             tracker.decrementPendingOutboundBytes(decrement);
371         }
372     }
373 
374     private static ChannelFutureListener toChannelFutureListener(ChannelPromise promise) {
375         return promise.isVoid() ? null : new DelegatingChannelPromiseNotifier(promise);
376     }
377 }