View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.timeout;
17  
18  import io.netty.bootstrap.ServerBootstrap;
19  import io.netty.channel.Channel;
20  import io.netty.channel.Channel.Unsafe;
21  import io.netty.channel.ChannelDuplexHandler;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelFutureListener;
24  import io.netty.channel.ChannelHandlerContext;
25  import io.netty.channel.ChannelInitializer;
26  import io.netty.channel.ChannelOutboundBuffer;
27  import io.netty.channel.ChannelPromise;
28  import io.netty.util.concurrent.Future;
29  import io.netty.util.concurrent.Ticker;
30  import io.netty.util.internal.ObjectUtil;
31  
32  import java.util.concurrent.TimeUnit;
33  
34  /**
35   * Triggers an {@link IdleStateEvent} when a {@link Channel} has not performed
36   * read, write, or both operation for a while.
37   *
38   * <h3>Supported idle states</h3>
39   * <table border="1">
40   * <tr>
41   * <th>Property</th><th>Meaning</th>
42   * </tr>
43   * <tr>
44   * <td>{@code readerIdleTime}</td>
45   * <td>an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE}
46   *     will be triggered when no read was performed for the specified period of
47   *     time.  Specify {@code 0} to disable.</td>
48   * </tr>
49   * <tr>
50   * <td>{@code writerIdleTime}</td>
51   * <td>an {@link IdleStateEvent} whose state is {@link IdleState#WRITER_IDLE}
52   *     will be triggered when no write was performed for the specified period of
53   *     time.  Specify {@code 0} to disable.</td>
54   * </tr>
55   * <tr>
56   * <td>{@code allIdleTime}</td>
57   * <td>an {@link IdleStateEvent} whose state is {@link IdleState#ALL_IDLE}
58   *     will be triggered when neither read nor write was performed for the
59   *     specified period of time.  Specify {@code 0} to disable.</td>
60   * </tr>
61   * </table>
62   *
63   * <pre>
64   * // An example that sends a ping message when there is no outbound traffic
65   * // for 30 seconds.  The connection is closed when there is no inbound traffic
66   * // for 60 seconds.
67   *
68   * public class MyChannelInitializer extends {@link ChannelInitializer}&lt;{@link Channel}&gt; {
69   *     {@code @Override}
70   *     public void initChannel({@link Channel} channel) {
71   *         channel.pipeline().addLast("idleStateHandler", new {@link IdleStateHandler}(60, 30, 0));
72   *         channel.pipeline().addLast("myHandler", new MyHandler());
73   *     }
74   * }
75   *
76   * // Handler should handle the {@link IdleStateEvent} triggered by {@link IdleStateHandler}.
77   * public class MyHandler extends {@link ChannelDuplexHandler} {
78   *     {@code @Override}
79   *     public void userEventTriggered({@link ChannelHandlerContext} ctx, {@link Object} evt) throws {@link Exception} {
80   *         if (evt instanceof {@link IdleStateEvent}) {
81   *             {@link IdleStateEvent} e = ({@link IdleStateEvent}) evt;
82   *             if (e.state() == {@link IdleState}.READER_IDLE) {
83   *                 ctx.close();
84   *             } else if (e.state() == {@link IdleState}.WRITER_IDLE) {
85   *                 ctx.writeAndFlush(new PingMessage());
86   *             }
87   *         }
88   *     }
89   * }
90   *
91   * {@link ServerBootstrap} bootstrap = ...;
92   * ...
93   * bootstrap.childHandler(new MyChannelInitializer());
94   * ...
95   * </pre>
96   *
97   * @see ReadTimeoutHandler
98   * @see WriteTimeoutHandler
99   */
100 public class IdleStateHandler extends ChannelDuplexHandler {
101     private static final long MIN_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
102 
103     // Not create a new ChannelFutureListener per write operation to reduce GC pressure.
104     private final ChannelFutureListener writeListener = new ChannelFutureListener() {
105         @Override
106         public void operationComplete(ChannelFuture future) throws Exception {
107             lastWriteTime = ticker.nanoTime();
108             firstWriterIdleEvent = firstAllIdleEvent = true;
109         }
110     };
111 
112     private final boolean observeOutput;
113     private final long readerIdleTimeNanos;
114     private final long writerIdleTimeNanos;
115     private final long allIdleTimeNanos;
116 
117     private Ticker ticker = Ticker.systemTicker();
118 
119     private Future<?> readerIdleTimeout;
120     private long lastReadTime;
121     private boolean firstReaderIdleEvent = true;
122 
123     private Future<?> writerIdleTimeout;
124     private long lastWriteTime;
125     private boolean firstWriterIdleEvent = true;
126 
127     private Future<?> allIdleTimeout;
128     private boolean firstAllIdleEvent = true;
129 
130     private byte state;
131     private static final byte ST_INITIALIZED = 1;
132     private static final byte ST_DESTROYED = 2;
133 
134     private boolean reading;
135 
136     private long lastChangeCheckTimeStamp;
137     private int lastMessageHashCode;
138     private long lastPendingWriteBytes;
139     private long lastFlushProgress;
140 
141     /**
142      * Creates a new instance firing {@link IdleStateEvent}s.
143      *
144      * @param readerIdleTimeSeconds
145      *        an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE}
146      *        will be triggered when no read was performed for the specified
147      *        period of time.  Specify {@code 0} to disable.
148      * @param writerIdleTimeSeconds
149      *        an {@link IdleStateEvent} whose state is {@link IdleState#WRITER_IDLE}
150      *        will be triggered when no write was performed for the specified
151      *        period of time.  Specify {@code 0} to disable.
152      * @param allIdleTimeSeconds
153      *        an {@link IdleStateEvent} whose state is {@link IdleState#ALL_IDLE}
154      *        will be triggered when neither read nor write was performed for
155      *        the specified period of time.  Specify {@code 0} to disable.
156      */
157     public IdleStateHandler(
158             int readerIdleTimeSeconds,
159             int writerIdleTimeSeconds,
160             int allIdleTimeSeconds) {
161 
162         this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
163              TimeUnit.SECONDS);
164     }
165 
166     /**
167      * @see #IdleStateHandler(boolean, long, long, long, TimeUnit)
168      */
169     public IdleStateHandler(
170             long readerIdleTime, long writerIdleTime, long allIdleTime,
171             TimeUnit unit) {
172         this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
173     }
174 
175     /**
176      * Creates a new instance firing {@link IdleStateEvent}s.
177      *
178      * @param observeOutput
179      *        whether or not the consumption of {@code bytes} should be taken into
180      *        consideration when assessing write idleness. The default is {@code false}.
181      * @param readerIdleTime
182      *        an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE}
183      *        will be triggered when no read was performed for the specified
184      *        period of time.  Specify {@code 0} to disable.
185      * @param writerIdleTime
186      *        an {@link IdleStateEvent} whose state is {@link IdleState#WRITER_IDLE}
187      *        will be triggered when no write was performed for the specified
188      *        period of time.  Specify {@code 0} to disable.
189      * @param allIdleTime
190      *        an {@link IdleStateEvent} whose state is {@link IdleState#ALL_IDLE}
191      *        will be triggered when neither read nor write was performed for
192      *        the specified period of time.  Specify {@code 0} to disable.
193      * @param unit
194      *        the {@link TimeUnit} of {@code readerIdleTime},
195      *        {@code writeIdleTime}, and {@code allIdleTime}
196      */
197     public IdleStateHandler(boolean observeOutput,
198             long readerIdleTime, long writerIdleTime, long allIdleTime,
199             TimeUnit unit) {
200         ObjectUtil.checkNotNull(unit, "unit");
201 
202         this.observeOutput = observeOutput;
203 
204         if (readerIdleTime <= 0) {
205             readerIdleTimeNanos = 0;
206         } else {
207             readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
208         }
209         if (writerIdleTime <= 0) {
210             writerIdleTimeNanos = 0;
211         } else {
212             writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
213         }
214         if (allIdleTime <= 0) {
215             allIdleTimeNanos = 0;
216         } else {
217             allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
218         }
219     }
220 
221     /**
222      * Return the readerIdleTime that was given when instance this class in milliseconds.
223      *
224      */
225     public long getReaderIdleTimeInMillis() {
226         return TimeUnit.NANOSECONDS.toMillis(readerIdleTimeNanos);
227     }
228 
229     /**
230      * Return the writerIdleTime that was given when instance this class in milliseconds.
231      *
232      */
233     public long getWriterIdleTimeInMillis() {
234         return TimeUnit.NANOSECONDS.toMillis(writerIdleTimeNanos);
235     }
236 
237     /**
238      * Return the allIdleTime that was given when instance this class in milliseconds.
239      *
240      */
241     public long getAllIdleTimeInMillis() {
242         return TimeUnit.NANOSECONDS.toMillis(allIdleTimeNanos);
243     }
244 
245     @Override
246     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
247         this.ticker = ctx.executor().ticker();
248         if (ctx.channel().isActive() && ctx.channel().isRegistered()) {
249             // channelActive() event has been fired already, which means this.channelActive() will
250             // not be invoked. We have to initialize here instead.
251             initialize(ctx);
252         } else {
253             // channelActive() event has not been fired yet.  this.channelActive() will be invoked
254             // and initialization will occur there.
255         }
256     }
257 
258     @Override
259     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
260         destroy();
261     }
262 
263     @Override
264     public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
265         // Initialize early if channel is active already.
266         if (ctx.channel().isActive()) {
267             initialize(ctx);
268         }
269         super.channelRegistered(ctx);
270     }
271 
272     @Override
273     public void channelActive(ChannelHandlerContext ctx) throws Exception {
274         // This method will be invoked only if this handler was added
275         // before channelActive() event is fired.  If a user adds this handler
276         // after the channelActive() event, initialize() will be called by beforeAdd().
277         initialize(ctx);
278         super.channelActive(ctx);
279     }
280 
281     @Override
282     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
283         destroy();
284         super.channelInactive(ctx);
285     }
286 
287     @Override
288     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
289         if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
290             reading = true;
291             firstReaderIdleEvent = firstAllIdleEvent = true;
292         }
293         ctx.fireChannelRead(msg);
294     }
295 
296     @Override
297     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
298         if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) {
299             lastReadTime = ticker.nanoTime();
300             reading = false;
301         }
302         ctx.fireChannelReadComplete();
303     }
304 
305     @Override
306     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
307         // Allow writing with void promise if handler is only configured for read timeout events.
308         if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
309             ctx.write(msg, promise.unvoid()).addListener(writeListener);
310         } else {
311             ctx.write(msg, promise);
312         }
313     }
314 
315     /**
316      * Reset the read timeout. As this handler is not thread-safe, this method <b>must</b> be called on the event loop.
317      */
318     public void resetReadTimeout() {
319         if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
320             lastReadTime = ticker.nanoTime();
321             reading = false;
322         }
323     }
324 
325     /**
326      * Reset the write timeout. As this handler is not thread-safe, this method <b>must</b> be called on the event loop.
327      */
328     public void resetWriteTimeout() {
329         if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
330             lastWriteTime = ticker.nanoTime();
331         }
332     }
333 
334     private void initialize(ChannelHandlerContext ctx) {
335         // Avoid the case where destroy() is called before scheduling timeouts.
336         // See: https://github.com/netty/netty/issues/143
337         switch (state) {
338         case 1:
339         case 2:
340             return;
341         default:
342              break;
343         }
344 
345         state = ST_INITIALIZED;
346         initOutputChanged(ctx);
347 
348         lastReadTime = lastWriteTime = ticker.nanoTime();
349         if (readerIdleTimeNanos > 0) {
350             readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
351                     readerIdleTimeNanos, TimeUnit.NANOSECONDS);
352         }
353         if (writerIdleTimeNanos > 0) {
354             writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
355                     writerIdleTimeNanos, TimeUnit.NANOSECONDS);
356         }
357         if (allIdleTimeNanos > 0) {
358             allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
359                     allIdleTimeNanos, TimeUnit.NANOSECONDS);
360         }
361     }
362 
363     /**
364      * This method is visible for testing!
365      */
366     Future<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
367         return ctx.executor().schedule(task, delay, unit);
368     }
369 
370     private void destroy() {
371         state = ST_DESTROYED;
372 
373         if (readerIdleTimeout != null) {
374             readerIdleTimeout.cancel(false);
375             readerIdleTimeout = null;
376         }
377         if (writerIdleTimeout != null) {
378             writerIdleTimeout.cancel(false);
379             writerIdleTimeout = null;
380         }
381         if (allIdleTimeout != null) {
382             allIdleTimeout.cancel(false);
383             allIdleTimeout = null;
384         }
385     }
386 
387     /**
388      * Is called when an {@link IdleStateEvent} should be fired. This implementation calls
389      * {@link ChannelHandlerContext#fireUserEventTriggered(Object)}.
390      */
391     protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
392         ctx.fireUserEventTriggered(evt);
393     }
394 
395     /**
396      * Returns a {@link IdleStateEvent}.
397      */
398     protected IdleStateEvent newIdleStateEvent(IdleState state, boolean first) {
399         switch (state) {
400             case ALL_IDLE:
401                 return first ? IdleStateEvent.FIRST_ALL_IDLE_STATE_EVENT : IdleStateEvent.ALL_IDLE_STATE_EVENT;
402             case READER_IDLE:
403                 return first ? IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT : IdleStateEvent.READER_IDLE_STATE_EVENT;
404             case WRITER_IDLE:
405                 return first ? IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT : IdleStateEvent.WRITER_IDLE_STATE_EVENT;
406             default:
407                 throw new IllegalArgumentException("Unhandled: state=" + state + ", first=" + first);
408         }
409     }
410 
411     /**
412      * @see #hasOutputChanged(ChannelHandlerContext, boolean)
413      */
414     private void initOutputChanged(ChannelHandlerContext ctx) {
415         if (observeOutput) {
416             Channel channel = ctx.channel();
417             Unsafe unsafe = channel.unsafe();
418             ChannelOutboundBuffer buf = unsafe.outboundBuffer();
419 
420             if (buf != null) {
421                 lastMessageHashCode = System.identityHashCode(buf.current());
422                 lastPendingWriteBytes = buf.totalPendingWriteBytes();
423                 lastFlushProgress = buf.currentProgress();
424             }
425         }
426     }
427 
428     /**
429      * Returns {@code true} if and only if the {@link IdleStateHandler} was constructed
430      * with {@link #observeOutput} enabled and there has been an observed change in the
431      * {@link ChannelOutboundBuffer} between two consecutive calls of this method.
432      *
433      * https://github.com/netty/netty/issues/6150
434      */
435     private boolean hasOutputChanged(ChannelHandlerContext ctx, boolean first) {
436         if (observeOutput) {
437 
438             // We can take this shortcut if the ChannelPromises that got passed into write()
439             // appear to complete. It indicates "change" on message level and we simply assume
440             // that there's change happening on byte level. If the user doesn't observe channel
441             // writability events then they'll eventually OOME and there's clearly a different
442             // problem and idleness is least of their concerns.
443             if (lastChangeCheckTimeStamp != lastWriteTime) {
444                 lastChangeCheckTimeStamp = lastWriteTime;
445 
446                 // But this applies only if it's the non-first call.
447                 if (!first) {
448                     return true;
449                 }
450             }
451 
452             Channel channel = ctx.channel();
453             Unsafe unsafe = channel.unsafe();
454             ChannelOutboundBuffer buf = unsafe.outboundBuffer();
455 
456             if (buf != null) {
457                 int messageHashCode = System.identityHashCode(buf.current());
458                 long pendingWriteBytes = buf.totalPendingWriteBytes();
459 
460                 if (messageHashCode != lastMessageHashCode || pendingWriteBytes != lastPendingWriteBytes) {
461                     lastMessageHashCode = messageHashCode;
462                     lastPendingWriteBytes = pendingWriteBytes;
463 
464                     if (!first) {
465                         return true;
466                     }
467                 }
468 
469                 long flushProgress = buf.currentProgress();
470                 if (flushProgress != lastFlushProgress) {
471                     lastFlushProgress = flushProgress;
472                     return !first;
473                 }
474             }
475         }
476 
477         return false;
478     }
479 
480     private abstract static class AbstractIdleTask implements Runnable {
481 
482         private final ChannelHandlerContext ctx;
483 
484         AbstractIdleTask(ChannelHandlerContext ctx) {
485             this.ctx = ctx;
486         }
487 
488         @Override
489         public void run() {
490             if (!ctx.channel().isOpen()) {
491                 return;
492             }
493 
494             run(ctx);
495         }
496 
497         protected abstract void run(ChannelHandlerContext ctx);
498     }
499 
500     private final class ReaderIdleTimeoutTask extends AbstractIdleTask {
501 
502         ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
503             super(ctx);
504         }
505 
506         @Override
507         protected void run(ChannelHandlerContext ctx) {
508             long nextDelay = readerIdleTimeNanos;
509             if (!reading) {
510                 nextDelay -= ticker.nanoTime() - lastReadTime;
511             }
512 
513             if (nextDelay <= 0) {
514                 // Reader is idle - set a new timeout and notify the callback.
515                 readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
516 
517                 boolean first = firstReaderIdleEvent;
518                 firstReaderIdleEvent = false;
519 
520                 try {
521                     IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
522                     channelIdle(ctx, event);
523                 } catch (Throwable t) {
524                     ctx.fireExceptionCaught(t);
525                 }
526             } else {
527                 // Read occurred before the timeout - set a new timeout with shorter delay.
528                 readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
529             }
530         }
531     }
532 
533     private final class WriterIdleTimeoutTask extends AbstractIdleTask {
534 
535         WriterIdleTimeoutTask(ChannelHandlerContext ctx) {
536             super(ctx);
537         }
538 
539         @Override
540         protected void run(ChannelHandlerContext ctx) {
541 
542             long lastWriteTime = IdleStateHandler.this.lastWriteTime;
543             long nextDelay = writerIdleTimeNanos - (ticker.nanoTime() - lastWriteTime);
544             if (nextDelay <= 0) {
545                 // Writer is idle - set a new timeout and notify the callback.
546                 writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);
547 
548                 boolean first = firstWriterIdleEvent;
549                 firstWriterIdleEvent = false;
550 
551                 try {
552                     if (hasOutputChanged(ctx, first)) {
553                         return;
554                     }
555 
556                     IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
557                     channelIdle(ctx, event);
558                 } catch (Throwable t) {
559                     ctx.fireExceptionCaught(t);
560                 }
561             } else {
562                 // Write occurred before the timeout - set a new timeout with shorter delay.
563                 writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
564             }
565         }
566     }
567 
568     private final class AllIdleTimeoutTask extends AbstractIdleTask {
569 
570         AllIdleTimeoutTask(ChannelHandlerContext ctx) {
571             super(ctx);
572         }
573 
574         @Override
575         protected void run(ChannelHandlerContext ctx) {
576 
577             long nextDelay = allIdleTimeNanos;
578             if (!reading) {
579                 nextDelay -= ticker.nanoTime() - Math.max(lastReadTime, lastWriteTime);
580             }
581             if (nextDelay <= 0) {
582                 // Both reader and writer are idle - set a new timeout and
583                 // notify the callback.
584                 allIdleTimeout = schedule(ctx, this, allIdleTimeNanos, TimeUnit.NANOSECONDS);
585 
586                 boolean first = firstAllIdleEvent;
587                 firstAllIdleEvent = false;
588 
589                 try {
590                     if (hasOutputChanged(ctx, first)) {
591                         return;
592                     }
593 
594                     IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, first);
595                     channelIdle(ctx, event);
596                 } catch (Throwable t) {
597                     ctx.fireExceptionCaught(t);
598                 }
599             } else {
600                 // Either read or write occurred before the timeout - set a new
601                 // timeout with shorter delay.
602                 allIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
603             }
604         }
605     }
606 }