View Javadoc
1   /*
2    * Copyright 2021 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty5.channel;
16  
17  import io.netty5.buffer.api.Buffer;
18  import io.netty5.buffer.api.BufferAllocator;
19  import io.netty5.buffer.api.CompositeBuffer;
20  import io.netty5.util.concurrent.Future;
21  import io.netty5.util.concurrent.FutureListener;
22  import io.netty5.util.concurrent.Promise;
23  import io.netty5.util.internal.SilentDispose;
24  import io.netty5.util.internal.UnstableApi;
25  import io.netty5.util.internal.logging.InternalLogger;
26  import io.netty5.util.internal.logging.InternalLoggerFactory;
27  
28  import java.util.ArrayDeque;
29  import java.util.List;
30  
31  import static io.netty5.util.internal.ObjectUtil.checkPositiveOrZero;
32  import static java.util.Objects.requireNonNull;
33  
34  @SuppressWarnings("unchecked")
35  @UnstableApi
36  public abstract class AbstractCoalescingBufferQueue {
37      private static final InternalLogger logger = InternalLoggerFactory.getInstance(
38              AbstractCoalescingBufferQueue.class);
39      private final ArrayDeque<Object> bufAndListenerPairs;
40      private int readableBytes;
41  
42      /**
43       * Create a new instance.
44       *
45       * @param initSize the initial size of the underlying queue.
46       */
47      protected AbstractCoalescingBufferQueue(int initSize) {
48          bufAndListenerPairs = new ArrayDeque<>(initSize);
49      }
50  
51      /**
52       * Add a buffer to the front of the queue and associate a promise with it that should be completed when
53       * all the buffer's bytes have been consumed from the queue and written.
54       * @param buf to add to the head of the queue
55       * @param promise to complete when all the bytes have been consumed and written, can be void.
56       */
57      public final void addFirst(Buffer buf, Promise<Void> promise) {
58          addFirst(buf, f -> f.cascadeTo(promise));
59      }
60  
61      private void addFirst(Buffer buf, FutureListener<Void> listener) {
62          if (listener != null) {
63              bufAndListenerPairs.addFirst(listener);
64          }
65          bufAndListenerPairs.addFirst(buf);
66          incrementReadableBytes(buf.readableBytes());
67      }
68  
69      /**
70       * Add a buffer to the end of the queue.
71       */
72      public final void add(Buffer buf) {
73          add(buf, (FutureListener<Void>) null);
74      }
75  
76      /**
77       * Add a buffer to the end of the queue and associate a promise with it that should be completed when
78       * all the buffer's bytes have been consumed from the queue and written.
79       * @param buf to add to the tail of the queue
80       * @param promise to complete when all the bytes have been consumed and written, can be void.
81       */
82      public final void add(Buffer buf, Promise<Void> promise) {
83          // buffers are added before promises so that we naturally 'consume' the entire buffer during removal
84          // before we complete it's promise.
85          add(buf, f -> f.cascadeTo(promise));
86      }
87  
88      /**
89       * Add a buffer to the end of the queue and associate a listener with it that should be completed when
90       * all the buffers  bytes have been consumed from the queue and written.
91       * @param buf to add to the tail of the queue
92       * @param listener to notify when all the bytes have been consumed and written, can be {@code null}.
93       */
94      public final void add(Buffer buf, FutureListener<Void> listener) {
95          // buffers are added before promises so that we naturally 'consume' the entire buffer during removal
96          // before we complete it's promise.
97          bufAndListenerPairs.add(buf);
98          if (listener != null) {
99              bufAndListenerPairs.add(listener);
100         }
101         incrementReadableBytes(buf.readableBytes());
102     }
103 
104     /**
105      * Remove the first {@link Buffer} from the queue.
106      * @param aggregatePromise used to aggregate the promises and listeners for the returned buffer.
107      * @return the first {@link Buffer} from the queue.
108      */
109     public final Buffer removeFirst(Promise<Void> aggregatePromise) {
110         Object entry = bufAndListenerPairs.poll();
111         if (entry == null) {
112             return null;
113         }
114         assert entry instanceof Buffer;
115         Buffer result = (Buffer) entry;
116 
117         decrementReadableBytes(result.readableBytes());
118 
119         entry = bufAndListenerPairs.peek();
120         if (entry instanceof FutureListener) {
121             aggregatePromise.asFuture().addListener((FutureListener<Void>) entry);
122             bufAndListenerPairs.poll();
123         }
124         return result;
125     }
126 
127     /**
128      * Remove a {@link Buffer} from the queue with the specified number of bytes. Any added buffer whose bytes are
129      * fully consumed during removal will have their promise completed when the passed aggregate {@link Promise}
130      * completes.
131      *
132      * @param alloc The allocator used if a new {@link Buffer} is generated during the aggregation process.
133      * @param bytes the maximum number of readable bytes in the returned {@link Buffer}, if {@code bytes} is greater
134      *              than {@link #readableBytes} then a buffer of length {@link #readableBytes} is returned.
135      * @param aggregatePromise used to aggregate the promises and listeners for the constituent buffers.
136      * @return a {@link Buffer} composed of the enqueued buffers.
137      */
138     public final Buffer remove(BufferAllocator alloc, int bytes, Promise<Void> aggregatePromise) {
139         checkPositiveOrZero(bytes, "bytes");
140         requireNonNull(aggregatePromise, "aggregatePromise");
141 
142         // Use isEmpty rather than readableBytes==0 as we may have a promise associated with an empty buffer.
143         if (bufAndListenerPairs.isEmpty()) {
144             assert readableBytes == 0;
145             return removeEmptyValue();
146         }
147         bytes = Math.min(bytes, readableBytes);
148 
149         Buffer toReturn = null;
150         Buffer entryBuffer = null;
151         int originalBytes = bytes;
152         try {
153             for (;;) {
154                 Object entry = bufAndListenerPairs.poll();
155                 if (entry == null) {
156                     break;
157                 }
158                 if (entry instanceof FutureListener) {
159                     aggregatePromise.asFuture().addListener((FutureListener<Void>) entry);
160                     continue;
161                 }
162                 entryBuffer = (Buffer) entry;
163                 if (entryBuffer.readableBytes() > bytes) {
164                     // Add the buffer back to the queue as we can't consume all of it.
165                     bufAndListenerPairs.addFirst(entryBuffer);
166                     if (bytes > 0) {
167                         // Take a slice of what we can consume and retain it.
168                         entryBuffer = entryBuffer.readSplit(bytes);
169                         toReturn = toReturn == null ? composeFirst(alloc, entryBuffer)
170                                                     : compose(alloc, toReturn, entryBuffer);
171                         bytes = 0;
172                     }
173                     break;
174                 }
175                 bytes -= entryBuffer.readableBytes();
176                 toReturn = toReturn == null ? composeFirst(alloc, entryBuffer)
177                                             : compose(alloc, toReturn, entryBuffer);
178                 entryBuffer = null;
179             }
180         } catch (Throwable cause) {
181             SilentDispose.dispose(entryBuffer, logger);
182             SilentDispose.dispose(toReturn, logger);
183             aggregatePromise.setFailure(cause);
184             throw cause;
185         }
186         decrementReadableBytes(originalBytes - bytes);
187         return toReturn;
188     }
189 
190     /**
191      * The number of readable bytes.
192      */
193     public final int readableBytes() {
194         return readableBytes;
195     }
196 
197     /**
198      * Are there pending buffers in the queue.
199      */
200     public final boolean isEmpty() {
201         return bufAndListenerPairs.isEmpty();
202     }
203 
204     /**
205      *  Release all buffers in the queue and complete all listeners and promises.
206      */
207     public final void releaseAndFailAll(ChannelOutboundInvoker invoker, Throwable cause) {
208         releaseAndCompleteAll(invoker.newFailedFuture(cause));
209     }
210 
211     /**
212      * Copy all pending entries in this queue into the destination queue.
213      * @param dest to copy pending buffers to.
214      */
215     public final void copyTo(AbstractCoalescingBufferQueue dest) {
216         dest.bufAndListenerPairs.addAll(bufAndListenerPairs);
217         dest.incrementReadableBytes(readableBytes);
218     }
219 
220     /**
221      * Writes all remaining elements in this queue.
222      * @param ctx The context to write all elements to.
223      */
224     public final void writeAndRemoveAll(ChannelHandlerContext ctx) {
225         Throwable pending = null;
226         Buffer previousBuf = null;
227         for (;;) {
228             Object entry = bufAndListenerPairs.poll();
229             try {
230                 if (entry == null) {
231                     if (previousBuf != null) {
232                         decrementReadableBytes(previousBuf.readableBytes());
233                         // If the write fails we want to at least propagate the exception through the ChannelPipeline
234                         // as otherwise the user will not be made aware of the failure at all.
235                         ctx.write(previousBuf)
236                            .addListener(ctx.channel(), ChannelFutureListeners.FIRE_EXCEPTION_ON_FAILURE);
237                     }
238                     break;
239                 }
240 
241                 if (entry instanceof Buffer) {
242                     if (previousBuf != null) {
243                         decrementReadableBytes(previousBuf.readableBytes());
244                         // If the write fails we want to at least propagate the exception through the ChannelPipeline
245                         // as otherwise the user will not be made aware of the failure at all.
246                         ctx.write(previousBuf)
247                            .addListener(ctx.channel(), ChannelFutureListeners.FIRE_EXCEPTION_ON_FAILURE);
248                     }
249                     previousBuf = (Buffer) entry;
250                 } else if (entry instanceof Promise) {
251                     decrementReadableBytes(previousBuf.readableBytes());
252                     ctx.write(previousBuf).cascadeTo((Promise<? super Void>) entry);
253                     previousBuf = null;
254                 } else {
255                     decrementReadableBytes(previousBuf.readableBytes());
256                     ctx.write(previousBuf).addListener((FutureListener<Void>) entry);
257                     previousBuf = null;
258                 }
259             } catch (Throwable t) {
260                 if (pending == null) {
261                     pending = t;
262                 } else {
263                     logger.info("Throwable being suppressed because Throwable {} is already pending", pending, t);
264                 }
265             }
266         }
267         if (pending != null) {
268             throw new IllegalStateException(pending);
269         }
270     }
271 
272     @Override
273     public String toString() {
274         return "bytes: " + readableBytes + " buffers: " + (size() >> 1);
275     }
276 
277     /**
278      * Calculate the result of {@code current + next}.
279      */
280     protected abstract Buffer compose(BufferAllocator alloc, Buffer cumulation, Buffer next);
281 
282     /**
283      * Compose {@code cumulation} and {@code next} into a new {@link CompositeBuffer}.
284      */
285     protected final Buffer composeIntoComposite(BufferAllocator alloc, Buffer cumulation, Buffer next) {
286         // Create a composite buffer to accumulate this pair and potentially all the buffers
287         // in the queue. Using +2 as we have already dequeued current and next.
288         return alloc.compose(List.of(cumulation.send(), next.send()));
289     }
290 
291     /**
292      * Compose {@code cumulation} and {@code next} into a new {@link Buffer} suitable for IO.
293      * @param alloc The allocator to use to allocate the new buffer.
294      * @param cumulation The current cumulation.
295      * @param next The next buffer.
296      * @param minIncrement The minimum buffer size - the resulting buffer will grow by at least this much.
297      * @return The result of {@code cumulation + next}.
298      */
299     protected final Buffer copyAndCompose(BufferAllocator alloc, Buffer cumulation, Buffer next, int minIncrement) {
300         try (cumulation; next) {
301             int sum = cumulation.readableBytes() + Math.max(minIncrement, next.readableBytes());
302             return alloc.allocate(sum).writeBytes(cumulation).writeBytes(next);
303         }
304     }
305 
306     /**
307      * Calculate the first {@link Buffer} which will be used in subsequent calls to
308      * {@link #compose(BufferAllocator, Buffer, Buffer)}.
309      */
310     protected Buffer composeFirst(BufferAllocator allocator, Buffer first) {
311         return first;
312     }
313 
314     /**
315      * The value to return when {@link #remove(BufferAllocator, int, Promise)} is called but the queue is empty.
316      * @return the {@link Buffer} which represents an empty queue.
317      */
318     protected abstract Buffer removeEmptyValue();
319 
320     /**
321      * Get the number of elements in this queue added via one of the {@link #add(Buffer)} methods.
322      * @return the number of elements in this queue.
323      */
324     protected final int size() {
325         return bufAndListenerPairs.size();
326     }
327 
328     private void releaseAndCompleteAll(Future<Void> future) {
329         Throwable pending = null;
330         for (;;) {
331             Object entry = bufAndListenerPairs.poll();
332             if (entry == null) {
333                 break;
334             }
335             try {
336                 if (entry instanceof Buffer) {
337                     Buffer buffer = (Buffer) entry;
338                     decrementReadableBytes(buffer.readableBytes());
339                     SilentDispose.dispose(buffer, logger);
340                 } else {
341                     ((FutureListener<Void>) entry).operationComplete(future);
342                 }
343             } catch (Throwable t) {
344                 if (pending == null) {
345                     pending = t;
346                 } else {
347                     logger.info("Throwable being suppressed because Throwable {} is already pending", pending, t);
348                 }
349             }
350         }
351         if (pending != null) {
352             throw new IllegalStateException(pending);
353         }
354     }
355 
356     private void incrementReadableBytes(int increment) {
357         int nextReadableBytes = readableBytes + increment;
358         if (nextReadableBytes < readableBytes) {
359             throw new IllegalStateException("buffer queue length overflow: " + readableBytes + " + " + increment);
360         }
361         readableBytes = nextReadableBytes;
362     }
363 
364     private void decrementReadableBytes(int decrement) {
365         readableBytes -= decrement;
366         assert readableBytes >= 0;
367     }
368 }