1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.timeout;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.util.concurrent.TimeUnit;
21
22 import org.jboss.netty.bootstrap.ServerBootstrap;
23 import org.jboss.netty.channel.Channel;
24 import org.jboss.netty.channel.ChannelHandler;
25 import org.jboss.netty.channel.ChannelHandler.Sharable;
26 import org.jboss.netty.channel.ChannelHandlerContext;
27 import org.jboss.netty.channel.ChannelPipeline;
28 import org.jboss.netty.channel.ChannelPipelineFactory;
29 import org.jboss.netty.channel.ChannelStateEvent;
30 import org.jboss.netty.channel.Channels;
31 import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
32 import org.jboss.netty.channel.MessageEvent;
33 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
34 import org.jboss.netty.channel.WriteCompletionEvent;
35 import org.jboss.netty.util.ExternalResourceReleasable;
36 import org.jboss.netty.util.HashedWheelTimer;
37 import org.jboss.netty.util.Timeout;
38 import org.jboss.netty.util.Timer;
39 import org.jboss.netty.util.TimerTask;
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122 @Sharable
123 public class IdleStateHandler extends SimpleChannelUpstreamHandler
124 implements LifeCycleAwareChannelHandler,
125 ExternalResourceReleasable {
126
127 final Timer timer;
128
129 final long readerIdleTimeMillis;
130 final long writerIdleTimeMillis;
131 final long allIdleTimeMillis;
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152 public IdleStateHandler(
153 Timer timer,
154 int readerIdleTimeSeconds,
155 int writerIdleTimeSeconds,
156 int allIdleTimeSeconds) {
157
158 this(timer,
159 readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
160 TimeUnit.SECONDS);
161 }
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185 public IdleStateHandler(
186 Timer timer,
187 long readerIdleTime, long writerIdleTime, long allIdleTime,
188 TimeUnit unit) {
189
190 if (timer == null) {
191 throw new NullPointerException("timer");
192 }
193 if (unit == null) {
194 throw new NullPointerException("unit");
195 }
196
197 this.timer = timer;
198 if (readerIdleTime <= 0) {
199 readerIdleTimeMillis = 0;
200 } else {
201 readerIdleTimeMillis = Math.max(unit.toMillis(readerIdleTime), 1);
202 }
203 if (writerIdleTime <= 0) {
204 writerIdleTimeMillis = 0;
205 } else {
206 writerIdleTimeMillis = Math.max(unit.toMillis(writerIdleTime), 1);
207 }
208 if (allIdleTime <= 0) {
209 allIdleTimeMillis = 0;
210 } else {
211 allIdleTimeMillis = Math.max(unit.toMillis(allIdleTime), 1);
212 }
213 }
214
215
216
217
218
219 public long getReaderIdleTimeInMillis() {
220 return readerIdleTimeMillis;
221 }
222
223
224
225
226
227 public long getWriterIdleTimeInMillis() {
228 return writerIdleTimeMillis;
229 }
230
231
232
233
234
235 public long getAllIdleTimeInMillis() {
236 return allIdleTimeMillis;
237 }
238
239
240
241
242
243
244 public void releaseExternalResources() {
245 timer.stop();
246 }
247
248 public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
249 if (ctx.getPipeline().isAttached()) {
250
251
252
253 initialize(ctx);
254 } else {
255
256
257 }
258 }
259
260 public void afterAdd(ChannelHandlerContext ctx) throws Exception {
261
262 }
263
264 public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
265 destroy(ctx);
266 }
267
268 public void afterRemove(ChannelHandlerContext ctx) throws Exception {
269
270 }
271
272 @Override
273 public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
274 throws Exception {
275
276
277
278 initialize(ctx);
279 ctx.sendUpstream(e);
280 }
281
282 @Override
283 public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
284 throws Exception {
285 destroy(ctx);
286 ctx.sendUpstream(e);
287 }
288
289 @Override
290 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
291 throws Exception {
292 State state = (State) ctx.getAttachment();
293 state.lastReadTime = System.currentTimeMillis();
294 ctx.sendUpstream(e);
295 }
296
297 @Override
298 public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e)
299 throws Exception {
300 if (e.getWrittenAmount() > 0) {
301 State state = (State) ctx.getAttachment();
302 state.lastWriteTime = System.currentTimeMillis();
303 }
304 ctx.sendUpstream(e);
305 }
306
307 private void initialize(ChannelHandlerContext ctx) {
308 State state = state(ctx);
309
310
311
312 synchronized (state) {
313 switch (state.state) {
314 case 1:
315 case 2:
316 return;
317 }
318 state.state = 1;
319 }
320
321 state.lastReadTime = state.lastWriteTime = System.currentTimeMillis();
322 if (readerIdleTimeMillis > 0) {
323 state.readerIdleTimeout = timer.newTimeout(
324 new ReaderIdleTimeoutTask(ctx),
325 readerIdleTimeMillis, TimeUnit.MILLISECONDS);
326 }
327 if (writerIdleTimeMillis > 0) {
328 state.writerIdleTimeout = timer.newTimeout(
329 new WriterIdleTimeoutTask(ctx),
330 writerIdleTimeMillis, TimeUnit.MILLISECONDS);
331 }
332 if (allIdleTimeMillis > 0) {
333 state.allIdleTimeout = timer.newTimeout(
334 new AllIdleTimeoutTask(ctx),
335 allIdleTimeMillis, TimeUnit.MILLISECONDS);
336 }
337 }
338
339 private static void destroy(ChannelHandlerContext ctx) {
340 State state = state(ctx);
341 synchronized (state) {
342 if (state.state != 1) {
343 return;
344 }
345 state.state = 2;
346 }
347
348 if (state.readerIdleTimeout != null) {
349 state.readerIdleTimeout.cancel();
350 state.readerIdleTimeout = null;
351 }
352 if (state.writerIdleTimeout != null) {
353 state.writerIdleTimeout.cancel();
354 state.writerIdleTimeout = null;
355 }
356 if (state.allIdleTimeout != null) {
357 state.allIdleTimeout.cancel();
358 state.allIdleTimeout = null;
359 }
360 }
361
362 private static State state(ChannelHandlerContext ctx) {
363 State state;
364 synchronized (ctx) {
365
366 state = (State) ctx.getAttachment();
367 if (state != null) {
368 return state;
369 }
370 state = new State();
371 ctx.setAttachment(state);
372 }
373 return state;
374 }
375
376 private void fireChannelIdle(
377 final ChannelHandlerContext ctx, final IdleState state, final long lastActivityTimeMillis) {
378 ctx.getPipeline().execute(new Runnable() {
379
380 public void run() {
381 try {
382 channelIdle(ctx, state, lastActivityTimeMillis);
383 } catch (Throwable t) {
384 fireExceptionCaught(ctx, t);
385 }
386 }
387 });
388 }
389
390 protected void channelIdle(
391 ChannelHandlerContext ctx, IdleState state, long lastActivityTimeMillis) throws Exception {
392 ctx.sendUpstream(new DefaultIdleStateEvent(ctx.getChannel(), state, lastActivityTimeMillis));
393 }
394
395 private final class ReaderIdleTimeoutTask implements TimerTask {
396
397 private final ChannelHandlerContext ctx;
398
399 ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
400 this.ctx = ctx;
401 }
402
403 public void run(Timeout timeout) throws Exception {
404 if (timeout.isCancelled() || !ctx.getChannel().isOpen()) {
405 return;
406 }
407
408 State state = (State) ctx.getAttachment();
409 long currentTime = System.currentTimeMillis();
410 long lastReadTime = state.lastReadTime;
411 long nextDelay = readerIdleTimeMillis - (currentTime - lastReadTime);
412 if (nextDelay <= 0) {
413
414 state.readerIdleTimeout =
415 timer.newTimeout(this, readerIdleTimeMillis, TimeUnit.MILLISECONDS);
416 fireChannelIdle(ctx, IdleState.READER_IDLE, lastReadTime);
417 } else {
418
419 state.readerIdleTimeout =
420 timer.newTimeout(this, nextDelay, TimeUnit.MILLISECONDS);
421 }
422 }
423
424 }
425
426 private final class WriterIdleTimeoutTask implements TimerTask {
427
428 private final ChannelHandlerContext ctx;
429
430 WriterIdleTimeoutTask(ChannelHandlerContext ctx) {
431 this.ctx = ctx;
432 }
433
434 public void run(Timeout timeout) throws Exception {
435 if (timeout.isCancelled() || !ctx.getChannel().isOpen()) {
436 return;
437 }
438
439 State state = (State) ctx.getAttachment();
440 long currentTime = System.currentTimeMillis();
441 long lastWriteTime = state.lastWriteTime;
442 long nextDelay = writerIdleTimeMillis - (currentTime - lastWriteTime);
443 if (nextDelay <= 0) {
444
445 state.writerIdleTimeout =
446 timer.newTimeout(this, writerIdleTimeMillis, TimeUnit.MILLISECONDS);
447 fireChannelIdle(ctx, IdleState.WRITER_IDLE, lastWriteTime);
448 } else {
449
450 state.writerIdleTimeout =
451 timer.newTimeout(this, nextDelay, TimeUnit.MILLISECONDS);
452 }
453 }
454 }
455
456 private final class AllIdleTimeoutTask implements TimerTask {
457
458 private final ChannelHandlerContext ctx;
459
460 AllIdleTimeoutTask(ChannelHandlerContext ctx) {
461 this.ctx = ctx;
462 }
463
464 public void run(Timeout timeout) throws Exception {
465 if (timeout.isCancelled() || !ctx.getChannel().isOpen()) {
466 return;
467 }
468
469 State state = (State) ctx.getAttachment();
470 long currentTime = System.currentTimeMillis();
471 long lastIoTime = Math.max(state.lastReadTime, state.lastWriteTime);
472 long nextDelay = allIdleTimeMillis - (currentTime - lastIoTime);
473 if (nextDelay <= 0) {
474
475
476 state.allIdleTimeout =
477 timer.newTimeout(this, allIdleTimeMillis, TimeUnit.MILLISECONDS);
478 fireChannelIdle(ctx, IdleState.ALL_IDLE, lastIoTime);
479 } else {
480
481
482 state.allIdleTimeout =
483 timer.newTimeout(this, nextDelay, TimeUnit.MILLISECONDS);
484 }
485 }
486 }
487
488 private static final class State {
489
490 int state;
491
492 volatile Timeout readerIdleTimeout;
493 volatile long lastReadTime;
494
495 volatile Timeout writerIdleTimeout;
496 volatile long lastWriteTime;
497
498 volatile Timeout allIdleTimeout;
499
500 State() {
501 }
502 }
503 }