View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.timeout;
17  
18  import io.netty.bootstrap.ServerBootstrap;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelDuplexHandler;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelFutureListener;
23  import io.netty.channel.ChannelHandlerContext;
24  import io.netty.channel.ChannelInitializer;
25  import io.netty.channel.ChannelOutboundHandlerAdapter;
26  import io.netty.channel.ChannelPromise;
27  import io.netty.util.concurrent.Future;
28  import io.netty.util.internal.ObjectUtil;
29  
30  import java.util.concurrent.TimeUnit;
31  
32  /**
33   * Raises a {@link WriteTimeoutException} when a write operation cannot finish in a certain period of time.
34   *
35   * <pre>
36   * // The connection is closed when a write operation cannot finish in 30 seconds.
37   *
38   * public class MyChannelInitializer extends {@link ChannelInitializer}&lt;{@link Channel}&gt; {
39   *     public void initChannel({@link Channel} channel) {
40   *         channel.pipeline().addLast("writeTimeoutHandler", new {@link WriteTimeoutHandler}(30);
41   *         channel.pipeline().addLast("myHandler", new MyHandler());
42   *     }
43   * }
44   *
45   * // Handler should handle the {@link WriteTimeoutException}.
46   * public class MyHandler extends {@link ChannelDuplexHandler} {
47   *     {@code @Override}
48   *     public void exceptionCaught({@link ChannelHandlerContext} ctx, {@link Throwable} cause)
49   *             throws {@link Exception} {
50   *         if (cause instanceof {@link WriteTimeoutException}) {
51   *             // do something
52   *         } else {
53   *             super.exceptionCaught(ctx, cause);
54   *         }
55   *     }
56   * }
57   *
58   * {@link ServerBootstrap} bootstrap = ...;
59   * ...
60   * bootstrap.childHandler(new MyChannelInitializer());
61   * ...
62   * </pre>
63   * @see ReadTimeoutHandler
64   * @see IdleStateHandler
65   */
66  public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter {
67      private static final long MIN_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
68  
69      private final long timeoutNanos;
70  
71      /**
72       * A doubly-linked list to track all WriteTimeoutTasks
73       */
74      private WriteTimeoutTask lastTask;
75  
76      private boolean closed;
77  
78      /**
79       * Creates a new instance.
80       *
81       * @param timeoutSeconds
82       *        write timeout in seconds
83       */
84      public WriteTimeoutHandler(int timeoutSeconds) {
85          this(timeoutSeconds, TimeUnit.SECONDS);
86      }
87  
88      /**
89       * Creates a new instance.
90       *
91       * @param timeout
92       *        write timeout
93       * @param unit
94       *        the {@link TimeUnit} of {@code timeout}
95       */
96      public WriteTimeoutHandler(long timeout, TimeUnit unit) {
97          ObjectUtil.checkNotNull(unit, "unit");
98  
99          if (timeout <= 0) {
100             timeoutNanos = 0;
101         } else {
102             timeoutNanos = Math.max(unit.toNanos(timeout), MIN_TIMEOUT_NANOS);
103         }
104     }
105 
106     @Override
107     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
108         if (timeoutNanos > 0) {
109             promise = promise.unvoid();
110             scheduleTimeout(ctx, promise);
111         }
112         ctx.write(msg, promise);
113     }
114 
115     @Override
116     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
117         assert ctx.executor().inEventLoop();
118         WriteTimeoutTask task = lastTask;
119         lastTask = null;
120         while (task != null) {
121             assert task.ctx.executor().inEventLoop();
122             task.scheduledFuture.cancel(false);
123             WriteTimeoutTask prev = task.prev;
124             task.prev = null;
125             task.next = null;
126             task = prev;
127         }
128     }
129 
130     private void scheduleTimeout(final ChannelHandlerContext ctx, final ChannelPromise promise) {
131         // Schedule a timeout.
132         final WriteTimeoutTask task = new WriteTimeoutTask(ctx, promise);
133         task.scheduledFuture = ctx.executor().schedule(task, timeoutNanos, TimeUnit.NANOSECONDS);
134 
135         if (!task.scheduledFuture.isDone()) {
136             addWriteTimeoutTask(task);
137 
138             // Cancel the scheduled timeout if the flush promise is complete.
139             promise.addListener(task);
140         }
141     }
142 
143     private void addWriteTimeoutTask(WriteTimeoutTask task) {
144         assert task.ctx.executor().inEventLoop();
145         if (lastTask != null) {
146             lastTask.next = task;
147             task.prev = lastTask;
148         }
149         lastTask = task;
150     }
151 
152     private void removeWriteTimeoutTask(WriteTimeoutTask task) {
153         assert task.ctx.executor().inEventLoop();
154         if (task == lastTask) {
155             // task is the tail of list
156             assert task.next == null;
157             lastTask = lastTask.prev;
158             if (lastTask != null) {
159                 lastTask.next = null;
160             }
161         } else if (task.prev == null && task.next == null) {
162             // Since task is not lastTask, then it has been removed or not been added.
163             return;
164         } else if (task.prev == null) {
165             // task is the head of list and the list has at least 2 nodes
166             task.next.prev = null;
167         } else {
168             task.prev.next = task.next;
169             task.next.prev = task.prev;
170         }
171         task.prev = null;
172         task.next = null;
173     }
174 
175     /**
176      * Is called when a write timeout was detected
177      */
178     protected void writeTimedOut(ChannelHandlerContext ctx) throws Exception {
179         if (!closed) {
180             ctx.fireExceptionCaught(WriteTimeoutException.INSTANCE);
181             ctx.close();
182             closed = true;
183         }
184     }
185 
186     private final class WriteTimeoutTask implements Runnable, ChannelFutureListener {
187 
188         private final ChannelHandlerContext ctx;
189         private final ChannelPromise promise;
190 
191         // WriteTimeoutTask is also a node of a doubly-linked list
192         WriteTimeoutTask prev;
193         WriteTimeoutTask next;
194 
195         Future<?> scheduledFuture;
196 
197         WriteTimeoutTask(ChannelHandlerContext ctx, ChannelPromise promise) {
198             this.ctx = ctx;
199             this.promise = promise;
200         }
201 
202         @Override
203         public void run() {
204             // Was not written yet so issue a write timeout
205             // The promise itself will be failed with a ClosedChannelException once the close() was issued
206             // See https://github.com/netty/netty/issues/2159
207             if (!promise.isDone()) {
208                 try {
209                     writeTimedOut(ctx);
210                 } catch (Throwable t) {
211                     ctx.fireExceptionCaught(t);
212                 }
213             }
214             removeWriteTimeoutTask(this);
215         }
216 
217         @Override
218         public void operationComplete(ChannelFuture future) throws Exception {
219             // scheduledFuture has already be set when reaching here
220             scheduledFuture.cancel(false);
221 
222             // Check if its safe to modify the "doubly-linked-list" that we maintain. If its not we will schedule the
223             // modification so its picked up by the executor..
224             if (ctx.executor().inEventLoop()) {
225                 removeWriteTimeoutTask(this);
226             } else {
227                 // So let's just pass outself to the executor which will then take care of remove this task
228                 // from the doubly-linked list. Schedule ourself is fine as the promise itself is done.
229                 //
230                 // This fixes https://github.com/netty/netty/issues/11053
231                 assert promise.isDone();
232                 ctx.executor().execute(this);
233             }
234         }
235     }
236 }