1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.timeout;
17
18 import io.netty.bootstrap.ServerBootstrap;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelDuplexHandler;
21 import io.netty.channel.ChannelFuture;
22 import io.netty.channel.ChannelFutureListener;
23 import io.netty.channel.ChannelHandlerContext;
24 import io.netty.channel.ChannelInitializer;
25 import io.netty.channel.ChannelOutboundHandlerAdapter;
26 import io.netty.channel.ChannelPromise;
27
28 import java.util.concurrent.ScheduledFuture;
29 import java.util.concurrent.TimeUnit;
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65 public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter {
66 private static final long MIN_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
67
68 private final long timeoutNanos;
69
70
71
72
73 private WriteTimeoutTask lastTask;
74
75 private boolean closed;
76
77
78
79
80
81
82
83 public WriteTimeoutHandler(int timeoutSeconds) {
84 this(timeoutSeconds, TimeUnit.SECONDS);
85 }
86
87
88
89
90
91
92
93
94
95 public WriteTimeoutHandler(long timeout, TimeUnit unit) {
96 if (unit == null) {
97 throw new NullPointerException("unit");
98 }
99
100 if (timeout <= 0) {
101 timeoutNanos = 0;
102 } else {
103 timeoutNanos = Math.max(unit.toNanos(timeout), MIN_TIMEOUT_NANOS);
104 }
105 }
106
107 @Override
108 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
109 scheduleTimeout(ctx, promise);
110 ctx.write(msg, promise);
111 }
112
113 @Override
114 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
115 WriteTimeoutTask task = lastTask;
116 lastTask = null;
117 while (task != null) {
118 task.scheduledFuture.cancel(false);
119 WriteTimeoutTask prev = task.prev;
120 task.prev = null;
121 task.next = null;
122 task = prev;
123 }
124 }
125
126 private void scheduleTimeout(final ChannelHandlerContext ctx, final ChannelPromise promise) {
127
128 final WriteTimeoutTask task = new WriteTimeoutTask(ctx, promise);
129 task.scheduledFuture = ctx.executor().schedule(task, timeoutNanos, TimeUnit.NANOSECONDS);
130
131 if (!task.scheduledFuture.isDone()) {
132 addWriteTimeoutTask(task);
133
134
135 promise.addListener(task);
136 }
137 }
138
139 private void addWriteTimeoutTask(WriteTimeoutTask task) {
140 if (lastTask == null) {
141 lastTask = task;
142 } else {
143 lastTask.next = task;
144 task.prev = lastTask;
145 lastTask = task;
146 }
147 }
148
149 private void removeWriteTimeoutTask(WriteTimeoutTask task) {
150 if (task == lastTask) {
151
152 assert task.next == null;
153 lastTask = lastTask.prev;
154 if (lastTask != null) {
155 lastTask.next = null;
156 }
157 } else if (task.prev == null && task.next == null) {
158
159 return;
160 } else if (task.prev == null) {
161
162 task.next.prev = null;
163 } else {
164 task.prev.next = task.next;
165 task.next.prev = task.prev;
166 }
167 task.prev = null;
168 task.next = null;
169 }
170
171
172
173
174 protected void writeTimedOut(ChannelHandlerContext ctx) throws Exception {
175 if (!closed) {
176 ctx.fireExceptionCaught(WriteTimeoutException.INSTANCE);
177 ctx.close();
178 closed = true;
179 }
180 }
181
182 private final class WriteTimeoutTask implements Runnable, ChannelFutureListener {
183
184 private final ChannelHandlerContext ctx;
185 private final ChannelPromise promise;
186
187
188 WriteTimeoutTask prev;
189 WriteTimeoutTask next;
190
191 ScheduledFuture<?> scheduledFuture;
192
193 WriteTimeoutTask(ChannelHandlerContext ctx, ChannelPromise promise) {
194 this.ctx = ctx;
195 this.promise = promise;
196 }
197
198 @Override
199 public void run() {
200
201
202
203 if (!promise.isDone()) {
204 try {
205 writeTimedOut(ctx);
206 } catch (Throwable t) {
207 ctx.fireExceptionCaught(t);
208 }
209 }
210 removeWriteTimeoutTask(this);
211 }
212
213 @Override
214 public void operationComplete(ChannelFuture future) throws Exception {
215
216 scheduledFuture.cancel(false);
217 removeWriteTimeoutTask(this);
218 }
219 }
220 }