1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package org.jboss.netty.channel.socket.oio;
17
18 import org.jboss.netty.channel.Channel;
19 import org.jboss.netty.channel.ChannelFuture;
20 import org.jboss.netty.channel.Channels;
21 import org.jboss.netty.channel.socket.Worker;
22
23 import java.io.IOException;
24 import java.net.SocketTimeoutException;
25 import java.util.Queue;
26 import java.util.concurrent.ConcurrentLinkedQueue;
27
28 import static org.jboss.netty.channel.Channels.*;
29
30 /**
31 * Abstract base class for Oio-Worker implementations
32 *
33 * @param <C> {@link AbstractOioChannel}
34 */
35 abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker {
36
37 private final Queue<Runnable> eventQueue = new ConcurrentLinkedQueue<Runnable>();
38
39 protected final C channel;
40
41 /**
42 * If this worker has been started thread will be a reference to the thread
43 * used when starting. i.e. the current thread when the run method is executed.
44 */
45 protected volatile Thread thread;
46
47 private volatile boolean done;
48
49 protected AbstractOioWorker(C channel) {
50 this.channel = channel;
51 channel.worker = this;
52 }
53
54 public void run() {
55 thread = channel.workerThread = Thread.currentThread();
56 while (channel.isOpen()) {
57 synchronized (channel.interestOpsLock) {
58 while (!channel.isReadable()) {
59 try {
60 // notify() is not called at all.
61 // close() and setInterestOps() calls Thread.interrupt()
62 channel.interestOpsLock.wait();
63 } catch (InterruptedException e) {
64 if (!channel.isOpen()) {
65 break;
66 }
67 }
68 }
69 }
70
71 boolean cont = false;
72 try {
73 cont = process();
74 } catch (Throwable t) {
75 boolean readTimeout = t instanceof SocketTimeoutException;
76 if (!readTimeout && !channel.isSocketClosed()) {
77 fireExceptionCaught(channel, t);
78 }
79 if (readTimeout) {
80 // the readTimeout was triggered because of the SO_TIMEOUT,
81 // so just continue with the loop here
82 cont = true;
83 }
84 } finally {
85 processEventQueue();
86 }
87
88 if (!cont) {
89 break;
90 }
91 }
92
93 synchronized (channel.interestOpsLock) {
94 // Setting the workerThread to null will prevent any channel
95 // operations from interrupting this thread from now on.
96 //
97 //
98 // Do this while holding the lock to not race with close(...) or
99 // setInterestOps(...)
100 channel.workerThread = null;
101 }
102
103 // Clean up.
104 close(channel, succeededFuture(channel), true);
105
106 // Mark the worker event loop as done so we know that we need to run tasks directly and not queue them
107 // See #287
108 done = true;
109
110 // just to make we don't have something left
111 processEventQueue();
112 }
113
114 static boolean isIoThread(AbstractOioChannel channel) {
115 return Thread.currentThread() == channel.workerThread;
116 }
117
118 public void executeInIoThread(Runnable task) {
119 // check if the current thread is the worker thread
120 //
121 // Also check if the event loop of the worker is complete. If so we need to run the task now.
122 // See #287
123 if (Thread.currentThread() == thread || done) {
124 task.run();
125 } else {
126 boolean added = eventQueue.offer(task);
127
128 if (added) {
129 // as we set the SO_TIMEOUT to 1 second this task will get picked up in 1 second at latest
130 }
131 }
132 }
133
134 private void processEventQueue() {
135 for (;;) {
136 final Runnable task = eventQueue.poll();
137 if (task == null) {
138 break;
139 }
140 task.run();
141 }
142 }
143
144 /**
145 * Process the incoming messages and also is responsible for call
146 * {@link Channels#fireMessageReceived(Channel, Object)} once a message was processed without
147 * errors.
148 *
149 * @return continue returns {@code true} as long as this worker should continue to try
150 * processing incoming messages
151 * @throws IOException
152 */
153 abstract boolean process() throws IOException;
154
155 static void setInterestOps(
156 AbstractOioChannel channel, ChannelFuture future, int interestOps) {
157 boolean iothread = isIoThread(channel);
158
159 // Override OP_WRITE flag - a user cannot change this flag.
160 interestOps &= ~Channel.OP_WRITE;
161 interestOps |= channel.getInterestOps() & Channel.OP_WRITE;
162
163 boolean changed = false;
164 try {
165 if (channel.getInterestOps() != interestOps) {
166 if ((interestOps & Channel.OP_READ) != 0) {
167 channel.setInterestOpsNow(Channel.OP_READ);
168 } else {
169 channel.setInterestOpsNow(Channel.OP_NONE);
170 }
171 changed = true;
172 }
173
174 future.setSuccess();
175 if (changed) {
176 synchronized (channel.interestOpsLock) {
177 channel.setInterestOpsNow(interestOps);
178
179 // Notify the worker so it stops or continues reading.
180 Thread currentThread = Thread.currentThread();
181 Thread workerThread = channel.workerThread;
182 if (workerThread != null && currentThread != workerThread) {
183 workerThread.interrupt();
184 }
185 }
186 if (iothread) {
187 fireChannelInterestChanged(channel);
188 } else {
189 fireChannelInterestChangedLater(channel);
190 }
191 }
192 } catch (Throwable t) {
193 future.setFailure(t);
194 if (iothread) {
195 fireExceptionCaught(channel, t);
196 } else {
197 fireExceptionCaughtLater(channel, t);
198 }
199 }
200 }
201
202 static void close(AbstractOioChannel channel, ChannelFuture future) {
203 close(channel, future, isIoThread(channel));
204 }
205
206 private static void close(AbstractOioChannel channel, ChannelFuture future, boolean iothread) {
207 boolean connected = channel.isConnected();
208 boolean bound = channel.isBound();
209
210 try {
211 channel.closeSocket();
212 if (channel.setClosed()) {
213 future.setSuccess();
214 if (connected) {
215 Thread currentThread = Thread.currentThread();
216 synchronized (channel.interestOpsLock) {
217 // We need to do this while hold the lock as otherwise
218 // we may race and so interrupt the workerThread even
219 // if we are in the workerThread now.
220 // This can happen if run() set channel.workerThread to null
221 // between workerThread != null and currentThread!= workerThread
222 Thread workerThread = channel.workerThread;
223 if (workerThread != null && currentThread != workerThread) {
224 workerThread.interrupt();
225 }
226 }
227
228 if (iothread) {
229 fireChannelDisconnected(channel);
230 } else {
231 fireChannelDisconnectedLater(channel);
232 }
233 }
234 if (bound) {
235 if (iothread) {
236 fireChannelUnbound(channel);
237 } else {
238 fireChannelUnboundLater(channel);
239 }
240 }
241 if (iothread) {
242 fireChannelClosed(channel);
243 } else {
244 fireChannelClosedLater(channel);
245 }
246 } else {
247 future.setSuccess();
248 }
249 } catch (Throwable t) {
250 future.setFailure(t);
251 if (iothread) {
252 fireExceptionCaught(channel, t);
253 } else {
254 fireExceptionCaughtLater(channel, t);
255 }
256 }
257 }
258 }