1 /*
2 * Copyright 2016 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License, version
5 * 2.0 (the "License"); you may not use this file except in compliance with the
6 * License. You may obtain a copy of the License at:
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations under
14 * the License.
15 */
16 package io.netty.handler.flow;
17
18 import java.util.ArrayDeque;
19 import java.util.Queue;
20
21 import io.netty.channel.ChannelConfig;
22 import io.netty.channel.ChannelDuplexHandler;
23 import io.netty.channel.ChannelHandler;
24 import io.netty.channel.ChannelHandlerContext;
25 import io.netty.handler.codec.ByteToMessageDecoder;
26 import io.netty.handler.codec.MessageToByteEncoder;
27 import io.netty.util.ReferenceCountUtil;
28 import io.netty.util.internal.ObjectPool;
29 import io.netty.util.internal.ObjectPool.Handle;
30 import io.netty.util.internal.ObjectPool.ObjectCreator;
31 import io.netty.util.internal.logging.InternalLogger;
32 import io.netty.util.internal.logging.InternalLoggerFactory;
33
34 /**
35 * The {@link FlowControlHandler} ensures that only one message per {@code read()} is sent downstream.
36 *
37 * Classes such as {@link ByteToMessageDecoder} or {@link MessageToByteEncoder} are free to emit as
38 * many events as they like for any given input. A channel's auto reading configuration doesn't usually
39 * apply in these scenarios. This is causing problems in downstream {@link ChannelHandler}s that would
40 * like to hold subsequent events while they're processing one event. It's a common problem with the
41 * {@code HttpObjectDecoder} that will very often fire an {@code HttpRequest} that is immediately followed
42 * by a {@code LastHttpContent} event.
43 *
44 * <pre>{@code
45 * ChannelPipeline pipeline = ...;
46 *
47 * pipeline.addLast(new HttpServerCodec());
48 * pipeline.addLast(new FlowControlHandler());
49 *
50 * pipeline.addLast(new MyExampleHandler());
51 *
52 * class MyExampleHandler extends ChannelInboundHandlerAdapter {
53 * @Override
54 * public void channelRead(ChannelHandlerContext ctx, Object msg) {
55 * if (msg instanceof HttpRequest) {
56 * ctx.channel().config().setAutoRead(false);
57 *
58 * // The FlowControlHandler will hold any subsequent events that
59 * // were emitted by HttpObjectDecoder until auto reading is turned
60 * // back on or Channel#read() is being called.
61 * }
62 * }
63 * }
64 * }</pre>
65 *
66 * @see ChannelConfig#setAutoRead(boolean)
67 */
68 public class FlowControlHandler extends ChannelDuplexHandler {
69 private static final InternalLogger logger = InternalLoggerFactory.getInstance(FlowControlHandler.class);
70
71 private final boolean releaseMessages;
72
73 private RecyclableArrayDeque queue;
74
75 private ChannelConfig config;
76
77 private boolean shouldConsume;
78
79 public FlowControlHandler() {
80 this(true);
81 }
82
83 public FlowControlHandler(boolean releaseMessages) {
84 this.releaseMessages = releaseMessages;
85 }
86
87 /**
88 * Determine if the underlying {@link Queue} is empty. This method exists for
89 * testing, debugging and inspection purposes and it is not Thread safe!
90 */
91 boolean isQueueEmpty() {
92 return queue == null || queue.isEmpty();
93 }
94
95 /**
96 * Releases all messages and destroys the {@link Queue}.
97 */
98 private void destroy() {
99 if (queue != null) {
100
101 if (!queue.isEmpty()) {
102 logger.trace("Non-empty queue: {}", queue);
103
104 if (releaseMessages) {
105 Object msg;
106 while ((msg = queue.poll()) != null) {
107 ReferenceCountUtil.safeRelease(msg);
108 }
109 }
110 }
111
112 queue.recycle();
113 queue = null;
114 }
115 }
116
117 @Override
118 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
119 config = ctx.channel().config();
120 }
121
122 @Override
123 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
124 super.handlerRemoved(ctx);
125 if (!isQueueEmpty()) {
126 dequeue(ctx, queue.size());
127 }
128 destroy();
129 }
130
131 @Override
132 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
133 destroy();
134 ctx.fireChannelInactive();
135 }
136
137 @Override
138 public void read(ChannelHandlerContext ctx) throws Exception {
139 if (dequeue(ctx, 1) == 0) {
140 // It seems no messages were consumed. We need to read() some
141 // messages from upstream and once one arrives it need to be
142 // relayed to downstream to keep the flow going.
143 shouldConsume = true;
144 ctx.read();
145 }
146 }
147
148 @Override
149 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
150 if (queue == null) {
151 queue = RecyclableArrayDeque.newInstance();
152 }
153
154 queue.offer(msg);
155
156 // We just received one message. Do we need to relay it regardless
157 // of the auto reading configuration? The answer is yes if this
158 // method was called as a result of a prior read() call.
159 int minConsume = shouldConsume ? 1 : 0;
160 shouldConsume = false;
161
162 dequeue(ctx, minConsume);
163 }
164
165 @Override
166 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
167 if (isQueueEmpty()) {
168 ctx.fireChannelReadComplete();
169 } else {
170 // Don't relay completion events from upstream as they
171 // make no sense in this context. See dequeue() where
172 // a new set of completion events is being produced.
173 }
174 }
175
176 /**
177 * Dequeues one or many (or none) messages depending on the channel's auto
178 * reading state and returns the number of messages that were consumed from
179 * the internal queue.
180 *
181 * The {@code minConsume} argument is used to force {@code dequeue()} into
182 * consuming that number of messages regardless of the channel's auto
183 * reading configuration.
184 *
185 * @see #read(ChannelHandlerContext)
186 * @see #channelRead(ChannelHandlerContext, Object)
187 */
188 private int dequeue(ChannelHandlerContext ctx, int minConsume) {
189 int consumed = 0;
190
191 // fireChannelRead(...) may call ctx.read() and so this method may reentrance. Because of this we need to
192 // check if queue was set to null in the meantime and if so break the loop.
193 while (queue != null && (consumed < minConsume || config.isAutoRead())) {
194 Object msg = queue.poll();
195 if (msg == null) {
196 break;
197 }
198
199 ++consumed;
200 ctx.fireChannelRead(msg);
201 }
202
203 // We're firing a completion event every time one (or more)
204 // messages were consumed and the queue ended up being drained
205 // to an empty state.
206 if (queue != null && queue.isEmpty()) {
207 queue.recycle();
208 queue = null;
209
210 if (consumed > 0) {
211 ctx.fireChannelReadComplete();
212 }
213 }
214
215 return consumed;
216 }
217
218 /**
219 * A recyclable {@link ArrayDeque}.
220 */
221 private static final class RecyclableArrayDeque extends ArrayDeque<Object> {
222
223 private static final long serialVersionUID = 0L;
224
225 /**
226 * A value of {@code 2} should be a good choice for most scenarios.
227 */
228 private static final int DEFAULT_NUM_ELEMENTS = 2;
229
230 private static final ObjectPool<RecyclableArrayDeque> RECYCLER = ObjectPool.newPool(
231 new ObjectCreator<RecyclableArrayDeque>() {
232 @Override
233 public RecyclableArrayDeque newObject(Handle<RecyclableArrayDeque> handle) {
234 return new RecyclableArrayDeque(DEFAULT_NUM_ELEMENTS, handle);
235 }
236 });
237
238 public static RecyclableArrayDeque newInstance() {
239 return RECYCLER.get();
240 }
241
242 private final Handle<RecyclableArrayDeque> handle;
243
244 private RecyclableArrayDeque(int numElements, Handle<RecyclableArrayDeque> handle) {
245 super(numElements);
246 this.handle = handle;
247 }
248
249 public void recycle() {
250 clear();
251 handle.recycle(this);
252 }
253 }
254 }