1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.timeout;
17
18 import io.netty.bootstrap.ServerBootstrap;
19 import io.netty.channel.Channel;
20 import io.netty.channel.Channel.Unsafe;
21 import io.netty.channel.ChannelDuplexHandler;
22 import io.netty.channel.ChannelFutureListener;
23 import io.netty.channel.ChannelHandlerContext;
24 import io.netty.channel.ChannelInitializer;
25 import io.netty.channel.ChannelOutboundBuffer;
26 import io.netty.channel.ChannelPromise;
27 import io.netty.util.concurrent.Future;
28 import io.netty.util.concurrent.Ticker;
29 import io.netty.util.internal.ObjectUtil;
30
31 import java.util.concurrent.TimeUnit;
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99 public class IdleStateHandler extends ChannelDuplexHandler {
100 private static final long MIN_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
101
102 private final boolean observeOutput;
103 private final long readerIdleTimeNanos;
104 private final long writerIdleTimeNanos;
105 private final long allIdleTimeNanos;
106
107 private Ticker ticker = Ticker.systemTicker();
108
109 private Future<?> readerIdleTimeout;
110 private long lastReadTime;
111 private boolean firstReaderIdleEvent = true;
112
113 private Future<?> writerIdleTimeout;
114 private long lastWriteTime;
115 private boolean firstWriterIdleEvent = true;
116
117 private Future<?> allIdleTimeout;
118 private boolean firstAllIdleEvent = true;
119
120 private byte state;
121 private static final byte ST_INITIALIZED = 1;
122 private static final byte ST_DESTROYED = 2;
123
124 private boolean reading;
125
126 private long lastChangeCheckTimeStamp;
127 private int lastMessageHashCode;
128 private long lastPendingWriteBytes;
129 private long lastFlushProgress;
130
131 private final ChannelFutureListener writeListener = future -> {
132 lastWriteTime = ticker.nanoTime();
133 firstWriterIdleEvent = firstAllIdleEvent = true;
134 };
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152 public IdleStateHandler(
153 int readerIdleTimeSeconds,
154 int writerIdleTimeSeconds,
155 int allIdleTimeSeconds) {
156
157 this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
158 TimeUnit.SECONDS);
159 }
160
161
162
163
164 public IdleStateHandler(
165 long readerIdleTime, long writerIdleTime, long allIdleTime,
166 TimeUnit unit) {
167 this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
168 }
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192 public IdleStateHandler(boolean observeOutput,
193 long readerIdleTime, long writerIdleTime, long allIdleTime,
194 TimeUnit unit) {
195 ObjectUtil.checkNotNull(unit, "unit");
196
197 this.observeOutput = observeOutput;
198
199 if (readerIdleTime <= 0) {
200 readerIdleTimeNanos = 0;
201 } else {
202 readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
203 }
204 if (writerIdleTime <= 0) {
205 writerIdleTimeNanos = 0;
206 } else {
207 writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
208 }
209 if (allIdleTime <= 0) {
210 allIdleTimeNanos = 0;
211 } else {
212 allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
213 }
214 }
215
216
217
218
219
220 public long getReaderIdleTimeInMillis() {
221 return TimeUnit.NANOSECONDS.toMillis(readerIdleTimeNanos);
222 }
223
224
225
226
227
228 public long getWriterIdleTimeInMillis() {
229 return TimeUnit.NANOSECONDS.toMillis(writerIdleTimeNanos);
230 }
231
232
233
234
235
236 public long getAllIdleTimeInMillis() {
237 return TimeUnit.NANOSECONDS.toMillis(allIdleTimeNanos);
238 }
239
240 @Override
241 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
242 this.ticker = ctx.executor().ticker();
243 if (ctx.channel().isActive() && ctx.channel().isRegistered()) {
244
245
246 initialize(ctx);
247 } else {
248
249
250 }
251 }
252
253 @Override
254 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
255 destroy();
256 }
257
258 @Override
259 public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
260
261 if (ctx.channel().isActive()) {
262 initialize(ctx);
263 }
264 super.channelRegistered(ctx);
265 }
266
267 @Override
268 public void channelActive(ChannelHandlerContext ctx) throws Exception {
269
270
271
272 initialize(ctx);
273 super.channelActive(ctx);
274 }
275
276 @Override
277 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
278 destroy();
279 super.channelInactive(ctx);
280 }
281
282 @Override
283 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
284 if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
285 reading = true;
286 firstReaderIdleEvent = firstAllIdleEvent = true;
287 }
288 ctx.fireChannelRead(msg);
289 }
290
291 @Override
292 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
293 if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) {
294 lastReadTime = ticker.nanoTime();
295 reading = false;
296 }
297 ctx.fireChannelReadComplete();
298 }
299
300 @Override
301 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
302
303 if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
304 ctx.write(msg, promise.unvoid()).addListener(writeListener);
305 } else {
306 ctx.write(msg, promise);
307 }
308 }
309
310
311
312
313 public void resetReadTimeout() {
314 if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
315 lastReadTime = ticker.nanoTime();
316 reading = false;
317 }
318 }
319
320
321
322
323 public void resetWriteTimeout() {
324 if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
325 lastWriteTime = ticker.nanoTime();
326 }
327 }
328
329 private void initialize(ChannelHandlerContext ctx) {
330
331
332 switch (state) {
333 case 1:
334 case 2:
335 return;
336 default:
337 break;
338 }
339
340 state = ST_INITIALIZED;
341 initOutputChanged(ctx);
342
343 lastReadTime = lastWriteTime = ticker.nanoTime();
344 if (readerIdleTimeNanos > 0) {
345 readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
346 readerIdleTimeNanos, TimeUnit.NANOSECONDS);
347 }
348 if (writerIdleTimeNanos > 0) {
349 writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
350 writerIdleTimeNanos, TimeUnit.NANOSECONDS);
351 }
352 if (allIdleTimeNanos > 0) {
353 allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
354 allIdleTimeNanos, TimeUnit.NANOSECONDS);
355 }
356 }
357
358
359
360
361 Future<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
362 return ctx.executor().schedule(task, delay, unit);
363 }
364
365 private void destroy() {
366 state = ST_DESTROYED;
367
368 if (readerIdleTimeout != null) {
369 readerIdleTimeout.cancel(false);
370 readerIdleTimeout = null;
371 }
372 if (writerIdleTimeout != null) {
373 writerIdleTimeout.cancel(false);
374 writerIdleTimeout = null;
375 }
376 if (allIdleTimeout != null) {
377 allIdleTimeout.cancel(false);
378 allIdleTimeout = null;
379 }
380 }
381
382
383
384
385
386 protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
387 ctx.fireUserEventTriggered(evt);
388 }
389
390
391
392
393 protected IdleStateEvent newIdleStateEvent(IdleState state, boolean first) {
394 switch (state) {
395 case ALL_IDLE:
396 return first ? IdleStateEvent.FIRST_ALL_IDLE_STATE_EVENT : IdleStateEvent.ALL_IDLE_STATE_EVENT;
397 case READER_IDLE:
398 return first ? IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT : IdleStateEvent.READER_IDLE_STATE_EVENT;
399 case WRITER_IDLE:
400 return first ? IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT : IdleStateEvent.WRITER_IDLE_STATE_EVENT;
401 default:
402 throw new IllegalArgumentException("Unhandled: state=" + state + ", first=" + first);
403 }
404 }
405
406
407
408
409 private void initOutputChanged(ChannelHandlerContext ctx) {
410 if (observeOutput) {
411 Channel channel = ctx.channel();
412 Unsafe unsafe = channel.unsafe();
413 ChannelOutboundBuffer buf = unsafe.outboundBuffer();
414
415 if (buf != null) {
416 lastMessageHashCode = System.identityHashCode(buf.current());
417 lastPendingWriteBytes = buf.totalPendingWriteBytes();
418 lastFlushProgress = buf.currentProgress();
419 }
420 }
421 }
422
423
424
425
426
427
428
429
430 private boolean hasOutputChanged(ChannelHandlerContext ctx, boolean first) {
431 if (observeOutput) {
432
433
434
435
436
437
438 if (lastChangeCheckTimeStamp != lastWriteTime) {
439 lastChangeCheckTimeStamp = lastWriteTime;
440
441
442 if (!first) {
443 return true;
444 }
445 }
446
447 Channel channel = ctx.channel();
448 Unsafe unsafe = channel.unsafe();
449 ChannelOutboundBuffer buf = unsafe.outboundBuffer();
450
451 if (buf != null) {
452 int messageHashCode = System.identityHashCode(buf.current());
453 long pendingWriteBytes = buf.totalPendingWriteBytes();
454
455 if (messageHashCode != lastMessageHashCode || pendingWriteBytes != lastPendingWriteBytes) {
456 lastMessageHashCode = messageHashCode;
457 lastPendingWriteBytes = pendingWriteBytes;
458
459 if (!first) {
460 return true;
461 }
462 }
463
464 long flushProgress = buf.currentProgress();
465 if (flushProgress != lastFlushProgress) {
466 lastFlushProgress = flushProgress;
467 return !first;
468 }
469 }
470 }
471
472 return false;
473 }
474
475 private abstract static class AbstractIdleTask implements Runnable {
476
477 private final ChannelHandlerContext ctx;
478
479 AbstractIdleTask(ChannelHandlerContext ctx) {
480 this.ctx = ctx;
481 }
482
483 @Override
484 public void run() {
485 if (!ctx.channel().isOpen()) {
486 return;
487 }
488
489 run(ctx);
490 }
491
492 protected abstract void run(ChannelHandlerContext ctx);
493 }
494
495 private final class ReaderIdleTimeoutTask extends AbstractIdleTask {
496
497 ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
498 super(ctx);
499 }
500
501 @Override
502 protected void run(ChannelHandlerContext ctx) {
503 long nextDelay = readerIdleTimeNanos;
504 if (!reading) {
505 nextDelay -= ticker.nanoTime() - lastReadTime;
506 }
507
508 if (nextDelay <= 0) {
509
510 readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
511
512 boolean first = firstReaderIdleEvent;
513 firstReaderIdleEvent = false;
514
515 try {
516 IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
517 channelIdle(ctx, event);
518 } catch (Throwable t) {
519 ctx.fireExceptionCaught(t);
520 }
521 } else {
522
523 readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
524 }
525 }
526 }
527
528 private final class WriterIdleTimeoutTask extends AbstractIdleTask {
529
530 WriterIdleTimeoutTask(ChannelHandlerContext ctx) {
531 super(ctx);
532 }
533
534 @Override
535 protected void run(ChannelHandlerContext ctx) {
536
537 long lastWriteTime = IdleStateHandler.this.lastWriteTime;
538 long nextDelay = writerIdleTimeNanos - (ticker.nanoTime() - lastWriteTime);
539 if (nextDelay <= 0) {
540
541 writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);
542
543 boolean first = firstWriterIdleEvent;
544 firstWriterIdleEvent = false;
545
546 try {
547 if (hasOutputChanged(ctx, first)) {
548 return;
549 }
550
551 IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
552 channelIdle(ctx, event);
553 } catch (Throwable t) {
554 ctx.fireExceptionCaught(t);
555 }
556 } else {
557
558 writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
559 }
560 }
561 }
562
563 private final class AllIdleTimeoutTask extends AbstractIdleTask {
564
565 AllIdleTimeoutTask(ChannelHandlerContext ctx) {
566 super(ctx);
567 }
568
569 @Override
570 protected void run(ChannelHandlerContext ctx) {
571
572 long nextDelay = allIdleTimeNanos;
573 if (!reading) {
574 nextDelay -= ticker.nanoTime() - Math.max(lastReadTime, lastWriteTime);
575 }
576 if (nextDelay <= 0) {
577
578
579 allIdleTimeout = schedule(ctx, this, allIdleTimeNanos, TimeUnit.NANOSECONDS);
580
581 boolean first = firstAllIdleEvent;
582 firstAllIdleEvent = false;
583
584 try {
585 if (hasOutputChanged(ctx, first)) {
586 return;
587 }
588
589 IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, first);
590 channelIdle(ctx, event);
591 } catch (Throwable t) {
592 ctx.fireExceptionCaught(t);
593 }
594 } else {
595
596
597 allIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
598 }
599 }
600 }
601 }