View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.handler.timeout;
17  
18  import io.netty5.bootstrap.ServerBootstrap;
19  import io.netty5.channel.Channel;
20  import io.netty5.channel.ChannelHandler;
21  import io.netty5.channel.ChannelHandlerContext;
22  import io.netty5.channel.ChannelInitializer;
23  import io.netty5.util.concurrent.Future;
24  import io.netty5.util.concurrent.FutureListener;
25  
26  import java.util.concurrent.TimeUnit;
27  
28  import static java.util.Objects.requireNonNull;
29  
30  /**
31   * Triggers an {@link IdleStateEvent} when a {@link Channel} has not performed
32   * read, write, or both operation for a while.
33   *
34   * <h3>Supported idle states</h3>
35   * <table border="1">
36   * <tr>
37   * <th>Property</th><th>Meaning</th>
38   * </tr>
39   * <tr>
40   * <td>{@code readerIdleTime}</td>
41   * <td>an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE}
42   *     will be triggered when no read was performed for the specified period of
43   *     time.  Specify {@code 0} to disable.</td>
44   * </tr>
45   * <tr>
46   * <td>{@code writerIdleTime}</td>
47   * <td>an {@link IdleStateEvent} whose state is {@link IdleState#WRITER_IDLE}
48   *     will be triggered when no write was performed for the specified period of
49   *     time.  Specify {@code 0} to disable.</td>
50   * </tr>
51   * <tr>
52   * <td>{@code allIdleTime}</td>
53   * <td>an {@link IdleStateEvent} whose state is {@link IdleState#ALL_IDLE}
54   *     will be triggered when neither read nor write was performed for the
55   *     specified period of time.  Specify {@code 0} to disable.</td>
56   * </tr>
57   * </table>
58   *
59   * <pre>
60   * // An example that sends a ping message when there is no outbound traffic
61   * // for 30 seconds.  The connection is closed when there is no inbound traffic
62   * // for 60 seconds.
63   *
64   * public class MyChannelInitializer extends {@link ChannelInitializer}&lt;{@link Channel}&gt; {
65   *     {@code @Override}
66   *     public void initChannel({@link Channel} channel) {
67   *         channel.pipeline().addLast("idleStateHandler", new {@link IdleStateHandler}(60, 30, 0));
68   *         channel.pipeline().addLast("myHandler", new MyHandler());
69   *     }
70   * }
71   *
72   * // Handler should handle the {@link IdleStateEvent} triggered by {@link IdleStateHandler}.
73   * public class MyHandler implements {@link ChannelHandler} {
74   *     {@code @Override}
75   *     public void userEventTriggered({@link ChannelHandlerContext} ctx, {@link Object} evt) throws {@link Exception} {
76   *         if (evt instanceof {@link IdleStateEvent}) {
77   *             {@link IdleStateEvent} e = ({@link IdleStateEvent}) evt;
78   *             if (e.state() == {@link IdleState}.READER_IDLE) {
79   *                 ctx.close();
80   *             } else if (e.state() == {@link IdleState}.WRITER_IDLE) {
81   *                 ctx.writeAndFlush(new PingMessage());
82   *             }
83   *         }
84   *     }
85   * }
86   *
87   * {@link ServerBootstrap} bootstrap = ...;
88   * ...
89   * bootstrap.childHandler(new MyChannelInitializer());
90   * ...
91   * </pre>
92   *
93   * @see ReadTimeoutHandler
94   * @see WriteTimeoutHandler
95   */
96  public class IdleStateHandler implements ChannelHandler {
97      private static final long MIN_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
98  
99      // Not create a new ChannelFutureListeners per write operation to reduce GC pressure.
100     private final FutureListener<Void> writeListener = future -> {
101         lastWriteTime = ticksInNanos();
102         firstWriterIdleEvent = firstAllIdleEvent = true;
103     };
104 
105     private final long readerIdleTimeNanos;
106     private final long writerIdleTimeNanos;
107     private final long allIdleTimeNanos;
108 
109     private Future<?> readerIdleTimeout;
110     private long lastReadTime;
111     private boolean firstReaderIdleEvent = true;
112 
113     private Future<?> writerIdleTimeout;
114     private long lastWriteTime;
115     private boolean firstWriterIdleEvent = true;
116 
117     private Future<?> allIdleTimeout;
118     private boolean firstAllIdleEvent = true;
119 
120     private byte state; // 0 - none, 1 - initialized, 2 - destroyed
121     private boolean reading;
122 
123     /**
124      * Creates a new instance firing {@link IdleStateEvent}s.
125      *
126      * @param readerIdleTimeSeconds
127      *        an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE}
128      *        will be triggered when no read was performed for the specified
129      *        period of time.  Specify {@code 0} to disable.
130      * @param writerIdleTimeSeconds
131      *        an {@link IdleStateEvent} whose state is {@link IdleState#WRITER_IDLE}
132      *        will be triggered when no write was performed for the specified
133      *        period of time.  Specify {@code 0} to disable.
134      * @param allIdleTimeSeconds
135      *        an {@link IdleStateEvent} whose state is {@link IdleState#ALL_IDLE}
136      *        will be triggered when neither read nor write was performed for
137      *        the specified period of time.  Specify {@code 0} to disable.
138      */
139     public IdleStateHandler(
140             int readerIdleTimeSeconds,
141             int writerIdleTimeSeconds,
142             int allIdleTimeSeconds) {
143 
144         this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
145              TimeUnit.SECONDS);
146     }
147 
148     /**
149      * Creates a new instance firing {@link IdleStateEvent}s.
150      * @param readerIdleTime
151      *        an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE}
152      *        will be triggered when no read was performed for the specified
153      *        period of time.  Specify {@code 0} to disable.
154      * @param writerIdleTime
155      *        an {@link IdleStateEvent} whose state is {@link IdleState#WRITER_IDLE}
156      *        will be triggered when no write was performed for the specified
157      *        period of time.  Specify {@code 0} to disable.
158      * @param allIdleTime
159      *        an {@link IdleStateEvent} whose state is {@link IdleState#ALL_IDLE}
160      *        will be triggered when neither read nor write was performed for
161      *        the specified period of time.  Specify {@code 0} to disable.
162      * @param unit
163      *        the {@link TimeUnit} of {@code readerIdleTime},
164      *        {@code writeIdleTime}, and {@code allIdleTime}
165      */
166     public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime,
167             TimeUnit unit) {
168         requireNonNull(unit, "unit");
169 
170         if (readerIdleTime <= 0) {
171             readerIdleTimeNanos = 0;
172         } else {
173             readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
174         }
175         if (writerIdleTime <= 0) {
176             writerIdleTimeNanos = 0;
177         } else {
178             writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
179         }
180         if (allIdleTime <= 0) {
181             allIdleTimeNanos = 0;
182         } else {
183             allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
184         }
185     }
186 
187     /**
188      * Return the readerIdleTime that was given when instance this class in milliseconds.
189      *
190      */
191     public long getReaderIdleTimeInMillis() {
192         return TimeUnit.NANOSECONDS.toMillis(readerIdleTimeNanos);
193     }
194 
195     /**
196      * Return the writerIdleTime that was given when instance this class in milliseconds.
197      *
198      */
199     public long getWriterIdleTimeInMillis() {
200         return TimeUnit.NANOSECONDS.toMillis(writerIdleTimeNanos);
201     }
202 
203     /**
204      * Return the allIdleTime that was given when instance this class in milliseconds.
205      *
206      */
207     public long getAllIdleTimeInMillis() {
208         return TimeUnit.NANOSECONDS.toMillis(allIdleTimeNanos);
209     }
210 
211     @Override
212     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
213         if (ctx.channel().isActive() && ctx.channel().isRegistered()) {
214             // channelActive() event has been fired already, which means this.channelActive() will
215             // not be invoked. We have to initialize here instead.
216             initialize(ctx);
217         } else {
218             // channelActive() event has not been fired yet.  this.channelActive() will be invoked
219             // and initialization will occur there.
220         }
221     }
222 
223     @Override
224     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
225         destroy();
226     }
227 
228     @Override
229     public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
230         // Initialize early if channel is active already.
231         if (ctx.channel().isActive()) {
232             initialize(ctx);
233         }
234         ctx.fireChannelRegistered();
235     }
236 
237     @Override
238     public void channelActive(ChannelHandlerContext ctx) throws Exception {
239         // This method will be invoked only if this handler was added
240         // before channelActive() event is fired.  If a user adds this handler
241         // after the channelActive() event, initialize() will be called by beforeAdd().
242         initialize(ctx);
243         ctx.fireChannelActive();
244     }
245 
246     @Override
247     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
248         destroy();
249         ctx.fireChannelInactive();
250     }
251 
252     @Override
253     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
254         if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
255             reading = true;
256             firstReaderIdleEvent = firstAllIdleEvent = true;
257         }
258         ctx.fireChannelRead(msg);
259     }
260 
261     @Override
262     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
263         if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) {
264             lastReadTime = ticksInNanos();
265             reading = false;
266         }
267         ctx.fireChannelReadComplete();
268     }
269 
270     @Override
271     public Future<Void> write(ChannelHandlerContext ctx, Object msg) {
272         Future<Void> future = ctx.write(msg);
273         // Allow writing with void promise if handler is only configured for read timeout events.
274         if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
275             future.addListener(writeListener);
276         }
277         return future;
278     }
279 
280     private void initialize(ChannelHandlerContext ctx) {
281         // Avoid the case where destroy() is called before scheduling timeouts.
282         // See: https://github.com/netty/netty/issues/143
283         switch (state) {
284         case 1:
285         case 2:
286             return;
287         default:
288              break;
289         }
290 
291         state = 1;
292 
293         lastReadTime = lastWriteTime = ticksInNanos();
294         if (readerIdleTimeNanos > 0) {
295             readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
296                     readerIdleTimeNanos, TimeUnit.NANOSECONDS);
297         }
298         if (writerIdleTimeNanos > 0) {
299             writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
300                     writerIdleTimeNanos, TimeUnit.NANOSECONDS);
301         }
302         if (allIdleTimeNanos > 0) {
303             allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
304                     allIdleTimeNanos, TimeUnit.NANOSECONDS);
305         }
306     }
307 
308     /**
309      * This method is visible for testing!
310      */
311     long ticksInNanos() {
312         return System.nanoTime();
313     }
314 
315     /**
316      * This method is visible for testing!
317      */
318     Future<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
319         return ctx.executor().schedule(task, delay, unit);
320     }
321 
322     private void destroy() {
323         state = 2;
324 
325         if (readerIdleTimeout != null) {
326             readerIdleTimeout.cancel();
327             readerIdleTimeout = null;
328         }
329         if (writerIdleTimeout != null) {
330             writerIdleTimeout.cancel();
331             writerIdleTimeout = null;
332         }
333         if (allIdleTimeout != null) {
334             allIdleTimeout.cancel();
335             allIdleTimeout = null;
336         }
337     }
338 
339     /**
340      * Is called when an {@link IdleStateEvent} should be fired. This implementation calls
341      * {@link ChannelHandlerContext#fireChannelInboundEvent(Object)}.
342      */
343     protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
344         ctx.fireChannelInboundEvent(evt);
345     }
346 
347     /**
348      * Returns a {@link IdleStateEvent}.
349      */
350     protected IdleStateEvent newIdleStateEvent(IdleState state, boolean first) {
351         switch (state) {
352             case ALL_IDLE:
353                 return first ? IdleStateEvent.FIRST_ALL_IDLE_STATE_EVENT : IdleStateEvent.ALL_IDLE_STATE_EVENT;
354             case READER_IDLE:
355                 return first ? IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT : IdleStateEvent.READER_IDLE_STATE_EVENT;
356             case WRITER_IDLE:
357                 return first ? IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT : IdleStateEvent.WRITER_IDLE_STATE_EVENT;
358             default:
359                 throw new IllegalArgumentException("Unhandled: state=" + state + ", first=" + first);
360         }
361     }
362 
363     private abstract static class AbstractIdleTask implements Runnable {
364 
365         private final ChannelHandlerContext ctx;
366 
367         AbstractIdleTask(ChannelHandlerContext ctx) {
368             this.ctx = ctx;
369         }
370 
371         @Override
372         public void run() {
373             if (!ctx.channel().isOpen()) {
374                 return;
375             }
376 
377             run(ctx);
378         }
379 
380         protected abstract void run(ChannelHandlerContext ctx);
381     }
382 
383     private final class ReaderIdleTimeoutTask extends AbstractIdleTask {
384 
385         ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
386             super(ctx);
387         }
388 
389         @Override
390         protected void run(ChannelHandlerContext ctx) {
391             long nextDelay = readerIdleTimeNanos;
392             if (!reading) {
393                 nextDelay -= ticksInNanos() - lastReadTime;
394             }
395 
396             if (nextDelay <= 0) {
397                 // Reader is idle - set a new timeout and notify the callback.
398                 readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
399 
400                 boolean first = firstReaderIdleEvent;
401                 firstReaderIdleEvent = false;
402 
403                 try {
404                     IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
405                     channelIdle(ctx, event);
406                 } catch (Throwable t) {
407                     ctx.fireChannelExceptionCaught(t);
408                 }
409             } else {
410                 // Read occurred before the timeout - set a new timeout with shorter delay.
411                 readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
412             }
413         }
414     }
415 
416     private final class WriterIdleTimeoutTask extends AbstractIdleTask {
417 
418         WriterIdleTimeoutTask(ChannelHandlerContext ctx) {
419             super(ctx);
420         }
421 
422         @Override
423         protected void run(ChannelHandlerContext ctx) {
424 
425             long lastWriteTime = IdleStateHandler.this.lastWriteTime;
426             long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
427             if (nextDelay <= 0) {
428                 // Writer is idle - set a new timeout and notify the callback.
429                 writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);
430 
431                 boolean first = firstWriterIdleEvent;
432                 firstWriterIdleEvent = false;
433 
434                 try {
435                     IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
436                     channelIdle(ctx, event);
437                 } catch (Throwable t) {
438                     ctx.fireChannelExceptionCaught(t);
439                 }
440             } else {
441                 // Write occurred before the timeout - set a new timeout with shorter delay.
442                 writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
443             }
444         }
445     }
446 
447     private final class AllIdleTimeoutTask extends AbstractIdleTask {
448 
449         AllIdleTimeoutTask(ChannelHandlerContext ctx) {
450             super(ctx);
451         }
452 
453         @Override
454         protected void run(ChannelHandlerContext ctx) {
455 
456             long nextDelay = allIdleTimeNanos;
457             if (!reading) {
458                 nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
459             }
460             if (nextDelay <= 0) {
461                 // Both reader and writer are idle - set a new timeout and
462                 // notify the callback.
463                 allIdleTimeout = schedule(ctx, this, allIdleTimeNanos, TimeUnit.NANOSECONDS);
464 
465                 boolean first = firstAllIdleEvent;
466                 firstAllIdleEvent = false;
467 
468                 try {
469                     IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, first);
470                     channelIdle(ctx, event);
471                 } catch (Throwable t) {
472                     ctx.fireChannelExceptionCaught(t);
473                 }
474             } else {
475                 // Either read or write occurred before the timeout - set a new
476                 // timeout with shorter delay.
477                 allIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
478             }
479         }
480     }
481 }