View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.timeout;
17  
18  import io.netty.bootstrap.ServerBootstrap;
19  import io.netty.channel.Channel;
20  import io.netty.channel.Channel.Unsafe;
21  import io.netty.channel.ChannelDuplexHandler;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelFutureListener;
24  import io.netty.channel.ChannelHandlerContext;
25  import io.netty.channel.ChannelInitializer;
26  import io.netty.channel.ChannelOutboundBuffer;
27  import io.netty.channel.ChannelPromise;
28  import io.netty.util.concurrent.Future;
29  import io.netty.util.internal.ObjectUtil;
30  
31  import java.util.concurrent.TimeUnit;
32  
33  /**
34   * Triggers an {@link IdleStateEvent} when a {@link Channel} has not performed
35   * read, write, or both operation for a while.
36   *
37   * <h3>Supported idle states</h3>
38   * <table border="1">
39   * <tr>
40   * <th>Property</th><th>Meaning</th>
41   * </tr>
42   * <tr>
43   * <td>{@code readerIdleTime}</td>
44   * <td>an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE}
45   *     will be triggered when no read was performed for the specified period of
46   *     time.  Specify {@code 0} to disable.</td>
47   * </tr>
48   * <tr>
49   * <td>{@code writerIdleTime}</td>
50   * <td>an {@link IdleStateEvent} whose state is {@link IdleState#WRITER_IDLE}
51   *     will be triggered when no write was performed for the specified period of
52   *     time.  Specify {@code 0} to disable.</td>
53   * </tr>
54   * <tr>
55   * <td>{@code allIdleTime}</td>
56   * <td>an {@link IdleStateEvent} whose state is {@link IdleState#ALL_IDLE}
57   *     will be triggered when neither read nor write was performed for the
58   *     specified period of time.  Specify {@code 0} to disable.</td>
59   * </tr>
60   * </table>
61   *
62   * <pre>
63   * // An example that sends a ping message when there is no outbound traffic
64   * // for 30 seconds.  The connection is closed when there is no inbound traffic
65   * // for 60 seconds.
66   *
67   * public class MyChannelInitializer extends {@link ChannelInitializer}&lt;{@link Channel}&gt; {
68   *     {@code @Override}
69   *     public void initChannel({@link Channel} channel) {
70   *         channel.pipeline().addLast("idleStateHandler", new {@link IdleStateHandler}(60, 30, 0));
71   *         channel.pipeline().addLast("myHandler", new MyHandler());
72   *     }
73   * }
74   *
75   * // Handler should handle the {@link IdleStateEvent} triggered by {@link IdleStateHandler}.
76   * public class MyHandler extends {@link ChannelDuplexHandler} {
77   *     {@code @Override}
78   *     public void userEventTriggered({@link ChannelHandlerContext} ctx, {@link Object} evt) throws {@link Exception} {
79   *         if (evt instanceof {@link IdleStateEvent}) {
80   *             {@link IdleStateEvent} e = ({@link IdleStateEvent}) evt;
81   *             if (e.state() == {@link IdleState}.READER_IDLE) {
82   *                 ctx.close();
83   *             } else if (e.state() == {@link IdleState}.WRITER_IDLE) {
84   *                 ctx.writeAndFlush(new PingMessage());
85   *             }
86   *         }
87   *     }
88   * }
89   *
90   * {@link ServerBootstrap} bootstrap = ...;
91   * ...
92   * bootstrap.childHandler(new MyChannelInitializer());
93   * ...
94   * </pre>
95   *
96   * @see ReadTimeoutHandler
97   * @see WriteTimeoutHandler
98   */
99  public class IdleStateHandler extends ChannelDuplexHandler {
100     private static final long MIN_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
101 
102     // Not create a new ChannelFutureListener per write operation to reduce GC pressure.
103     private final ChannelFutureListener writeListener = new ChannelFutureListener() {
104         @Override
105         public void operationComplete(ChannelFuture future) throws Exception {
106             lastWriteTime = ticksInNanos();
107             firstWriterIdleEvent = firstAllIdleEvent = true;
108         }
109     };
110 
111     private final boolean observeOutput;
112     private final long readerIdleTimeNanos;
113     private final long writerIdleTimeNanos;
114     private final long allIdleTimeNanos;
115 
116     private Future<?> readerIdleTimeout;
117     private long lastReadTime;
118     private boolean firstReaderIdleEvent = true;
119 
120     private Future<?> writerIdleTimeout;
121     private long lastWriteTime;
122     private boolean firstWriterIdleEvent = true;
123 
124     private Future<?> allIdleTimeout;
125     private boolean firstAllIdleEvent = true;
126 
127     private byte state; // 0 - none, 1 - initialized, 2 - destroyed
128     private boolean reading;
129 
130     private long lastChangeCheckTimeStamp;
131     private int lastMessageHashCode;
132     private long lastPendingWriteBytes;
133     private long lastFlushProgress;
134 
135     /**
136      * Creates a new instance firing {@link IdleStateEvent}s.
137      *
138      * @param readerIdleTimeSeconds
139      *        an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE}
140      *        will be triggered when no read was performed for the specified
141      *        period of time.  Specify {@code 0} to disable.
142      * @param writerIdleTimeSeconds
143      *        an {@link IdleStateEvent} whose state is {@link IdleState#WRITER_IDLE}
144      *        will be triggered when no write was performed for the specified
145      *        period of time.  Specify {@code 0} to disable.
146      * @param allIdleTimeSeconds
147      *        an {@link IdleStateEvent} whose state is {@link IdleState#ALL_IDLE}
148      *        will be triggered when neither read nor write was performed for
149      *        the specified period of time.  Specify {@code 0} to disable.
150      */
151     public IdleStateHandler(
152             int readerIdleTimeSeconds,
153             int writerIdleTimeSeconds,
154             int allIdleTimeSeconds) {
155 
156         this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
157              TimeUnit.SECONDS);
158     }
159 
160     /**
161      * @see #IdleStateHandler(boolean, long, long, long, TimeUnit)
162      */
163     public IdleStateHandler(
164             long readerIdleTime, long writerIdleTime, long allIdleTime,
165             TimeUnit unit) {
166         this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
167     }
168 
169     /**
170      * Creates a new instance firing {@link IdleStateEvent}s.
171      *
172      * @param observeOutput
173      *        whether or not the consumption of {@code bytes} should be taken into
174      *        consideration when assessing write idleness. The default is {@code false}.
175      * @param readerIdleTime
176      *        an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE}
177      *        will be triggered when no read was performed for the specified
178      *        period of time.  Specify {@code 0} to disable.
179      * @param writerIdleTime
180      *        an {@link IdleStateEvent} whose state is {@link IdleState#WRITER_IDLE}
181      *        will be triggered when no write was performed for the specified
182      *        period of time.  Specify {@code 0} to disable.
183      * @param allIdleTime
184      *        an {@link IdleStateEvent} whose state is {@link IdleState#ALL_IDLE}
185      *        will be triggered when neither read nor write was performed for
186      *        the specified period of time.  Specify {@code 0} to disable.
187      * @param unit
188      *        the {@link TimeUnit} of {@code readerIdleTime},
189      *        {@code writeIdleTime}, and {@code allIdleTime}
190      */
191     public IdleStateHandler(boolean observeOutput,
192             long readerIdleTime, long writerIdleTime, long allIdleTime,
193             TimeUnit unit) {
194         ObjectUtil.checkNotNull(unit, "unit");
195 
196         this.observeOutput = observeOutput;
197 
198         if (readerIdleTime <= 0) {
199             readerIdleTimeNanos = 0;
200         } else {
201             readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
202         }
203         if (writerIdleTime <= 0) {
204             writerIdleTimeNanos = 0;
205         } else {
206             writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
207         }
208         if (allIdleTime <= 0) {
209             allIdleTimeNanos = 0;
210         } else {
211             allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
212         }
213     }
214 
215     /**
216      * Return the readerIdleTime that was given when instance this class in milliseconds.
217      *
218      */
219     public long getReaderIdleTimeInMillis() {
220         return TimeUnit.NANOSECONDS.toMillis(readerIdleTimeNanos);
221     }
222 
223     /**
224      * Return the writerIdleTime that was given when instance this class in milliseconds.
225      *
226      */
227     public long getWriterIdleTimeInMillis() {
228         return TimeUnit.NANOSECONDS.toMillis(writerIdleTimeNanos);
229     }
230 
231     /**
232      * Return the allIdleTime that was given when instance this class in milliseconds.
233      *
234      */
235     public long getAllIdleTimeInMillis() {
236         return TimeUnit.NANOSECONDS.toMillis(allIdleTimeNanos);
237     }
238 
239     @Override
240     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
241         if (ctx.channel().isActive() && ctx.channel().isRegistered()) {
242             // channelActive() event has been fired already, which means this.channelActive() will
243             // not be invoked. We have to initialize here instead.
244             initialize(ctx);
245         } else {
246             // channelActive() event has not been fired yet.  this.channelActive() will be invoked
247             // and initialization will occur there.
248         }
249     }
250 
251     @Override
252     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
253         destroy();
254     }
255 
256     @Override
257     public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
258         // Initialize early if channel is active already.
259         if (ctx.channel().isActive()) {
260             initialize(ctx);
261         }
262         super.channelRegistered(ctx);
263     }
264 
265     @Override
266     public void channelActive(ChannelHandlerContext ctx) throws Exception {
267         // This method will be invoked only if this handler was added
268         // before channelActive() event is fired.  If a user adds this handler
269         // after the channelActive() event, initialize() will be called by beforeAdd().
270         initialize(ctx);
271         super.channelActive(ctx);
272     }
273 
274     @Override
275     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
276         destroy();
277         super.channelInactive(ctx);
278     }
279 
280     @Override
281     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
282         if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
283             reading = true;
284             firstReaderIdleEvent = firstAllIdleEvent = true;
285         }
286         ctx.fireChannelRead(msg);
287     }
288 
289     @Override
290     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
291         if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) {
292             lastReadTime = ticksInNanos();
293             reading = false;
294         }
295         ctx.fireChannelReadComplete();
296     }
297 
298     @Override
299     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
300         // Allow writing with void promise if handler is only configured for read timeout events.
301         if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
302             ctx.write(msg, promise.unvoid()).addListener(writeListener);
303         } else {
304             ctx.write(msg, promise);
305         }
306     }
307 
308     private void initialize(ChannelHandlerContext ctx) {
309         // Avoid the case where destroy() is called before scheduling timeouts.
310         // See: https://github.com/netty/netty/issues/143
311         switch (state) {
312         case 1:
313         case 2:
314             return;
315         default:
316              break;
317         }
318 
319         state = 1;
320         initOutputChanged(ctx);
321 
322         lastReadTime = lastWriteTime = ticksInNanos();
323         if (readerIdleTimeNanos > 0) {
324             readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
325                     readerIdleTimeNanos, TimeUnit.NANOSECONDS);
326         }
327         if (writerIdleTimeNanos > 0) {
328             writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
329                     writerIdleTimeNanos, TimeUnit.NANOSECONDS);
330         }
331         if (allIdleTimeNanos > 0) {
332             allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
333                     allIdleTimeNanos, TimeUnit.NANOSECONDS);
334         }
335     }
336 
337     /**
338      * This method is visible for testing!
339      */
340     long ticksInNanos() {
341         return System.nanoTime();
342     }
343 
344     /**
345      * This method is visible for testing!
346      */
347     Future<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
348         return ctx.executor().schedule(task, delay, unit);
349     }
350 
351     private void destroy() {
352         state = 2;
353 
354         if (readerIdleTimeout != null) {
355             readerIdleTimeout.cancel(false);
356             readerIdleTimeout = null;
357         }
358         if (writerIdleTimeout != null) {
359             writerIdleTimeout.cancel(false);
360             writerIdleTimeout = null;
361         }
362         if (allIdleTimeout != null) {
363             allIdleTimeout.cancel(false);
364             allIdleTimeout = null;
365         }
366     }
367 
368     /**
369      * Is called when an {@link IdleStateEvent} should be fired. This implementation calls
370      * {@link ChannelHandlerContext#fireUserEventTriggered(Object)}.
371      */
372     protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
373         ctx.fireUserEventTriggered(evt);
374     }
375 
376     /**
377      * Returns a {@link IdleStateEvent}.
378      */
379     protected IdleStateEvent newIdleStateEvent(IdleState state, boolean first) {
380         switch (state) {
381             case ALL_IDLE:
382                 return first ? IdleStateEvent.FIRST_ALL_IDLE_STATE_EVENT : IdleStateEvent.ALL_IDLE_STATE_EVENT;
383             case READER_IDLE:
384                 return first ? IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT : IdleStateEvent.READER_IDLE_STATE_EVENT;
385             case WRITER_IDLE:
386                 return first ? IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT : IdleStateEvent.WRITER_IDLE_STATE_EVENT;
387             default:
388                 throw new IllegalArgumentException("Unhandled: state=" + state + ", first=" + first);
389         }
390     }
391 
392     /**
393      * @see #hasOutputChanged(ChannelHandlerContext, boolean)
394      */
395     private void initOutputChanged(ChannelHandlerContext ctx) {
396         if (observeOutput) {
397             Channel channel = ctx.channel();
398             Unsafe unsafe = channel.unsafe();
399             ChannelOutboundBuffer buf = unsafe.outboundBuffer();
400 
401             if (buf != null) {
402                 lastMessageHashCode = System.identityHashCode(buf.current());
403                 lastPendingWriteBytes = buf.totalPendingWriteBytes();
404                 lastFlushProgress = buf.currentProgress();
405             }
406         }
407     }
408 
409     /**
410      * Returns {@code true} if and only if the {@link IdleStateHandler} was constructed
411      * with {@link #observeOutput} enabled and there has been an observed change in the
412      * {@link ChannelOutboundBuffer} between two consecutive calls of this method.
413      *
414      * https://github.com/netty/netty/issues/6150
415      */
416     private boolean hasOutputChanged(ChannelHandlerContext ctx, boolean first) {
417         if (observeOutput) {
418 
419             // We can take this shortcut if the ChannelPromises that got passed into write()
420             // appear to complete. It indicates "change" on message level and we simply assume
421             // that there's change happening on byte level. If the user doesn't observe channel
422             // writability events then they'll eventually OOME and there's clearly a different
423             // problem and idleness is least of their concerns.
424             if (lastChangeCheckTimeStamp != lastWriteTime) {
425                 lastChangeCheckTimeStamp = lastWriteTime;
426 
427                 // But this applies only if it's the non-first call.
428                 if (!first) {
429                     return true;
430                 }
431             }
432 
433             Channel channel = ctx.channel();
434             Unsafe unsafe = channel.unsafe();
435             ChannelOutboundBuffer buf = unsafe.outboundBuffer();
436 
437             if (buf != null) {
438                 int messageHashCode = System.identityHashCode(buf.current());
439                 long pendingWriteBytes = buf.totalPendingWriteBytes();
440 
441                 if (messageHashCode != lastMessageHashCode || pendingWriteBytes != lastPendingWriteBytes) {
442                     lastMessageHashCode = messageHashCode;
443                     lastPendingWriteBytes = pendingWriteBytes;
444 
445                     if (!first) {
446                         return true;
447                     }
448                 }
449 
450                 long flushProgress = buf.currentProgress();
451                 if (flushProgress != lastFlushProgress) {
452                     lastFlushProgress = flushProgress;
453                     return !first;
454                 }
455             }
456         }
457 
458         return false;
459     }
460 
461     private abstract static class AbstractIdleTask implements Runnable {
462 
463         private final ChannelHandlerContext ctx;
464 
465         AbstractIdleTask(ChannelHandlerContext ctx) {
466             this.ctx = ctx;
467         }
468 
469         @Override
470         public void run() {
471             if (!ctx.channel().isOpen()) {
472                 return;
473             }
474 
475             run(ctx);
476         }
477 
478         protected abstract void run(ChannelHandlerContext ctx);
479     }
480 
481     private final class ReaderIdleTimeoutTask extends AbstractIdleTask {
482 
483         ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
484             super(ctx);
485         }
486 
487         @Override
488         protected void run(ChannelHandlerContext ctx) {
489             long nextDelay = readerIdleTimeNanos;
490             if (!reading) {
491                 nextDelay -= ticksInNanos() - lastReadTime;
492             }
493 
494             if (nextDelay <= 0) {
495                 // Reader is idle - set a new timeout and notify the callback.
496                 readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
497 
498                 boolean first = firstReaderIdleEvent;
499                 firstReaderIdleEvent = false;
500 
501                 try {
502                     IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
503                     channelIdle(ctx, event);
504                 } catch (Throwable t) {
505                     ctx.fireExceptionCaught(t);
506                 }
507             } else {
508                 // Read occurred before the timeout - set a new timeout with shorter delay.
509                 readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
510             }
511         }
512     }
513 
514     private final class WriterIdleTimeoutTask extends AbstractIdleTask {
515 
516         WriterIdleTimeoutTask(ChannelHandlerContext ctx) {
517             super(ctx);
518         }
519 
520         @Override
521         protected void run(ChannelHandlerContext ctx) {
522 
523             long lastWriteTime = IdleStateHandler.this.lastWriteTime;
524             long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
525             if (nextDelay <= 0) {
526                 // Writer is idle - set a new timeout and notify the callback.
527                 writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);
528 
529                 boolean first = firstWriterIdleEvent;
530                 firstWriterIdleEvent = false;
531 
532                 try {
533                     if (hasOutputChanged(ctx, first)) {
534                         return;
535                     }
536 
537                     IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
538                     channelIdle(ctx, event);
539                 } catch (Throwable t) {
540                     ctx.fireExceptionCaught(t);
541                 }
542             } else {
543                 // Write occurred before the timeout - set a new timeout with shorter delay.
544                 writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
545             }
546         }
547     }
548 
549     private final class AllIdleTimeoutTask extends AbstractIdleTask {
550 
551         AllIdleTimeoutTask(ChannelHandlerContext ctx) {
552             super(ctx);
553         }
554 
555         @Override
556         protected void run(ChannelHandlerContext ctx) {
557 
558             long nextDelay = allIdleTimeNanos;
559             if (!reading) {
560                 nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
561             }
562             if (nextDelay <= 0) {
563                 // Both reader and writer are idle - set a new timeout and
564                 // notify the callback.
565                 allIdleTimeout = schedule(ctx, this, allIdleTimeNanos, TimeUnit.NANOSECONDS);
566 
567                 boolean first = firstAllIdleEvent;
568                 firstAllIdleEvent = false;
569 
570                 try {
571                     if (hasOutputChanged(ctx, first)) {
572                         return;
573                     }
574 
575                     IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, first);
576                     channelIdle(ctx, event);
577                 } catch (Throwable t) {
578                     ctx.fireExceptionCaught(t);
579                 }
580             } else {
581                 // Either read or write occurred before the timeout - set a new
582                 // timeout with shorter delay.
583                 allIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
584             }
585         }
586     }
587 }