View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.nio;
17  
18  import org.jboss.netty.channel.Channel;
19  import org.jboss.netty.channel.ChannelFuture;
20  import org.jboss.netty.channel.MessageEvent;
21  import org.jboss.netty.channel.socket.Worker;
22  import org.jboss.netty.channel.socket.nio.AbstractNioChannel.WriteRequestQueue;
23  import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer;
24  import org.jboss.netty.util.ThreadNameDeterminer;
25  import org.jboss.netty.util.ThreadRenamingRunnable;
26  
27  import java.io.IOException;
28  import java.nio.channels.AsynchronousCloseException;
29  import java.nio.channels.CancelledKeyException;
30  import java.nio.channels.ClosedChannelException;
31  import java.nio.channels.NotYetConnectedException;
32  import java.nio.channels.SelectionKey;
33  import java.nio.channels.Selector;
34  import java.nio.channels.WritableByteChannel;
35  import java.util.ArrayList;
36  import java.util.Iterator;
37  import java.util.List;
38  import java.util.Set;
39  import java.util.concurrent.Executor;
40  
41  import static org.jboss.netty.channel.Channels.*;
42  
43  abstract class AbstractNioWorker extends AbstractNioSelector implements Worker {
44  
45      protected final SocketSendBufferPool sendBufferPool = new SocketSendBufferPool();
46  
47      AbstractNioWorker(Executor executor) {
48          super(executor);
49      }
50  
51      AbstractNioWorker(Executor executor, ThreadNameDeterminer determiner) {
52          super(executor, determiner);
53      }
54  
55      public void executeInIoThread(Runnable task) {
56          executeInIoThread(task, false);
57      }
58  
59      /**
60       * Execute the {@link Runnable} in a IO-Thread
61       *
62       * @param task
63       *            the {@link Runnable} to execute
64       * @param alwaysAsync
65       *            {@code true} if the {@link Runnable} should be executed
66       *            in an async fashion even if the current Thread == IO Thread
67       */
68      public void executeInIoThread(Runnable task, boolean alwaysAsync) {
69          if (!alwaysAsync && isIoThread()) {
70              task.run();
71          } else {
72              registerTask(task);
73          }
74      }
75  
76      @Override
77      protected void close(SelectionKey k) {
78          AbstractNioChannel<?> ch = (AbstractNioChannel<?>) k.attachment();
79          close(ch, succeededFuture(ch));
80      }
81  
82      @Override
83      protected ThreadRenamingRunnable newThreadRenamingRunnable(int id, ThreadNameDeterminer determiner) {
84          return new ThreadRenamingRunnable(this, "New I/O worker #" + id, determiner);
85      }
86  
87      @Override
88      public void run() {
89          super.run();
90          sendBufferPool.releaseExternalResources();
91      }
92  
93      @Override
94      protected void process(Selector selector) throws IOException {
95          Set<SelectionKey> selectedKeys = selector.selectedKeys();
96          // check if the set is empty and if so just return to not create garbage by
97          // creating a new Iterator every time even if there is nothing to process.
98          // See https://github.com/netty/netty/issues/597
99          if (selectedKeys.isEmpty()) {
100             return;
101         }
102         for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
103             SelectionKey k = i.next();
104             i.remove();
105             try {
106                 int readyOps = k.readyOps();
107                 if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) {
108                     if (!read(k)) {
109                         // Connection already closed - no need to handle write.
110                         continue;
111                     }
112                 }
113                 if ((readyOps & SelectionKey.OP_WRITE) != 0) {
114                     writeFromSelectorLoop(k);
115                 }
116             } catch (CancelledKeyException e) {
117                 close(k);
118             }
119 
120             if (cleanUpCancelledKeys()) {
121                 break; // break the loop to avoid ConcurrentModificationException
122             }
123         }
124     }
125 
126     void writeFromUserCode(final AbstractNioChannel<?> channel) {
127         if (!channel.isConnected()) {
128             cleanUpWriteBuffer(channel);
129             return;
130         }
131 
132         if (scheduleWriteIfNecessary(channel)) {
133             return;
134         }
135 
136         // From here, we are sure Thread.currentThread() == workerThread.
137 
138         if (channel.writeSuspended) {
139             return;
140         }
141 
142         if (channel.inWriteNowLoop) {
143             return;
144         }
145 
146         write0(channel);
147     }
148 
149     void writeFromTaskLoop(AbstractNioChannel<?> ch) {
150         if (!ch.writeSuspended) {
151             write0(ch);
152         }
153     }
154 
155     void writeFromSelectorLoop(final SelectionKey k) {
156         AbstractNioChannel<?> ch = (AbstractNioChannel<?>) k.attachment();
157         ch.writeSuspended = false;
158         write0(ch);
159     }
160 
161     protected abstract boolean scheduleWriteIfNecessary(AbstractNioChannel<?> channel);
162 
163     protected void write0(AbstractNioChannel<?> channel) {
164         boolean open = true;
165         boolean addOpWrite = false;
166         boolean removeOpWrite = false;
167         boolean iothread = isIoThread(channel);
168 
169         long writtenBytes = 0;
170 
171         final SocketSendBufferPool sendBufferPool = this.sendBufferPool;
172         final WritableByteChannel ch = channel.channel;
173         final WriteRequestQueue writeBuffer = channel.writeBufferQueue;
174         final int writeSpinCount = channel.getConfig().getWriteSpinCount();
175         List<Throwable> causes = null;
176 
177         synchronized (channel.writeLock) {
178             channel.inWriteNowLoop = true;
179             for (;;) {
180 
181                 MessageEvent evt = channel.currentWriteEvent;
182                 SendBuffer buf = null;
183                 ChannelFuture future = null;
184                 try {
185                     if (evt == null) {
186                         if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
187                             removeOpWrite = true;
188                             channel.writeSuspended = false;
189                             break;
190                         }
191                         future = evt.getFuture();
192 
193                         channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage());
194                     } else {
195                         future = evt.getFuture();
196                         buf = channel.currentWriteBuffer;
197                     }
198 
199                     long localWrittenBytes = 0;
200                     for (int i = writeSpinCount; i > 0; i --) {
201                         localWrittenBytes = buf.transferTo(ch);
202                         if (localWrittenBytes != 0) {
203                             writtenBytes += localWrittenBytes;
204                             break;
205                         }
206                         if (buf.finished()) {
207                             break;
208                         }
209                     }
210 
211                     if (buf.finished()) {
212                         // Successful write - proceed to the next message.
213                         buf.release();
214                         channel.currentWriteEvent = null;
215                         channel.currentWriteBuffer = null;
216                         // Mark the event object for garbage collection.
217                         //noinspection UnusedAssignment
218                         evt = null;
219                         buf = null;
220                         future.setSuccess();
221                     } else {
222                         // Not written fully - perhaps the kernel buffer is full.
223                         addOpWrite = true;
224                         channel.writeSuspended = true;
225 
226                         if (writtenBytes > 0) {
227                             // Notify progress listeners if necessary.
228                             future.setProgress(
229                                     localWrittenBytes,
230                                     buf.writtenBytes(), buf.totalBytes());
231                         }
232                         break;
233                     }
234                 } catch (AsynchronousCloseException e) {
235                     // Doesn't need a user attention - ignore.
236                 } catch (Throwable t) {
237                     if (buf != null) {
238                         buf.release();
239                     }
240                     channel.currentWriteEvent = null;
241                     channel.currentWriteBuffer = null;
242                     // Mark the event object for garbage collection.
243                     //noinspection UnusedAssignment
244                     buf = null;
245                     //noinspection UnusedAssignment
246                     evt = null;
247                     if (future != null) {
248                         future.setFailure(t);
249                     }
250                     if (iothread) {
251                         // An exception was thrown from within a write in the iothread. We store a reference to it
252                         // in a list for now and notify the handlers in the chain after the writeLock was released
253                         // to prevent possible deadlock.
254                         // See #1310
255                         if (causes == null) {
256                             causes = new ArrayList<Throwable>(1);
257                         }
258                         causes.add(t);
259                     } else {
260                         fireExceptionCaughtLater(channel, t);
261                     }
262                     if (t instanceof IOException) {
263                         // close must be handled from outside the write lock to fix a possible deadlock
264                         // which can happen when MemoryAwareThreadPoolExecutor is used and the limit is exceed
265                         // and a close is triggered while the lock is hold. This is because the close(..)
266                         // may try to submit a task to handle it via the ExecutorHandler which then deadlocks.
267                         // See #1310
268                         open = false;
269                     }
270                 }
271             }
272             channel.inWriteNowLoop = false;
273 
274             // Initially, the following block was executed after releasing
275             // the writeLock, but there was a race condition, and it has to be
276             // executed before releasing the writeLock:
277             //
278             //     https://issues.jboss.org/browse/NETTY-410
279             //
280             if (open) {
281                 if (addOpWrite) {
282                     setOpWrite(channel);
283                 } else if (removeOpWrite) {
284                     clearOpWrite(channel);
285                 }
286             }
287         }
288         if (causes != null) {
289             for (Throwable cause: causes) {
290                 // notify about cause now as it was triggered in the write loop
291                 fireExceptionCaught(channel, cause);
292             }
293         }
294         if (!open) {
295             // close the channel now
296             close(channel, succeededFuture(channel));
297         }
298         if (iothread) {
299             fireWriteComplete(channel, writtenBytes);
300         } else {
301             fireWriteCompleteLater(channel, writtenBytes);
302         }
303     }
304 
305     static boolean isIoThread(AbstractNioChannel<?> channel) {
306         return Thread.currentThread() == channel.worker.thread;
307     }
308 
309     protected void setOpWrite(AbstractNioChannel<?> channel) {
310         Selector selector = this.selector;
311         SelectionKey key = channel.channel.keyFor(selector);
312         if (key == null) {
313             return;
314         }
315         if (!key.isValid()) {
316             close(key);
317             return;
318         }
319 
320         int interestOps = channel.getInternalInterestOps();
321         if ((interestOps & SelectionKey.OP_WRITE) == 0) {
322             interestOps |= SelectionKey.OP_WRITE;
323             key.interestOps(interestOps);
324             channel.setInternalInterestOps(interestOps);
325         }
326     }
327 
328     protected void clearOpWrite(AbstractNioChannel<?> channel) {
329         Selector selector = this.selector;
330         SelectionKey key = channel.channel.keyFor(selector);
331         if (key == null) {
332             return;
333         }
334         if (!key.isValid()) {
335             close(key);
336             return;
337         }
338 
339         int interestOps = channel.getInternalInterestOps();
340         if ((interestOps & SelectionKey.OP_WRITE) != 0) {
341             interestOps &= ~SelectionKey.OP_WRITE;
342             key.interestOps(interestOps);
343             channel.setInternalInterestOps(interestOps);
344         }
345     }
346 
347     protected void close(AbstractNioChannel<?> channel, ChannelFuture future) {
348         boolean connected = channel.isConnected();
349         boolean bound = channel.isBound();
350         boolean iothread = isIoThread(channel);
351 
352         try {
353             channel.channel.close();
354             increaseCancelledKeys();
355 
356             if (channel.setClosed()) {
357                 future.setSuccess();
358                 if (connected) {
359                     if (iothread) {
360                         fireChannelDisconnected(channel);
361                     } else {
362                         fireChannelDisconnectedLater(channel);
363                     }
364                 }
365                 if (bound) {
366                     if (iothread) {
367                         fireChannelUnbound(channel);
368                     } else {
369                         fireChannelUnboundLater(channel);
370                     }
371                 }
372 
373                 cleanUpWriteBuffer(channel);
374                 if (iothread) {
375                     fireChannelClosed(channel);
376                 } else {
377                     fireChannelClosedLater(channel);
378                 }
379             } else {
380                 future.setSuccess();
381             }
382         } catch (Throwable t) {
383             future.setFailure(t);
384             if (iothread) {
385                 fireExceptionCaught(channel, t);
386             } else {
387                 fireExceptionCaughtLater(channel, t);
388             }
389         }
390     }
391 
392     protected static void cleanUpWriteBuffer(AbstractNioChannel<?> channel) {
393         Exception cause = null;
394         boolean fireExceptionCaught = false;
395 
396         // Clean up the stale messages in the write buffer.
397         synchronized (channel.writeLock) {
398             MessageEvent evt = channel.currentWriteEvent;
399             if (evt != null) {
400                 // Create the exception only once to avoid the excessive overhead
401                 // caused by fillStackTrace.
402                 if (channel.isOpen()) {
403                     cause = new NotYetConnectedException();
404                 } else {
405                     cause = new ClosedChannelException();
406                 }
407 
408                 ChannelFuture future = evt.getFuture();
409                 if (channel.currentWriteBuffer != null) {
410                     channel.currentWriteBuffer.release();
411                     channel.currentWriteBuffer = null;
412                 }
413                 channel.currentWriteEvent = null;
414                 // Mark the event object for garbage collection.
415                 //noinspection UnusedAssignment
416                 evt = null;
417                 future.setFailure(cause);
418                 fireExceptionCaught = true;
419             }
420 
421             WriteRequestQueue writeBuffer = channel.writeBufferQueue;
422             for (;;) {
423                 evt = writeBuffer.poll();
424                 if (evt == null) {
425                     break;
426                 }
427                 // Create the exception only once to avoid the excessive overhead
428                 // caused by fillStackTrace.
429                 if (cause == null) {
430                     if (channel.isOpen()) {
431                         cause = new NotYetConnectedException();
432                     } else {
433                         cause = new ClosedChannelException();
434                     }
435                     fireExceptionCaught = true;
436                 }
437                 evt.getFuture().setFailure(cause);
438             }
439         }
440 
441         if (fireExceptionCaught) {
442             if (isIoThread(channel)) {
443                 fireExceptionCaught(channel, cause);
444             } else {
445                 fireExceptionCaughtLater(channel, cause);
446             }
447         }
448     }
449 
450     void setInterestOps(final AbstractNioChannel<?> channel, final ChannelFuture future, final int interestOps) {
451         boolean iothread = isIoThread(channel);
452         if (!iothread) {
453             channel.getPipeline().execute(new Runnable() {
454                 public void run() {
455                     setInterestOps(channel, future, interestOps);
456                 }
457             });
458             return;
459         }
460 
461         boolean changed = false;
462         try {
463             Selector selector = this.selector;
464             SelectionKey key = channel.channel.keyFor(selector);
465 
466             // Override OP_WRITE flag - a user cannot change this flag.
467             int newInterestOps = interestOps & ~Channel.OP_WRITE | channel.getInternalInterestOps() & Channel.OP_WRITE;
468 
469             if (key == null || selector == null) {
470                 if (channel.getInternalInterestOps() != newInterestOps) {
471                     changed = true;
472                 }
473 
474                 // Not registered to the worker yet.
475                 // Set the rawInterestOps immediately; RegisterTask will pick it up.
476                 channel.setInternalInterestOps(newInterestOps);
477 
478                 future.setSuccess();
479                 if (changed) {
480                     if (iothread) {
481                         fireChannelInterestChanged(channel);
482                     } else {
483                         fireChannelInterestChangedLater(channel);
484                     }
485                 }
486 
487                 return;
488             }
489 
490             if (channel.getInternalInterestOps() != newInterestOps) {
491                 changed = true;
492                 key.interestOps(newInterestOps);
493                 if (Thread.currentThread() != thread &&
494                     wakenUp.compareAndSet(false, true)) {
495                     selector.wakeup();
496                 }
497                 channel.setInternalInterestOps(newInterestOps);
498             }
499 
500             future.setSuccess();
501             if (changed) {
502                 fireChannelInterestChanged(channel);
503             }
504         } catch (CancelledKeyException e) {
505             // setInterestOps() was called on a closed channel.
506             ClosedChannelException cce = new ClosedChannelException();
507             future.setFailure(cce);
508             fireExceptionCaught(channel, cce);
509         } catch (Throwable t) {
510             future.setFailure(t);
511             fireExceptionCaught(channel, t);
512         }
513     }
514 
515     /**
516      * Read is called when a Selector has been notified that the underlying channel
517      * was something to be read. The channel would previously have registered its interest
518      * in read operations.
519      *
520      * @param k The selection key which contains the Selector registration information.
521      */
522     protected abstract boolean read(SelectionKey k);
523 
524 }