1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty5.handler.timeout;
17
18 import io.netty5.bootstrap.ServerBootstrap;
19 import io.netty5.channel.Channel;
20 import io.netty5.channel.ChannelHandler;
21 import io.netty5.channel.ChannelHandlerContext;
22 import io.netty5.channel.ChannelInitializer;
23 import io.netty5.util.concurrent.Future;
24 import io.netty5.util.concurrent.FutureListener;
25
26 import java.util.concurrent.TimeUnit;
27
28 import static java.util.Objects.requireNonNull;
29
30 /**
31 * Triggers an {@link IdleStateEvent} when a {@link Channel} has not performed
32 * read, write, or both operation for a while.
33 *
34 * <h3>Supported idle states</h3>
35 * <table border="1">
36 * <tr>
37 * <th>Property</th><th>Meaning</th>
38 * </tr>
39 * <tr>
40 * <td>{@code readerIdleTime}</td>
41 * <td>an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE}
42 * will be triggered when no read was performed for the specified period of
43 * time. Specify {@code 0} to disable.</td>
44 * </tr>
45 * <tr>
46 * <td>{@code writerIdleTime}</td>
47 * <td>an {@link IdleStateEvent} whose state is {@link IdleState#WRITER_IDLE}
48 * will be triggered when no write was performed for the specified period of
49 * time. Specify {@code 0} to disable.</td>
50 * </tr>
51 * <tr>
52 * <td>{@code allIdleTime}</td>
53 * <td>an {@link IdleStateEvent} whose state is {@link IdleState#ALL_IDLE}
54 * will be triggered when neither read nor write was performed for the
55 * specified period of time. Specify {@code 0} to disable.</td>
56 * </tr>
57 * </table>
58 *
59 * <pre>
60 * // An example that sends a ping message when there is no outbound traffic
61 * // for 30 seconds. The connection is closed when there is no inbound traffic
62 * // for 60 seconds.
63 *
64 * public class MyChannelInitializer extends {@link ChannelInitializer}<{@link Channel}> {
65 * {@code @Override}
66 * public void initChannel({@link Channel} channel) {
67 * channel.pipeline().addLast("idleStateHandler", new {@link IdleStateHandler}(60, 30, 0));
68 * channel.pipeline().addLast("myHandler", new MyHandler());
69 * }
70 * }
71 *
72 * // Handler should handle the {@link IdleStateEvent} triggered by {@link IdleStateHandler}.
73 * public class MyHandler implements {@link ChannelHandler} {
74 * {@code @Override}
75 * public void userEventTriggered({@link ChannelHandlerContext} ctx, {@link Object} evt) throws {@link Exception} {
76 * if (evt instanceof {@link IdleStateEvent}) {
77 * {@link IdleStateEvent} e = ({@link IdleStateEvent}) evt;
78 * if (e.state() == {@link IdleState}.READER_IDLE) {
79 * ctx.close();
80 * } else if (e.state() == {@link IdleState}.WRITER_IDLE) {
81 * ctx.writeAndFlush(new PingMessage());
82 * }
83 * }
84 * }
85 * }
86 *
87 * {@link ServerBootstrap} bootstrap = ...;
88 * ...
89 * bootstrap.childHandler(new MyChannelInitializer());
90 * ...
91 * </pre>
92 *
93 * @see ReadTimeoutHandler
94 * @see WriteTimeoutHandler
95 */
96 public class IdleStateHandler implements ChannelHandler {
97 private static final long MIN_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
98
99 // Not create a new ChannelFutureListeners per write operation to reduce GC pressure.
100 private final FutureListener<Void> writeListener = future -> {
101 lastWriteTime = ticksInNanos();
102 firstWriterIdleEvent = firstAllIdleEvent = true;
103 };
104
105 private final long readerIdleTimeNanos;
106 private final long writerIdleTimeNanos;
107 private final long allIdleTimeNanos;
108
109 private Future<?> readerIdleTimeout;
110 private long lastReadTime;
111 private boolean firstReaderIdleEvent = true;
112
113 private Future<?> writerIdleTimeout;
114 private long lastWriteTime;
115 private boolean firstWriterIdleEvent = true;
116
117 private Future<?> allIdleTimeout;
118 private boolean firstAllIdleEvent = true;
119
120 private byte state; // 0 - none, 1 - initialized, 2 - destroyed
121 private boolean reading;
122
123 /**
124 * Creates a new instance firing {@link IdleStateEvent}s.
125 *
126 * @param readerIdleTimeSeconds
127 * an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE}
128 * will be triggered when no read was performed for the specified
129 * period of time. Specify {@code 0} to disable.
130 * @param writerIdleTimeSeconds
131 * an {@link IdleStateEvent} whose state is {@link IdleState#WRITER_IDLE}
132 * will be triggered when no write was performed for the specified
133 * period of time. Specify {@code 0} to disable.
134 * @param allIdleTimeSeconds
135 * an {@link IdleStateEvent} whose state is {@link IdleState#ALL_IDLE}
136 * will be triggered when neither read nor write was performed for
137 * the specified period of time. Specify {@code 0} to disable.
138 */
139 public IdleStateHandler(
140 int readerIdleTimeSeconds,
141 int writerIdleTimeSeconds,
142 int allIdleTimeSeconds) {
143
144 this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
145 TimeUnit.SECONDS);
146 }
147
148 /**
149 * Creates a new instance firing {@link IdleStateEvent}s.
150 * @param readerIdleTime
151 * an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE}
152 * will be triggered when no read was performed for the specified
153 * period of time. Specify {@code 0} to disable.
154 * @param writerIdleTime
155 * an {@link IdleStateEvent} whose state is {@link IdleState#WRITER_IDLE}
156 * will be triggered when no write was performed for the specified
157 * period of time. Specify {@code 0} to disable.
158 * @param allIdleTime
159 * an {@link IdleStateEvent} whose state is {@link IdleState#ALL_IDLE}
160 * will be triggered when neither read nor write was performed for
161 * the specified period of time. Specify {@code 0} to disable.
162 * @param unit
163 * the {@link TimeUnit} of {@code readerIdleTime},
164 * {@code writeIdleTime}, and {@code allIdleTime}
165 */
166 public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime,
167 TimeUnit unit) {
168 requireNonNull(unit, "unit");
169
170 if (readerIdleTime <= 0) {
171 readerIdleTimeNanos = 0;
172 } else {
173 readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
174 }
175 if (writerIdleTime <= 0) {
176 writerIdleTimeNanos = 0;
177 } else {
178 writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
179 }
180 if (allIdleTime <= 0) {
181 allIdleTimeNanos = 0;
182 } else {
183 allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
184 }
185 }
186
187 /**
188 * Return the readerIdleTime that was given when instance this class in milliseconds.
189 *
190 */
191 public long getReaderIdleTimeInMillis() {
192 return TimeUnit.NANOSECONDS.toMillis(readerIdleTimeNanos);
193 }
194
195 /**
196 * Return the writerIdleTime that was given when instance this class in milliseconds.
197 *
198 */
199 public long getWriterIdleTimeInMillis() {
200 return TimeUnit.NANOSECONDS.toMillis(writerIdleTimeNanos);
201 }
202
203 /**
204 * Return the allIdleTime that was given when instance this class in milliseconds.
205 *
206 */
207 public long getAllIdleTimeInMillis() {
208 return TimeUnit.NANOSECONDS.toMillis(allIdleTimeNanos);
209 }
210
211 @Override
212 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
213 if (ctx.channel().isActive() && ctx.channel().isRegistered()) {
214 // channelActive() event has been fired already, which means this.channelActive() will
215 // not be invoked. We have to initialize here instead.
216 initialize(ctx);
217 } else {
218 // channelActive() event has not been fired yet. this.channelActive() will be invoked
219 // and initialization will occur there.
220 }
221 }
222
223 @Override
224 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
225 destroy();
226 }
227
228 @Override
229 public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
230 // Initialize early if channel is active already.
231 if (ctx.channel().isActive()) {
232 initialize(ctx);
233 }
234 ctx.fireChannelRegistered();
235 }
236
237 @Override
238 public void channelActive(ChannelHandlerContext ctx) throws Exception {
239 // This method will be invoked only if this handler was added
240 // before channelActive() event is fired. If a user adds this handler
241 // after the channelActive() event, initialize() will be called by beforeAdd().
242 initialize(ctx);
243 ctx.fireChannelActive();
244 }
245
246 @Override
247 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
248 destroy();
249 ctx.fireChannelInactive();
250 }
251
252 @Override
253 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
254 if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
255 reading = true;
256 firstReaderIdleEvent = firstAllIdleEvent = true;
257 }
258 ctx.fireChannelRead(msg);
259 }
260
261 @Override
262 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
263 if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) {
264 lastReadTime = ticksInNanos();
265 reading = false;
266 }
267 ctx.fireChannelReadComplete();
268 }
269
270 @Override
271 public Future<Void> write(ChannelHandlerContext ctx, Object msg) {
272 Future<Void> future = ctx.write(msg);
273 // Allow writing with void promise if handler is only configured for read timeout events.
274 if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
275 future.addListener(writeListener);
276 }
277 return future;
278 }
279
280 private void initialize(ChannelHandlerContext ctx) {
281 // Avoid the case where destroy() is called before scheduling timeouts.
282 // See: https://github.com/netty/netty/issues/143
283 switch (state) {
284 case 1:
285 case 2:
286 return;
287 default:
288 break;
289 }
290
291 state = 1;
292
293 lastReadTime = lastWriteTime = ticksInNanos();
294 if (readerIdleTimeNanos > 0) {
295 readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
296 readerIdleTimeNanos, TimeUnit.NANOSECONDS);
297 }
298 if (writerIdleTimeNanos > 0) {
299 writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
300 writerIdleTimeNanos, TimeUnit.NANOSECONDS);
301 }
302 if (allIdleTimeNanos > 0) {
303 allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
304 allIdleTimeNanos, TimeUnit.NANOSECONDS);
305 }
306 }
307
308 /**
309 * This method is visible for testing!
310 */
311 long ticksInNanos() {
312 return System.nanoTime();
313 }
314
315 /**
316 * This method is visible for testing!
317 */
318 Future<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
319 return ctx.executor().schedule(task, delay, unit);
320 }
321
322 private void destroy() {
323 state = 2;
324
325 if (readerIdleTimeout != null) {
326 readerIdleTimeout.cancel();
327 readerIdleTimeout = null;
328 }
329 if (writerIdleTimeout != null) {
330 writerIdleTimeout.cancel();
331 writerIdleTimeout = null;
332 }
333 if (allIdleTimeout != null) {
334 allIdleTimeout.cancel();
335 allIdleTimeout = null;
336 }
337 }
338
339 /**
340 * Is called when an {@link IdleStateEvent} should be fired. This implementation calls
341 * {@link ChannelHandlerContext#fireChannelInboundEvent(Object)}.
342 */
343 protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
344 ctx.fireChannelInboundEvent(evt);
345 }
346
347 /**
348 * Returns a {@link IdleStateEvent}.
349 */
350 protected IdleStateEvent newIdleStateEvent(IdleState state, boolean first) {
351 switch (state) {
352 case ALL_IDLE:
353 return first ? IdleStateEvent.FIRST_ALL_IDLE_STATE_EVENT : IdleStateEvent.ALL_IDLE_STATE_EVENT;
354 case READER_IDLE:
355 return first ? IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT : IdleStateEvent.READER_IDLE_STATE_EVENT;
356 case WRITER_IDLE:
357 return first ? IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT : IdleStateEvent.WRITER_IDLE_STATE_EVENT;
358 default:
359 throw new IllegalArgumentException("Unhandled: state=" + state + ", first=" + first);
360 }
361 }
362
363 private abstract static class AbstractIdleTask implements Runnable {
364
365 private final ChannelHandlerContext ctx;
366
367 AbstractIdleTask(ChannelHandlerContext ctx) {
368 this.ctx = ctx;
369 }
370
371 @Override
372 public void run() {
373 if (!ctx.channel().isOpen()) {
374 return;
375 }
376
377 run(ctx);
378 }
379
380 protected abstract void run(ChannelHandlerContext ctx);
381 }
382
383 private final class ReaderIdleTimeoutTask extends AbstractIdleTask {
384
385 ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
386 super(ctx);
387 }
388
389 @Override
390 protected void run(ChannelHandlerContext ctx) {
391 long nextDelay = readerIdleTimeNanos;
392 if (!reading) {
393 nextDelay -= ticksInNanos() - lastReadTime;
394 }
395
396 if (nextDelay <= 0) {
397 // Reader is idle - set a new timeout and notify the callback.
398 readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
399
400 boolean first = firstReaderIdleEvent;
401 firstReaderIdleEvent = false;
402
403 try {
404 IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
405 channelIdle(ctx, event);
406 } catch (Throwable t) {
407 ctx.fireChannelExceptionCaught(t);
408 }
409 } else {
410 // Read occurred before the timeout - set a new timeout with shorter delay.
411 readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
412 }
413 }
414 }
415
416 private final class WriterIdleTimeoutTask extends AbstractIdleTask {
417
418 WriterIdleTimeoutTask(ChannelHandlerContext ctx) {
419 super(ctx);
420 }
421
422 @Override
423 protected void run(ChannelHandlerContext ctx) {
424
425 long lastWriteTime = IdleStateHandler.this.lastWriteTime;
426 long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
427 if (nextDelay <= 0) {
428 // Writer is idle - set a new timeout and notify the callback.
429 writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);
430
431 boolean first = firstWriterIdleEvent;
432 firstWriterIdleEvent = false;
433
434 try {
435 IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
436 channelIdle(ctx, event);
437 } catch (Throwable t) {
438 ctx.fireChannelExceptionCaught(t);
439 }
440 } else {
441 // Write occurred before the timeout - set a new timeout with shorter delay.
442 writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
443 }
444 }
445 }
446
447 private final class AllIdleTimeoutTask extends AbstractIdleTask {
448
449 AllIdleTimeoutTask(ChannelHandlerContext ctx) {
450 super(ctx);
451 }
452
453 @Override
454 protected void run(ChannelHandlerContext ctx) {
455
456 long nextDelay = allIdleTimeNanos;
457 if (!reading) {
458 nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
459 }
460 if (nextDelay <= 0) {
461 // Both reader and writer are idle - set a new timeout and
462 // notify the callback.
463 allIdleTimeout = schedule(ctx, this, allIdleTimeNanos, TimeUnit.NANOSECONDS);
464
465 boolean first = firstAllIdleEvent;
466 firstAllIdleEvent = false;
467
468 try {
469 IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, first);
470 channelIdle(ctx, event);
471 } catch (Throwable t) {
472 ctx.fireChannelExceptionCaught(t);
473 }
474 } else {
475 // Either read or write occurred before the timeout - set a new
476 // timeout with shorter delay.
477 allIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
478 }
479 }
480 }
481 }