View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.nio;
17  
18  import org.jboss.netty.channel.Channel;
19  import org.jboss.netty.channel.ChannelFuture;
20  import org.jboss.netty.channel.MessageEvent;
21  import org.jboss.netty.channel.socket.Worker;
22  import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer;
23  import org.jboss.netty.util.ThreadNameDeterminer;
24  import org.jboss.netty.util.ThreadRenamingRunnable;
25  
26  import java.io.IOException;
27  import java.nio.channels.AsynchronousCloseException;
28  import java.nio.channels.CancelledKeyException;
29  import java.nio.channels.ClosedChannelException;
30  import java.nio.channels.NotYetConnectedException;
31  import java.nio.channels.SelectionKey;
32  import java.nio.channels.Selector;
33  import java.nio.channels.WritableByteChannel;
34  import java.util.ArrayList;
35  import java.util.Iterator;
36  import java.util.List;
37  import java.util.Queue;
38  import java.util.Set;
39  import java.util.concurrent.Executor;
40  
41  
42  import static org.jboss.netty.channel.Channels.*;
43  
44  abstract class AbstractNioWorker extends AbstractNioSelector implements Worker {
45  
46      protected final SocketSendBufferPool sendBufferPool = new SocketSendBufferPool();
47  
48      AbstractNioWorker(Executor executor) {
49          super(executor);
50      }
51  
52      AbstractNioWorker(Executor executor, ThreadNameDeterminer determiner) {
53          super(executor, determiner);
54      }
55  
56      public void executeInIoThread(Runnable task) {
57          executeInIoThread(task, false);
58      }
59  
60      /**
61       * Execute the {@link Runnable} in a IO-Thread
62       *
63       * @param task
64       *            the {@link Runnable} to execute
65       * @param alwaysAsync
66       *            {@code true} if the {@link Runnable} should be executed
67       *            in an async fashion even if the current Thread == IO Thread
68       */
69      public void executeInIoThread(Runnable task, boolean alwaysAsync) {
70          if (!alwaysAsync && isIoThread()) {
71              task.run();
72          } else {
73              registerTask(task);
74          }
75      }
76  
77      @Override
78      protected void close(SelectionKey k) {
79          AbstractNioChannel<?> ch = (AbstractNioChannel<?>) k.attachment();
80          close(ch, succeededFuture(ch));
81      }
82  
83      @Override
84      protected ThreadRenamingRunnable newThreadRenamingRunnable(int id, ThreadNameDeterminer determiner) {
85          return new ThreadRenamingRunnable(this, "New I/O worker #" + id, determiner);
86      }
87  
88      @Override
89      public void run() {
90          super.run();
91          sendBufferPool.releaseExternalResources();
92      }
93  
94      @Override
95      protected void process(Selector selector) throws IOException {
96          Set<SelectionKey> selectedKeys = selector.selectedKeys();
97          // check if the set is empty and if so just return to not create garbage by
98          // creating a new Iterator every time even if there is nothing to process.
99          // See https://github.com/netty/netty/issues/597
100         if (selectedKeys.isEmpty()) {
101             return;
102         }
103         for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
104             SelectionKey k = i.next();
105             i.remove();
106             try {
107                 int readyOps = k.readyOps();
108                 if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) {
109                     if (!read(k)) {
110                         // Connection already closed - no need to handle write.
111                         continue;
112                     }
113                 }
114                 if ((readyOps & SelectionKey.OP_WRITE) != 0) {
115                     writeFromSelectorLoop(k);
116                 }
117             } catch (CancelledKeyException e) {
118                 close(k);
119             }
120 
121             if (cleanUpCancelledKeys()) {
122                 break; // break the loop to avoid ConcurrentModificationException
123             }
124         }
125     }
126 
127     void writeFromUserCode(final AbstractNioChannel<?> channel) {
128         if (!channel.isConnected()) {
129             cleanUpWriteBuffer(channel);
130             return;
131         }
132 
133         if (scheduleWriteIfNecessary(channel)) {
134             return;
135         }
136 
137         // From here, we are sure Thread.currentThread() == workerThread.
138 
139         if (channel.writeSuspended) {
140             return;
141         }
142 
143         if (channel.inWriteNowLoop) {
144             return;
145         }
146 
147         write0(channel);
148     }
149 
150     void writeFromTaskLoop(AbstractNioChannel<?> ch) {
151         if (!ch.writeSuspended) {
152             write0(ch);
153         }
154     }
155 
156     void writeFromSelectorLoop(final SelectionKey k) {
157         AbstractNioChannel<?> ch = (AbstractNioChannel<?>) k.attachment();
158         ch.writeSuspended = false;
159         write0(ch);
160     }
161 
162     protected abstract boolean scheduleWriteIfNecessary(AbstractNioChannel<?> channel);
163 
164     protected void write0(AbstractNioChannel<?> channel) {
165         boolean open = true;
166         boolean addOpWrite = false;
167         boolean removeOpWrite = false;
168         boolean iothread = isIoThread(channel);
169 
170         long writtenBytes = 0;
171 
172         final SocketSendBufferPool sendBufferPool = this.sendBufferPool;
173         final WritableByteChannel ch = channel.channel;
174         final Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
175         final int writeSpinCount = channel.getConfig().getWriteSpinCount();
176         List<Throwable> causes = null;
177 
178         synchronized (channel.writeLock) {
179             channel.inWriteNowLoop = true;
180             for (;;) {
181 
182                 MessageEvent evt = channel.currentWriteEvent;
183                 SendBuffer buf = null;
184                 ChannelFuture future = null;
185                 try {
186                     if (evt == null) {
187                         if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
188                             removeOpWrite = true;
189                             channel.writeSuspended = false;
190                             break;
191                         }
192                         future = evt.getFuture();
193 
194                         channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage());
195                     } else {
196                         future = evt.getFuture();
197                         buf = channel.currentWriteBuffer;
198                     }
199 
200                     long localWrittenBytes = 0;
201                     for (int i = writeSpinCount; i > 0; i --) {
202                         localWrittenBytes = buf.transferTo(ch);
203                         if (localWrittenBytes != 0) {
204                             writtenBytes += localWrittenBytes;
205                             break;
206                         }
207                         if (buf.finished()) {
208                             break;
209                         }
210                     }
211 
212                     if (buf.finished()) {
213                         // Successful write - proceed to the next message.
214                         buf.release();
215                         channel.currentWriteEvent = null;
216                         channel.currentWriteBuffer = null;
217                         // Mark the event object for garbage collection.
218                         //noinspection UnusedAssignment
219                         evt = null;
220                         buf = null;
221                         future.setSuccess();
222                     } else {
223                         // Not written fully - perhaps the kernel buffer is full.
224                         addOpWrite = true;
225                         channel.writeSuspended = true;
226 
227                         if (localWrittenBytes > 0) {
228                             // Notify progress listeners if necessary.
229                             future.setProgress(
230                                     localWrittenBytes,
231                                     buf.writtenBytes(), buf.totalBytes());
232                         }
233                         break;
234                     }
235                 } catch (AsynchronousCloseException e) {
236                     // Doesn't need a user attention - ignore.
237                 } catch (Throwable t) {
238                     if (buf != null) {
239                         buf.release();
240                     }
241                     channel.currentWriteEvent = null;
242                     channel.currentWriteBuffer = null;
243                     // Mark the event object for garbage collection.
244                     //noinspection UnusedAssignment
245                     buf = null;
246                     //noinspection UnusedAssignment
247                     evt = null;
248                     if (future != null) {
249                         future.setFailure(t);
250                     }
251                     if (iothread) {
252                         // An exception was thrown from within a write in the iothread. We store a reference to it
253                         // in a list for now and notify the handlers in the chain after the writeLock was released
254                         // to prevent possible deadlock.
255                         // See #1310
256                         if (causes == null) {
257                             causes = new ArrayList<Throwable>(1);
258                         }
259                         causes.add(t);
260                     } else {
261                         fireExceptionCaughtLater(channel, t);
262                     }
263                     if (t instanceof IOException) {
264                         // close must be handled from outside the write lock to fix a possible deadlock
265                         // which can happen when MemoryAwareThreadPoolExecutor is used and the limit is exceed
266                         // and a close is triggered while the lock is hold. This is because the close(..)
267                         // may try to submit a task to handle it via the ExecutorHandler which then deadlocks.
268                         // See #1310
269                         open = false;
270                     }
271                 }
272             }
273             channel.inWriteNowLoop = false;
274 
275             // Initially, the following block was executed after releasing
276             // the writeLock, but there was a race condition, and it has to be
277             // executed before releasing the writeLock:
278             //
279             //     https://issues.jboss.org/browse/NETTY-410
280             //
281             if (open) {
282                 if (addOpWrite) {
283                     setOpWrite(channel);
284                 } else if (removeOpWrite) {
285                     clearOpWrite(channel);
286                 }
287             }
288         }
289         if (causes != null) {
290             for (Throwable cause: causes) {
291                 // notify about cause now as it was triggered in the write loop
292                 fireExceptionCaught(channel, cause);
293             }
294         }
295         if (!open) {
296             // close the channel now
297             close(channel, succeededFuture(channel));
298         }
299         if (iothread) {
300             fireWriteComplete(channel, writtenBytes);
301         } else {
302             fireWriteCompleteLater(channel, writtenBytes);
303         }
304     }
305 
306     static boolean isIoThread(AbstractNioChannel<?> channel) {
307         return Thread.currentThread() == channel.worker.thread;
308     }
309 
310     protected void setOpWrite(AbstractNioChannel<?> channel) {
311         Selector selector = this.selector;
312         SelectionKey key = channel.channel.keyFor(selector);
313         if (key == null) {
314             return;
315         }
316         if (!key.isValid()) {
317             close(key);
318             return;
319         }
320 
321         int interestOps = channel.getRawInterestOps();
322         if ((interestOps & SelectionKey.OP_WRITE) == 0) {
323             interestOps |= SelectionKey.OP_WRITE;
324             key.interestOps(interestOps);
325             channel.setRawInterestOpsNow(interestOps);
326         }
327     }
328 
329     protected void clearOpWrite(AbstractNioChannel<?> channel) {
330         Selector selector = this.selector;
331         SelectionKey key = channel.channel.keyFor(selector);
332         if (key == null) {
333             return;
334         }
335         if (!key.isValid()) {
336             close(key);
337             return;
338         }
339 
340         int interestOps = channel.getRawInterestOps();
341         if ((interestOps & SelectionKey.OP_WRITE) != 0) {
342             interestOps &= ~SelectionKey.OP_WRITE;
343             key.interestOps(interestOps);
344             channel.setRawInterestOpsNow(interestOps);
345         }
346     }
347 
348     protected void close(AbstractNioChannel<?> channel, ChannelFuture future) {
349         boolean connected = channel.isConnected();
350         boolean bound = channel.isBound();
351         boolean iothread = isIoThread(channel);
352 
353         try {
354             channel.channel.close();
355             increaseCancelledKeys();
356 
357             if (channel.setClosed()) {
358                 future.setSuccess();
359                 if (connected) {
360                     if (iothread) {
361                         fireChannelDisconnected(channel);
362                     } else {
363                         fireChannelDisconnectedLater(channel);
364                     }
365                 }
366                 if (bound) {
367                     if (iothread) {
368                         fireChannelUnbound(channel);
369                     } else {
370                         fireChannelUnboundLater(channel);
371                     }
372                 }
373 
374                 cleanUpWriteBuffer(channel);
375                 if (iothread) {
376                     fireChannelClosed(channel);
377                 } else {
378                     fireChannelClosedLater(channel);
379                 }
380             } else {
381                 future.setSuccess();
382             }
383         } catch (Throwable t) {
384             future.setFailure(t);
385             if (iothread) {
386                 fireExceptionCaught(channel, t);
387             } else {
388                 fireExceptionCaughtLater(channel, t);
389             }
390         }
391     }
392 
393     protected static void cleanUpWriteBuffer(AbstractNioChannel<?> channel) {
394         Exception cause = null;
395         boolean fireExceptionCaught = false;
396 
397         // Clean up the stale messages in the write buffer.
398         synchronized (channel.writeLock) {
399             MessageEvent evt = channel.currentWriteEvent;
400             if (evt != null) {
401                 // Create the exception only once to avoid the excessive overhead
402                 // caused by fillStackTrace.
403                 if (channel.isOpen()) {
404                     cause = new NotYetConnectedException();
405                 } else {
406                     cause = new ClosedChannelException();
407                 }
408 
409                 ChannelFuture future = evt.getFuture();
410                 if (channel.currentWriteBuffer != null) {
411                     channel.currentWriteBuffer.release();
412                     channel.currentWriteBuffer = null;
413                 }
414                 channel.currentWriteEvent = null;
415                 // Mark the event object for garbage collection.
416                 //noinspection UnusedAssignment
417                 evt = null;
418                 future.setFailure(cause);
419                 fireExceptionCaught = true;
420             }
421 
422             Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
423             for (;;) {
424                 evt = writeBuffer.poll();
425                 if (evt == null) {
426                     break;
427                 }
428                 // Create the exception only once to avoid the excessive overhead
429                 // caused by fillStackTrace.
430                 if (cause == null) {
431                     if (channel.isOpen()) {
432                         cause = new NotYetConnectedException();
433                     } else {
434                         cause = new ClosedChannelException();
435                     }
436                     fireExceptionCaught = true;
437                 }
438                 evt.getFuture().setFailure(cause);
439             }
440         }
441 
442         if (fireExceptionCaught) {
443             if (isIoThread(channel)) {
444                 fireExceptionCaught(channel, cause);
445             } else {
446                 fireExceptionCaughtLater(channel, cause);
447             }
448         }
449     }
450 
451     void setInterestOps(final AbstractNioChannel<?> channel, final ChannelFuture future, final int interestOps) {
452         boolean iothread = isIoThread(channel);
453         if (!iothread) {
454             channel.getPipeline().execute(new Runnable() {
455                 public void run() {
456                     setInterestOps(channel, future, interestOps);
457                 }
458             });
459             return;
460         }
461 
462         boolean changed = false;
463         try {
464             Selector selector = this.selector;
465             SelectionKey key = channel.channel.keyFor(selector);
466 
467             // Override OP_WRITE flag - a user cannot change this flag.
468             int newInterestOps = interestOps & ~Channel.OP_WRITE | channel.getRawInterestOps() & Channel.OP_WRITE;
469 
470             if (key == null || selector == null) {
471                 if (channel.getRawInterestOps() != newInterestOps) {
472                     changed = true;
473                 }
474 
475                 // Not registered to the worker yet.
476                 // Set the rawInterestOps immediately; RegisterTask will pick it up.
477                 channel.setRawInterestOpsNow(newInterestOps);
478 
479                 future.setSuccess();
480                 if (changed) {
481                     if (iothread) {
482                         fireChannelInterestChanged(channel);
483                     } else {
484                         fireChannelInterestChangedLater(channel);
485                     }
486                 }
487 
488                 return;
489             }
490 
491             if (channel.getRawInterestOps() != newInterestOps) {
492                 key.interestOps(newInterestOps);
493                 if (Thread.currentThread() != thread &&
494                     wakenUp.compareAndSet(false, true)) {
495                     selector.wakeup();
496                 }
497                 channel.setRawInterestOpsNow(newInterestOps);
498             }
499 
500             future.setSuccess();
501             if (changed) {
502                 fireChannelInterestChanged(channel);
503             }
504         } catch (CancelledKeyException e) {
505             // setInterestOps() was called on a closed channel.
506             ClosedChannelException cce = new ClosedChannelException();
507             future.setFailure(cce);
508             fireExceptionCaught(channel, cce);
509         } catch (Throwable t) {
510             future.setFailure(t);
511             fireExceptionCaught(channel, t);
512         }
513     }
514 
515     /**
516      * Read is called when a Selector has been notified that the underlying channel
517      * was something to be read. The channel would previously have registered its interest
518      * in read operations.
519      *
520      * @param k The selection key which contains the Selector registration information.
521      */
522     protected abstract boolean read(SelectionKey k);
523 
524 }