View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.nio;
17  
18  import org.jboss.netty.channel.Channel;
19  import org.jboss.netty.channel.ChannelException;
20  import org.jboss.netty.channel.ChannelFuture;
21  import org.jboss.netty.channel.MessageEvent;
22  import org.jboss.netty.channel.socket.Worker;
23  import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer;
24  import org.jboss.netty.logging.InternalLogger;
25  import org.jboss.netty.logging.InternalLoggerFactory;
26  import org.jboss.netty.util.ExternalResourceReleasable;
27  import org.jboss.netty.util.ThreadRenamingRunnable;
28  import org.jboss.netty.util.internal.DeadLockProofWorker;
29  
30  import java.io.IOException;
31  import java.nio.channels.AsynchronousCloseException;
32  import java.nio.channels.CancelledKeyException;
33  import java.nio.channels.ClosedChannelException;
34  import java.nio.channels.DatagramChannel;
35  import java.nio.channels.NotYetConnectedException;
36  import java.nio.channels.SelectableChannel;
37  import java.nio.channels.SelectionKey;
38  import java.nio.channels.Selector;
39  import java.nio.channels.SocketChannel;
40  import java.nio.channels.WritableByteChannel;
41  import java.util.Iterator;
42  import java.util.Queue;
43  import java.util.Set;
44  import java.util.concurrent.ConcurrentLinkedQueue;
45  import java.util.concurrent.Executor;
46  import java.util.concurrent.ExecutorService;
47  import java.util.concurrent.RejectedExecutionException;
48  import java.util.concurrent.atomic.AtomicBoolean;
49  import java.util.concurrent.atomic.AtomicInteger;
50  import java.util.concurrent.locks.ReadWriteLock;
51  import java.util.concurrent.locks.ReentrantReadWriteLock;
52  
53  import static org.jboss.netty.channel.Channels.*;
54  
55  abstract class AbstractNioWorker implements Worker, ExternalResourceReleasable {
56  
57  
58      private static final AtomicInteger nextId = new AtomicInteger();
59  
60      final int id = nextId.incrementAndGet();
61  
62      /**
63       * Internal Netty logger.
64       */
65      private static final InternalLogger logger = InternalLoggerFactory
66              .getInstance(AbstractNioWorker.class);
67  
68      private static final int CONSTRAINT_LEVEL = NioProviderMetadata.CONSTRAINT_LEVEL;
69  
70      static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.
71  
72  
73      /**
74       * Executor used to execute {@link Runnable}s such as channel registration
75       * task.
76       */
77      private final Executor executor;
78  
79      /**
80       * If this worker has been started thread will be a reference to the thread
81       * used when starting. i.e. the current thread when the run method is executed.
82       */
83      protected volatile Thread thread;
84  
85      /**
86       * The NIO {@link Selector}.
87       */
88      volatile Selector selector;
89  
90      /**
91       * Boolean that controls determines if a blocked Selector.select should
92       * break out of its selection process. In our case we use a timeone for
93       * the select method and the select method will block for that time unless
94       * waken up.
95       */
96      protected final AtomicBoolean wakenUp = new AtomicBoolean();
97  
98      /**
99       * Lock for this workers Selector.
100      */
101     private final ReadWriteLock selectorGuard = new ReentrantReadWriteLock();
102 
103     /**
104      * Monitor object used to synchronize selector open/close.
105      */
106     private final Object startStopLock = new Object();
107 
108     /**
109      * Queue of channel registration tasks.
110      */
111     private final Queue<Runnable> registerTaskQueue = new ConcurrentLinkedQueue<Runnable>();
112 
113     /**
114      * Queue of WriteTasks
115      */
116     protected final Queue<Runnable> writeTaskQueue = new ConcurrentLinkedQueue<Runnable>();
117 
118     private final Queue<Runnable> eventQueue = new ConcurrentLinkedQueue<Runnable>();
119 
120 
121     private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation
122 
123     protected final SocketSendBufferPool sendBufferPool = new SocketSendBufferPool();
124 
125     AbstractNioWorker(Executor executor) {
126         this.executor = executor;
127         openSelector();
128     }
129 
130     void register(AbstractNioChannel<?> channel, ChannelFuture future) {
131 
132         synchronized (startStopLock) {
133             if (selector == null) {
134                 // the selector was null this means the Worker has already been shutdown.
135                 throw new RejectedExecutionException("Worker has already been shutdown");
136             }
137             Runnable registerTask = createRegisterTask(channel, future);
138 
139             boolean offered = registerTaskQueue.offer(registerTask);
140             assert offered;
141 
142             if (wakenUp.compareAndSet(false, true)) {
143                 selector.wakeup();
144             }
145 
146         }
147     }
148 
149     // Create a new selector and "transfer" all channels from the old
150     // selector to the new one
151     private Selector recreateSelector() throws IOException {
152         Selector newSelector = Selector.open();
153         Selector selector = this.selector;
154         this.selector = newSelector;
155 
156         // loop over all the keys that are registered with the old Selector
157         // and register them with the new one
158         for (SelectionKey key: selector.keys()) {
159             SelectableChannel ch = key.channel();
160             int ops = key.interestOps();
161             Object att = key.attachment();
162             // cancel the old key
163             key.cancel();
164 
165             try {
166                 // register the channel with the new selector now
167                 ch.register(newSelector, ops, att);
168             } catch (ClosedChannelException e) {
169                 // close channel
170                 AbstractNioChannel<?> channel = (AbstractNioChannel<?>) att;
171                 close(channel, succeededFuture(channel));
172             }
173         }
174         try {
175             // time to close the old selector as everything else is registered to the new one
176             selector.close();
177         } catch (Throwable t) {
178             if (logger.isWarnEnabled()) {
179                 logger.warn("Failed to close a selector.", t);
180             }
181         }
182         if (logger.isWarnEnabled()) {
183             logger.warn("Recreated Selector because of possible jdk epoll(..) bug");
184         }
185         return newSelector;
186     }
187 
188     /**
189      * Start the {@link AbstractNioWorker} and return the {@link Selector} that will be used for
190      * the {@link AbstractNioChannel}'s when they get registered
191      *
192      * @return selector
193      */
194     private void openSelector() {
195         try {
196             selector = Selector.open();
197         } catch (Throwable t) {
198             throw new ChannelException("Failed to create a selector.", t);
199         }
200 
201         // Start the worker thread with the new Selector.
202         boolean success = false;
203         try {
204             DeadLockProofWorker.start(executor, new ThreadRenamingRunnable(this, "New I/O  worker #" + id));
205             success = true;
206         } finally {
207             if (!success) {
208                 // Release the Selector if the execution fails.
209                 try {
210                     selector.close();
211                 } catch (Throwable t) {
212                     logger.warn("Failed to close a selector.", t);
213                 }
214                 selector = null;
215                 // The method will return to the caller at this point.
216             }
217         }
218         assert selector != null && selector.isOpen();
219     }
220 
221 
222     public void run() {
223         thread = Thread.currentThread();
224 
225         boolean shutdown = false;
226         int selectReturnsImmediately = 0;
227         Selector selector = this.selector;
228 
229         // use 80% of the timeout for measure
230         final long minSelectTimeout = SelectorUtil.SELECT_TIMEOUT_NANOS * 80 / 100;
231         boolean wakenupFromLoop = false;
232         for (;;) {
233             wakenUp.set(false);
234 
235             if (CONSTRAINT_LEVEL != 0) {
236                 selectorGuard.writeLock().lock();
237                     // This empty synchronization block prevents the selector
238                     // from acquiring its lock.
239                 selectorGuard.writeLock().unlock();
240             }
241 
242             try {
243                 long beforeSelect = System.nanoTime();
244                 int selected = SelectorUtil.select(selector);
245                 if (SelectorUtil.EPOLL_BUG_WORKAROUND && selected == 0 && !wakenupFromLoop && !wakenUp.get()) {
246                     long timeBlocked = System.nanoTime() - beforeSelect;
247 
248                     if (timeBlocked < minSelectTimeout) {
249                         boolean notConnected = false;
250                         // loop over all keys as the selector may was unblocked because of a closed channel
251                         for (SelectionKey key: selector.keys()) {
252                             SelectableChannel ch = key.channel();
253                             try {
254                                 if (ch instanceof DatagramChannel && !((DatagramChannel) ch).isConnected() ||
255                                         ch instanceof SocketChannel && !((SocketChannel) ch).isConnected()) {
256                                     notConnected = true;
257                                     // cancel the key just to be on the safe side
258                                     key.cancel();
259                                 }
260                             } catch (CancelledKeyException e) {
261                                 // ignore
262                             }
263                         }
264                         if (notConnected) {
265                             selectReturnsImmediately = 0;
266                         } else {
267                             // returned before the minSelectTimeout elapsed with nothing select.
268                             // this may be the cause of the jdk epoll(..) bug, so increment the counter
269                             // which we use later to see if its really the jdk bug.
270                             selectReturnsImmediately ++;
271 
272                         }
273 
274                     } else {
275                         selectReturnsImmediately = 0;
276                     }
277 
278                     if (selectReturnsImmediately == 1024) {
279                         // The selector returned immediately for 10 times in a row,
280                         // so recreate one selector as it seems like we hit the
281                         // famous epoll(..) jdk bug.
282                         selector = recreateSelector();
283                         selectReturnsImmediately = 0;
284                         wakenupFromLoop = false;
285                         // try to select again
286                         continue;
287                     }
288                 } else {
289                     // reset counter
290                     selectReturnsImmediately = 0;
291                 }
292 
293                 // 'wakenUp.compareAndSet(false, true)' is always evaluated
294                 // before calling 'selector.wakeup()' to reduce the wake-up
295                 // overhead. (Selector.wakeup() is an expensive operation.)
296                 //
297                 // However, there is a race condition in this approach.
298                 // The race condition is triggered when 'wakenUp' is set to
299                 // true too early.
300                 //
301                 // 'wakenUp' is set to true too early if:
302                 // 1) Selector is waken up between 'wakenUp.set(false)' and
303                 //    'selector.select(...)'. (BAD)
304                 // 2) Selector is waken up between 'selector.select(...)' and
305                 //    'if (wakenUp.get()) { ... }'. (OK)
306                 //
307                 // In the first case, 'wakenUp' is set to true and the
308                 // following 'selector.select(...)' will wake up immediately.
309                 // Until 'wakenUp' is set to false again in the next round,
310                 // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
311                 // any attempt to wake up the Selector will fail, too, causing
312                 // the following 'selector.select(...)' call to block
313                 // unnecessarily.
314                 //
315                 // To fix this problem, we wake up the selector again if wakenUp
316                 // is true immediately after selector.select(...).
317                 // It is inefficient in that it wakes up the selector for both
318                 // the first case (BAD - wake-up required) and the second case
319                 // (OK - no wake-up required).
320 
321                 if (wakenUp.get()) {
322                     wakenupFromLoop = true;
323                     selector.wakeup();
324                 } else {
325                     wakenupFromLoop = false;
326                 }
327 
328                 cancelledKeys = 0;
329                 processRegisterTaskQueue();
330                 processEventQueue();
331                 processWriteTaskQueue();
332                 processSelectedKeys(selector.selectedKeys());
333 
334                 // Exit the loop when there's nothing to handle.
335                 // The shutdown flag is used to delay the shutdown of this
336                 // loop to avoid excessive Selector creation when
337                 // connections are registered in a one-by-one manner instead of
338                 // concurrent manner.
339                 if (selector.keys().isEmpty()) {
340                     if (shutdown ||
341                         executor instanceof ExecutorService && ((ExecutorService) executor).isShutdown()) {
342 
343                         synchronized (startStopLock) {
344                             if (registerTaskQueue.isEmpty() && selector.keys().isEmpty()) {
345                                 try {
346                                     selector.close();
347                                 } catch (IOException e) {
348                                     logger.warn(
349                                             "Failed to close a selector.", e);
350                                 } finally {
351                                     this.selector = null;
352                                 }
353                                 break;
354                             } else {
355                                 shutdown = false;
356                             }
357                         }
358                     }
359                 } else {
360                     shutdown = false;
361                 }
362             } catch (Throwable t) {
363                 logger.warn(
364                         "Unexpected exception in the selector loop.", t);
365 
366                 // Prevent possible consecutive immediate failures that lead to
367                 // excessive CPU consumption.
368                 try {
369                     Thread.sleep(1000);
370                 } catch (InterruptedException e) {
371                     // Ignore.
372                 }
373             }
374         }
375     }
376 
377     public void executeInIoThread(Runnable task) {
378         executeInIoThread(task, false);
379     }
380 
381     /**
382      * Execute the {@link Runnable} in a IO-Thread
383      *
384      * @param task
385      *            the {@link Runnable} to execute
386      * @param alwaysAsync
387      *            <code>true</code> if the {@link Runnable} should be executed
388      *            in an async fashion even if the current Thread == IO Thread
389      */
390     public void executeInIoThread(Runnable task, boolean alwaysAsync) {
391         if (!alwaysAsync && Thread.currentThread() == thread) {
392             task.run();
393         } else {
394             eventQueue.offer(task);
395 
396             synchronized (startStopLock) {
397                 // check if the selector was shutdown already or was not started yet. If so execute all
398                 // submitted tasks in the calling thread
399                 if (selector == null) {
400                     // execute everything in the event queue as the
401                     for (;;) {
402                         Runnable r = eventQueue.poll();
403                         if (r == null) {
404                             break;
405                         }
406                         r.run();
407                     }
408                 } else {
409                     if (wakenUp.compareAndSet(false, true))  {
410                         // wake up the selector to speed things
411                         Selector selector = this.selector;
412                         if (selector != null) {
413                             selector.wakeup();
414                         }
415                     }
416                 }
417             }
418         }
419 
420     }
421 
422 
423     private void processRegisterTaskQueue() throws IOException {
424         for (;;) {
425             final Runnable task = registerTaskQueue.poll();
426             if (task == null) {
427                 break;
428             }
429 
430             task.run();
431             cleanUpCancelledKeys();
432         }
433     }
434 
435     private void processWriteTaskQueue() throws IOException {
436         for (;;) {
437             final Runnable task = writeTaskQueue.poll();
438             if (task == null) {
439                 break;
440             }
441 
442             task.run();
443             cleanUpCancelledKeys();
444         }
445     }
446 
447     private void processEventQueue() throws IOException {
448         for (;;) {
449             final Runnable task = eventQueue.poll();
450             if (task == null) {
451                 break;
452             }
453             task.run();
454             cleanUpCancelledKeys();
455         }
456     }
457 
458     private void processSelectedKeys(Set<SelectionKey> selectedKeys) throws IOException {
459         // check if the set is empty and if so just return to not create garbage by
460         // creating a new Iterator every time even if there is nothing to process.
461         // See https://github.com/netty/netty/issues/597
462         if (selectedKeys.isEmpty()) {
463             return;
464         }
465         for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
466             SelectionKey k = i.next();
467             i.remove();
468             try {
469                 int readyOps = k.readyOps();
470                 if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) {
471                     if (!read(k)) {
472                         // Connection already closed - no need to handle write.
473                         continue;
474                     }
475                 }
476                 if ((readyOps & SelectionKey.OP_WRITE) != 0) {
477                     writeFromSelectorLoop(k);
478                 }
479             } catch (CancelledKeyException e) {
480                 close(k);
481             }
482 
483             if (cleanUpCancelledKeys()) {
484                 break; // break the loop to avoid ConcurrentModificationException
485             }
486         }
487     }
488 
489     private boolean cleanUpCancelledKeys() throws IOException {
490         if (cancelledKeys >= CLEANUP_INTERVAL) {
491             cancelledKeys = 0;
492             selector.selectNow();
493             return true;
494         }
495         return false;
496     }
497 
498 
499 
500     private void close(SelectionKey k) {
501         AbstractNioChannel<?> ch = (AbstractNioChannel<?>) k.attachment();
502         close(ch, succeededFuture(ch));
503     }
504 
505     void writeFromUserCode(final AbstractNioChannel<?> channel) {
506         if (!channel.isConnected()) {
507             cleanUpWriteBuffer(channel);
508             return;
509         }
510 
511         if (scheduleWriteIfNecessary(channel)) {
512             return;
513         }
514 
515         // From here, we are sure Thread.currentThread() == workerThread.
516 
517         if (channel.writeSuspended) {
518             return;
519         }
520 
521         if (channel.inWriteNowLoop) {
522             return;
523         }
524 
525         write0(channel);
526     }
527 
528     void writeFromTaskLoop(AbstractNioChannel<?> ch) {
529         if (!ch.writeSuspended) {
530             write0(ch);
531         }
532     }
533 
534     void writeFromSelectorLoop(final SelectionKey k) {
535         AbstractNioChannel<?> ch = (AbstractNioChannel<?>) k.attachment();
536         ch.writeSuspended = false;
537         write0(ch);
538     }
539 
540     protected abstract boolean scheduleWriteIfNecessary(AbstractNioChannel<?> channel);
541 
542     protected void write0(AbstractNioChannel<?> channel) {
543         boolean open = true;
544         boolean addOpWrite = false;
545         boolean removeOpWrite = false;
546         boolean iothread = isIoThread(channel);
547 
548         long writtenBytes = 0;
549 
550         final SocketSendBufferPool sendBufferPool = this.sendBufferPool;
551         final WritableByteChannel ch = channel.channel;
552         final Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
553         final int writeSpinCount = channel.getConfig().getWriteSpinCount();
554         synchronized (channel.writeLock) {
555             channel.inWriteNowLoop = true;
556             for (;;) {
557 
558                 MessageEvent evt = channel.currentWriteEvent;
559                 SendBuffer buf = null;
560                 ChannelFuture future = null;
561                 try {
562                     if (evt == null) {
563                         if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
564                             removeOpWrite = true;
565                             channel.writeSuspended = false;
566                             break;
567                         }
568                         future = evt.getFuture();
569 
570                         channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage());
571                     } else {
572                         future = evt.getFuture();
573                         buf = channel.currentWriteBuffer;
574 
575                     }
576 
577                     long localWrittenBytes = 0;
578                     for (int i = writeSpinCount; i > 0; i --) {
579                         localWrittenBytes = buf.transferTo(ch);
580                         if (localWrittenBytes != 0) {
581                             writtenBytes += localWrittenBytes;
582                             break;
583                         }
584                         if (buf.finished()) {
585                             break;
586                         }
587                     }
588 
589                     if (buf.finished()) {
590                         // Successful write - proceed to the next message.
591                         buf.release();
592                         channel.currentWriteEvent = null;
593                         channel.currentWriteBuffer = null;
594                         evt = null;
595                         buf = null;
596                         future.setSuccess();
597                     } else {
598                         // Not written fully - perhaps the kernel buffer is full.
599                         addOpWrite = true;
600                         channel.writeSuspended = true;
601 
602                         if (localWrittenBytes > 0) {
603                             // Notify progress listeners if necessary.
604                             future.setProgress(
605                                     localWrittenBytes,
606                                     buf.writtenBytes(), buf.totalBytes());
607                         }
608                         break;
609                     }
610                 } catch (AsynchronousCloseException e) {
611                     // Doesn't need a user attention - ignore.
612                 } catch (Throwable t) {
613                     if (buf != null) {
614                         buf.release();
615                     }
616                     channel.currentWriteEvent = null;
617                     channel.currentWriteBuffer = null;
618                     buf = null;
619                     evt = null;
620                     if (future != null) {
621                         future.setFailure(t);
622                     }
623                     if (iothread) {
624                         fireExceptionCaught(channel, t);
625                     } else {
626                         fireExceptionCaughtLater(channel, t);
627                     }
628                     if (t instanceof IOException) {
629                         open = false;
630                         close(channel, succeededFuture(channel));
631                     }
632                 }
633             }
634             channel.inWriteNowLoop = false;
635 
636             // Initially, the following block was executed after releasing
637             // the writeLock, but there was a race condition, and it has to be
638             // executed before releasing the writeLock:
639             //
640             //     https://issues.jboss.org/browse/NETTY-410
641             //
642             if (open) {
643                 if (addOpWrite) {
644                     setOpWrite(channel);
645                 } else if (removeOpWrite) {
646                     clearOpWrite(channel);
647                 }
648             }
649         }
650         if (iothread) {
651             fireWriteComplete(channel, writtenBytes);
652         } else {
653             fireWriteCompleteLater(channel, writtenBytes);
654         }
655     }
656 
657     static boolean isIoThread(AbstractNioChannel<?> channel) {
658         return Thread.currentThread() == channel.worker.thread;
659     }
660 
661     protected void setOpWrite(AbstractNioChannel<?> channel) {
662         Selector selector = this.selector;
663         SelectionKey key = channel.channel.keyFor(selector);
664         if (key == null) {
665             return;
666         }
667         if (!key.isValid()) {
668             close(key);
669             return;
670         }
671 
672         // interestOps can change at any time and at any thread.
673         // Acquire a lock to avoid possible race condition.
674         synchronized (channel.interestOpsLock) {
675             int interestOps = channel.getRawInterestOps();
676             if ((interestOps & SelectionKey.OP_WRITE) == 0) {
677                 interestOps |= SelectionKey.OP_WRITE;
678                 key.interestOps(interestOps);
679                 channel.setRawInterestOpsNow(interestOps);
680             }
681         }
682     }
683 
684     protected void clearOpWrite(AbstractNioChannel<?> channel) {
685         Selector selector = this.selector;
686         SelectionKey key = channel.channel.keyFor(selector);
687         if (key == null) {
688             return;
689         }
690         if (!key.isValid()) {
691             close(key);
692             return;
693         }
694 
695         // interestOps can change at any time and at any thread.
696         // Acquire a lock to avoid possible race condition.
697         synchronized (channel.interestOpsLock) {
698             int interestOps = channel.getRawInterestOps();
699             if ((interestOps & SelectionKey.OP_WRITE) != 0) {
700                 interestOps &= ~SelectionKey.OP_WRITE;
701                 key.interestOps(interestOps);
702                 channel.setRawInterestOpsNow(interestOps);
703             }
704         }
705     }
706 
707 
708     void close(AbstractNioChannel<?> channel, ChannelFuture future) {
709         boolean connected = channel.isConnected();
710         boolean bound = channel.isBound();
711         boolean iothread = isIoThread(channel);
712 
713         try {
714             channel.channel.close();
715             cancelledKeys ++;
716 
717             if (channel.setClosed()) {
718                 future.setSuccess();
719                 if (connected) {
720                     if (iothread) {
721                         fireChannelDisconnected(channel);
722                     } else {
723                         fireChannelDisconnectedLater(channel);
724                     }
725                 }
726                 if (bound) {
727                     if (iothread) {
728                         fireChannelUnbound(channel);
729                     } else {
730                         fireChannelUnboundLater(channel);
731                     }
732                 }
733 
734                 cleanUpWriteBuffer(channel);
735                 if (iothread) {
736                     fireChannelClosed(channel);
737                 } else {
738                     fireChannelClosedLater(channel);
739                 }
740             } else {
741                 future.setSuccess();
742             }
743         } catch (Throwable t) {
744             future.setFailure(t);
745             if (iothread) {
746                 fireExceptionCaught(channel, t);
747             } else {
748                 fireExceptionCaughtLater(channel, t);
749             }
750         }
751     }
752 
753     protected static void cleanUpWriteBuffer(AbstractNioChannel<?> channel) {
754         Exception cause = null;
755         boolean fireExceptionCaught = false;
756 
757         // Clean up the stale messages in the write buffer.
758         synchronized (channel.writeLock) {
759             MessageEvent evt = channel.currentWriteEvent;
760             if (evt != null) {
761                 // Create the exception only once to avoid the excessive overhead
762                 // caused by fillStackTrace.
763                 if (channel.isOpen()) {
764                     cause = new NotYetConnectedException();
765                 } else {
766                     cause = new ClosedChannelException();
767                 }
768 
769                 ChannelFuture future = evt.getFuture();
770                 if (channel.currentWriteBuffer != null) {
771                     channel.currentWriteBuffer.release();
772                     channel.currentWriteBuffer = null;
773                 }
774                 channel.currentWriteEvent = null;
775                 evt = null;
776                 future.setFailure(cause);
777                 fireExceptionCaught = true;
778             }
779 
780             Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
781             for (;;) {
782                 evt = writeBuffer.poll();
783                 if (evt == null) {
784                     break;
785                 }
786                 // Create the exception only once to avoid the excessive overhead
787                 // caused by fillStackTrace.
788                 if (cause == null) {
789                     if (channel.isOpen()) {
790                         cause = new NotYetConnectedException();
791                     } else {
792                         cause = new ClosedChannelException();
793                     }
794                     fireExceptionCaught = true;
795                 }
796                 evt.getFuture().setFailure(cause);
797 
798 
799             }
800         }
801 
802         if (fireExceptionCaught) {
803             if (isIoThread(channel)) {
804                 fireExceptionCaught(channel, cause);
805             } else {
806                 fireExceptionCaughtLater(channel, cause);
807             }
808         }
809     }
810 
811     void setInterestOps(AbstractNioChannel<?> channel, ChannelFuture future, int interestOps) {
812         boolean changed = false;
813         boolean iothread = isIoThread(channel);
814         try {
815             // interestOps can change at any time and at any thread.
816             // Acquire a lock to avoid possible race condition.
817             synchronized (channel.interestOpsLock) {
818                 Selector selector = this.selector;
819                 SelectionKey key = channel.channel.keyFor(selector);
820 
821                 // Override OP_WRITE flag - a user cannot change this flag.
822                 interestOps &= ~Channel.OP_WRITE;
823                 interestOps |= channel.getRawInterestOps() & Channel.OP_WRITE;
824 
825                 if (key == null || selector == null) {
826                     if (channel.getRawInterestOps() != interestOps) {
827                         changed = true;
828                     }
829 
830                     // Not registered to the worker yet.
831                     // Set the rawInterestOps immediately; RegisterTask will pick it up.
832                     channel.setRawInterestOpsNow(interestOps);
833 
834                     future.setSuccess();
835                     if (changed) {
836                         if (iothread) {
837                             fireChannelInterestChanged(channel);
838                         } else {
839                             fireChannelInterestChangedLater(channel);
840                         }
841                     }
842 
843                     return;
844                 }
845 
846                 switch (CONSTRAINT_LEVEL) {
847                 case 0:
848                     if (channel.getRawInterestOps() != interestOps) {
849                         key.interestOps(interestOps);
850                         if (Thread.currentThread() != thread &&
851                             wakenUp.compareAndSet(false, true)) {
852                             selector.wakeup();
853                         }
854                         changed = true;
855                     }
856                     break;
857                 case 1:
858                 case 2:
859                     if (channel.getRawInterestOps() != interestOps) {
860                         if (Thread.currentThread() == thread) {
861                             key.interestOps(interestOps);
862                             changed = true;
863                         } else {
864                             selectorGuard.readLock().lock();
865                             try {
866                                 if (wakenUp.compareAndSet(false, true)) {
867                                     selector.wakeup();
868                                 }
869                                 key.interestOps(interestOps);
870                                 changed = true;
871                             } finally {
872                                 selectorGuard.readLock().unlock();
873                             }
874                         }
875                     }
876                     break;
877                 default:
878                     throw new Error();
879                 }
880 
881                 if (changed) {
882                     channel.setRawInterestOpsNow(interestOps);
883                 }
884             }
885 
886             future.setSuccess();
887             if (changed) {
888                 if (iothread) {
889                     fireChannelInterestChanged(channel);
890                 } else {
891                     fireChannelInterestChangedLater(channel);
892                 }
893             }
894         } catch (CancelledKeyException e) {
895             // setInterestOps() was called on a closed channel.
896             ClosedChannelException cce = new ClosedChannelException();
897             future.setFailure(cce);
898             if (iothread) {
899                 fireExceptionCaught(channel, cce);
900             } else {
901                 fireExceptionCaughtLater(channel, cce);
902             }
903         } catch (Throwable t) {
904             future.setFailure(t);
905             if (iothread) {
906                 fireExceptionCaught(channel, t);
907             } else {
908                 fireExceptionCaughtLater(channel, t);
909             }
910         }
911     }
912 
913 
914     public void releaseExternalResources() {
915         sendBufferPool.releaseExternalResources();
916     }
917 
918     /**
919      * Read is called when a Selector has been notified that the underlying channel
920      * was something to be read. The channel would previously have registered its interest
921      * in read operations.
922      *
923      * @param k The selection key which contains the Selector registration information.
924      */
925     protected abstract boolean read(SelectionKey k);
926 
927     /**
928      * Create a new {@link Runnable} which will register the {@link AbstractNioWorker} with the {@link Channel}
929      *
930      * @param channel
931      * @param future
932      * @return task
933      */
934     protected abstract Runnable createRegisterTask(AbstractNioChannel<?> channel, ChannelFuture future);
935 
936 }