1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.timeout;
17
18 import io.netty.bootstrap.ServerBootstrap;
19 import io.netty.channel.Channel;
20 import io.netty.channel.Channel.Unsafe;
21 import io.netty.channel.ChannelDuplexHandler;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.ChannelFutureListener;
24 import io.netty.channel.ChannelHandlerContext;
25 import io.netty.channel.ChannelInitializer;
26 import io.netty.channel.ChannelOutboundBuffer;
27 import io.netty.channel.ChannelPromise;
28
29 import java.util.concurrent.ScheduledFuture;
30 import java.util.concurrent.TimeUnit;
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98 public class IdleStateHandler extends ChannelDuplexHandler {
99 private static final long MIN_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
100
101
102 private final ChannelFutureListener writeListener = new ChannelFutureListener() {
103 @Override
104 public void operationComplete(ChannelFuture future) throws Exception {
105 lastWriteTime = ticksInNanos();
106 firstWriterIdleEvent = firstAllIdleEvent = true;
107 }
108 };
109
110 private final boolean observeOutput;
111 private final long readerIdleTimeNanos;
112 private final long writerIdleTimeNanos;
113 private final long allIdleTimeNanos;
114
115 private ScheduledFuture<?> readerIdleTimeout;
116 private long lastReadTime;
117 private boolean firstReaderIdleEvent = true;
118
119 private ScheduledFuture<?> writerIdleTimeout;
120 private long lastWriteTime;
121 private boolean firstWriterIdleEvent = true;
122
123 private ScheduledFuture<?> allIdleTimeout;
124 private boolean firstAllIdleEvent = true;
125
126 private byte state;
127 private boolean reading;
128
129 private long lastChangeCheckTimeStamp;
130 private int lastMessageHashCode;
131 private long lastPendingWriteBytes;
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149 public IdleStateHandler(
150 int readerIdleTimeSeconds,
151 int writerIdleTimeSeconds,
152 int allIdleTimeSeconds) {
153
154 this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
155 TimeUnit.SECONDS);
156 }
157
158
159
160
161 public IdleStateHandler(
162 long readerIdleTime, long writerIdleTime, long allIdleTime,
163 TimeUnit unit) {
164 this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
165 }
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189 public IdleStateHandler(boolean observeOutput,
190 long readerIdleTime, long writerIdleTime, long allIdleTime,
191 TimeUnit unit) {
192 if (unit == null) {
193 throw new NullPointerException("unit");
194 }
195
196 this.observeOutput = observeOutput;
197
198 if (readerIdleTime <= 0) {
199 readerIdleTimeNanos = 0;
200 } else {
201 readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
202 }
203 if (writerIdleTime <= 0) {
204 writerIdleTimeNanos = 0;
205 } else {
206 writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
207 }
208 if (allIdleTime <= 0) {
209 allIdleTimeNanos = 0;
210 } else {
211 allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
212 }
213 }
214
215
216
217
218
219 public long getReaderIdleTimeInMillis() {
220 return TimeUnit.NANOSECONDS.toMillis(readerIdleTimeNanos);
221 }
222
223
224
225
226
227 public long getWriterIdleTimeInMillis() {
228 return TimeUnit.NANOSECONDS.toMillis(writerIdleTimeNanos);
229 }
230
231
232
233
234
235 public long getAllIdleTimeInMillis() {
236 return TimeUnit.NANOSECONDS.toMillis(allIdleTimeNanos);
237 }
238
239 @Override
240 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
241 if (ctx.channel().isActive() && ctx.channel().isRegistered()) {
242
243
244 initialize(ctx);
245 } else {
246
247
248 }
249 }
250
251 @Override
252 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
253 destroy();
254 }
255
256 @Override
257 public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
258
259 if (ctx.channel().isActive()) {
260 initialize(ctx);
261 }
262 super.channelRegistered(ctx);
263 }
264
265 @Override
266 public void channelActive(ChannelHandlerContext ctx) throws Exception {
267
268
269
270 initialize(ctx);
271 super.channelActive(ctx);
272 }
273
274 @Override
275 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
276 destroy();
277 super.channelInactive(ctx);
278 }
279
280 @Override
281 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
282 if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
283 reading = true;
284 firstReaderIdleEvent = firstAllIdleEvent = true;
285 }
286 ctx.fireChannelRead(msg);
287 }
288
289 @Override
290 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
291 if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) {
292 lastReadTime = ticksInNanos();
293 reading = false;
294 }
295 ctx.fireChannelReadComplete();
296 }
297
298 @Override
299 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
300
301 if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
302 ctx.write(msg, promise).addListener(writeListener);
303 } else {
304 ctx.write(msg, promise);
305 }
306 }
307
308 private void initialize(ChannelHandlerContext ctx) {
309
310
311 switch (state) {
312 case 1:
313 case 2:
314 return;
315 }
316
317 state = 1;
318 initOutputChanged(ctx);
319
320 lastReadTime = lastWriteTime = ticksInNanos();
321 if (readerIdleTimeNanos > 0) {
322 readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
323 readerIdleTimeNanos, TimeUnit.NANOSECONDS);
324 }
325 if (writerIdleTimeNanos > 0) {
326 writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
327 writerIdleTimeNanos, TimeUnit.NANOSECONDS);
328 }
329 if (allIdleTimeNanos > 0) {
330 allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
331 allIdleTimeNanos, TimeUnit.NANOSECONDS);
332 }
333 }
334
335
336
337
338 long ticksInNanos() {
339 return System.nanoTime();
340 }
341
342
343
344
345 ScheduledFuture<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
346 return ctx.executor().schedule(task, delay, unit);
347 }
348
349 private void destroy() {
350 state = 2;
351
352 if (readerIdleTimeout != null) {
353 readerIdleTimeout.cancel(false);
354 readerIdleTimeout = null;
355 }
356 if (writerIdleTimeout != null) {
357 writerIdleTimeout.cancel(false);
358 writerIdleTimeout = null;
359 }
360 if (allIdleTimeout != null) {
361 allIdleTimeout.cancel(false);
362 allIdleTimeout = null;
363 }
364 }
365
366
367
368
369
370 protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
371 ctx.fireUserEventTriggered(evt);
372 }
373
374
375
376
377 protected IdleStateEvent newIdleStateEvent(IdleState state, boolean first) {
378 switch (state) {
379 case ALL_IDLE:
380 return first ? IdleStateEvent.FIRST_ALL_IDLE_STATE_EVENT : IdleStateEvent.ALL_IDLE_STATE_EVENT;
381 case READER_IDLE:
382 return first ? IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT : IdleStateEvent.READER_IDLE_STATE_EVENT;
383 case WRITER_IDLE:
384 return first ? IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT : IdleStateEvent.WRITER_IDLE_STATE_EVENT;
385 default:
386 throw new IllegalArgumentException("Unhandled: state=" + state + ", first=" + first);
387 }
388 }
389
390
391
392
393 private void initOutputChanged(ChannelHandlerContext ctx) {
394 if (observeOutput) {
395 Channel channel = ctx.channel();
396 Unsafe unsafe = channel.unsafe();
397 ChannelOutboundBuffer buf = unsafe.outboundBuffer();
398
399 if (buf != null) {
400 lastMessageHashCode = System.identityHashCode(buf.current());
401 lastPendingWriteBytes = buf.totalPendingWriteBytes();
402 }
403 }
404 }
405
406
407
408
409
410
411
412
413 private boolean hasOutputChanged(ChannelHandlerContext ctx, boolean first) {
414 if (observeOutput) {
415
416
417
418
419
420
421 if (lastChangeCheckTimeStamp != lastWriteTime) {
422 lastChangeCheckTimeStamp = lastWriteTime;
423
424
425 if (!first) {
426 return true;
427 }
428 }
429
430 Channel channel = ctx.channel();
431 Unsafe unsafe = channel.unsafe();
432 ChannelOutboundBuffer buf = unsafe.outboundBuffer();
433
434 if (buf != null) {
435 int messageHashCode = System.identityHashCode(buf.current());
436 long pendingWriteBytes = buf.totalPendingWriteBytes();
437
438 if (messageHashCode != lastMessageHashCode || pendingWriteBytes != lastPendingWriteBytes) {
439 lastMessageHashCode = messageHashCode;
440 lastPendingWriteBytes = pendingWriteBytes;
441
442 if (!first) {
443 return true;
444 }
445 }
446 }
447 }
448
449 return false;
450 }
451
452 private abstract static class AbstractIdleTask implements Runnable {
453
454 private final ChannelHandlerContext ctx;
455
456 AbstractIdleTask(ChannelHandlerContext ctx) {
457 this.ctx = ctx;
458 }
459
460 @Override
461 public void run() {
462 if (!ctx.channel().isOpen()) {
463 return;
464 }
465
466 run(ctx);
467 }
468
469 protected abstract void run(ChannelHandlerContext ctx);
470 }
471
472 private final class ReaderIdleTimeoutTask extends AbstractIdleTask {
473
474 ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
475 super(ctx);
476 }
477
478 @Override
479 protected void run(ChannelHandlerContext ctx) {
480 long nextDelay = readerIdleTimeNanos;
481 if (!reading) {
482 nextDelay -= ticksInNanos() - lastReadTime;
483 }
484
485 if (nextDelay <= 0) {
486
487 readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
488
489 boolean first = firstReaderIdleEvent;
490 firstReaderIdleEvent = false;
491
492 try {
493 IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
494 channelIdle(ctx, event);
495 } catch (Throwable t) {
496 ctx.fireExceptionCaught(t);
497 }
498 } else {
499
500 readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
501 }
502 }
503 }
504
505 private final class WriterIdleTimeoutTask extends AbstractIdleTask {
506
507 WriterIdleTimeoutTask(ChannelHandlerContext ctx) {
508 super(ctx);
509 }
510
511 @Override
512 protected void run(ChannelHandlerContext ctx) {
513
514 long lastWriteTime = IdleStateHandler.this.lastWriteTime;
515 long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
516 if (nextDelay <= 0) {
517
518 writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);
519
520 boolean first = firstWriterIdleEvent;
521 firstWriterIdleEvent = false;
522
523 try {
524 if (hasOutputChanged(ctx, first)) {
525 return;
526 }
527
528 IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
529 channelIdle(ctx, event);
530 } catch (Throwable t) {
531 ctx.fireExceptionCaught(t);
532 }
533 } else {
534
535 writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
536 }
537 }
538 }
539
540 private final class AllIdleTimeoutTask extends AbstractIdleTask {
541
542 AllIdleTimeoutTask(ChannelHandlerContext ctx) {
543 super(ctx);
544 }
545
546 @Override
547 protected void run(ChannelHandlerContext ctx) {
548
549 long nextDelay = allIdleTimeNanos;
550 if (!reading) {
551 nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
552 }
553 if (nextDelay <= 0) {
554
555
556 allIdleTimeout = schedule(ctx, this, allIdleTimeNanos, TimeUnit.NANOSECONDS);
557
558 boolean first = firstAllIdleEvent;
559 firstAllIdleEvent = false;
560
561 try {
562 if (hasOutputChanged(ctx, first)) {
563 return;
564 }
565
566 IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, first);
567 channelIdle(ctx, event);
568 } catch (Throwable t) {
569 ctx.fireExceptionCaught(t);
570 }
571 } else {
572
573
574 allIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
575 }
576 }
577 }
578 }