1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.handler.flow;
17
18 import io.netty5.channel.ChannelOption;
19 import io.netty5.handler.codec.ByteToMessageDecoder;
20 import io.netty5.handler.codec.MessageToByteEncoder;
21 import io.netty5.util.Resource;
22 import io.netty5.channel.ChannelHandler;
23 import io.netty5.channel.ChannelHandlerContext;
24 import io.netty5.util.internal.ObjectPool;
25 import io.netty5.util.internal.ObjectPool.Handle;
26 import io.netty5.util.internal.logging.InternalLogger;
27 import io.netty5.util.internal.logging.InternalLoggerFactory;
28
29 import java.util.ArrayDeque;
30 import java.util.Queue;
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66 public class FlowControlHandler implements ChannelHandler {
67 private static final InternalLogger logger = InternalLoggerFactory.getInstance(FlowControlHandler.class);
68
69 private final boolean releaseMessages;
70
71 private RecyclableArrayDeque queue;
72
73 private boolean shouldConsume;
74
75 public FlowControlHandler() {
76 this(true);
77 }
78
79 public FlowControlHandler(boolean releaseMessages) {
80 this.releaseMessages = releaseMessages;
81 }
82
83
84
85
86
87 boolean isQueueEmpty() {
88 return queue == null || queue.isEmpty();
89 }
90
91
92
93
94 private void destroy() {
95 if (queue != null) {
96
97 if (!queue.isEmpty()) {
98 logger.trace("Non-empty queue: {}", queue);
99
100 if (releaseMessages) {
101 Object msg;
102 while ((msg = queue.poll()) != null) {
103 try {
104 Resource.dispose(msg);
105 } catch (Exception e) {
106 logger.trace("Exception while disposing of message in flow control", e);
107 }
108 }
109 }
110 }
111
112 queue.recycle();
113 queue = null;
114 }
115 }
116
117 @Override
118 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
119 if (!isQueueEmpty()) {
120 dequeue(ctx, queue.size());
121 }
122 destroy();
123 }
124
125 @Override
126 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
127 destroy();
128 ctx.fireChannelInactive();
129 }
130
131 @Override
132 public void read(ChannelHandlerContext ctx) {
133 if (dequeue(ctx, 1) == 0) {
134
135
136
137 shouldConsume = true;
138 ctx.read();
139 }
140 }
141
142 @Override
143 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
144 if (queue == null) {
145 queue = RecyclableArrayDeque.newInstance();
146 }
147
148 queue.offer(msg);
149
150
151
152
153 int minConsume = shouldConsume ? 1 : 0;
154 shouldConsume = false;
155
156 dequeue(ctx, minConsume);
157 }
158
159 @Override
160 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
161 if (isQueueEmpty()) {
162 ctx.fireChannelReadComplete();
163 } else {
164
165
166
167 }
168 }
169
170
171
172
173
174
175
176
177
178
179
180
181
182 private int dequeue(ChannelHandlerContext ctx, int minConsume) {
183 int consumed = 0;
184
185
186
187 while (queue != null && (consumed < minConsume || ctx.channel().getOption(ChannelOption.AUTO_READ))) {
188 Object msg = queue.poll();
189 if (msg == null) {
190 break;
191 }
192
193 ++consumed;
194 ctx.fireChannelRead(msg);
195 }
196
197
198
199
200 if (queue != null && queue.isEmpty()) {
201 queue.recycle();
202 queue = null;
203
204 if (consumed > 0) {
205 ctx.fireChannelReadComplete();
206 }
207 }
208
209 return consumed;
210 }
211
212
213
214
215 private static final class RecyclableArrayDeque extends ArrayDeque<Object> {
216
217 private static final long serialVersionUID = 0L;
218
219
220
221
222 private static final int DEFAULT_NUM_ELEMENTS = 2;
223
224 private static final ObjectPool<RecyclableArrayDeque> RECYCLER = ObjectPool.newPool(
225 handle -> new RecyclableArrayDeque(DEFAULT_NUM_ELEMENTS, handle));
226
227 public static RecyclableArrayDeque newInstance() {
228 return RECYCLER.get();
229 }
230
231 private final Handle<RecyclableArrayDeque> handle;
232
233 private RecyclableArrayDeque(int numElements, Handle<RecyclableArrayDeque> handle) {
234 super(numElements);
235 this.handle = handle;
236 }
237
238 public void recycle() {
239 clear();
240 handle.recycle(this);
241 }
242 }
243 }