View Javadoc
1   /*
2    * Copyright 2015 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty5.channel;
16  
17  import io.netty5.buffer.api.Buffer;
18  import io.netty5.buffer.api.BufferAllocator;
19  import io.netty5.buffer.api.CompositeBuffer;
20  import io.netty5.util.concurrent.Promise;
21  
22  import static java.util.Objects.requireNonNull;
23  
24  /**
25   * A FIFO queue of bytes where producers add bytes by repeatedly adding {@link Buffer} and consumers take bytes in
26   * arbitrary lengths. This allows producers to add lots of small buffers and the consumer to take all the bytes out in a
27   * single buffer. Conversely, the producer may add larger buffers and the consumer could take the bytes in many small
28   * buffers.
29   *
30   * <p>Bytes are added and removed with promises. If the last byte of a buffer added with a promise is removed then
31   * that promise will complete when the promise passed to {@link #remove} completes.
32   *
33   * <p>This functionality is useful for aggregating or partitioning writes into fixed size buffers for framing protocols
34   * such as HTTP2.
35   */
36  public final class CoalescingBufferQueue extends AbstractCoalescingBufferQueue {
37      private final Channel channel;
38  
39      public CoalescingBufferQueue(Channel channel) {
40          this(channel, 4);
41      }
42  
43      public CoalescingBufferQueue(Channel channel, int initSize) {
44          super(initSize);
45          this.channel = requireNonNull(channel, "channel");
46      }
47  
48      /**
49       * Remove a {@link Buffer} from the queue with the specified number of bytes. Any added buffer whose bytes are fully
50       * consumed during removal will have it's promise completed when the passed aggregate {@link Promise} completes.
51       *
52       * @param bytes the maximum number of readable bytes in the returned {@link Buffer}, if {@code bytes} is greater
53       *              than {@link #readableBytes()} then a buffer of length {@link #readableBytes()} is returned.
54       * @param aggregatePromise used to aggregate the promises and listeners for the constituent buffers.
55       * @return a {@link Buffer} composed of the enqueued buffers.
56       */
57      public Buffer remove(int bytes, Promise<Void> aggregatePromise) {
58          return remove(channel.bufferAllocator(), bytes, aggregatePromise);
59      }
60  
61      /**
62       *  Release all buffers in the queue and complete all listeners and promises.
63       */
64      public void releaseAndFailAll(Throwable cause) {
65          releaseAndFailAll(channel, cause);
66      }
67  
68      @Override
69      protected Buffer compose(BufferAllocator alloc, Buffer cumulation, Buffer next) {
70          if (cumulation instanceof CompositeBuffer) {
71              CompositeBuffer composite = (CompositeBuffer) cumulation;
72              composite.extendWith(next.send());
73              return composite;
74          }
75          return composeIntoComposite(alloc, cumulation, next);
76      }
77  
78      @Override
79      protected Buffer removeEmptyValue() {
80          return BufferAllocator.offHeapUnpooled().allocate(0);
81      }
82  }