1 /*
2 * Copyright 2015 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5 * "License"); you may not use this file except in compliance with the License. You may obtain a
6 * copy of the License at:
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software distributed under the License
11 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12 * or implied. See the License for the specific language governing permissions and limitations under
13 * the License.
14 */
15 package io.netty5.channel;
16
17 import io.netty5.buffer.api.Buffer;
18 import io.netty5.buffer.api.BufferAllocator;
19 import io.netty5.buffer.api.CompositeBuffer;
20 import io.netty5.util.concurrent.Promise;
21
22 import static java.util.Objects.requireNonNull;
23
24 /**
25 * A FIFO queue of bytes where producers add bytes by repeatedly adding {@link Buffer} and consumers take bytes in
26 * arbitrary lengths. This allows producers to add lots of small buffers and the consumer to take all the bytes out in a
27 * single buffer. Conversely, the producer may add larger buffers and the consumer could take the bytes in many small
28 * buffers.
29 *
30 * <p>Bytes are added and removed with promises. If the last byte of a buffer added with a promise is removed then
31 * that promise will complete when the promise passed to {@link #remove} completes.
32 *
33 * <p>This functionality is useful for aggregating or partitioning writes into fixed size buffers for framing protocols
34 * such as HTTP2.
35 */
36 public final class CoalescingBufferQueue extends AbstractCoalescingBufferQueue {
37 private final Channel channel;
38
39 public CoalescingBufferQueue(Channel channel) {
40 this(channel, 4);
41 }
42
43 public CoalescingBufferQueue(Channel channel, int initSize) {
44 super(initSize);
45 this.channel = requireNonNull(channel, "channel");
46 }
47
48 /**
49 * Remove a {@link Buffer} from the queue with the specified number of bytes. Any added buffer whose bytes are fully
50 * consumed during removal will have it's promise completed when the passed aggregate {@link Promise} completes.
51 *
52 * @param bytes the maximum number of readable bytes in the returned {@link Buffer}, if {@code bytes} is greater
53 * than {@link #readableBytes()} then a buffer of length {@link #readableBytes()} is returned.
54 * @param aggregatePromise used to aggregate the promises and listeners for the constituent buffers.
55 * @return a {@link Buffer} composed of the enqueued buffers.
56 */
57 public Buffer remove(int bytes, Promise<Void> aggregatePromise) {
58 return remove(channel.bufferAllocator(), bytes, aggregatePromise);
59 }
60
61 /**
62 * Release all buffers in the queue and complete all listeners and promises.
63 */
64 public void releaseAndFailAll(Throwable cause) {
65 releaseAndFailAll(channel, cause);
66 }
67
68 @Override
69 protected Buffer compose(BufferAllocator alloc, Buffer cumulation, Buffer next) {
70 if (cumulation instanceof CompositeBuffer) {
71 CompositeBuffer composite = (CompositeBuffer) cumulation;
72 composite.extendWith(next.send());
73 return composite;
74 }
75 return composeIntoComposite(alloc, cumulation, next);
76 }
77
78 @Override
79 protected Buffer removeEmptyValue() {
80 return BufferAllocator.offHeapUnpooled().allocate(0);
81 }
82 }