View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.handler.timeout;
17  
18  import io.netty5.bootstrap.ServerBootstrap;
19  import io.netty5.channel.Channel;
20  import io.netty5.channel.ChannelHandler;
21  import io.netty5.channel.ChannelHandlerContext;
22  import io.netty5.channel.ChannelInitializer;
23  import io.netty5.util.concurrent.Future;
24  import io.netty5.util.concurrent.FutureListener;
25  
26  import java.util.concurrent.TimeUnit;
27  
28  import static java.util.Objects.requireNonNull;
29  
30  /**
31   * Raises a {@link WriteTimeoutException} when a write operation cannot finish in a certain period of time.
32   *
33   * <pre>
34   * // The connection is closed when a write operation cannot finish in 30 seconds.
35   *
36   * public class MyChannelInitializer extends {@link ChannelInitializer}&lt;{@link Channel}&gt; {
37   *     public void initChannel({@link Channel} channel) {
38   *         channel.pipeline().addLast("writeTimeoutHandler", new {@link WriteTimeoutHandler}(30);
39   *         channel.pipeline().addLast("myHandler", new MyHandler());
40   *     }
41   * }
42   *
43   * // Handler should handle the {@link WriteTimeoutException}.
44   * public class MyHandler implements {@link ChannelHandler} {
45   *     {@code @Override}
46   *     public void exceptionCaught({@link ChannelHandlerContext} ctx, {@link Throwable} cause)
47   *             throws {@link Exception} {
48   *         if (cause instanceof {@link WriteTimeoutException}) {
49   *             // do something
50   *         } else {
51   *             super.exceptionCaught(ctx, cause);
52   *         }
53   *     }
54   * }
55   *
56   * {@link ServerBootstrap} bootstrap = ...;
57   * ...
58   * bootstrap.childHandler(new MyChannelInitializer());
59   * ...
60   * </pre>
61   * @see ReadTimeoutHandler
62   * @see IdleStateHandler
63   */
64  public class WriteTimeoutHandler implements ChannelHandler {
65      private static final long MIN_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
66  
67      private final long timeoutNanos;
68  
69      /**
70       * A doubly-linked list to track all WriteTimeoutTasks
71       */
72      private WriteTimeoutTask lastTask;
73  
74      private boolean closed;
75  
76      /**
77       * Creates a new instance.
78       *
79       * @param timeoutSeconds
80       *        write timeout in seconds
81       */
82      public WriteTimeoutHandler(int timeoutSeconds) {
83          this(timeoutSeconds, TimeUnit.SECONDS);
84      }
85  
86      /**
87       * Creates a new instance.
88       *
89       * @param timeout
90       *        write timeout
91       * @param unit
92       *        the {@link TimeUnit} of {@code timeout}
93       */
94      public WriteTimeoutHandler(long timeout, TimeUnit unit) {
95          requireNonNull(unit, "unit");
96  
97          if (timeout <= 0) {
98              timeoutNanos = 0;
99          } else {
100             timeoutNanos = Math.max(unit.toNanos(timeout), MIN_TIMEOUT_NANOS);
101         }
102     }
103 
104     @Override
105     public Future<Void> write(ChannelHandlerContext ctx, Object msg) {
106         Future<Void> f = ctx.write(msg);
107         if (timeoutNanos > 0) {
108             scheduleTimeout(ctx, f);
109         }
110         return f;
111     }
112 
113     @Override
114     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
115         assert ctx.executor().inEventLoop();
116         WriteTimeoutTask task = lastTask;
117         lastTask = null;
118         while (task != null) {
119             assert task.ctx.executor().inEventLoop();
120             task.scheduledFuture.cancel();
121             WriteTimeoutTask prev = task.prev;
122             task.prev = null;
123             task.next = null;
124             task = prev;
125         }
126     }
127 
128     private void scheduleTimeout(final ChannelHandlerContext ctx, final Future<Void> future) {
129         // Schedule a timeout.
130         final WriteTimeoutTask task = new WriteTimeoutTask(ctx, future);
131         task.scheduledFuture = ctx.executor().schedule(task, timeoutNanos, TimeUnit.NANOSECONDS);
132 
133         if (!task.scheduledFuture.isDone()) {
134             addWriteTimeoutTask(task);
135 
136             // Cancel the scheduled timeout if the flush promise is complete.
137             future.addListener(task);
138         }
139     }
140 
141     private void addWriteTimeoutTask(WriteTimeoutTask task) {
142         assert task.ctx.executor().inEventLoop();
143         if (lastTask != null) {
144             lastTask.next = task;
145             task.prev = lastTask;
146         }
147         lastTask = task;
148     }
149 
150     private void removeWriteTimeoutTask(WriteTimeoutTask task) {
151         assert task.ctx.executor().inEventLoop();
152         if (task == lastTask) {
153             // task is the tail of list
154             assert task.next == null;
155             lastTask = lastTask.prev;
156             if (lastTask != null) {
157                 lastTask.next = null;
158             }
159         } else if (task.prev == null && task.next == null) {
160             // Since task is not lastTask, then it has been removed or not been added.
161             return;
162         } else if (task.prev == null) {
163             // task is the head of list and the list has at least 2 nodes
164             task.next.prev = null;
165         } else {
166             task.prev.next = task.next;
167             task.next.prev = task.prev;
168         }
169         task.prev = null;
170         task.next = null;
171     }
172 
173     /**
174      * Is called when a write timeout was detected
175      */
176     protected void writeTimedOut(ChannelHandlerContext ctx) throws Exception {
177         if (!closed) {
178             ctx.fireChannelExceptionCaught(WriteTimeoutException.INSTANCE);
179             ctx.close();
180             closed = true;
181         }
182     }
183 
184     private final class WriteTimeoutTask implements Runnable, FutureListener<Void> {
185 
186         private final ChannelHandlerContext ctx;
187         private final Future<Void> future;
188 
189         // WriteTimeoutTask is also a node of a doubly-linked list
190         WriteTimeoutTask prev;
191         WriteTimeoutTask next;
192 
193         Future<?> scheduledFuture;
194         WriteTimeoutTask(ChannelHandlerContext ctx, Future<Void> future) {
195             this.ctx = ctx;
196             this.future = future;
197         }
198 
199         @Override
200         public void run() {
201             // Was not written yet so issue a write timeout
202             // The promise itself will be failed with a ClosedChannelException once the close() was issued
203             // See https://github.com/netty/netty/issues/2159
204             if (!future.isDone()) {
205                 try {
206                     writeTimedOut(ctx);
207                 } catch (Throwable t) {
208                     ctx.fireChannelExceptionCaught(t);
209                 }
210             }
211             removeWriteTimeoutTask(this);
212         }
213 
214         @Override
215         public void operationComplete(Future<? extends Void> future) throws Exception {
216             // scheduledFuture has already be set when reaching here
217             scheduledFuture.cancel();
218 
219             // Check if its safe to modify the "doubly-linked-list" that we maintain. If its not we will schedule the
220             // modification so its picked up by the executor..
221             if (ctx.executor().inEventLoop()) {
222                 removeWriteTimeoutTask(this);
223             } else {
224                 // So let's just pass outself to the executor which will then take care of remove this task
225                 // from the doubly-linked list. Schedule ourself is fine as the promise itself is done.
226                 //
227                 // This fixes https://github.com/netty/netty/issues/11053
228                 assert future.isDone();
229                 ctx.executor().execute(this);
230             }
231         }
232     }
233 }