View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.oio;
17  
18  import org.jboss.netty.channel.Channel;
19  import org.jboss.netty.channel.ChannelFuture;
20  import org.jboss.netty.channel.Channels;
21  import org.jboss.netty.channel.socket.Worker;
22  
23  import java.io.IOException;
24  import java.net.SocketTimeoutException;
25  import java.util.Queue;
26  import java.util.concurrent.ConcurrentLinkedQueue;
27  
28  import static org.jboss.netty.channel.Channels.*;
29  
30  /**
31   * Abstract base class for Oio-Worker implementations
32   *
33   * @param <C> {@link AbstractOioChannel}
34   */
35  abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker {
36  
37      private final Queue<Runnable> eventQueue = new ConcurrentLinkedQueue<Runnable>();
38  
39      protected final C channel;
40  
41      /**
42       * If this worker has been started thread will be a reference to the thread
43       * used when starting. i.e. the current thread when the run method is executed.
44       */
45      protected volatile Thread thread;
46  
47      private volatile boolean done;
48  
49      protected AbstractOioWorker(C channel) {
50          this.channel = channel;
51          channel.worker = this;
52      }
53  
54      public void run() {
55          thread = channel.workerThread = Thread.currentThread();
56          while (channel.isOpen()) {
57              synchronized (channel.interestOpsLock) {
58                  while (!channel.isReadable()) {
59                      try {
60                          // notify() is not called at all.
61                          // close() and setInterestOps() calls Thread.interrupt()
62                          channel.interestOpsLock.wait();
63                      } catch (InterruptedException e) {
64                          if (!channel.isOpen()) {
65                              break;
66                          }
67                      }
68                  }
69              }
70  
71              boolean cont = false;
72              try {
73                  cont = process();
74              } catch (Throwable t) {
75                  boolean readTimeout = t instanceof SocketTimeoutException;
76                  if (!readTimeout && !channel.isSocketClosed()) {
77                      fireExceptionCaught(channel, t);
78                  }
79                  if (readTimeout) {
80                      // the readTimeout was triggered because of the SO_TIMEOUT,
81                      // so  just continue with the loop here
82                      cont = true;
83                  }
84              } finally {
85                  processEventQueue();
86              }
87  
88              if (!cont) {
89                  break;
90              }
91          }
92  
93          synchronized (channel.interestOpsLock) {
94              // Setting the workerThread to null will prevent any channel
95              // operations from interrupting this thread from now on.
96              //
97              //
98              // Do this while holding the lock to not race with close(...) or
99              // setInterestOps(...)
100             channel.workerThread = null;
101         }
102 
103         // Clean up.
104         close(channel, succeededFuture(channel), true);
105 
106         // Mark the worker event loop as done so we know that we need to run tasks directly and not queue them
107         // See #287
108         done = true;
109 
110         // just to make we don't have something left
111         processEventQueue();
112     }
113 
114     static boolean isIoThread(AbstractOioChannel channel) {
115         return Thread.currentThread() == channel.workerThread;
116     }
117 
118     public void executeInIoThread(Runnable task) {
119         // check if the current thread is the worker thread
120         //
121         // Also check if the event loop of the worker is complete. If so we need to run the task now.
122         // See #287
123         if (Thread.currentThread() == thread || done) {
124             task.run();
125         } else {
126             boolean added = eventQueue.offer(task);
127 
128             if (added) {
129                 // as we set the SO_TIMEOUT to 1 second this task will get picked up in 1 second at latest
130             }
131         }
132     }
133 
134     private void processEventQueue() {
135         for (;;) {
136             final Runnable task = eventQueue.poll();
137             if (task == null) {
138                 break;
139             }
140             task.run();
141         }
142     }
143 
144     /**
145      * Process the incoming messages and also is responsible for call
146      * {@link Channels#fireMessageReceived(Channel, Object)} once a message was processed without
147      * errors.
148      *
149      * @return continue returns {@code true} as long as this worker should continue to try
150      *         processing incoming messages
151      * @throws IOException
152      */
153     abstract boolean process() throws IOException;
154 
155     static void setInterestOps(
156             AbstractOioChannel channel, ChannelFuture future, int interestOps) {
157         boolean iothread = isIoThread(channel);
158 
159         // Override OP_WRITE flag - a user cannot change this flag.
160         interestOps &= ~Channel.OP_WRITE;
161         interestOps |= channel.getInternalInterestOps() & Channel.OP_WRITE;
162 
163         boolean changed = false;
164         try {
165             if (channel.getInternalInterestOps() != interestOps) {
166                 if ((interestOps & Channel.OP_READ) != 0) {
167                     channel.setInternalInterestOps(Channel.OP_READ);
168                 } else {
169                     channel.setInternalInterestOps(Channel.OP_NONE);
170                 }
171                 changed = true;
172             }
173 
174             future.setSuccess();
175             if (changed) {
176                 synchronized (channel.interestOpsLock) {
177                     channel.setInternalInterestOps(interestOps);
178 
179                     // Notify the worker so it stops or continues reading.
180                     Thread currentThread = Thread.currentThread();
181                     Thread workerThread = channel.workerThread;
182                     if (workerThread != null && currentThread != workerThread) {
183                         workerThread.interrupt();
184                     }
185                 }
186                 if (iothread) {
187                     fireChannelInterestChanged(channel);
188                 } else {
189                     fireChannelInterestChangedLater(channel);
190                 }
191             }
192         } catch (Throwable t) {
193             future.setFailure(t);
194             if (iothread) {
195                 fireExceptionCaught(channel, t);
196             } else {
197                 fireExceptionCaughtLater(channel, t);
198             }
199         }
200     }
201 
202     static void close(AbstractOioChannel channel, ChannelFuture future) {
203         close(channel, future, isIoThread(channel));
204     }
205 
206     private static void close(AbstractOioChannel channel, ChannelFuture future, boolean iothread) {
207         boolean connected = channel.isConnected();
208         boolean bound = channel.isBound();
209 
210         try {
211             channel.closeSocket();
212             if (channel.setClosed()) {
213                 future.setSuccess();
214                 if (connected) {
215                     Thread currentThread = Thread.currentThread();
216                     synchronized (channel.interestOpsLock) {
217                         // We need to do this while hold the lock as otherwise
218                         // we may race and so interrupt the workerThread even
219                         // if we are in the workerThread now.
220                         // This can happen if run() set channel.workerThread to null
221                         // between workerThread != null and currentThread!= workerThread
222                         Thread workerThread = channel.workerThread;
223                         if (workerThread != null && currentThread != workerThread) {
224                             workerThread.interrupt();
225                         }
226                     }
227 
228                     if (iothread) {
229                         fireChannelDisconnected(channel);
230                     } else {
231                         fireChannelDisconnectedLater(channel);
232                     }
233                 }
234                 if (bound) {
235                     if (iothread) {
236                         fireChannelUnbound(channel);
237                     } else {
238                         fireChannelUnboundLater(channel);
239                     }
240                 }
241                 if (iothread) {
242                     fireChannelClosed(channel);
243                 } else {
244                     fireChannelClosedLater(channel);
245                 }
246             } else {
247                 future.setSuccess();
248             }
249         } catch (Throwable t) {
250             future.setFailure(t);
251             if (iothread) {
252                 fireExceptionCaught(channel, t);
253             } else {
254                 fireExceptionCaughtLater(channel, t);
255             }
256         }
257     }
258 }