View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.timeout;
17  
18  import io.netty.bootstrap.ServerBootstrap;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelDuplexHandler;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelFutureListener;
23  import io.netty.channel.ChannelHandlerContext;
24  import io.netty.channel.ChannelInitializer;
25  import io.netty.channel.ChannelOutboundHandlerAdapter;
26  import io.netty.channel.ChannelPromise;
27  
28  import java.util.concurrent.ScheduledFuture;
29  import java.util.concurrent.TimeUnit;
30  
31  /**
32   * Raises a {@link WriteTimeoutException} when a write operation cannot finish in a certain period of time.
33   *
34   * <pre>
35   * // The connection is closed when a write operation cannot finish in 30 seconds.
36   *
37   * public class MyChannelInitializer extends {@link ChannelInitializer}&lt;{@link Channel}&gt; {
38   *     public void initChannel({@link Channel} channel) {
39   *         channel.pipeline().addLast("writeTimeoutHandler", new {@link WriteTimeoutHandler}(30);
40   *         channel.pipeline().addLast("myHandler", new MyHandler());
41   *     }
42   * }
43   *
44   * // Handler should handle the {@link WriteTimeoutException}.
45   * public class MyHandler extends {@link ChannelDuplexHandler} {
46   *     {@code @Override}
47   *     public void exceptionCaught({@link ChannelHandlerContext} ctx, {@link Throwable} cause)
48   *             throws {@link Exception} {
49   *         if (cause instanceof {@link WriteTimeoutException}) {
50   *             // do something
51   *         } else {
52   *             super.exceptionCaught(ctx, cause);
53   *         }
54   *     }
55   * }
56   *
57   * {@link ServerBootstrap} bootstrap = ...;
58   * ...
59   * bootstrap.childHandler(new MyChannelInitializer());
60   * ...
61   * </pre>
62   * @see ReadTimeoutHandler
63   * @see IdleStateHandler
64   */
65  public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter {
66      private static final long MIN_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
67  
68      private final long timeoutNanos;
69  
70      /**
71       * A doubly-linked list to track all WriteTimeoutTasks
72       */
73      private WriteTimeoutTask lastTask;
74  
75      private boolean closed;
76  
77      /**
78       * Creates a new instance.
79       *
80       * @param timeoutSeconds
81       *        write timeout in seconds
82       */
83      public WriteTimeoutHandler(int timeoutSeconds) {
84          this(timeoutSeconds, TimeUnit.SECONDS);
85      }
86  
87      /**
88       * Creates a new instance.
89       *
90       * @param timeout
91       *        write timeout
92       * @param unit
93       *        the {@link TimeUnit} of {@code timeout}
94       */
95      public WriteTimeoutHandler(long timeout, TimeUnit unit) {
96          if (unit == null) {
97              throw new NullPointerException("unit");
98          }
99  
100         if (timeout <= 0) {
101             timeoutNanos = 0;
102         } else {
103             timeoutNanos = Math.max(unit.toNanos(timeout), MIN_TIMEOUT_NANOS);
104         }
105     }
106 
107     @Override
108     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
109         scheduleTimeout(ctx, promise);
110         ctx.write(msg, promise);
111     }
112 
113     @Override
114     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
115         WriteTimeoutTask task = lastTask;
116         lastTask = null;
117         while (task != null) {
118             task.scheduledFuture.cancel(false);
119             WriteTimeoutTask prev = task.prev;
120             task.prev = null;
121             task.next = null;
122             task = prev;
123         }
124     }
125 
126     private void scheduleTimeout(final ChannelHandlerContext ctx, final ChannelPromise promise) {
127         // Schedule a timeout.
128         final WriteTimeoutTask task = new WriteTimeoutTask(ctx, promise);
129         task.scheduledFuture = ctx.executor().schedule(task, timeoutNanos, TimeUnit.NANOSECONDS);
130 
131         if (!task.scheduledFuture.isDone()) {
132             addWriteTimeoutTask(task);
133 
134             // Cancel the scheduled timeout if the flush promise is complete.
135             promise.addListener(task);
136         }
137     }
138 
139     private void addWriteTimeoutTask(WriteTimeoutTask task) {
140         if (lastTask == null) {
141             lastTask = task;
142         } else {
143             lastTask.next = task;
144             task.prev = lastTask;
145             lastTask = task;
146         }
147     }
148 
149     private void removeWriteTimeoutTask(WriteTimeoutTask task) {
150         if (task == lastTask) {
151             // task is the tail of list
152             assert task.next == null;
153             lastTask = lastTask.prev;
154             if (lastTask != null) {
155                 lastTask.next = null;
156             }
157         } else if (task.prev == null && task.next == null) {
158             // Since task is not lastTask, then it has been removed or not been added.
159             return;
160         } else if (task.prev == null) {
161             // task is the head of list and the list has at least 2 nodes
162             task.next.prev = null;
163         } else {
164             task.prev.next = task.next;
165             task.next.prev = task.prev;
166         }
167         task.prev = null;
168         task.next = null;
169     }
170 
171     /**
172      * Is called when a write timeout was detected
173      */
174     protected void writeTimedOut(ChannelHandlerContext ctx) throws Exception {
175         if (!closed) {
176             ctx.fireExceptionCaught(WriteTimeoutException.INSTANCE);
177             ctx.close();
178             closed = true;
179         }
180     }
181 
182     private final class WriteTimeoutTask implements Runnable, ChannelFutureListener {
183 
184         private final ChannelHandlerContext ctx;
185         private final ChannelPromise promise;
186 
187         // WriteTimeoutTask is also a node of a doubly-linked list
188         WriteTimeoutTask prev;
189         WriteTimeoutTask next;
190 
191         ScheduledFuture<?> scheduledFuture;
192 
193         WriteTimeoutTask(ChannelHandlerContext ctx, ChannelPromise promise) {
194             this.ctx = ctx;
195             this.promise = promise;
196         }
197 
198         @Override
199         public void run() {
200             // Was not written yet so issue a write timeout
201             // The promise itself will be failed with a ClosedChannelException once the close() was issued
202             // See https://github.com/netty/netty/issues/2159
203             if (!promise.isDone()) {
204                 try {
205                     writeTimedOut(ctx);
206                 } catch (Throwable t) {
207                     ctx.fireExceptionCaught(t);
208                 }
209             }
210             removeWriteTimeoutTask(this);
211         }
212 
213         @Override
214         public void operationComplete(ChannelFuture future) throws Exception {
215             // scheduledFuture has already be set when reaching here
216             scheduledFuture.cancel(false);
217             removeWriteTimeoutTask(this);
218         }
219     }
220 }