1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.handler.timeout;
17
18 import io.netty5.bootstrap.ServerBootstrap;
19 import io.netty5.channel.Channel;
20 import io.netty5.channel.ChannelHandler;
21 import io.netty5.channel.ChannelHandlerContext;
22 import io.netty5.channel.ChannelInitializer;
23 import io.netty5.util.concurrent.Future;
24 import io.netty5.util.concurrent.FutureListener;
25
26 import java.util.concurrent.TimeUnit;
27
28 import static java.util.Objects.requireNonNull;
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96 public class IdleStateHandler implements ChannelHandler {
97 private static final long MIN_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
98
99
100 private final FutureListener<Void> writeListener = future -> {
101 lastWriteTime = ticksInNanos();
102 firstWriterIdleEvent = firstAllIdleEvent = true;
103 };
104
105 private final long readerIdleTimeNanos;
106 private final long writerIdleTimeNanos;
107 private final long allIdleTimeNanos;
108
109 private Future<?> readerIdleTimeout;
110 private long lastReadTime;
111 private boolean firstReaderIdleEvent = true;
112
113 private Future<?> writerIdleTimeout;
114 private long lastWriteTime;
115 private boolean firstWriterIdleEvent = true;
116
117 private Future<?> allIdleTimeout;
118 private boolean firstAllIdleEvent = true;
119
120 private byte state;
121 private boolean reading;
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139 public IdleStateHandler(
140 int readerIdleTimeSeconds,
141 int writerIdleTimeSeconds,
142 int allIdleTimeSeconds) {
143
144 this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
145 TimeUnit.SECONDS);
146 }
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166 public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime,
167 TimeUnit unit) {
168 requireNonNull(unit, "unit");
169
170 if (readerIdleTime <= 0) {
171 readerIdleTimeNanos = 0;
172 } else {
173 readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
174 }
175 if (writerIdleTime <= 0) {
176 writerIdleTimeNanos = 0;
177 } else {
178 writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
179 }
180 if (allIdleTime <= 0) {
181 allIdleTimeNanos = 0;
182 } else {
183 allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
184 }
185 }
186
187
188
189
190
191 public long getReaderIdleTimeInMillis() {
192 return TimeUnit.NANOSECONDS.toMillis(readerIdleTimeNanos);
193 }
194
195
196
197
198
199 public long getWriterIdleTimeInMillis() {
200 return TimeUnit.NANOSECONDS.toMillis(writerIdleTimeNanos);
201 }
202
203
204
205
206
207 public long getAllIdleTimeInMillis() {
208 return TimeUnit.NANOSECONDS.toMillis(allIdleTimeNanos);
209 }
210
211 @Override
212 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
213 if (ctx.channel().isActive() && ctx.channel().isRegistered()) {
214
215
216 initialize(ctx);
217 } else {
218
219
220 }
221 }
222
223 @Override
224 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
225 destroy();
226 }
227
228 @Override
229 public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
230
231 if (ctx.channel().isActive()) {
232 initialize(ctx);
233 }
234 ctx.fireChannelRegistered();
235 }
236
237 @Override
238 public void channelActive(ChannelHandlerContext ctx) throws Exception {
239
240
241
242 initialize(ctx);
243 ctx.fireChannelActive();
244 }
245
246 @Override
247 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
248 destroy();
249 ctx.fireChannelInactive();
250 }
251
252 @Override
253 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
254 if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
255 reading = true;
256 firstReaderIdleEvent = firstAllIdleEvent = true;
257 }
258 ctx.fireChannelRead(msg);
259 }
260
261 @Override
262 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
263 if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) {
264 lastReadTime = ticksInNanos();
265 reading = false;
266 }
267 ctx.fireChannelReadComplete();
268 }
269
270 @Override
271 public Future<Void> write(ChannelHandlerContext ctx, Object msg) {
272 Future<Void> future = ctx.write(msg);
273
274 if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
275 future.addListener(writeListener);
276 }
277 return future;
278 }
279
280 private void initialize(ChannelHandlerContext ctx) {
281
282
283 switch (state) {
284 case 1:
285 case 2:
286 return;
287 default:
288 break;
289 }
290
291 state = 1;
292
293 lastReadTime = lastWriteTime = ticksInNanos();
294 if (readerIdleTimeNanos > 0) {
295 readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
296 readerIdleTimeNanos, TimeUnit.NANOSECONDS);
297 }
298 if (writerIdleTimeNanos > 0) {
299 writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
300 writerIdleTimeNanos, TimeUnit.NANOSECONDS);
301 }
302 if (allIdleTimeNanos > 0) {
303 allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
304 allIdleTimeNanos, TimeUnit.NANOSECONDS);
305 }
306 }
307
308
309
310
311 long ticksInNanos() {
312 return System.nanoTime();
313 }
314
315
316
317
318 Future<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
319 return ctx.executor().schedule(task, delay, unit);
320 }
321
322 private void destroy() {
323 state = 2;
324
325 if (readerIdleTimeout != null) {
326 readerIdleTimeout.cancel();
327 readerIdleTimeout = null;
328 }
329 if (writerIdleTimeout != null) {
330 writerIdleTimeout.cancel();
331 writerIdleTimeout = null;
332 }
333 if (allIdleTimeout != null) {
334 allIdleTimeout.cancel();
335 allIdleTimeout = null;
336 }
337 }
338
339
340
341
342
343 protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
344 ctx.fireChannelInboundEvent(evt);
345 }
346
347
348
349
350 protected IdleStateEvent newIdleStateEvent(IdleState state, boolean first) {
351 switch (state) {
352 case ALL_IDLE:
353 return first ? IdleStateEvent.FIRST_ALL_IDLE_STATE_EVENT : IdleStateEvent.ALL_IDLE_STATE_EVENT;
354 case READER_IDLE:
355 return first ? IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT : IdleStateEvent.READER_IDLE_STATE_EVENT;
356 case WRITER_IDLE:
357 return first ? IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT : IdleStateEvent.WRITER_IDLE_STATE_EVENT;
358 default:
359 throw new IllegalArgumentException("Unhandled: state=" + state + ", first=" + first);
360 }
361 }
362
363 private abstract static class AbstractIdleTask implements Runnable {
364
365 private final ChannelHandlerContext ctx;
366
367 AbstractIdleTask(ChannelHandlerContext ctx) {
368 this.ctx = ctx;
369 }
370
371 @Override
372 public void run() {
373 if (!ctx.channel().isOpen()) {
374 return;
375 }
376
377 run(ctx);
378 }
379
380 protected abstract void run(ChannelHandlerContext ctx);
381 }
382
383 private final class ReaderIdleTimeoutTask extends AbstractIdleTask {
384
385 ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
386 super(ctx);
387 }
388
389 @Override
390 protected void run(ChannelHandlerContext ctx) {
391 long nextDelay = readerIdleTimeNanos;
392 if (!reading) {
393 nextDelay -= ticksInNanos() - lastReadTime;
394 }
395
396 if (nextDelay <= 0) {
397
398 readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
399
400 boolean first = firstReaderIdleEvent;
401 firstReaderIdleEvent = false;
402
403 try {
404 IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
405 channelIdle(ctx, event);
406 } catch (Throwable t) {
407 ctx.fireChannelExceptionCaught(t);
408 }
409 } else {
410
411 readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
412 }
413 }
414 }
415
416 private final class WriterIdleTimeoutTask extends AbstractIdleTask {
417
418 WriterIdleTimeoutTask(ChannelHandlerContext ctx) {
419 super(ctx);
420 }
421
422 @Override
423 protected void run(ChannelHandlerContext ctx) {
424
425 long lastWriteTime = IdleStateHandler.this.lastWriteTime;
426 long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
427 if (nextDelay <= 0) {
428
429 writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);
430
431 boolean first = firstWriterIdleEvent;
432 firstWriterIdleEvent = false;
433
434 try {
435 IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
436 channelIdle(ctx, event);
437 } catch (Throwable t) {
438 ctx.fireChannelExceptionCaught(t);
439 }
440 } else {
441
442 writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
443 }
444 }
445 }
446
447 private final class AllIdleTimeoutTask extends AbstractIdleTask {
448
449 AllIdleTimeoutTask(ChannelHandlerContext ctx) {
450 super(ctx);
451 }
452
453 @Override
454 protected void run(ChannelHandlerContext ctx) {
455
456 long nextDelay = allIdleTimeNanos;
457 if (!reading) {
458 nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
459 }
460 if (nextDelay <= 0) {
461
462
463 allIdleTimeout = schedule(ctx, this, allIdleTimeNanos, TimeUnit.NANOSECONDS);
464
465 boolean first = firstAllIdleEvent;
466 firstAllIdleEvent = false;
467
468 try {
469 IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, first);
470 channelIdle(ctx, event);
471 } catch (Throwable t) {
472 ctx.fireChannelExceptionCaught(t);
473 }
474 } else {
475
476
477 allIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
478 }
479 }
480 }
481 }