1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.timeout;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.util.concurrent.TimeUnit;
21
22 import org.jboss.netty.bootstrap.ServerBootstrap;
23 import org.jboss.netty.channel.Channel;
24 import org.jboss.netty.channel.ChannelHandler;
25 import org.jboss.netty.channel.ChannelHandler.Sharable;
26 import org.jboss.netty.channel.ChannelHandlerContext;
27 import org.jboss.netty.channel.ChannelPipeline;
28 import org.jboss.netty.channel.ChannelPipelineFactory;
29 import org.jboss.netty.channel.ChannelStateEvent;
30 import org.jboss.netty.channel.Channels;
31 import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
32 import org.jboss.netty.channel.MessageEvent;
33 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
34 import org.jboss.netty.channel.WriteCompletionEvent;
35 import org.jboss.netty.util.ExternalResourceReleasable;
36 import org.jboss.netty.util.HashedWheelTimer;
37 import org.jboss.netty.util.Timeout;
38 import org.jboss.netty.util.Timer;
39 import org.jboss.netty.util.TimerTask;
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122 @Sharable
123 public class IdleStateHandler extends SimpleChannelUpstreamHandler
124 implements LifeCycleAwareChannelHandler,
125 ExternalResourceReleasable {
126
127 final Timer timer;
128
129 final long readerIdleTimeMillis;
130 final long writerIdleTimeMillis;
131 final long allIdleTimeMillis;
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152 public IdleStateHandler(
153 Timer timer,
154 int readerIdleTimeSeconds,
155 int writerIdleTimeSeconds,
156 int allIdleTimeSeconds) {
157
158 this(timer,
159 readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
160 TimeUnit.SECONDS);
161 }
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185 public IdleStateHandler(
186 Timer timer,
187 long readerIdleTime, long writerIdleTime, long allIdleTime,
188 TimeUnit unit) {
189
190 if (timer == null) {
191 throw new NullPointerException("timer");
192 }
193 if (unit == null) {
194 throw new NullPointerException("unit");
195 }
196
197 this.timer = timer;
198 if (readerIdleTime <= 0) {
199 readerIdleTimeMillis = 0;
200 } else {
201 readerIdleTimeMillis = Math.max(unit.toMillis(readerIdleTime), 1);
202 }
203 if (writerIdleTime <= 0) {
204 writerIdleTimeMillis = 0;
205 } else {
206 writerIdleTimeMillis = Math.max(unit.toMillis(writerIdleTime), 1);
207 }
208 if (allIdleTime <= 0) {
209 allIdleTimeMillis = 0;
210 } else {
211 allIdleTimeMillis = Math.max(unit.toMillis(allIdleTime), 1);
212 }
213 }
214
215
216
217
218
219 public long getReaderIdleTimeInMillis() {
220 return readerIdleTimeMillis;
221 }
222
223
224
225
226
227 public long getWriterIdleTimeInMillis() {
228 return writerIdleTimeMillis;
229 }
230
231
232
233
234
235 public long getAllIdleTimeInMillis() {
236 return allIdleTimeMillis;
237 }
238
239
240
241
242
243
244 public void releaseExternalResources() {
245 timer.stop();
246 }
247
248 public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
249 if (ctx.getPipeline().isAttached()) {
250
251
252
253 initialize(ctx);
254 } else {
255
256
257 }
258 }
259
260 public void afterAdd(ChannelHandlerContext ctx) throws Exception {
261
262 }
263
264 public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
265 destroy(ctx);
266 }
267
268 public void afterRemove(ChannelHandlerContext ctx) throws Exception {
269
270 }
271
272 @Override
273 public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
274 throws Exception {
275
276
277
278 initialize(ctx);
279 ctx.sendUpstream(e);
280 }
281
282 @Override
283 public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
284 throws Exception {
285 destroy(ctx);
286 ctx.sendUpstream(e);
287 }
288
289 @Override
290 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
291 throws Exception {
292 State state = (State) ctx.getAttachment();
293 state.lastReadTime = System.currentTimeMillis();
294 ctx.sendUpstream(e);
295 }
296
297 @Override
298 public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e)
299 throws Exception {
300 if (e.getWrittenAmount() > 0) {
301 State state = (State) ctx.getAttachment();
302 state.lastWriteTime = System.currentTimeMillis();
303 }
304 ctx.sendUpstream(e);
305 }
306
307 private void initialize(ChannelHandlerContext ctx) {
308 State state = state(ctx);
309
310
311
312 synchronized (state) {
313 switch (state.state) {
314 case 1:
315 case 2:
316 return;
317 }
318 state.state = 1;
319 }
320
321 state.lastReadTime = state.lastWriteTime = System.currentTimeMillis();
322 if (readerIdleTimeMillis > 0) {
323 state.readerIdleTimeout = timer.newTimeout(
324 new ReaderIdleTimeoutTask(ctx),
325 readerIdleTimeMillis, TimeUnit.MILLISECONDS);
326 }
327 if (writerIdleTimeMillis > 0) {
328 state.writerIdleTimeout = timer.newTimeout(
329 new WriterIdleTimeoutTask(ctx),
330 writerIdleTimeMillis, TimeUnit.MILLISECONDS);
331 }
332 if (allIdleTimeMillis > 0) {
333 state.allIdleTimeout = timer.newTimeout(
334 new AllIdleTimeoutTask(ctx),
335 allIdleTimeMillis, TimeUnit.MILLISECONDS);
336 }
337 }
338
339 private static void destroy(ChannelHandlerContext ctx) {
340 State state = state(ctx);
341 synchronized (state) {
342 if (state.state != 1) {
343 return;
344 }
345 state.state = 2;
346 }
347
348 if (state.readerIdleTimeout != null) {
349 state.readerIdleTimeout.cancel();
350 state.readerIdleTimeout = null;
351 }
352 if (state.writerIdleTimeout != null) {
353 state.writerIdleTimeout.cancel();
354 state.writerIdleTimeout = null;
355 }
356 if (state.allIdleTimeout != null) {
357 state.allIdleTimeout.cancel();
358 state.allIdleTimeout = null;
359 }
360 }
361
362 private static State state(ChannelHandlerContext ctx) {
363 State state;
364 synchronized (ctx) {
365
366 state = (State) ctx.getAttachment();
367 if (state != null) {
368 return state;
369 }
370 state = new State();
371 ctx.setAttachment(state);
372 }
373 return state;
374 }
375
376 private void fireChannelIdle(
377 final ChannelHandlerContext ctx, final IdleState state, final long lastActivityTimeMillis) {
378 ctx.getPipeline().execute(new Runnable() {
379
380 public void run() {
381 try {
382 channelIdle(ctx, state, lastActivityTimeMillis);
383 } catch (Throwable t) {
384 fireExceptionCaught(ctx, t);
385 }
386 }
387 });
388 }
389
390 protected void channelIdle(
391 ChannelHandlerContext ctx, IdleState state, long lastActivityTimeMillis) throws Exception {
392 ctx.sendUpstream(new DefaultIdleStateEvent(ctx.getChannel(), state, lastActivityTimeMillis));
393 }
394
395 private final class ReaderIdleTimeoutTask implements TimerTask {
396
397 private final ChannelHandlerContext ctx;
398
399 ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
400 this.ctx = ctx;
401 }
402
403 public void run(Timeout timeout) throws Exception {
404 if (timeout.isCancelled() || !ctx.getChannel().isOpen()) {
405 return;
406 }
407
408 State state = (State) ctx.getAttachment();
409 long currentTime = System.currentTimeMillis();
410 long lastReadTime = state.lastReadTime;
411 long nextDelay = readerIdleTimeMillis - (currentTime - lastReadTime);
412 if (nextDelay <= 0) {
413
414 state.readerIdleTimeout =
415 timer.newTimeout(this, readerIdleTimeMillis, TimeUnit.MILLISECONDS);
416 fireChannelIdle(ctx, IdleState.READER_IDLE, lastReadTime);
417 } else {
418
419 state.readerIdleTimeout =
420 timer.newTimeout(this, nextDelay, TimeUnit.MILLISECONDS);
421 }
422 }
423 }
424
425 private final class WriterIdleTimeoutTask implements TimerTask {
426
427 private final ChannelHandlerContext ctx;
428
429 WriterIdleTimeoutTask(ChannelHandlerContext ctx) {
430 this.ctx = ctx;
431 }
432
433 public void run(Timeout timeout) throws Exception {
434 if (timeout.isCancelled() || !ctx.getChannel().isOpen()) {
435 return;
436 }
437
438 State state = (State) ctx.getAttachment();
439 long currentTime = System.currentTimeMillis();
440 long lastWriteTime = state.lastWriteTime;
441 long nextDelay = writerIdleTimeMillis - (currentTime - lastWriteTime);
442 if (nextDelay <= 0) {
443
444 state.writerIdleTimeout =
445 timer.newTimeout(this, writerIdleTimeMillis, TimeUnit.MILLISECONDS);
446 fireChannelIdle(ctx, IdleState.WRITER_IDLE, lastWriteTime);
447 } else {
448
449 state.writerIdleTimeout =
450 timer.newTimeout(this, nextDelay, TimeUnit.MILLISECONDS);
451 }
452 }
453 }
454
455 private final class AllIdleTimeoutTask implements TimerTask {
456
457 private final ChannelHandlerContext ctx;
458
459 AllIdleTimeoutTask(ChannelHandlerContext ctx) {
460 this.ctx = ctx;
461 }
462
463 public void run(Timeout timeout) throws Exception {
464 if (timeout.isCancelled() || !ctx.getChannel().isOpen()) {
465 return;
466 }
467
468 State state = (State) ctx.getAttachment();
469 long currentTime = System.currentTimeMillis();
470 long lastIoTime = Math.max(state.lastReadTime, state.lastWriteTime);
471 long nextDelay = allIdleTimeMillis - (currentTime - lastIoTime);
472 if (nextDelay <= 0) {
473
474
475 state.allIdleTimeout =
476 timer.newTimeout(this, allIdleTimeMillis, TimeUnit.MILLISECONDS);
477 fireChannelIdle(ctx, IdleState.ALL_IDLE, lastIoTime);
478 } else {
479
480
481 state.allIdleTimeout =
482 timer.newTimeout(this, nextDelay, TimeUnit.MILLISECONDS);
483 }
484 }
485 }
486
487 private static final class State {
488
489 int state;
490
491 volatile Timeout readerIdleTimeout;
492 volatile long lastReadTime;
493
494 volatile Timeout writerIdleTimeout;
495 volatile long lastWriteTime;
496
497 volatile Timeout allIdleTimeout;
498
499 State() {
500 }
501 }
502 }