1 /*
2 * Copyright 2016 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License, version
5 * 2.0 (the "License"); you may not use this file except in compliance with the
6 * License. You may obtain a copy of the License at:
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations under
14 * the License.
15 */
16 package io.netty5.handler.flow;
17
18 import io.netty5.channel.ChannelOption;
19 import io.netty5.handler.codec.ByteToMessageDecoder;
20 import io.netty5.handler.codec.MessageToByteEncoder;
21 import io.netty5.util.Resource;
22 import io.netty5.channel.ChannelHandler;
23 import io.netty5.channel.ChannelHandlerContext;
24 import io.netty5.util.internal.ObjectPool;
25 import io.netty5.util.internal.ObjectPool.Handle;
26 import io.netty5.util.internal.logging.InternalLogger;
27 import io.netty5.util.internal.logging.InternalLoggerFactory;
28
29 import java.util.ArrayDeque;
30 import java.util.Queue;
31
32 /**
33 * The {@link FlowControlHandler} ensures that only one message per {@code read()} is sent downstream.
34 *
35 * Classes such as {@link ByteToMessageDecoder} or {@link MessageToByteEncoder} are free to emit as
36 * many events as they like for any given input. A channel's auto reading configuration doesn't usually
37 * apply in these scenarios. This is causing problems in downstream {@link ChannelHandler}s that would
38 * like to hold subsequent events while they're processing one event. It's a common problem with the
39 * {@code HttpObjectDecoder} that will very often fire an {@code HttpRequest} that is immediately followed
40 * by a {@code LastHttpContent} event.
41 *
42 * <pre>{@code
43 * ChannelPipeline pipeline = ...;
44 *
45 * pipeline.addLast(new HttpServerCodec());
46 * pipeline.addLast(new FlowControlHandler());
47 *
48 * pipeline.addLast(new MyExampleHandler());
49 *
50 * class MyExampleHandler extends ChannelInboundHandlerAdapter {
51 * @Override
52 * public void channelRead(ChannelHandlerContext ctx, Object msg) {
53 * if (msg instanceof HttpRequest) {
54 * ctx.channel().setChannelOption(ChannelOption.AUTO_READ, false);
55 *
56 * // The FlowControlHandler will hold any subsequent events that
57 * // were emitted by HttpObjectDecoder until auto reading is turned
58 * // back on or Channel#read() is being called.
59 * }
60 * }
61 * }
62 * }</pre>
63 *
64 * @see ChannelOption#AUTO_READ)
65 */
66 public class FlowControlHandler implements ChannelHandler {
67 private static final InternalLogger logger = InternalLoggerFactory.getInstance(FlowControlHandler.class);
68
69 private final boolean releaseMessages;
70
71 private RecyclableArrayDeque queue;
72
73 private boolean shouldConsume;
74
75 public FlowControlHandler() {
76 this(true);
77 }
78
79 public FlowControlHandler(boolean releaseMessages) {
80 this.releaseMessages = releaseMessages;
81 }
82
83 /**
84 * Determine if the underlying {@link Queue} is empty. This method exists for
85 * testing, debugging and inspection purposes and it is not Thread safe!
86 */
87 boolean isQueueEmpty() {
88 return queue == null || queue.isEmpty();
89 }
90
91 /**
92 * Releases all messages and destroys the {@link Queue}.
93 */
94 private void destroy() {
95 if (queue != null) {
96
97 if (!queue.isEmpty()) {
98 logger.trace("Non-empty queue: {}", queue);
99
100 if (releaseMessages) {
101 Object msg;
102 while ((msg = queue.poll()) != null) {
103 try {
104 Resource.dispose(msg);
105 } catch (Exception e) {
106 logger.trace("Exception while disposing of message in flow control", e);
107 }
108 }
109 }
110 }
111
112 queue.recycle();
113 queue = null;
114 }
115 }
116
117 @Override
118 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
119 if (!isQueueEmpty()) {
120 dequeue(ctx, queue.size());
121 }
122 destroy();
123 }
124
125 @Override
126 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
127 destroy();
128 ctx.fireChannelInactive();
129 }
130
131 @Override
132 public void read(ChannelHandlerContext ctx) {
133 if (dequeue(ctx, 1) == 0) {
134 // It seems no messages were consumed. We need to read() some
135 // messages from upstream and once one arrives it need to be
136 // relayed to downstream to keep the flow going.
137 shouldConsume = true;
138 ctx.read();
139 }
140 }
141
142 @Override
143 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
144 if (queue == null) {
145 queue = RecyclableArrayDeque.newInstance();
146 }
147
148 queue.offer(msg);
149
150 // We just received one message. Do we need to relay it regardless
151 // of the auto reading configuration? The answer is yes if this
152 // method was called as a result of a prior read() call.
153 int minConsume = shouldConsume ? 1 : 0;
154 shouldConsume = false;
155
156 dequeue(ctx, minConsume);
157 }
158
159 @Override
160 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
161 if (isQueueEmpty()) {
162 ctx.fireChannelReadComplete();
163 } else {
164 // Don't relay completion events from upstream as they
165 // make no sense in this context. See dequeue() where
166 // a new set of completion events is being produced.
167 }
168 }
169
170 /**
171 * Dequeues one or many (or none) messages depending on the channel's auto
172 * reading state and returns the number of messages that were consumed from
173 * the internal queue.
174 *
175 * The {@code minConsume} argument is used to force {@code dequeue()} into
176 * consuming that number of messages regardless of the channel's auto
177 * reading configuration.
178 *
179 * @see #read(ChannelHandlerContext)
180 * @see #channelRead(ChannelHandlerContext, Object)
181 */
182 private int dequeue(ChannelHandlerContext ctx, int minConsume) {
183 int consumed = 0;
184
185 // fireChannelRead(...) may call ctx.read() and so this method may reentrance. Because of this we need to
186 // check if queue was set to null in the meantime and if so break the loop.
187 while (queue != null && (consumed < minConsume || ctx.channel().getOption(ChannelOption.AUTO_READ))) {
188 Object msg = queue.poll();
189 if (msg == null) {
190 break;
191 }
192
193 ++consumed;
194 ctx.fireChannelRead(msg);
195 }
196
197 // We're firing a completion event every time one (or more)
198 // messages were consumed and the queue ended up being drained
199 // to an empty state.
200 if (queue != null && queue.isEmpty()) {
201 queue.recycle();
202 queue = null;
203
204 if (consumed > 0) {
205 ctx.fireChannelReadComplete();
206 }
207 }
208
209 return consumed;
210 }
211
212 /**
213 * A recyclable {@link ArrayDeque}.
214 */
215 private static final class RecyclableArrayDeque extends ArrayDeque<Object> {
216
217 private static final long serialVersionUID = 0L;
218
219 /**
220 * A value of {@code 2} should be a good choice for most scenarios.
221 */
222 private static final int DEFAULT_NUM_ELEMENTS = 2;
223
224 private static final ObjectPool<RecyclableArrayDeque> RECYCLER = ObjectPool.newPool(
225 handle -> new RecyclableArrayDeque(DEFAULT_NUM_ELEMENTS, handle));
226
227 public static RecyclableArrayDeque newInstance() {
228 return RECYCLER.get();
229 }
230
231 private final Handle<RecyclableArrayDeque> handle;
232
233 private RecyclableArrayDeque(int numElements, Handle<RecyclableArrayDeque> handle) {
234 super(numElements);
235 this.handle = handle;
236 }
237
238 public void recycle() {
239 clear();
240 handle.recycle(this);
241 }
242 }
243 }