1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.timeout;
17
18 import io.netty.bootstrap.ServerBootstrap;
19 import io.netty.channel.Channel;
20 import io.netty.channel.Channel.Unsafe;
21 import io.netty.channel.ChannelDuplexHandler;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.ChannelFutureListener;
24 import io.netty.channel.ChannelHandlerContext;
25 import io.netty.channel.ChannelInitializer;
26 import io.netty.channel.ChannelOutboundBuffer;
27 import io.netty.channel.ChannelPromise;
28 import io.netty.util.concurrent.Future;
29 import io.netty.util.concurrent.Ticker;
30 import io.netty.util.internal.ObjectUtil;
31
32 import java.util.concurrent.TimeUnit;
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100 public class IdleStateHandler extends ChannelDuplexHandler {
101 private static final long MIN_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
102
103
104 private final ChannelFutureListener writeListener = new ChannelFutureListener() {
105 @Override
106 public void operationComplete(ChannelFuture future) throws Exception {
107 lastWriteTime = ticker.nanoTime();
108 firstWriterIdleEvent = firstAllIdleEvent = true;
109 }
110 };
111
112 private final boolean observeOutput;
113 private final long readerIdleTimeNanos;
114 private final long writerIdleTimeNanos;
115 private final long allIdleTimeNanos;
116
117 private Ticker ticker = Ticker.systemTicker();
118
119 private Future<?> readerIdleTimeout;
120 private long lastReadTime;
121 private boolean firstReaderIdleEvent = true;
122
123 private Future<?> writerIdleTimeout;
124 private long lastWriteTime;
125 private boolean firstWriterIdleEvent = true;
126
127 private Future<?> allIdleTimeout;
128 private boolean firstAllIdleEvent = true;
129
130 private byte state;
131 private static final byte ST_INITIALIZED = 1;
132 private static final byte ST_DESTROYED = 2;
133
134 private boolean reading;
135
136 private long lastChangeCheckTimeStamp;
137 private int lastMessageHashCode;
138 private long lastPendingWriteBytes;
139 private long lastFlushProgress;
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157 public IdleStateHandler(
158 int readerIdleTimeSeconds,
159 int writerIdleTimeSeconds,
160 int allIdleTimeSeconds) {
161
162 this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
163 TimeUnit.SECONDS);
164 }
165
166
167
168
169 public IdleStateHandler(
170 long readerIdleTime, long writerIdleTime, long allIdleTime,
171 TimeUnit unit) {
172 this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
173 }
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197 public IdleStateHandler(boolean observeOutput,
198 long readerIdleTime, long writerIdleTime, long allIdleTime,
199 TimeUnit unit) {
200 ObjectUtil.checkNotNull(unit, "unit");
201
202 this.observeOutput = observeOutput;
203
204 if (readerIdleTime <= 0) {
205 readerIdleTimeNanos = 0;
206 } else {
207 readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
208 }
209 if (writerIdleTime <= 0) {
210 writerIdleTimeNanos = 0;
211 } else {
212 writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
213 }
214 if (allIdleTime <= 0) {
215 allIdleTimeNanos = 0;
216 } else {
217 allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
218 }
219 }
220
221
222
223
224
225 public long getReaderIdleTimeInMillis() {
226 return TimeUnit.NANOSECONDS.toMillis(readerIdleTimeNanos);
227 }
228
229
230
231
232
233 public long getWriterIdleTimeInMillis() {
234 return TimeUnit.NANOSECONDS.toMillis(writerIdleTimeNanos);
235 }
236
237
238
239
240
241 public long getAllIdleTimeInMillis() {
242 return TimeUnit.NANOSECONDS.toMillis(allIdleTimeNanos);
243 }
244
245 @Override
246 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
247 this.ticker = ctx.executor().ticker();
248 if (ctx.channel().isActive() && ctx.channel().isRegistered()) {
249
250
251 initialize(ctx);
252 } else {
253
254
255 }
256 }
257
258 @Override
259 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
260 destroy();
261 }
262
263 @Override
264 public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
265
266 if (ctx.channel().isActive()) {
267 initialize(ctx);
268 }
269 super.channelRegistered(ctx);
270 }
271
272 @Override
273 public void channelActive(ChannelHandlerContext ctx) throws Exception {
274
275
276
277 initialize(ctx);
278 super.channelActive(ctx);
279 }
280
281 @Override
282 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
283 destroy();
284 super.channelInactive(ctx);
285 }
286
287 @Override
288 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
289 if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
290 reading = true;
291 firstReaderIdleEvent = firstAllIdleEvent = true;
292 }
293 ctx.fireChannelRead(msg);
294 }
295
296 @Override
297 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
298 if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) {
299 lastReadTime = ticker.nanoTime();
300 reading = false;
301 }
302 ctx.fireChannelReadComplete();
303 }
304
305 @Override
306 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
307
308 if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
309 ctx.write(msg, promise.unvoid()).addListener(writeListener);
310 } else {
311 ctx.write(msg, promise);
312 }
313 }
314
315
316
317
318 public void resetReadTimeout() {
319 if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
320 lastReadTime = ticker.nanoTime();
321 reading = false;
322 }
323 }
324
325
326
327
328 public void resetWriteTimeout() {
329 if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
330 lastWriteTime = ticker.nanoTime();
331 }
332 }
333
334 private void initialize(ChannelHandlerContext ctx) {
335
336
337 switch (state) {
338 case 1:
339 case 2:
340 return;
341 default:
342 break;
343 }
344
345 state = ST_INITIALIZED;
346 initOutputChanged(ctx);
347
348 lastReadTime = lastWriteTime = ticker.nanoTime();
349 if (readerIdleTimeNanos > 0) {
350 readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
351 readerIdleTimeNanos, TimeUnit.NANOSECONDS);
352 }
353 if (writerIdleTimeNanos > 0) {
354 writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
355 writerIdleTimeNanos, TimeUnit.NANOSECONDS);
356 }
357 if (allIdleTimeNanos > 0) {
358 allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
359 allIdleTimeNanos, TimeUnit.NANOSECONDS);
360 }
361 }
362
363
364
365
366 Future<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
367 return ctx.executor().schedule(task, delay, unit);
368 }
369
370 private void destroy() {
371 state = ST_DESTROYED;
372
373 if (readerIdleTimeout != null) {
374 readerIdleTimeout.cancel(false);
375 readerIdleTimeout = null;
376 }
377 if (writerIdleTimeout != null) {
378 writerIdleTimeout.cancel(false);
379 writerIdleTimeout = null;
380 }
381 if (allIdleTimeout != null) {
382 allIdleTimeout.cancel(false);
383 allIdleTimeout = null;
384 }
385 }
386
387
388
389
390
391 protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
392 ctx.fireUserEventTriggered(evt);
393 }
394
395
396
397
398 protected IdleStateEvent newIdleStateEvent(IdleState state, boolean first) {
399 switch (state) {
400 case ALL_IDLE:
401 return first ? IdleStateEvent.FIRST_ALL_IDLE_STATE_EVENT : IdleStateEvent.ALL_IDLE_STATE_EVENT;
402 case READER_IDLE:
403 return first ? IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT : IdleStateEvent.READER_IDLE_STATE_EVENT;
404 case WRITER_IDLE:
405 return first ? IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT : IdleStateEvent.WRITER_IDLE_STATE_EVENT;
406 default:
407 throw new IllegalArgumentException("Unhandled: state=" + state + ", first=" + first);
408 }
409 }
410
411
412
413
414 private void initOutputChanged(ChannelHandlerContext ctx) {
415 if (observeOutput) {
416 Channel channel = ctx.channel();
417 Unsafe unsafe = channel.unsafe();
418 ChannelOutboundBuffer buf = unsafe.outboundBuffer();
419
420 if (buf != null) {
421 lastMessageHashCode = System.identityHashCode(buf.current());
422 lastPendingWriteBytes = buf.totalPendingWriteBytes();
423 lastFlushProgress = buf.currentProgress();
424 }
425 }
426 }
427
428
429
430
431
432
433
434
435 private boolean hasOutputChanged(ChannelHandlerContext ctx, boolean first) {
436 if (observeOutput) {
437
438
439
440
441
442
443 if (lastChangeCheckTimeStamp != lastWriteTime) {
444 lastChangeCheckTimeStamp = lastWriteTime;
445
446
447 if (!first) {
448 return true;
449 }
450 }
451
452 Channel channel = ctx.channel();
453 Unsafe unsafe = channel.unsafe();
454 ChannelOutboundBuffer buf = unsafe.outboundBuffer();
455
456 if (buf != null) {
457 int messageHashCode = System.identityHashCode(buf.current());
458 long pendingWriteBytes = buf.totalPendingWriteBytes();
459
460 if (messageHashCode != lastMessageHashCode || pendingWriteBytes != lastPendingWriteBytes) {
461 lastMessageHashCode = messageHashCode;
462 lastPendingWriteBytes = pendingWriteBytes;
463
464 if (!first) {
465 return true;
466 }
467 }
468
469 long flushProgress = buf.currentProgress();
470 if (flushProgress != lastFlushProgress) {
471 lastFlushProgress = flushProgress;
472 return !first;
473 }
474 }
475 }
476
477 return false;
478 }
479
480 private abstract static class AbstractIdleTask implements Runnable {
481
482 private final ChannelHandlerContext ctx;
483
484 AbstractIdleTask(ChannelHandlerContext ctx) {
485 this.ctx = ctx;
486 }
487
488 @Override
489 public void run() {
490 if (!ctx.channel().isOpen()) {
491 return;
492 }
493
494 run(ctx);
495 }
496
497 protected abstract void run(ChannelHandlerContext ctx);
498 }
499
500 private final class ReaderIdleTimeoutTask extends AbstractIdleTask {
501
502 ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
503 super(ctx);
504 }
505
506 @Override
507 protected void run(ChannelHandlerContext ctx) {
508 long nextDelay = readerIdleTimeNanos;
509 if (!reading) {
510 nextDelay -= ticker.nanoTime() - lastReadTime;
511 }
512
513 if (nextDelay <= 0) {
514
515 readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
516
517 boolean first = firstReaderIdleEvent;
518 firstReaderIdleEvent = false;
519
520 try {
521 IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
522 channelIdle(ctx, event);
523 } catch (Throwable t) {
524 ctx.fireExceptionCaught(t);
525 }
526 } else {
527
528 readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
529 }
530 }
531 }
532
533 private final class WriterIdleTimeoutTask extends AbstractIdleTask {
534
535 WriterIdleTimeoutTask(ChannelHandlerContext ctx) {
536 super(ctx);
537 }
538
539 @Override
540 protected void run(ChannelHandlerContext ctx) {
541
542 long lastWriteTime = IdleStateHandler.this.lastWriteTime;
543 long nextDelay = writerIdleTimeNanos - (ticker.nanoTime() - lastWriteTime);
544 if (nextDelay <= 0) {
545
546 writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);
547
548 boolean first = firstWriterIdleEvent;
549 firstWriterIdleEvent = false;
550
551 try {
552 if (hasOutputChanged(ctx, first)) {
553 return;
554 }
555
556 IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
557 channelIdle(ctx, event);
558 } catch (Throwable t) {
559 ctx.fireExceptionCaught(t);
560 }
561 } else {
562
563 writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
564 }
565 }
566 }
567
568 private final class AllIdleTimeoutTask extends AbstractIdleTask {
569
570 AllIdleTimeoutTask(ChannelHandlerContext ctx) {
571 super(ctx);
572 }
573
574 @Override
575 protected void run(ChannelHandlerContext ctx) {
576
577 long nextDelay = allIdleTimeNanos;
578 if (!reading) {
579 nextDelay -= ticker.nanoTime() - Math.max(lastReadTime, lastWriteTime);
580 }
581 if (nextDelay <= 0) {
582
583
584 allIdleTimeout = schedule(ctx, this, allIdleTimeNanos, TimeUnit.NANOSECONDS);
585
586 boolean first = firstAllIdleEvent;
587 firstAllIdleEvent = false;
588
589 try {
590 if (hasOutputChanged(ctx, first)) {
591 return;
592 }
593
594 IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, first);
595 channelIdle(ctx, event);
596 } catch (Throwable t) {
597 ctx.fireExceptionCaught(t);
598 }
599 } else {
600
601
602 allIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
603 }
604 }
605 }
606 }