View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.timeout;
17  
18  import io.netty.bootstrap.ServerBootstrap;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelDuplexHandler;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelFutureListener;
23  import io.netty.channel.ChannelHandlerContext;
24  import io.netty.channel.ChannelInitializer;
25  import io.netty.channel.ChannelOutboundHandlerAdapter;
26  import io.netty.channel.ChannelPromise;
27  
28  import java.util.concurrent.ScheduledFuture;
29  import java.util.concurrent.TimeUnit;
30  
31  /**
32   * Raises a {@link WriteTimeoutException} when a write operation cannot finish in a certain period of time.
33   *
34   * <pre>
35   * // The connection is closed when a write operation cannot finish in 30 seconds.
36   *
37   * public class MyChannelInitializer extends {@link ChannelInitializer}&lt;{@link Channel}&gt; {
38   *     public void initChannel({@link Channel} channel) {
39   *         channel.pipeline().addLast("writeTimeoutHandler", new {@link WriteTimeoutHandler}(30);
40   *         channel.pipeline().addLast("myHandler", new MyHandler());
41   *     }
42   * }
43   *
44   * // Handler should handle the {@link WriteTimeoutException}.
45   * public class MyHandler extends {@link ChannelDuplexHandler} {
46   *     {@code @Override}
47   *     public void exceptionCaught({@link ChannelHandlerContext} ctx, {@link Throwable} cause)
48   *             throws {@link Exception} {
49   *         if (cause instanceof {@link WriteTimeoutException}) {
50   *             // do something
51   *         } else {
52   *             super.exceptionCaught(ctx, cause);
53   *         }
54   *     }
55   * }
56   *
57   * {@link ServerBootstrap} bootstrap = ...;
58   * ...
59   * bootstrap.childHandler(new MyChannelInitializer());
60   * ...
61   * </pre>
62   * @see ReadTimeoutHandler
63   * @see IdleStateHandler
64   */
65  public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter {
66      private static final long MIN_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
67  
68      private final long timeoutNanos;
69  
70      /**
71       * A doubly-linked list to track all WriteTimeoutTasks
72       */
73      private WriteTimeoutTask lastTask;
74  
75      private boolean closed;
76  
77      /**
78       * Creates a new instance.
79       *
80       * @param timeoutSeconds
81       *        write timeout in seconds
82       */
83      public WriteTimeoutHandler(int timeoutSeconds) {
84          this(timeoutSeconds, TimeUnit.SECONDS);
85      }
86  
87      /**
88       * Creates a new instance.
89       *
90       * @param timeout
91       *        write timeout
92       * @param unit
93       *        the {@link TimeUnit} of {@code timeout}
94       */
95      public WriteTimeoutHandler(long timeout, TimeUnit unit) {
96          if (unit == null) {
97              throw new NullPointerException("unit");
98          }
99  
100         if (timeout <= 0) {
101             timeoutNanos = 0;
102         } else {
103             timeoutNanos = Math.max(unit.toNanos(timeout), MIN_TIMEOUT_NANOS);
104         }
105     }
106 
107     @Override
108     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
109         if (timeoutNanos > 0) {
110             promise = promise.unvoid();
111             scheduleTimeout(ctx, promise);
112         }
113         ctx.write(msg, promise);
114     }
115 
116     @Override
117     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
118         WriteTimeoutTask task = lastTask;
119         lastTask = null;
120         while (task != null) {
121             task.scheduledFuture.cancel(false);
122             WriteTimeoutTask prev = task.prev;
123             task.prev = null;
124             task.next = null;
125             task = prev;
126         }
127     }
128 
129     private void scheduleTimeout(final ChannelHandlerContext ctx, final ChannelPromise promise) {
130         // Schedule a timeout.
131         final WriteTimeoutTask task = new WriteTimeoutTask(ctx, promise);
132         task.scheduledFuture = ctx.executor().schedule(task, timeoutNanos, TimeUnit.NANOSECONDS);
133 
134         if (!task.scheduledFuture.isDone()) {
135             addWriteTimeoutTask(task);
136 
137             // Cancel the scheduled timeout if the flush promise is complete.
138             promise.addListener(task);
139         }
140     }
141 
142     private void addWriteTimeoutTask(WriteTimeoutTask task) {
143         if (lastTask == null) {
144             lastTask = task;
145         } else {
146             lastTask.next = task;
147             task.prev = lastTask;
148             lastTask = task;
149         }
150     }
151 
152     private void removeWriteTimeoutTask(WriteTimeoutTask task) {
153         if (task == lastTask) {
154             // task is the tail of list
155             assert task.next == null;
156             lastTask = lastTask.prev;
157             if (lastTask != null) {
158                 lastTask.next = null;
159             }
160         } else if (task.prev == null && task.next == null) {
161             // Since task is not lastTask, then it has been removed or not been added.
162             return;
163         } else if (task.prev == null) {
164             // task is the head of list and the list has at least 2 nodes
165             task.next.prev = null;
166         } else {
167             task.prev.next = task.next;
168             task.next.prev = task.prev;
169         }
170         task.prev = null;
171         task.next = null;
172     }
173 
174     /**
175      * Is called when a write timeout was detected
176      */
177     protected void writeTimedOut(ChannelHandlerContext ctx) throws Exception {
178         if (!closed) {
179             ctx.fireExceptionCaught(WriteTimeoutException.INSTANCE);
180             ctx.close();
181             closed = true;
182         }
183     }
184 
185     private final class WriteTimeoutTask implements Runnable, ChannelFutureListener {
186 
187         private final ChannelHandlerContext ctx;
188         private final ChannelPromise promise;
189 
190         // WriteTimeoutTask is also a node of a doubly-linked list
191         WriteTimeoutTask prev;
192         WriteTimeoutTask next;
193 
194         ScheduledFuture<?> scheduledFuture;
195 
196         WriteTimeoutTask(ChannelHandlerContext ctx, ChannelPromise promise) {
197             this.ctx = ctx;
198             this.promise = promise;
199         }
200 
201         @Override
202         public void run() {
203             // Was not written yet so issue a write timeout
204             // The promise itself will be failed with a ClosedChannelException once the close() was issued
205             // See https://github.com/netty/netty/issues/2159
206             if (!promise.isDone()) {
207                 try {
208                     writeTimedOut(ctx);
209                 } catch (Throwable t) {
210                     ctx.fireExceptionCaught(t);
211                 }
212             }
213             removeWriteTimeoutTask(this);
214         }
215 
216         @Override
217         public void operationComplete(ChannelFuture future) throws Exception {
218             // scheduledFuture has already be set when reaching here
219             scheduledFuture.cancel(false);
220             removeWriteTimeoutTask(this);
221         }
222     }
223 }