1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.timeout;
17
18 import io.netty.bootstrap.ServerBootstrap;
19 import io.netty.channel.Channel;
20 import io.netty.channel.Channel.Unsafe;
21 import io.netty.channel.ChannelDuplexHandler;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.ChannelFutureListener;
24 import io.netty.channel.ChannelHandlerContext;
25 import io.netty.channel.ChannelInitializer;
26 import io.netty.channel.ChannelOutboundBuffer;
27 import io.netty.channel.ChannelPromise;
28 import io.netty.util.concurrent.Future;
29 import io.netty.util.internal.ObjectUtil;
30
31 import java.util.concurrent.TimeUnit;
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99 public class IdleStateHandler extends ChannelDuplexHandler {
100 private static final long MIN_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
101
102
103 private final ChannelFutureListener writeListener = new ChannelFutureListener() {
104 @Override
105 public void operationComplete(ChannelFuture future) throws Exception {
106 lastWriteTime = ticksInNanos();
107 firstWriterIdleEvent = firstAllIdleEvent = true;
108 }
109 };
110
111 private final boolean observeOutput;
112 private final long readerIdleTimeNanos;
113 private final long writerIdleTimeNanos;
114 private final long allIdleTimeNanos;
115
116 private Future<?> readerIdleTimeout;
117 private long lastReadTime;
118 private boolean firstReaderIdleEvent = true;
119
120 private Future<?> writerIdleTimeout;
121 private long lastWriteTime;
122 private boolean firstWriterIdleEvent = true;
123
124 private Future<?> allIdleTimeout;
125 private boolean firstAllIdleEvent = true;
126
127 private byte state;
128 private static final byte ST_INITIALIZED = 1;
129 private static final byte ST_DESTROYED = 2;
130
131 private boolean reading;
132
133 private long lastChangeCheckTimeStamp;
134 private int lastMessageHashCode;
135 private long lastPendingWriteBytes;
136 private long lastFlushProgress;
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154 public IdleStateHandler(
155 int readerIdleTimeSeconds,
156 int writerIdleTimeSeconds,
157 int allIdleTimeSeconds) {
158
159 this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
160 TimeUnit.SECONDS);
161 }
162
163
164
165
166 public IdleStateHandler(
167 long readerIdleTime, long writerIdleTime, long allIdleTime,
168 TimeUnit unit) {
169 this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
170 }
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194 public IdleStateHandler(boolean observeOutput,
195 long readerIdleTime, long writerIdleTime, long allIdleTime,
196 TimeUnit unit) {
197 ObjectUtil.checkNotNull(unit, "unit");
198
199 this.observeOutput = observeOutput;
200
201 if (readerIdleTime <= 0) {
202 readerIdleTimeNanos = 0;
203 } else {
204 readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
205 }
206 if (writerIdleTime <= 0) {
207 writerIdleTimeNanos = 0;
208 } else {
209 writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
210 }
211 if (allIdleTime <= 0) {
212 allIdleTimeNanos = 0;
213 } else {
214 allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
215 }
216 }
217
218
219
220
221
222 public long getReaderIdleTimeInMillis() {
223 return TimeUnit.NANOSECONDS.toMillis(readerIdleTimeNanos);
224 }
225
226
227
228
229
230 public long getWriterIdleTimeInMillis() {
231 return TimeUnit.NANOSECONDS.toMillis(writerIdleTimeNanos);
232 }
233
234
235
236
237
238 public long getAllIdleTimeInMillis() {
239 return TimeUnit.NANOSECONDS.toMillis(allIdleTimeNanos);
240 }
241
242 @Override
243 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
244 if (ctx.channel().isActive() && ctx.channel().isRegistered()) {
245
246
247 initialize(ctx);
248 } else {
249
250
251 }
252 }
253
254 @Override
255 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
256 destroy();
257 }
258
259 @Override
260 public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
261
262 if (ctx.channel().isActive()) {
263 initialize(ctx);
264 }
265 super.channelRegistered(ctx);
266 }
267
268 @Override
269 public void channelActive(ChannelHandlerContext ctx) throws Exception {
270
271
272
273 initialize(ctx);
274 super.channelActive(ctx);
275 }
276
277 @Override
278 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
279 destroy();
280 super.channelInactive(ctx);
281 }
282
283 @Override
284 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
285 if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
286 reading = true;
287 firstReaderIdleEvent = firstAllIdleEvent = true;
288 }
289 ctx.fireChannelRead(msg);
290 }
291
292 @Override
293 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
294 if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) {
295 lastReadTime = ticksInNanos();
296 reading = false;
297 }
298 ctx.fireChannelReadComplete();
299 }
300
301 @Override
302 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
303
304 if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
305 ctx.write(msg, promise.unvoid()).addListener(writeListener);
306 } else {
307 ctx.write(msg, promise);
308 }
309 }
310
311
312
313
314 public void resetReadTimeout() {
315 if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
316 lastReadTime = ticksInNanos();
317 reading = false;
318 }
319 }
320
321
322
323
324 public void resetWriteTimeout() {
325 if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
326 lastWriteTime = ticksInNanos();
327 }
328 }
329
330 private void initialize(ChannelHandlerContext ctx) {
331
332
333 switch (state) {
334 case 1:
335 case 2:
336 return;
337 default:
338 break;
339 }
340
341 state = ST_INITIALIZED;
342 initOutputChanged(ctx);
343
344 lastReadTime = lastWriteTime = ticksInNanos();
345 if (readerIdleTimeNanos > 0) {
346 readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
347 readerIdleTimeNanos, TimeUnit.NANOSECONDS);
348 }
349 if (writerIdleTimeNanos > 0) {
350 writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
351 writerIdleTimeNanos, TimeUnit.NANOSECONDS);
352 }
353 if (allIdleTimeNanos > 0) {
354 allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
355 allIdleTimeNanos, TimeUnit.NANOSECONDS);
356 }
357 }
358
359
360
361
362 long ticksInNanos() {
363 return System.nanoTime();
364 }
365
366
367
368
369 Future<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
370 return ctx.executor().schedule(task, delay, unit);
371 }
372
373 private void destroy() {
374 state = ST_DESTROYED;
375
376 if (readerIdleTimeout != null) {
377 readerIdleTimeout.cancel(false);
378 readerIdleTimeout = null;
379 }
380 if (writerIdleTimeout != null) {
381 writerIdleTimeout.cancel(false);
382 writerIdleTimeout = null;
383 }
384 if (allIdleTimeout != null) {
385 allIdleTimeout.cancel(false);
386 allIdleTimeout = null;
387 }
388 }
389
390
391
392
393
394 protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
395 ctx.fireUserEventTriggered(evt);
396 }
397
398
399
400
401 protected IdleStateEvent newIdleStateEvent(IdleState state, boolean first) {
402 switch (state) {
403 case ALL_IDLE:
404 return first ? IdleStateEvent.FIRST_ALL_IDLE_STATE_EVENT : IdleStateEvent.ALL_IDLE_STATE_EVENT;
405 case READER_IDLE:
406 return first ? IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT : IdleStateEvent.READER_IDLE_STATE_EVENT;
407 case WRITER_IDLE:
408 return first ? IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT : IdleStateEvent.WRITER_IDLE_STATE_EVENT;
409 default:
410 throw new IllegalArgumentException("Unhandled: state=" + state + ", first=" + first);
411 }
412 }
413
414
415
416
417 private void initOutputChanged(ChannelHandlerContext ctx) {
418 if (observeOutput) {
419 Channel channel = ctx.channel();
420 Unsafe unsafe = channel.unsafe();
421 ChannelOutboundBuffer buf = unsafe.outboundBuffer();
422
423 if (buf != null) {
424 lastMessageHashCode = System.identityHashCode(buf.current());
425 lastPendingWriteBytes = buf.totalPendingWriteBytes();
426 lastFlushProgress = buf.currentProgress();
427 }
428 }
429 }
430
431
432
433
434
435
436
437
438 private boolean hasOutputChanged(ChannelHandlerContext ctx, boolean first) {
439 if (observeOutput) {
440
441
442
443
444
445
446 if (lastChangeCheckTimeStamp != lastWriteTime) {
447 lastChangeCheckTimeStamp = lastWriteTime;
448
449
450 if (!first) {
451 return true;
452 }
453 }
454
455 Channel channel = ctx.channel();
456 Unsafe unsafe = channel.unsafe();
457 ChannelOutboundBuffer buf = unsafe.outboundBuffer();
458
459 if (buf != null) {
460 int messageHashCode = System.identityHashCode(buf.current());
461 long pendingWriteBytes = buf.totalPendingWriteBytes();
462
463 if (messageHashCode != lastMessageHashCode || pendingWriteBytes != lastPendingWriteBytes) {
464 lastMessageHashCode = messageHashCode;
465 lastPendingWriteBytes = pendingWriteBytes;
466
467 if (!first) {
468 return true;
469 }
470 }
471
472 long flushProgress = buf.currentProgress();
473 if (flushProgress != lastFlushProgress) {
474 lastFlushProgress = flushProgress;
475 return !first;
476 }
477 }
478 }
479
480 return false;
481 }
482
483 private abstract static class AbstractIdleTask implements Runnable {
484
485 private final ChannelHandlerContext ctx;
486
487 AbstractIdleTask(ChannelHandlerContext ctx) {
488 this.ctx = ctx;
489 }
490
491 @Override
492 public void run() {
493 if (!ctx.channel().isOpen()) {
494 return;
495 }
496
497 run(ctx);
498 }
499
500 protected abstract void run(ChannelHandlerContext ctx);
501 }
502
503 private final class ReaderIdleTimeoutTask extends AbstractIdleTask {
504
505 ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
506 super(ctx);
507 }
508
509 @Override
510 protected void run(ChannelHandlerContext ctx) {
511 long nextDelay = readerIdleTimeNanos;
512 if (!reading) {
513 nextDelay -= ticksInNanos() - lastReadTime;
514 }
515
516 if (nextDelay <= 0) {
517
518 readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
519
520 boolean first = firstReaderIdleEvent;
521 firstReaderIdleEvent = false;
522
523 try {
524 IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
525 channelIdle(ctx, event);
526 } catch (Throwable t) {
527 ctx.fireExceptionCaught(t);
528 }
529 } else {
530
531 readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
532 }
533 }
534 }
535
536 private final class WriterIdleTimeoutTask extends AbstractIdleTask {
537
538 WriterIdleTimeoutTask(ChannelHandlerContext ctx) {
539 super(ctx);
540 }
541
542 @Override
543 protected void run(ChannelHandlerContext ctx) {
544
545 long lastWriteTime = IdleStateHandler.this.lastWriteTime;
546 long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
547 if (nextDelay <= 0) {
548
549 writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);
550
551 boolean first = firstWriterIdleEvent;
552 firstWriterIdleEvent = false;
553
554 try {
555 if (hasOutputChanged(ctx, first)) {
556 return;
557 }
558
559 IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
560 channelIdle(ctx, event);
561 } catch (Throwable t) {
562 ctx.fireExceptionCaught(t);
563 }
564 } else {
565
566 writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
567 }
568 }
569 }
570
571 private final class AllIdleTimeoutTask extends AbstractIdleTask {
572
573 AllIdleTimeoutTask(ChannelHandlerContext ctx) {
574 super(ctx);
575 }
576
577 @Override
578 protected void run(ChannelHandlerContext ctx) {
579
580 long nextDelay = allIdleTimeNanos;
581 if (!reading) {
582 nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
583 }
584 if (nextDelay <= 0) {
585
586
587 allIdleTimeout = schedule(ctx, this, allIdleTimeNanos, TimeUnit.NANOSECONDS);
588
589 boolean first = firstAllIdleEvent;
590 firstAllIdleEvent = false;
591
592 try {
593 if (hasOutputChanged(ctx, first)) {
594 return;
595 }
596
597 IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, first);
598 channelIdle(ctx, event);
599 } catch (Throwable t) {
600 ctx.fireExceptionCaught(t);
601 }
602 } else {
603
604
605 allIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
606 }
607 }
608 }
609 }