1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.nio;
17
18 import org.jboss.netty.channel.Channel;
19 import org.jboss.netty.channel.ChannelFuture;
20 import org.jboss.netty.channel.MessageEvent;
21 import org.jboss.netty.channel.socket.Worker;
22 import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer;
23 import org.jboss.netty.util.ThreadNameDeterminer;
24 import org.jboss.netty.util.ThreadRenamingRunnable;
25
26 import java.io.IOException;
27 import java.nio.channels.AsynchronousCloseException;
28 import java.nio.channels.CancelledKeyException;
29 import java.nio.channels.ClosedChannelException;
30 import java.nio.channels.NotYetConnectedException;
31 import java.nio.channels.SelectionKey;
32 import java.nio.channels.Selector;
33 import java.nio.channels.WritableByteChannel;
34 import java.util.ArrayList;
35 import java.util.Iterator;
36 import java.util.List;
37 import java.util.Queue;
38 import java.util.Set;
39 import java.util.concurrent.Executor;
40
41
42 import static org.jboss.netty.channel.Channels.*;
43
44 abstract class AbstractNioWorker extends AbstractNioSelector implements Worker {
45
46 protected final SocketSendBufferPool sendBufferPool = new SocketSendBufferPool();
47
48 AbstractNioWorker(Executor executor) {
49 super(executor);
50 }
51
52 AbstractNioWorker(Executor executor, ThreadNameDeterminer determiner) {
53 super(executor, determiner);
54 }
55
56 public void executeInIoThread(Runnable task) {
57 executeInIoThread(task, false);
58 }
59
60
61
62
63
64
65
66
67
68
69 public void executeInIoThread(Runnable task, boolean alwaysAsync) {
70 if (!alwaysAsync && isIoThread()) {
71 task.run();
72 } else {
73 registerTask(task);
74 }
75 }
76
77 @Override
78 protected void close(SelectionKey k) {
79 AbstractNioChannel<?> ch = (AbstractNioChannel<?>) k.attachment();
80 close(ch, succeededFuture(ch));
81 }
82
83 @Override
84 protected ThreadRenamingRunnable newThreadRenamingRunnable(int id, ThreadNameDeterminer determiner) {
85 return new ThreadRenamingRunnable(this, "New I/O worker #" + id, determiner);
86 }
87
88 @Override
89 public void run() {
90 super.run();
91 sendBufferPool.releaseExternalResources();
92 }
93
94 @Override
95 protected void process(Selector selector) throws IOException {
96 Set<SelectionKey> selectedKeys = selector.selectedKeys();
97
98
99
100 if (selectedKeys.isEmpty()) {
101 return;
102 }
103 for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
104 SelectionKey k = i.next();
105 i.remove();
106 try {
107 int readyOps = k.readyOps();
108 if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) {
109 if (!read(k)) {
110
111 continue;
112 }
113 }
114 if ((readyOps & SelectionKey.OP_WRITE) != 0) {
115 writeFromSelectorLoop(k);
116 }
117 } catch (CancelledKeyException e) {
118 close(k);
119 }
120
121 if (cleanUpCancelledKeys()) {
122 break;
123 }
124 }
125 }
126
127 void writeFromUserCode(final AbstractNioChannel<?> channel) {
128 if (!channel.isConnected()) {
129 cleanUpWriteBuffer(channel);
130 return;
131 }
132
133 if (scheduleWriteIfNecessary(channel)) {
134 return;
135 }
136
137
138
139 if (channel.writeSuspended) {
140 return;
141 }
142
143 if (channel.inWriteNowLoop) {
144 return;
145 }
146
147 write0(channel);
148 }
149
150 void writeFromTaskLoop(AbstractNioChannel<?> ch) {
151 if (!ch.writeSuspended) {
152 write0(ch);
153 }
154 }
155
156 void writeFromSelectorLoop(final SelectionKey k) {
157 AbstractNioChannel<?> ch = (AbstractNioChannel<?>) k.attachment();
158 ch.writeSuspended = false;
159 write0(ch);
160 }
161
162 protected abstract boolean scheduleWriteIfNecessary(AbstractNioChannel<?> channel);
163
164 protected void write0(AbstractNioChannel<?> channel) {
165 boolean open = true;
166 boolean addOpWrite = false;
167 boolean removeOpWrite = false;
168 boolean iothread = isIoThread(channel);
169
170 long writtenBytes = 0;
171
172 final SocketSendBufferPool sendBufferPool = this.sendBufferPool;
173 final WritableByteChannel ch = channel.channel;
174 final Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
175 final int writeSpinCount = channel.getConfig().getWriteSpinCount();
176 List<Throwable> causes = null;
177
178 synchronized (channel.writeLock) {
179 channel.inWriteNowLoop = true;
180 for (;;) {
181
182 MessageEvent evt = channel.currentWriteEvent;
183 SendBuffer buf = null;
184 ChannelFuture future = null;
185 try {
186 if (evt == null) {
187 if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
188 removeOpWrite = true;
189 channel.writeSuspended = false;
190 break;
191 }
192 future = evt.getFuture();
193
194 channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage());
195 } else {
196 future = evt.getFuture();
197 buf = channel.currentWriteBuffer;
198 }
199
200 long localWrittenBytes = 0;
201 for (int i = writeSpinCount; i > 0; i --) {
202 localWrittenBytes = buf.transferTo(ch);
203 if (localWrittenBytes != 0) {
204 writtenBytes += localWrittenBytes;
205 break;
206 }
207 if (buf.finished()) {
208 break;
209 }
210 }
211
212 if (buf.finished()) {
213
214 buf.release();
215 channel.currentWriteEvent = null;
216 channel.currentWriteBuffer = null;
217
218
219 evt = null;
220 buf = null;
221 future.setSuccess();
222 } else {
223
224 addOpWrite = true;
225 channel.writeSuspended = true;
226
227 if (localWrittenBytes > 0) {
228
229 future.setProgress(
230 localWrittenBytes,
231 buf.writtenBytes(), buf.totalBytes());
232 }
233 break;
234 }
235 } catch (AsynchronousCloseException e) {
236
237 } catch (Throwable t) {
238 if (buf != null) {
239 buf.release();
240 }
241 channel.currentWriteEvent = null;
242 channel.currentWriteBuffer = null;
243
244
245 buf = null;
246
247 evt = null;
248 if (future != null) {
249 future.setFailure(t);
250 }
251 if (iothread) {
252
253
254
255
256 if (causes == null) {
257 causes = new ArrayList<Throwable>(1);
258 }
259 causes.add(t);
260 } else {
261 fireExceptionCaughtLater(channel, t);
262 }
263 if (t instanceof IOException) {
264
265
266
267
268
269 open = false;
270 }
271 }
272 }
273 channel.inWriteNowLoop = false;
274
275
276
277
278
279
280
281 if (open) {
282 if (addOpWrite) {
283 setOpWrite(channel);
284 } else if (removeOpWrite) {
285 clearOpWrite(channel);
286 }
287 }
288 }
289 if (causes != null) {
290 for (Throwable cause: causes) {
291
292 fireExceptionCaught(channel, cause);
293 }
294 }
295 if (!open) {
296
297 close(channel, succeededFuture(channel));
298 }
299 if (iothread) {
300 fireWriteComplete(channel, writtenBytes);
301 } else {
302 fireWriteCompleteLater(channel, writtenBytes);
303 }
304 }
305
306 static boolean isIoThread(AbstractNioChannel<?> channel) {
307 return Thread.currentThread() == channel.worker.thread;
308 }
309
310 protected void setOpWrite(AbstractNioChannel<?> channel) {
311 Selector selector = this.selector;
312 SelectionKey key = channel.channel.keyFor(selector);
313 if (key == null) {
314 return;
315 }
316 if (!key.isValid()) {
317 close(key);
318 return;
319 }
320
321 int interestOps = channel.getRawInterestOps();
322 if ((interestOps & SelectionKey.OP_WRITE) == 0) {
323 interestOps |= SelectionKey.OP_WRITE;
324 key.interestOps(interestOps);
325 channel.setRawInterestOpsNow(interestOps);
326 }
327 }
328
329 protected void clearOpWrite(AbstractNioChannel<?> channel) {
330 Selector selector = this.selector;
331 SelectionKey key = channel.channel.keyFor(selector);
332 if (key == null) {
333 return;
334 }
335 if (!key.isValid()) {
336 close(key);
337 return;
338 }
339
340 int interestOps = channel.getRawInterestOps();
341 if ((interestOps & SelectionKey.OP_WRITE) != 0) {
342 interestOps &= ~SelectionKey.OP_WRITE;
343 key.interestOps(interestOps);
344 channel.setRawInterestOpsNow(interestOps);
345 }
346 }
347
348 protected void close(AbstractNioChannel<?> channel, ChannelFuture future) {
349 boolean connected = channel.isConnected();
350 boolean bound = channel.isBound();
351 boolean iothread = isIoThread(channel);
352
353 try {
354 channel.channel.close();
355 increaseCancelledKeys();
356
357 if (channel.setClosed()) {
358 future.setSuccess();
359 if (connected) {
360 if (iothread) {
361 fireChannelDisconnected(channel);
362 } else {
363 fireChannelDisconnectedLater(channel);
364 }
365 }
366 if (bound) {
367 if (iothread) {
368 fireChannelUnbound(channel);
369 } else {
370 fireChannelUnboundLater(channel);
371 }
372 }
373
374 cleanUpWriteBuffer(channel);
375 if (iothread) {
376 fireChannelClosed(channel);
377 } else {
378 fireChannelClosedLater(channel);
379 }
380 } else {
381 future.setSuccess();
382 }
383 } catch (Throwable t) {
384 future.setFailure(t);
385 if (iothread) {
386 fireExceptionCaught(channel, t);
387 } else {
388 fireExceptionCaughtLater(channel, t);
389 }
390 }
391 }
392
393 protected static void cleanUpWriteBuffer(AbstractNioChannel<?> channel) {
394 Exception cause = null;
395 boolean fireExceptionCaught = false;
396
397
398 synchronized (channel.writeLock) {
399 MessageEvent evt = channel.currentWriteEvent;
400 if (evt != null) {
401
402
403 if (channel.isOpen()) {
404 cause = new NotYetConnectedException();
405 } else {
406 cause = new ClosedChannelException();
407 }
408
409 ChannelFuture future = evt.getFuture();
410 if (channel.currentWriteBuffer != null) {
411 channel.currentWriteBuffer.release();
412 channel.currentWriteBuffer = null;
413 }
414 channel.currentWriteEvent = null;
415
416
417 evt = null;
418 future.setFailure(cause);
419 fireExceptionCaught = true;
420 }
421
422 Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
423 for (;;) {
424 evt = writeBuffer.poll();
425 if (evt == null) {
426 break;
427 }
428
429
430 if (cause == null) {
431 if (channel.isOpen()) {
432 cause = new NotYetConnectedException();
433 } else {
434 cause = new ClosedChannelException();
435 }
436 fireExceptionCaught = true;
437 }
438 evt.getFuture().setFailure(cause);
439 }
440 }
441
442 if (fireExceptionCaught) {
443 if (isIoThread(channel)) {
444 fireExceptionCaught(channel, cause);
445 } else {
446 fireExceptionCaughtLater(channel, cause);
447 }
448 }
449 }
450
451 void setInterestOps(final AbstractNioChannel<?> channel, final ChannelFuture future, final int interestOps) {
452 boolean iothread = isIoThread(channel);
453 if (!iothread) {
454 channel.getPipeline().execute(new Runnable() {
455 public void run() {
456 setInterestOps(channel, future, interestOps);
457 }
458 });
459 return;
460 }
461
462 boolean changed = false;
463 try {
464 Selector selector = this.selector;
465 SelectionKey key = channel.channel.keyFor(selector);
466
467
468 int newInterestOps = interestOps & ~Channel.OP_WRITE | channel.getRawInterestOps() & Channel.OP_WRITE;
469
470 if (key == null || selector == null) {
471 if (channel.getRawInterestOps() != newInterestOps) {
472 changed = true;
473 }
474
475
476
477 channel.setRawInterestOpsNow(newInterestOps);
478
479 future.setSuccess();
480 if (changed) {
481 if (iothread) {
482 fireChannelInterestChanged(channel);
483 } else {
484 fireChannelInterestChangedLater(channel);
485 }
486 }
487
488 return;
489 }
490
491 if (channel.getRawInterestOps() != newInterestOps) {
492 key.interestOps(newInterestOps);
493 if (Thread.currentThread() != thread &&
494 wakenUp.compareAndSet(false, true)) {
495 selector.wakeup();
496 }
497 channel.setRawInterestOpsNow(newInterestOps);
498 }
499
500 future.setSuccess();
501 if (changed) {
502 fireChannelInterestChanged(channel);
503 }
504 } catch (CancelledKeyException e) {
505
506 ClosedChannelException cce = new ClosedChannelException();
507 future.setFailure(cce);
508 fireExceptionCaught(channel, cce);
509 } catch (Throwable t) {
510 future.setFailure(t);
511 fireExceptionCaught(channel, t);
512 }
513 }
514
515
516
517
518
519
520
521
522 protected abstract boolean read(SelectionKey k);
523
524 }