View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.oio;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  
20  import java.io.IOException;
21  import java.net.SocketTimeoutException;
22  import java.util.Queue;
23  import java.util.concurrent.ConcurrentLinkedQueue;
24  
25  import org.jboss.netty.channel.Channel;
26  import org.jboss.netty.channel.ChannelFuture;
27  import org.jboss.netty.channel.Channels;
28  import org.jboss.netty.channel.socket.Worker;
29  
30  /**
31   * Abstract base class for Oio-Worker implementations
32   *
33   * @param <C> {@link AbstractOioChannel}
34   */
35  abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker {
36  
37      private final Queue<Runnable> eventQueue = new ConcurrentLinkedQueue<Runnable>();
38  
39      protected final C channel;
40  
41      /**
42       * If this worker has been started thread will be a reference to the thread
43       * used when starting. i.e. the current thread when the run method is executed.
44       */
45      protected volatile Thread thread;
46  
47      private volatile boolean done;
48  
49      protected AbstractOioWorker(C channel) {
50          this.channel = channel;
51          channel.worker = this;
52      }
53  
54  
55      public void run() {
56          thread = channel.workerThread = Thread.currentThread();
57  
58          while (channel.isOpen()) {
59              synchronized (channel.interestOpsLock) {
60                  while (!channel.isReadable()) {
61                      try {
62                          // notify() is not called at all.
63                          // close() and setInterestOps() calls Thread.interrupt()
64                          channel.interestOpsLock.wait();
65                      } catch (InterruptedException e) {
66                          if (!channel.isOpen()) {
67                              break;
68                          }
69                      }
70                  }
71              }
72  
73              boolean cont = false;
74              try {
75                  cont = process();
76              } catch (Throwable t) {
77                  boolean readTimeout = t instanceof SocketTimeoutException;
78                  if (!readTimeout && !channel.isSocketClosed()) {
79                      fireExceptionCaught(channel, t);
80                  }
81                  if (readTimeout) {
82                      // the readTimeout was triggered because of the SO_TIMEOUT,
83                      // so  just continue with the loop here
84                      cont = true;
85                  }
86              } finally {
87                  processEventQueue();
88  
89                  if (!cont) {
90                      break;
91                  }
92              }
93          }
94  
95          // Setting the workerThread to null will prevent any channel
96          // operations from interrupting this thread from now on.
97          channel.workerThread = null;
98  
99          // Clean up.
100         close(channel, succeededFuture(channel), true);
101 
102         // Mark the worker event loop as done so we know that we need to run tasks directly and not queue them
103         // See #287
104         done = true;
105 
106         // just to make we don't have something left
107         processEventQueue();
108 
109     }
110 
111     static boolean isIoThread(AbstractOioChannel channel) {
112         return Thread.currentThread() == channel.workerThread;
113     }
114 
115 
116     public void executeInIoThread(Runnable task) {
117         // check if the current thread is the worker thread
118         //
119         // Also check if the event loop of the worker is complete. If so we need to run the task now.
120         // See #287
121         if (Thread.currentThread() == thread || done) {
122             task.run();
123         } else {
124             boolean added = eventQueue.offer(task);
125 
126             if (added) {
127                 // as we set the SO_TIMEOUT to 1 second this task will get picked up in 1 second at latest
128             }
129         }
130     }
131 
132     private void processEventQueue() {
133         for (;;) {
134             final Runnable task = eventQueue.poll();
135             if (task == null) {
136                 break;
137             }
138             task.run();
139         }
140     }
141 
142 
143     /**
144      * Process the incoming messages and also is responsible for call
145      * {@link Channels#fireMessageReceived(Channel, Object)} once a message was processed without
146      * errors.
147      *
148      * @return continue returns <code>true</code> as long as this worker should continue to try
149      *         processing incoming messages
150      * @throws IOException
151      */
152     abstract boolean process() throws IOException;
153 
154     static void setInterestOps(
155             AbstractOioChannel channel, ChannelFuture future, int interestOps) {
156         boolean iothread = isIoThread(channel);
157 
158         // Override OP_WRITE flag - a user cannot change this flag.
159         interestOps &= ~Channel.OP_WRITE;
160         interestOps |= channel.getInterestOps() & Channel.OP_WRITE;
161 
162         boolean changed = false;
163         try {
164             if (channel.getInterestOps() != interestOps) {
165                 if ((interestOps & Channel.OP_READ) != 0) {
166                     channel.setInterestOpsNow(Channel.OP_READ);
167                 } else {
168                     channel.setInterestOpsNow(Channel.OP_NONE);
169                 }
170                 changed = true;
171             }
172 
173             future.setSuccess();
174             if (changed) {
175                 synchronized (channel.interestOpsLock) {
176                     channel.setInterestOpsNow(interestOps);
177 
178                     // Notify the worker so it stops or continues reading.
179                     Thread currentThread = Thread.currentThread();
180                     Thread workerThread = channel.workerThread;
181                     if (workerThread != null && currentThread != workerThread) {
182                         workerThread.interrupt();
183                     }
184                 }
185                 if (iothread) {
186                     fireChannelInterestChanged(channel);
187                 } else {
188                     fireChannelInterestChangedLater(channel);
189                 }
190             }
191         } catch (Throwable t) {
192             future.setFailure(t);
193             if (iothread) {
194                 fireExceptionCaught(channel, t);
195             } else {
196                 fireExceptionCaughtLater(channel, t);
197             }
198         }
199     }
200 
201     static void close(AbstractOioChannel channel, ChannelFuture future) {
202         close(channel, future, isIoThread(channel));
203     }
204 
205     private static void close(AbstractOioChannel channel, ChannelFuture future, boolean iothread) {
206         boolean connected = channel.isConnected();
207         boolean bound = channel.isBound();
208 
209         try {
210             channel.closeSocket();
211             if (channel.setClosed()) {
212                 future.setSuccess();
213                 if (connected) {
214                     // Notify the worker so it stops reading.
215                     Thread currentThread = Thread.currentThread();
216                     Thread workerThread = channel.workerThread;
217                     if (workerThread != null && currentThread != workerThread) {
218                         workerThread.interrupt();
219                     }
220                     if (iothread) {
221                         fireChannelDisconnected(channel);
222                     } else {
223                         fireChannelDisconnectedLater(channel);
224                     }
225                 }
226                 if (bound) {
227                     if (iothread) {
228                         fireChannelUnbound(channel);
229                     } else {
230                         fireChannelUnboundLater(channel);
231                     }
232                 }
233                 if (iothread) {
234                     fireChannelClosed(channel);
235                 } else {
236                     fireChannelClosedLater(channel);
237                 }
238             } else {
239                 future.setSuccess();
240             }
241         } catch (Throwable t) {
242             future.setFailure(t);
243             if (iothread) {
244                 fireExceptionCaught(channel, t);
245             } else {
246                 fireExceptionCaughtLater(channel, t);
247             }
248         }
249     }
250 }