View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.timeout;
17  
18  import io.netty.bootstrap.ServerBootstrap;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelFuture;
21  import io.netty.channel.ChannelFutureListener;
22  import io.netty.channel.ChannelHandlerAdapter;
23  import io.netty.channel.ChannelHandlerContext;
24  import io.netty.channel.ChannelInitializer;
25  import io.netty.channel.ChannelPromise;
26  
27  import java.util.concurrent.ScheduledFuture;
28  import java.util.concurrent.TimeUnit;
29  
30  /**
31   * Raises a {@link WriteTimeoutException} when no data was written within a
32   * certain period of time.
33   *
34   * <pre>
35   * // The connection is closed when there is no outbound traffic
36   * // for 30 seconds.
37   *
38   * public class MyChannelInitializer extends {@link ChannelInitializer}&lt;{@link Channel}&gt; {
39   *     public void initChannel({@link Channel} channel) {
40   *         channel.pipeline().addLast("writeTimeoutHandler", new {@link WriteTimeoutHandler}(30);
41   *         channel.pipeline().addLast("myHandler", new MyHandler());
42   *     }
43   * }
44   *
45   * // Handler should handle the {@link WriteTimeoutException}.
46   * public class MyHandler extends {@link ChannelHandlerAdapter} {
47   *     {@code @Override}
48   *     public void exceptionCaught({@link ChannelHandlerContext} ctx, {@link Throwable} cause)
49   *             throws {@link Exception} {
50   *         if (cause instanceof {@link WriteTimeoutException}) {
51   *             // do something
52   *         } else {
53   *             super.exceptionCaught(ctx, cause);
54   *         }
55   *     }
56   * }
57   *
58   * {@link ServerBootstrap} bootstrap = ...;
59   * ...
60   * bootstrap.childHandler(new MyChannelInitializer());
61   * ...
62   * </pre>
63   * @see ReadTimeoutHandler
64   * @see IdleStateHandler
65   */
66  public class WriteTimeoutHandler extends ChannelHandlerAdapter {
67      private static final long MIN_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
68  
69      private final long timeoutNanos;
70  
71      private boolean closed;
72  
73      /**
74       * Creates a new instance.
75       *
76       * @param timeoutSeconds
77       *        write timeout in seconds
78       */
79      public WriteTimeoutHandler(int timeoutSeconds) {
80          this(timeoutSeconds, TimeUnit.SECONDS);
81      }
82  
83      /**
84       * Creates a new instance.
85       *
86       * @param timeout
87       *        write timeout
88       * @param unit
89       *        the {@link TimeUnit} of {@code timeout}
90       */
91      public WriteTimeoutHandler(long timeout, TimeUnit unit) {
92          if (unit == null) {
93              throw new NullPointerException("unit");
94          }
95  
96          if (timeout <= 0) {
97              timeoutNanos = 0;
98          } else {
99              timeoutNanos = Math.max(unit.toNanos(timeout), MIN_TIMEOUT_NANOS);
100         }
101     }
102 
103     @Override
104     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
105         if (timeoutNanos > 0) {
106             promise = promise.unvoid();
107             scheduleTimeout(ctx, promise);
108         }
109         ctx.write(msg, promise);
110     }
111 
112     private void scheduleTimeout(final ChannelHandlerContext ctx, final ChannelPromise future) {
113         // Schedule a timeout.
114         final ScheduledFuture<?> sf = ctx.executor().schedule(new Runnable() {
115             @Override
116             public void run() {
117                 // Was not written yet so issue a write timeout
118                 // The future itself will be failed with a ClosedChannelException once the close() was issued
119                 // See https://github.com/netty/netty/issues/2159
120                 if (!future.isDone()) {
121                     try {
122                         writeTimedOut(ctx);
123                     } catch (Throwable t) {
124                         ctx.fireExceptionCaught(t);
125                     }
126                 }
127             }
128         }, timeoutNanos, TimeUnit.NANOSECONDS);
129 
130         // Cancel the scheduled timeout if the flush future is complete.
131         future.addListener(new ChannelFutureListener() {
132             @Override
133             public void operationComplete(ChannelFuture future) throws Exception {
134                 sf.cancel(false);
135             }
136         });
137     }
138 
139     /**
140      * Is called when a write timeout was detected
141      */
142     protected void writeTimedOut(ChannelHandlerContext ctx) throws Exception {
143         if (!closed) {
144             ctx.fireExceptionCaught(WriteTimeoutException.INSTANCE);
145             ctx.close();
146             closed = true;
147         }
148     }
149 }