View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.timeout;
17  
18  import io.netty.bootstrap.ServerBootstrap;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelHandlerAdapter;
21  import io.netty.channel.ChannelHandlerContext;
22  import io.netty.channel.ChannelInitializer;
23  
24  import java.util.concurrent.ScheduledFuture;
25  import java.util.concurrent.TimeUnit;
26  
27  /**
28   * Raises a {@link ReadTimeoutException} when no data was read within a certain
29   * period of time.
30   *
31   * <pre>
32   * // The connection is closed when there is no inbound traffic
33   * // for 30 seconds.
34   *
35   * public class MyChannelInitializer extends {@link ChannelInitializer}&lt;{@link Channel}&gt; {
36   *     public void initChannel({@link Channel} channel) {
37   *         channel.pipeline().addLast("readTimeoutHandler", new {@link ReadTimeoutHandler}(30);
38   *         channel.pipeline().addLast("myHandler", new MyHandler());
39   *     }
40   * }
41   *
42   * // Handler should handle the {@link ReadTimeoutException}.
43   * public class MyHandler extends {@link ChannelHandlerAdapter} {
44   *     {@code @Override}
45   *     public void exceptionCaught({@link ChannelHandlerContext} ctx, {@link Throwable} cause)
46   *             throws {@link Exception} {
47   *         if (cause instanceof {@link ReadTimeoutException}) {
48   *             // do something
49   *         } else {
50   *             super.exceptionCaught(ctx, cause);
51   *         }
52   *     }
53   * }
54   *
55   * {@link ServerBootstrap} bootstrap = ...;
56   * ...
57   * bootstrap.childHandler(new MyChannelInitializer());
58   * ...
59   * </pre>
60   * @see WriteTimeoutHandler
61   * @see IdleStateHandler
62   */
63  public class ReadTimeoutHandler extends ChannelHandlerAdapter {
64      private static final long MIN_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
65  
66      private final long timeoutNanos;
67  
68      private volatile ScheduledFuture<?> timeout;
69      private volatile long lastReadTime;
70  
71      private volatile int state; // 0 - none, 1 - Initialized, 2 - Destroyed;
72  
73      private boolean closed;
74  
75      /**
76       * Creates a new instance.
77       *
78       * @param timeoutSeconds
79       *        read timeout in seconds
80       */
81      public ReadTimeoutHandler(int timeoutSeconds) {
82          this(timeoutSeconds, TimeUnit.SECONDS);
83      }
84  
85      /**
86       * Creates a new instance.
87       *
88       * @param timeout
89       *        read timeout
90       * @param unit
91       *        the {@link TimeUnit} of {@code timeout}
92       */
93      public ReadTimeoutHandler(long timeout, TimeUnit unit) {
94          if (unit == null) {
95              throw new NullPointerException("unit");
96          }
97  
98          if (timeout <= 0) {
99              timeoutNanos = 0;
100         } else {
101             timeoutNanos = Math.max(unit.toNanos(timeout), MIN_TIMEOUT_NANOS);
102         }
103     }
104 
105     @Override
106     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
107         if (ctx.channel().isActive() && ctx.channel().isRegistered()) {
108             // channelActvie() event has been fired already, which means this.channelActive() will
109             // not be invoked. We have to initialize here instead.
110             initialize(ctx);
111         } else {
112             // channelActive() event has not been fired yet.  this.channelActive() will be invoked
113             // and initialization will occur there.
114         }
115     }
116 
117     @Override
118     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
119         destroy();
120     }
121 
122     @Override
123     public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
124         // Initialize early if channel is active already.
125         if (ctx.channel().isActive()) {
126             initialize(ctx);
127         }
128         super.channelRegistered(ctx);
129     }
130 
131     @Override
132     public void channelActive(ChannelHandlerContext ctx) throws Exception {
133         // This method will be invoked only if this handler was added
134         // before channelActive() event is fired.  If a user adds this handler
135         // after the channelActive() event, initialize() will be called by beforeAdd().
136         initialize(ctx);
137         super.channelActive(ctx);
138     }
139 
140     @Override
141     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
142         destroy();
143         super.channelInactive(ctx);
144     }
145 
146     @Override
147     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
148         lastReadTime = System.nanoTime();
149         ctx.fireChannelRead(msg);
150     }
151 
152     private void initialize(ChannelHandlerContext ctx) {
153         // Avoid the case where destroy() is called before scheduling timeouts.
154         // See: https://github.com/netty/netty/issues/143
155         switch (state) {
156         case 1:
157         case 2:
158             return;
159         }
160 
161         state = 1;
162 
163         lastReadTime = System.nanoTime();
164         if (timeoutNanos > 0) {
165             timeout = ctx.executor().schedule(
166                     new ReadTimeoutTask(ctx),
167                     timeoutNanos, TimeUnit.NANOSECONDS);
168         }
169     }
170 
171     private void destroy() {
172         state = 2;
173 
174         if (timeout != null) {
175             timeout.cancel(false);
176             timeout = null;
177         }
178     }
179 
180     /**
181      * Is called when a read timeout was detected.
182      */
183     protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
184         if (!closed) {
185             ctx.fireExceptionCaught(ReadTimeoutException.INSTANCE);
186             ctx.close();
187             closed = true;
188         }
189     }
190 
191     private final class ReadTimeoutTask implements Runnable {
192 
193         private final ChannelHandlerContext ctx;
194 
195         ReadTimeoutTask(ChannelHandlerContext ctx) {
196             this.ctx = ctx;
197         }
198 
199         @Override
200         public void run() {
201             if (!ctx.channel().isOpen()) {
202                 return;
203             }
204 
205             long currentTime = System.nanoTime();
206             long nextDelay = timeoutNanos - (currentTime - lastReadTime);
207             if (nextDelay <= 0) {
208                 // Read timed out - set a new timeout and notify the callback.
209                 timeout = ctx.executor().schedule(this, timeoutNanos, TimeUnit.NANOSECONDS);
210                 try {
211                     readTimedOut(ctx);
212                 } catch (Throwable t) {
213                     ctx.fireExceptionCaught(t);
214                 }
215             } else {
216                 // Read occurred before the timeout - set a new timeout with shorter delay.
217                 timeout = ctx.executor().schedule(this, nextDelay, TimeUnit.NANOSECONDS);
218             }
219         }
220     }
221 }