1 /*
2 * Copyright 2016 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License, version
5 * 2.0 (the "License"); you may not use this file except in compliance with the
6 * License. You may obtain a copy of the License at:
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations under
14 * the License.
15 */
16 package io.netty.handler.flow;
17
18 import java.util.ArrayDeque;
19 import java.util.Queue;
20
21 import io.netty.channel.ChannelConfig;
22 import io.netty.channel.ChannelDuplexHandler;
23 import io.netty.channel.ChannelHandler;
24 import io.netty.channel.ChannelHandlerContext;
25 import io.netty.handler.codec.ByteToMessageDecoder;
26 import io.netty.handler.codec.MessageToByteEncoder;
27 import io.netty.util.Recycler;
28 import io.netty.util.ReferenceCountUtil;
29 import io.netty.util.internal.ObjectPool.Handle;
30 import io.netty.util.internal.logging.InternalLogger;
31 import io.netty.util.internal.logging.InternalLoggerFactory;
32
33 /**
34 * The {@link FlowControlHandler} ensures that only one message per {@code read()} is sent downstream.
35 *
36 * Classes such as {@link ByteToMessageDecoder} or {@link MessageToByteEncoder} are free to emit as
37 * many events as they like for any given input. A channel's auto reading configuration doesn't usually
38 * apply in these scenarios. This is causing problems in downstream {@link ChannelHandler}s that would
39 * like to hold subsequent events while they're processing one event. It's a common problem with the
40 * {@code HttpObjectDecoder} that will very often fire an {@code HttpRequest} that is immediately followed
41 * by a {@code LastHttpContent} event.
42 *
43 * <pre>{@code
44 * ChannelPipeline pipeline = ...;
45 *
46 * pipeline.addLast(new HttpServerCodec());
47 * pipeline.addLast(new FlowControlHandler());
48 *
49 * pipeline.addLast(new MyExampleHandler());
50 *
51 * class MyExampleHandler extends ChannelInboundHandlerAdapter {
52 * @Override
53 * public void channelRead(ChannelHandlerContext ctx, Object msg) {
54 * if (msg instanceof HttpRequest) {
55 * ctx.channel().config().setAutoRead(false);
56 *
57 * // The FlowControlHandler will hold any subsequent events that
58 * // were emitted by HttpObjectDecoder until auto reading is turned
59 * // back on or Channel#read() is being called.
60 * }
61 * }
62 * }
63 * }</pre>
64 *
65 * @see ChannelConfig#setAutoRead(boolean)
66 */
67 public class FlowControlHandler extends ChannelDuplexHandler {
68 private static final InternalLogger logger = InternalLoggerFactory.getInstance(FlowControlHandler.class);
69
70 private final boolean releaseMessages;
71
72 private RecyclableArrayDeque queue;
73
74 private ChannelConfig config;
75
76 private boolean shouldConsume;
77
78 public FlowControlHandler() {
79 this(true);
80 }
81
82 public FlowControlHandler(boolean releaseMessages) {
83 this.releaseMessages = releaseMessages;
84 }
85
86 /**
87 * Determine if the underlying {@link Queue} is empty. This method exists for
88 * testing, debugging and inspection purposes and it is not Thread safe!
89 */
90 boolean isQueueEmpty() {
91 return queue == null || queue.isEmpty();
92 }
93
94 /**
95 * Releases all messages and destroys the {@link Queue}.
96 */
97 private void destroy() {
98 if (queue != null) {
99
100 if (!queue.isEmpty()) {
101 logger.trace("Non-empty queue: {}", queue);
102
103 if (releaseMessages) {
104 Object msg;
105 while ((msg = queue.poll()) != null) {
106 ReferenceCountUtil.safeRelease(msg);
107 }
108 }
109 }
110
111 queue.recycle();
112 queue = null;
113 }
114 }
115
116 @Override
117 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
118 config = ctx.channel().config();
119 }
120
121 @Override
122 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
123 super.handlerRemoved(ctx);
124 if (!isQueueEmpty()) {
125 dequeue(ctx, queue.size());
126 }
127 destroy();
128 }
129
130 @Override
131 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
132 destroy();
133 ctx.fireChannelInactive();
134 }
135
136 @Override
137 public void read(ChannelHandlerContext ctx) throws Exception {
138 if (dequeue(ctx, 1) == 0) {
139 // It seems no messages were consumed. We need to read() some
140 // messages from upstream and once one arrives it need to be
141 // relayed to downstream to keep the flow going.
142 shouldConsume = true;
143 ctx.read();
144 } else if (config.isAutoRead()) {
145 ctx.read();
146 }
147 }
148
149 @Override
150 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
151 if (queue == null) {
152 queue = RecyclableArrayDeque.newInstance();
153 }
154
155 queue.offer(msg);
156
157 // We just received one message. Do we need to relay it regardless
158 // of the auto reading configuration? The answer is yes if this
159 // method was called as a result of a prior read() call.
160 int minConsume = shouldConsume ? 1 : 0;
161 shouldConsume = false;
162
163 dequeue(ctx, minConsume);
164 }
165
166 @Override
167 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
168 if (isQueueEmpty()) {
169 ctx.fireChannelReadComplete();
170 } else {
171 // Don't relay completion events from upstream as they
172 // make no sense in this context. See dequeue() where
173 // a new set of completion events is being produced.
174 }
175 }
176
177 /**
178 * Dequeues one or many (or none) messages depending on the channel's auto
179 * reading state and returns the number of messages that were consumed from
180 * the internal queue.
181 *
182 * The {@code minConsume} argument is used to force {@code dequeue()} into
183 * consuming that number of messages regardless of the channel's auto
184 * reading configuration.
185 *
186 * @see #read(ChannelHandlerContext)
187 * @see #channelRead(ChannelHandlerContext, Object)
188 */
189 private int dequeue(ChannelHandlerContext ctx, int minConsume) {
190 int consumed = 0;
191
192 // fireChannelRead(...) may call ctx.read() and so this method may reentrance. Because of this we need to
193 // check if queue was set to null in the meantime and if so break the loop.
194 while (queue != null && (consumed < minConsume || config.isAutoRead())) {
195 Object msg = queue.poll();
196 if (msg == null) {
197 break;
198 }
199
200 ++consumed;
201 ctx.fireChannelRead(msg);
202 }
203
204 // We're firing a completion event every time one (or more)
205 // messages were consumed and the queue ended up being drained
206 // to an empty state.
207 if (queue != null && queue.isEmpty()) {
208 queue.recycle();
209 queue = null;
210
211 if (consumed > 0) {
212 ctx.fireChannelReadComplete();
213 }
214 }
215
216 return consumed;
217 }
218
219 /**
220 * A recyclable {@link ArrayDeque}.
221 */
222 private static final class RecyclableArrayDeque extends ArrayDeque<Object> {
223
224 private static final long serialVersionUID = 0L;
225
226 /**
227 * A value of {@code 2} should be a good choice for most scenarios.
228 */
229 private static final int DEFAULT_NUM_ELEMENTS = 2;
230
231 private static final Recycler<RecyclableArrayDeque> RECYCLER =
232 new Recycler<RecyclableArrayDeque>() {
233 @Override
234 protected RecyclableArrayDeque newObject(Handle<RecyclableArrayDeque> handle) {
235 return new RecyclableArrayDeque(DEFAULT_NUM_ELEMENTS, handle);
236 }
237 };
238
239 public static RecyclableArrayDeque newInstance() {
240 return RECYCLER.get();
241 }
242
243 private final Handle<RecyclableArrayDeque> handle;
244
245 private RecyclableArrayDeque(int numElements, Handle<RecyclableArrayDeque> handle) {
246 super(numElements);
247 this.handle = handle;
248 }
249
250 public void recycle() {
251 clear();
252 handle.recycle(this);
253 }
254 }
255 }