View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.timeout;
17  
18  import io.netty.bootstrap.ServerBootstrap;
19  import io.netty.channel.Channel;
20  import io.netty.channel.Channel.Unsafe;
21  import io.netty.channel.ChannelDuplexHandler;
22  import io.netty.channel.ChannelFutureListener;
23  import io.netty.channel.ChannelHandlerContext;
24  import io.netty.channel.ChannelInitializer;
25  import io.netty.channel.ChannelOutboundBuffer;
26  import io.netty.channel.ChannelPromise;
27  import io.netty.util.concurrent.Future;
28  import io.netty.util.concurrent.Ticker;
29  import io.netty.util.internal.ObjectUtil;
30  
31  import java.util.concurrent.TimeUnit;
32  
33  /**
34   * Triggers an {@link IdleStateEvent} when a {@link Channel} has not performed
35   * read, write, or both operation for a while.
36   *
37   * <h3>Supported idle states</h3>
38   * <table border="1">
39   * <tr>
40   * <th>Property</th><th>Meaning</th>
41   * </tr>
42   * <tr>
43   * <td>{@code readerIdleTime}</td>
44   * <td>an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE}
45   *     will be triggered when no read was performed for the specified period of
46   *     time.  Specify {@code 0} to disable.</td>
47   * </tr>
48   * <tr>
49   * <td>{@code writerIdleTime}</td>
50   * <td>an {@link IdleStateEvent} whose state is {@link IdleState#WRITER_IDLE}
51   *     will be triggered when no write was performed for the specified period of
52   *     time.  Specify {@code 0} to disable.</td>
53   * </tr>
54   * <tr>
55   * <td>{@code allIdleTime}</td>
56   * <td>an {@link IdleStateEvent} whose state is {@link IdleState#ALL_IDLE}
57   *     will be triggered when neither read nor write was performed for the
58   *     specified period of time.  Specify {@code 0} to disable.</td>
59   * </tr>
60   * </table>
61   *
62   * <pre>
63   * // An example that sends a ping message when there is no outbound traffic
64   * // for 30 seconds.  The connection is closed when there is no inbound traffic
65   * // for 60 seconds.
66   *
67   * public class MyChannelInitializer extends {@link ChannelInitializer}&lt;{@link Channel}&gt; {
68   *     {@code @Override}
69   *     public void initChannel({@link Channel} channel) {
70   *         channel.pipeline().addLast("idleStateHandler", new {@link IdleStateHandler}(60, 30, 0));
71   *         channel.pipeline().addLast("myHandler", new MyHandler());
72   *     }
73   * }
74   *
75   * // Handler should handle the {@link IdleStateEvent} triggered by {@link IdleStateHandler}.
76   * public class MyHandler extends {@link ChannelDuplexHandler} {
77   *     {@code @Override}
78   *     public void userEventTriggered({@link ChannelHandlerContext} ctx, {@link Object} evt) throws {@link Exception} {
79   *         if (evt instanceof {@link IdleStateEvent}) {
80   *             {@link IdleStateEvent} e = ({@link IdleStateEvent}) evt;
81   *             if (e.state() == {@link IdleState}.READER_IDLE) {
82   *                 ctx.close();
83   *             } else if (e.state() == {@link IdleState}.WRITER_IDLE) {
84   *                 ctx.writeAndFlush(new PingMessage());
85   *             }
86   *         }
87   *     }
88   * }
89   *
90   * {@link ServerBootstrap} bootstrap = ...;
91   * ...
92   * bootstrap.childHandler(new MyChannelInitializer());
93   * ...
94   * </pre>
95   *
96   * @see ReadTimeoutHandler
97   * @see WriteTimeoutHandler
98   */
99  public class IdleStateHandler extends ChannelDuplexHandler {
100     private static final long MIN_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
101 
102     private final boolean observeOutput;
103     private final long readerIdleTimeNanos;
104     private final long writerIdleTimeNanos;
105     private final long allIdleTimeNanos;
106 
107     private Ticker ticker = Ticker.systemTicker();
108 
109     private Future<?> readerIdleTimeout;
110     private long lastReadTime;
111     private boolean firstReaderIdleEvent = true;
112 
113     private Future<?> writerIdleTimeout;
114     private long lastWriteTime;
115     private boolean firstWriterIdleEvent = true;
116 
117     private Future<?> allIdleTimeout;
118     private boolean firstAllIdleEvent = true;
119 
120     private byte state;
121     private static final byte ST_INITIALIZED = 1;
122     private static final byte ST_DESTROYED = 2;
123 
124     private boolean reading;
125 
126     private long lastChangeCheckTimeStamp;
127     private int lastMessageHashCode;
128     private long lastPendingWriteBytes;
129     private long lastFlushProgress;
130     // Not create a new ChannelFutureListener per write operation to reduce GC pressure.
131     private final ChannelFutureListener writeListener = future -> {
132         lastWriteTime = ticker.nanoTime();
133         firstWriterIdleEvent = firstAllIdleEvent = true;
134     };
135 
136     /**
137      * Creates a new instance firing {@link IdleStateEvent}s.
138      *
139      * @param readerIdleTimeSeconds
140      *        an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE}
141      *        will be triggered when no read was performed for the specified
142      *        period of time.  Specify {@code 0} to disable.
143      * @param writerIdleTimeSeconds
144      *        an {@link IdleStateEvent} whose state is {@link IdleState#WRITER_IDLE}
145      *        will be triggered when no write was performed for the specified
146      *        period of time.  Specify {@code 0} to disable.
147      * @param allIdleTimeSeconds
148      *        an {@link IdleStateEvent} whose state is {@link IdleState#ALL_IDLE}
149      *        will be triggered when neither read nor write was performed for
150      *        the specified period of time.  Specify {@code 0} to disable.
151      */
152     public IdleStateHandler(
153             int readerIdleTimeSeconds,
154             int writerIdleTimeSeconds,
155             int allIdleTimeSeconds) {
156 
157         this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
158              TimeUnit.SECONDS);
159     }
160 
161     /**
162      * @see #IdleStateHandler(boolean, long, long, long, TimeUnit)
163      */
164     public IdleStateHandler(
165             long readerIdleTime, long writerIdleTime, long allIdleTime,
166             TimeUnit unit) {
167         this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
168     }
169 
170     /**
171      * Creates a new instance firing {@link IdleStateEvent}s.
172      *
173      * @param observeOutput
174      *        whether or not the consumption of {@code bytes} should be taken into
175      *        consideration when assessing write idleness. The default is {@code false}.
176      * @param readerIdleTime
177      *        an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE}
178      *        will be triggered when no read was performed for the specified
179      *        period of time.  Specify {@code 0} to disable.
180      * @param writerIdleTime
181      *        an {@link IdleStateEvent} whose state is {@link IdleState#WRITER_IDLE}
182      *        will be triggered when no write was performed for the specified
183      *        period of time.  Specify {@code 0} to disable.
184      * @param allIdleTime
185      *        an {@link IdleStateEvent} whose state is {@link IdleState#ALL_IDLE}
186      *        will be triggered when neither read nor write was performed for
187      *        the specified period of time.  Specify {@code 0} to disable.
188      * @param unit
189      *        the {@link TimeUnit} of {@code readerIdleTime},
190      *        {@code writeIdleTime}, and {@code allIdleTime}
191      */
192     public IdleStateHandler(boolean observeOutput,
193             long readerIdleTime, long writerIdleTime, long allIdleTime,
194             TimeUnit unit) {
195         ObjectUtil.checkNotNull(unit, "unit");
196 
197         this.observeOutput = observeOutput;
198 
199         if (readerIdleTime <= 0) {
200             readerIdleTimeNanos = 0;
201         } else {
202             readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
203         }
204         if (writerIdleTime <= 0) {
205             writerIdleTimeNanos = 0;
206         } else {
207             writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
208         }
209         if (allIdleTime <= 0) {
210             allIdleTimeNanos = 0;
211         } else {
212             allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
213         }
214     }
215 
216     /**
217      * Return the readerIdleTime that was given when instance this class in milliseconds.
218      *
219      */
220     public long getReaderIdleTimeInMillis() {
221         return TimeUnit.NANOSECONDS.toMillis(readerIdleTimeNanos);
222     }
223 
224     /**
225      * Return the writerIdleTime that was given when instance this class in milliseconds.
226      *
227      */
228     public long getWriterIdleTimeInMillis() {
229         return TimeUnit.NANOSECONDS.toMillis(writerIdleTimeNanos);
230     }
231 
232     /**
233      * Return the allIdleTime that was given when instance this class in milliseconds.
234      *
235      */
236     public long getAllIdleTimeInMillis() {
237         return TimeUnit.NANOSECONDS.toMillis(allIdleTimeNanos);
238     }
239 
240     @Override
241     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
242         this.ticker = ctx.executor().ticker();
243         if (ctx.channel().isActive() && ctx.channel().isRegistered()) {
244             // channelActive() event has been fired already, which means this.channelActive() will
245             // not be invoked. We have to initialize here instead.
246             initialize(ctx);
247         } else {
248             // channelActive() event has not been fired yet.  this.channelActive() will be invoked
249             // and initialization will occur there.
250         }
251     }
252 
253     @Override
254     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
255         destroy();
256     }
257 
258     @Override
259     public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
260         // Initialize early if channel is active already.
261         if (ctx.channel().isActive()) {
262             initialize(ctx);
263         }
264         super.channelRegistered(ctx);
265     }
266 
267     @Override
268     public void channelActive(ChannelHandlerContext ctx) throws Exception {
269         // This method will be invoked only if this handler was added
270         // before channelActive() event is fired.  If a user adds this handler
271         // after the channelActive() event, initialize() will be called by beforeAdd().
272         initialize(ctx);
273         super.channelActive(ctx);
274     }
275 
276     @Override
277     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
278         destroy();
279         super.channelInactive(ctx);
280     }
281 
282     @Override
283     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
284         if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
285             reading = true;
286             firstReaderIdleEvent = firstAllIdleEvent = true;
287         }
288         ctx.fireChannelRead(msg);
289     }
290 
291     @Override
292     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
293         if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) {
294             lastReadTime = ticker.nanoTime();
295             reading = false;
296         }
297         ctx.fireChannelReadComplete();
298     }
299 
300     @Override
301     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
302         // Allow writing with void promise if handler is only configured for read timeout events.
303         if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
304             ctx.write(msg, promise.unvoid()).addListener(writeListener);
305         } else {
306             ctx.write(msg, promise);
307         }
308     }
309 
310     /**
311      * Reset the read timeout. As this handler is not thread-safe, this method <b>must</b> be called on the event loop.
312      */
313     public void resetReadTimeout() {
314         if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
315             lastReadTime = ticker.nanoTime();
316             reading = false;
317         }
318     }
319 
320     /**
321      * Reset the write timeout. As this handler is not thread-safe, this method <b>must</b> be called on the event loop.
322      */
323     public void resetWriteTimeout() {
324         if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
325             lastWriteTime = ticker.nanoTime();
326         }
327     }
328 
329     private void initialize(ChannelHandlerContext ctx) {
330         // Avoid the case where destroy() is called before scheduling timeouts.
331         // See: https://github.com/netty/netty/issues/143
332         switch (state) {
333         case 1:
334         case 2:
335             return;
336         default:
337              break;
338         }
339 
340         state = ST_INITIALIZED;
341         initOutputChanged(ctx);
342 
343         lastReadTime = lastWriteTime = ticker.nanoTime();
344         if (readerIdleTimeNanos > 0) {
345             readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
346                     readerIdleTimeNanos, TimeUnit.NANOSECONDS);
347         }
348         if (writerIdleTimeNanos > 0) {
349             writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
350                     writerIdleTimeNanos, TimeUnit.NANOSECONDS);
351         }
352         if (allIdleTimeNanos > 0) {
353             allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
354                     allIdleTimeNanos, TimeUnit.NANOSECONDS);
355         }
356     }
357 
358     /**
359      * This method is visible for testing!
360      */
361     Future<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
362         return ctx.executor().schedule(task, delay, unit);
363     }
364 
365     private void destroy() {
366         state = ST_DESTROYED;
367 
368         if (readerIdleTimeout != null) {
369             readerIdleTimeout.cancel(false);
370             readerIdleTimeout = null;
371         }
372         if (writerIdleTimeout != null) {
373             writerIdleTimeout.cancel(false);
374             writerIdleTimeout = null;
375         }
376         if (allIdleTimeout != null) {
377             allIdleTimeout.cancel(false);
378             allIdleTimeout = null;
379         }
380     }
381 
382     /**
383      * Is called when an {@link IdleStateEvent} should be fired. This implementation calls
384      * {@link ChannelHandlerContext#fireUserEventTriggered(Object)}.
385      */
386     protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
387         ctx.fireUserEventTriggered(evt);
388     }
389 
390     /**
391      * Returns a {@link IdleStateEvent}.
392      */
393     protected IdleStateEvent newIdleStateEvent(IdleState state, boolean first) {
394         switch (state) {
395             case ALL_IDLE:
396                 return first ? IdleStateEvent.FIRST_ALL_IDLE_STATE_EVENT : IdleStateEvent.ALL_IDLE_STATE_EVENT;
397             case READER_IDLE:
398                 return first ? IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT : IdleStateEvent.READER_IDLE_STATE_EVENT;
399             case WRITER_IDLE:
400                 return first ? IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT : IdleStateEvent.WRITER_IDLE_STATE_EVENT;
401             default:
402                 throw new IllegalArgumentException("Unhandled: state=" + state + ", first=" + first);
403         }
404     }
405 
406     /**
407      * @see #hasOutputChanged(ChannelHandlerContext, boolean)
408      */
409     private void initOutputChanged(ChannelHandlerContext ctx) {
410         if (observeOutput) {
411             Channel channel = ctx.channel();
412             Unsafe unsafe = channel.unsafe();
413             ChannelOutboundBuffer buf = unsafe.outboundBuffer();
414 
415             if (buf != null) {
416                 lastMessageHashCode = System.identityHashCode(buf.current());
417                 lastPendingWriteBytes = buf.totalPendingWriteBytes();
418                 lastFlushProgress = buf.currentProgress();
419             }
420         }
421     }
422 
423     /**
424      * Returns {@code true} if and only if the {@link IdleStateHandler} was constructed
425      * with {@link #observeOutput} enabled and there has been an observed change in the
426      * {@link ChannelOutboundBuffer} between two consecutive calls of this method.
427      *
428      * https://github.com/netty/netty/issues/6150
429      */
430     private boolean hasOutputChanged(ChannelHandlerContext ctx, boolean first) {
431         if (observeOutput) {
432 
433             // We can take this shortcut if the ChannelPromises that got passed into write()
434             // appear to complete. It indicates "change" on message level and we simply assume
435             // that there's change happening on byte level. If the user doesn't observe channel
436             // writability events then they'll eventually OOME and there's clearly a different
437             // problem and idleness is least of their concerns.
438             if (lastChangeCheckTimeStamp != lastWriteTime) {
439                 lastChangeCheckTimeStamp = lastWriteTime;
440 
441                 // But this applies only if it's the non-first call.
442                 if (!first) {
443                     return true;
444                 }
445             }
446 
447             Channel channel = ctx.channel();
448             Unsafe unsafe = channel.unsafe();
449             ChannelOutboundBuffer buf = unsafe.outboundBuffer();
450 
451             if (buf != null) {
452                 int messageHashCode = System.identityHashCode(buf.current());
453                 long pendingWriteBytes = buf.totalPendingWriteBytes();
454 
455                 if (messageHashCode != lastMessageHashCode || pendingWriteBytes != lastPendingWriteBytes) {
456                     lastMessageHashCode = messageHashCode;
457                     lastPendingWriteBytes = pendingWriteBytes;
458 
459                     if (!first) {
460                         return true;
461                     }
462                 }
463 
464                 long flushProgress = buf.currentProgress();
465                 if (flushProgress != lastFlushProgress) {
466                     lastFlushProgress = flushProgress;
467                     return !first;
468                 }
469             }
470         }
471 
472         return false;
473     }
474 
475     private abstract static class AbstractIdleTask implements Runnable {
476 
477         private final ChannelHandlerContext ctx;
478 
479         AbstractIdleTask(ChannelHandlerContext ctx) {
480             this.ctx = ctx;
481         }
482 
483         @Override
484         public void run() {
485             if (!ctx.channel().isOpen()) {
486                 return;
487             }
488 
489             run(ctx);
490         }
491 
492         protected abstract void run(ChannelHandlerContext ctx);
493     }
494 
495     private final class ReaderIdleTimeoutTask extends AbstractIdleTask {
496 
497         ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
498             super(ctx);
499         }
500 
501         @Override
502         protected void run(ChannelHandlerContext ctx) {
503             long nextDelay = readerIdleTimeNanos;
504             if (!reading) {
505                 nextDelay -= ticker.nanoTime() - lastReadTime;
506             }
507 
508             if (nextDelay <= 0) {
509                 // Reader is idle - set a new timeout and notify the callback.
510                 readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
511 
512                 boolean first = firstReaderIdleEvent;
513                 firstReaderIdleEvent = false;
514 
515                 try {
516                     IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
517                     channelIdle(ctx, event);
518                 } catch (Throwable t) {
519                     ctx.fireExceptionCaught(t);
520                 }
521             } else {
522                 // Read occurred before the timeout - set a new timeout with shorter delay.
523                 readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
524             }
525         }
526     }
527 
528     private final class WriterIdleTimeoutTask extends AbstractIdleTask {
529 
530         WriterIdleTimeoutTask(ChannelHandlerContext ctx) {
531             super(ctx);
532         }
533 
534         @Override
535         protected void run(ChannelHandlerContext ctx) {
536 
537             long lastWriteTime = IdleStateHandler.this.lastWriteTime;
538             long nextDelay = writerIdleTimeNanos - (ticker.nanoTime() - lastWriteTime);
539             if (nextDelay <= 0) {
540                 // Writer is idle - set a new timeout and notify the callback.
541                 writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);
542 
543                 boolean first = firstWriterIdleEvent;
544                 firstWriterIdleEvent = false;
545 
546                 try {
547                     if (hasOutputChanged(ctx, first)) {
548                         return;
549                     }
550 
551                     IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
552                     channelIdle(ctx, event);
553                 } catch (Throwable t) {
554                     ctx.fireExceptionCaught(t);
555                 }
556             } else {
557                 // Write occurred before the timeout - set a new timeout with shorter delay.
558                 writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
559             }
560         }
561     }
562 
563     private final class AllIdleTimeoutTask extends AbstractIdleTask {
564 
565         AllIdleTimeoutTask(ChannelHandlerContext ctx) {
566             super(ctx);
567         }
568 
569         @Override
570         protected void run(ChannelHandlerContext ctx) {
571 
572             long nextDelay = allIdleTimeNanos;
573             if (!reading) {
574                 nextDelay -= ticker.nanoTime() - Math.max(lastReadTime, lastWriteTime);
575             }
576             if (nextDelay <= 0) {
577                 // Both reader and writer are idle - set a new timeout and
578                 // notify the callback.
579                 allIdleTimeout = schedule(ctx, this, allIdleTimeNanos, TimeUnit.NANOSECONDS);
580 
581                 boolean first = firstAllIdleEvent;
582                 firstAllIdleEvent = false;
583 
584                 try {
585                     if (hasOutputChanged(ctx, first)) {
586                         return;
587                     }
588 
589                     IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, first);
590                     channelIdle(ctx, event);
591                 } catch (Throwable t) {
592                     ctx.fireExceptionCaught(t);
593                 }
594             } else {
595                 // Either read or write occurred before the timeout - set a new
596                 // timeout with shorter delay.
597                 allIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
598             }
599         }
600     }
601 }