View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.timeout;
17  
18  import io.netty.bootstrap.ServerBootstrap;
19  import io.netty.channel.Channel;
20  import io.netty.channel.Channel.Unsafe;
21  import io.netty.channel.ChannelDuplexHandler;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelFutureListener;
24  import io.netty.channel.ChannelHandlerContext;
25  import io.netty.channel.ChannelInitializer;
26  import io.netty.channel.ChannelOutboundBuffer;
27  import io.netty.channel.ChannelPromise;
28  
29  import java.util.concurrent.ScheduledFuture;
30  import java.util.concurrent.TimeUnit;
31  
32  /**
33   * Triggers an {@link IdleStateEvent} when a {@link Channel} has not performed
34   * read, write, or both operation for a while.
35   *
36   * <h3>Supported idle states</h3>
37   * <table border="1">
38   * <tr>
39   * <th>Property</th><th>Meaning</th>
40   * </tr>
41   * <tr>
42   * <td>{@code readerIdleTime}</td>
43   * <td>an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE}
44   *     will be triggered when no read was performed for the specified period of
45   *     time.  Specify {@code 0} to disable.</td>
46   * </tr>
47   * <tr>
48   * <td>{@code writerIdleTime}</td>
49   * <td>an {@link IdleStateEvent} whose state is {@link IdleState#WRITER_IDLE}
50   *     will be triggered when no write was performed for the specified period of
51   *     time.  Specify {@code 0} to disable.</td>
52   * </tr>
53   * <tr>
54   * <td>{@code allIdleTime}</td>
55   * <td>an {@link IdleStateEvent} whose state is {@link IdleState#ALL_IDLE}
56   *     will be triggered when neither read nor write was performed for the
57   *     specified period of time.  Specify {@code 0} to disable.</td>
58   * </tr>
59   * </table>
60   *
61   * <pre>
62   * // An example that sends a ping message when there is no outbound traffic
63   * // for 30 seconds.  The connection is closed when there is no inbound traffic
64   * // for 60 seconds.
65   *
66   * public class MyChannelInitializer extends {@link ChannelInitializer}&lt;{@link Channel}&gt; {
67   *     {@code @Override}
68   *     public void initChannel({@link Channel} channel) {
69   *         channel.pipeline().addLast("idleStateHandler", new {@link IdleStateHandler}(60, 30, 0));
70   *         channel.pipeline().addLast("myHandler", new MyHandler());
71   *     }
72   * }
73   *
74   * // Handler should handle the {@link IdleStateEvent} triggered by {@link IdleStateHandler}.
75   * public class MyHandler extends {@link ChannelDuplexHandler} {
76   *     {@code @Override}
77   *     public void userEventTriggered({@link ChannelHandlerContext} ctx, {@link Object} evt) throws {@link Exception} {
78   *         if (evt instanceof {@link IdleStateEvent}) {
79   *             {@link IdleStateEvent} e = ({@link IdleStateEvent}) evt;
80   *             if (e.state() == {@link IdleState}.READER_IDLE) {
81   *                 ctx.close();
82   *             } else if (e.state() == {@link IdleState}.WRITER_IDLE) {
83   *                 ctx.writeAndFlush(new PingMessage());
84   *             }
85   *         }
86   *     }
87   * }
88   *
89   * {@link ServerBootstrap} bootstrap = ...;
90   * ...
91   * bootstrap.childHandler(new MyChannelInitializer());
92   * ...
93   * </pre>
94   *
95   * @see ReadTimeoutHandler
96   * @see WriteTimeoutHandler
97   */
98  public class IdleStateHandler extends ChannelDuplexHandler {
99      private static final long MIN_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
100 
101     // Not create a new ChannelFutureListener per write operation to reduce GC pressure.
102     private final ChannelFutureListener writeListener = new ChannelFutureListener() {
103         @Override
104         public void operationComplete(ChannelFuture future) throws Exception {
105             lastWriteTime = ticksInNanos();
106             firstWriterIdleEvent = firstAllIdleEvent = true;
107         }
108     };
109 
110     private final boolean observeOutput;
111     private final long readerIdleTimeNanos;
112     private final long writerIdleTimeNanos;
113     private final long allIdleTimeNanos;
114 
115     private ScheduledFuture<?> readerIdleTimeout;
116     private long lastReadTime;
117     private boolean firstReaderIdleEvent = true;
118 
119     private ScheduledFuture<?> writerIdleTimeout;
120     private long lastWriteTime;
121     private boolean firstWriterIdleEvent = true;
122 
123     private ScheduledFuture<?> allIdleTimeout;
124     private boolean firstAllIdleEvent = true;
125 
126     private byte state; // 0 - none, 1 - initialized, 2 - destroyed
127     private boolean reading;
128 
129     private long lastChangeCheckTimeStamp;
130     private int lastMessageHashCode;
131     private long lastPendingWriteBytes;
132 
133     /**
134      * Creates a new instance firing {@link IdleStateEvent}s.
135      *
136      * @param readerIdleTimeSeconds
137      *        an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE}
138      *        will be triggered when no read was performed for the specified
139      *        period of time.  Specify {@code 0} to disable.
140      * @param writerIdleTimeSeconds
141      *        an {@link IdleStateEvent} whose state is {@link IdleState#WRITER_IDLE}
142      *        will be triggered when no write was performed for the specified
143      *        period of time.  Specify {@code 0} to disable.
144      * @param allIdleTimeSeconds
145      *        an {@link IdleStateEvent} whose state is {@link IdleState#ALL_IDLE}
146      *        will be triggered when neither read nor write was performed for
147      *        the specified period of time.  Specify {@code 0} to disable.
148      */
149     public IdleStateHandler(
150             int readerIdleTimeSeconds,
151             int writerIdleTimeSeconds,
152             int allIdleTimeSeconds) {
153 
154         this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
155              TimeUnit.SECONDS);
156     }
157 
158     /**
159      * @see #IdleStateHandler(boolean, long, long, long, TimeUnit)
160      */
161     public IdleStateHandler(
162             long readerIdleTime, long writerIdleTime, long allIdleTime,
163             TimeUnit unit) {
164         this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
165     }
166 
167     /**
168      * Creates a new instance firing {@link IdleStateEvent}s.
169      *
170      * @param observeOutput
171      *        whether or not the consumption of {@code bytes} should be taken into
172      *        consideration when assessing write idleness. The default is {@code false}.
173      * @param readerIdleTime
174      *        an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE}
175      *        will be triggered when no read was performed for the specified
176      *        period of time.  Specify {@code 0} to disable.
177      * @param writerIdleTime
178      *        an {@link IdleStateEvent} whose state is {@link IdleState#WRITER_IDLE}
179      *        will be triggered when no write was performed for the specified
180      *        period of time.  Specify {@code 0} to disable.
181      * @param allIdleTime
182      *        an {@link IdleStateEvent} whose state is {@link IdleState#ALL_IDLE}
183      *        will be triggered when neither read nor write was performed for
184      *        the specified period of time.  Specify {@code 0} to disable.
185      * @param unit
186      *        the {@link TimeUnit} of {@code readerIdleTime},
187      *        {@code writeIdleTime}, and {@code allIdleTime}
188      */
189     public IdleStateHandler(boolean observeOutput,
190             long readerIdleTime, long writerIdleTime, long allIdleTime,
191             TimeUnit unit) {
192         if (unit == null) {
193             throw new NullPointerException("unit");
194         }
195 
196         this.observeOutput = observeOutput;
197 
198         if (readerIdleTime <= 0) {
199             readerIdleTimeNanos = 0;
200         } else {
201             readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
202         }
203         if (writerIdleTime <= 0) {
204             writerIdleTimeNanos = 0;
205         } else {
206             writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
207         }
208         if (allIdleTime <= 0) {
209             allIdleTimeNanos = 0;
210         } else {
211             allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
212         }
213     }
214 
215     /**
216      * Return the readerIdleTime that was given when instance this class in milliseconds.
217      *
218      */
219     public long getReaderIdleTimeInMillis() {
220         return TimeUnit.NANOSECONDS.toMillis(readerIdleTimeNanos);
221     }
222 
223     /**
224      * Return the writerIdleTime that was given when instance this class in milliseconds.
225      *
226      */
227     public long getWriterIdleTimeInMillis() {
228         return TimeUnit.NANOSECONDS.toMillis(writerIdleTimeNanos);
229     }
230 
231     /**
232      * Return the allIdleTime that was given when instance this class in milliseconds.
233      *
234      */
235     public long getAllIdleTimeInMillis() {
236         return TimeUnit.NANOSECONDS.toMillis(allIdleTimeNanos);
237     }
238 
239     @Override
240     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
241         if (ctx.channel().isActive() && ctx.channel().isRegistered()) {
242             // channelActive() event has been fired already, which means this.channelActive() will
243             // not be invoked. We have to initialize here instead.
244             initialize(ctx);
245         } else {
246             // channelActive() event has not been fired yet.  this.channelActive() will be invoked
247             // and initialization will occur there.
248         }
249     }
250 
251     @Override
252     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
253         destroy();
254     }
255 
256     @Override
257     public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
258         // Initialize early if channel is active already.
259         if (ctx.channel().isActive()) {
260             initialize(ctx);
261         }
262         super.channelRegistered(ctx);
263     }
264 
265     @Override
266     public void channelActive(ChannelHandlerContext ctx) throws Exception {
267         // This method will be invoked only if this handler was added
268         // before channelActive() event is fired.  If a user adds this handler
269         // after the channelActive() event, initialize() will be called by beforeAdd().
270         initialize(ctx);
271         super.channelActive(ctx);
272     }
273 
274     @Override
275     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
276         destroy();
277         super.channelInactive(ctx);
278     }
279 
280     @Override
281     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
282         if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
283             reading = true;
284             firstReaderIdleEvent = firstAllIdleEvent = true;
285         }
286         ctx.fireChannelRead(msg);
287     }
288 
289     @Override
290     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
291         if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) {
292             lastReadTime = ticksInNanos();
293             reading = false;
294         }
295         ctx.fireChannelReadComplete();
296     }
297 
298     @Override
299     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
300         // Allow writing with void promise if handler is only configured for read timeout events.
301         if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
302             ctx.write(msg, promise).addListener(writeListener);
303         } else {
304             ctx.write(msg, promise);
305         }
306     }
307 
308     private void initialize(ChannelHandlerContext ctx) {
309         // Avoid the case where destroy() is called before scheduling timeouts.
310         // See: https://github.com/netty/netty/issues/143
311         switch (state) {
312         case 1:
313         case 2:
314             return;
315         }
316 
317         state = 1;
318         initOutputChanged(ctx);
319 
320         lastReadTime = lastWriteTime = ticksInNanos();
321         if (readerIdleTimeNanos > 0) {
322             readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
323                     readerIdleTimeNanos, TimeUnit.NANOSECONDS);
324         }
325         if (writerIdleTimeNanos > 0) {
326             writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
327                     writerIdleTimeNanos, TimeUnit.NANOSECONDS);
328         }
329         if (allIdleTimeNanos > 0) {
330             allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
331                     allIdleTimeNanos, TimeUnit.NANOSECONDS);
332         }
333     }
334 
335     /**
336      * This method is visible for testing!
337      */
338     long ticksInNanos() {
339         return System.nanoTime();
340     }
341 
342     /**
343      * This method is visible for testing!
344      */
345     ScheduledFuture<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
346         return ctx.executor().schedule(task, delay, unit);
347     }
348 
349     private void destroy() {
350         state = 2;
351 
352         if (readerIdleTimeout != null) {
353             readerIdleTimeout.cancel(false);
354             readerIdleTimeout = null;
355         }
356         if (writerIdleTimeout != null) {
357             writerIdleTimeout.cancel(false);
358             writerIdleTimeout = null;
359         }
360         if (allIdleTimeout != null) {
361             allIdleTimeout.cancel(false);
362             allIdleTimeout = null;
363         }
364     }
365 
366     /**
367      * Is called when an {@link IdleStateEvent} should be fired. This implementation calls
368      * {@link ChannelHandlerContext#fireUserEventTriggered(Object)}.
369      */
370     protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
371         ctx.fireUserEventTriggered(evt);
372     }
373 
374     /**
375      * Returns a {@link IdleStateEvent}.
376      */
377     protected IdleStateEvent newIdleStateEvent(IdleState state, boolean first) {
378         switch (state) {
379             case ALL_IDLE:
380                 return first ? IdleStateEvent.FIRST_ALL_IDLE_STATE_EVENT : IdleStateEvent.ALL_IDLE_STATE_EVENT;
381             case READER_IDLE:
382                 return first ? IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT : IdleStateEvent.READER_IDLE_STATE_EVENT;
383             case WRITER_IDLE:
384                 return first ? IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT : IdleStateEvent.WRITER_IDLE_STATE_EVENT;
385             default:
386                 throw new IllegalArgumentException("Unhandled: state=" + state + ", first=" + first);
387         }
388     }
389 
390     /**
391      * @see #hasOutputChanged(ChannelHandlerContext, boolean)
392      */
393     private void initOutputChanged(ChannelHandlerContext ctx) {
394         if (observeOutput) {
395             Channel channel = ctx.channel();
396             Unsafe unsafe = channel.unsafe();
397             ChannelOutboundBuffer buf = unsafe.outboundBuffer();
398 
399             if (buf != null) {
400                 lastMessageHashCode = System.identityHashCode(buf.current());
401                 lastPendingWriteBytes = buf.totalPendingWriteBytes();
402             }
403         }
404     }
405 
406     /**
407      * Returns {@code true} if and only if the {@link IdleStateHandler} was constructed
408      * with {@link #observeOutput} enabled and there has been an observed change in the
409      * {@link ChannelOutboundBuffer} between two consecutive calls of this method.
410      *
411      * https://github.com/netty/netty/issues/6150
412      */
413     private boolean hasOutputChanged(ChannelHandlerContext ctx, boolean first) {
414         if (observeOutput) {
415 
416             // We can take this shortcut if the ChannelPromises that got passed into write()
417             // appear to complete. It indicates "change" on message level and we simply assume
418             // that there's change happening on byte level. If the user doesn't observe channel
419             // writability events then they'll eventually OOME and there's clearly a different
420             // problem and idleness is least of their concerns.
421             if (lastChangeCheckTimeStamp != lastWriteTime) {
422                 lastChangeCheckTimeStamp = lastWriteTime;
423 
424                 // But this applies only if it's the non-first call.
425                 if (!first) {
426                     return true;
427                 }
428             }
429 
430             Channel channel = ctx.channel();
431             Unsafe unsafe = channel.unsafe();
432             ChannelOutboundBuffer buf = unsafe.outboundBuffer();
433 
434             if (buf != null) {
435                 int messageHashCode = System.identityHashCode(buf.current());
436                 long pendingWriteBytes = buf.totalPendingWriteBytes();
437 
438                 if (messageHashCode != lastMessageHashCode || pendingWriteBytes != lastPendingWriteBytes) {
439                     lastMessageHashCode = messageHashCode;
440                     lastPendingWriteBytes = pendingWriteBytes;
441 
442                     if (!first) {
443                         return true;
444                     }
445                 }
446             }
447         }
448 
449         return false;
450     }
451 
452     private abstract static class AbstractIdleTask implements Runnable {
453 
454         private final ChannelHandlerContext ctx;
455 
456         AbstractIdleTask(ChannelHandlerContext ctx) {
457             this.ctx = ctx;
458         }
459 
460         @Override
461         public void run() {
462             if (!ctx.channel().isOpen()) {
463                 return;
464             }
465 
466             run(ctx);
467         }
468 
469         protected abstract void run(ChannelHandlerContext ctx);
470     }
471 
472     private final class ReaderIdleTimeoutTask extends AbstractIdleTask {
473 
474         ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
475             super(ctx);
476         }
477 
478         @Override
479         protected void run(ChannelHandlerContext ctx) {
480             long nextDelay = readerIdleTimeNanos;
481             if (!reading) {
482                 nextDelay -= ticksInNanos() - lastReadTime;
483             }
484 
485             if (nextDelay <= 0) {
486                 // Reader is idle - set a new timeout and notify the callback.
487                 readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
488 
489                 boolean first = firstReaderIdleEvent;
490                 firstReaderIdleEvent = false;
491 
492                 try {
493                     IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
494                     channelIdle(ctx, event);
495                 } catch (Throwable t) {
496                     ctx.fireExceptionCaught(t);
497                 }
498             } else {
499                 // Read occurred before the timeout - set a new timeout with shorter delay.
500                 readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
501             }
502         }
503     }
504 
505     private final class WriterIdleTimeoutTask extends AbstractIdleTask {
506 
507         WriterIdleTimeoutTask(ChannelHandlerContext ctx) {
508             super(ctx);
509         }
510 
511         @Override
512         protected void run(ChannelHandlerContext ctx) {
513 
514             long lastWriteTime = IdleStateHandler.this.lastWriteTime;
515             long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
516             if (nextDelay <= 0) {
517                 // Writer is idle - set a new timeout and notify the callback.
518                 writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);
519 
520                 boolean first = firstWriterIdleEvent;
521                 firstWriterIdleEvent = false;
522 
523                 try {
524                     if (hasOutputChanged(ctx, first)) {
525                         return;
526                     }
527 
528                     IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
529                     channelIdle(ctx, event);
530                 } catch (Throwable t) {
531                     ctx.fireExceptionCaught(t);
532                 }
533             } else {
534                 // Write occurred before the timeout - set a new timeout with shorter delay.
535                 writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
536             }
537         }
538     }
539 
540     private final class AllIdleTimeoutTask extends AbstractIdleTask {
541 
542         AllIdleTimeoutTask(ChannelHandlerContext ctx) {
543             super(ctx);
544         }
545 
546         @Override
547         protected void run(ChannelHandlerContext ctx) {
548 
549             long nextDelay = allIdleTimeNanos;
550             if (!reading) {
551                 nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
552             }
553             if (nextDelay <= 0) {
554                 // Both reader and writer are idle - set a new timeout and
555                 // notify the callback.
556                 allIdleTimeout = schedule(ctx, this, allIdleTimeNanos, TimeUnit.NANOSECONDS);
557 
558                 boolean first = firstAllIdleEvent;
559                 firstAllIdleEvent = false;
560 
561                 try {
562                     if (hasOutputChanged(ctx, first)) {
563                         return;
564                     }
565 
566                     IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, first);
567                     channelIdle(ctx, event);
568                 } catch (Throwable t) {
569                     ctx.fireExceptionCaught(t);
570                 }
571             } else {
572                 // Either read or write occurred before the timeout - set a new
573                 // timeout with shorter delay.
574                 allIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
575             }
576         }
577     }
578 }