1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.oio;
17
18 import org.jboss.netty.channel.Channel;
19 import org.jboss.netty.channel.ChannelFuture;
20 import org.jboss.netty.channel.Channels;
21 import org.jboss.netty.channel.socket.Worker;
22
23 import java.io.IOException;
24 import java.net.SocketTimeoutException;
25 import java.util.Queue;
26 import java.util.concurrent.ConcurrentLinkedQueue;
27
28 import static org.jboss.netty.channel.Channels.*;
29
30
31
32
33
34
35 abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker {
36
37 private final Queue<Runnable> eventQueue = new ConcurrentLinkedQueue<Runnable>();
38
39 protected final C channel;
40
41
42
43
44
45 protected volatile Thread thread;
46
47 private volatile boolean done;
48
49 protected AbstractOioWorker(C channel) {
50 this.channel = channel;
51 channel.worker = this;
52 }
53
54 public void run() {
55 thread = channel.workerThread = Thread.currentThread();
56 while (channel.isOpen()) {
57 synchronized (channel.interestOpsLock) {
58 while (!channel.isReadable()) {
59 try {
60
61
62 channel.interestOpsLock.wait();
63 } catch (InterruptedException e) {
64 if (!channel.isOpen()) {
65 break;
66 }
67 }
68 }
69 }
70
71 boolean cont = false;
72 try {
73 cont = process();
74 } catch (Throwable t) {
75 boolean readTimeout = t instanceof SocketTimeoutException;
76 if (!readTimeout && !channel.isSocketClosed()) {
77 fireExceptionCaught(channel, t);
78 }
79 if (readTimeout) {
80
81
82 cont = true;
83 }
84 } finally {
85 processEventQueue();
86 }
87
88 if (!cont) {
89 break;
90 }
91 }
92
93 synchronized (channel.interestOpsLock) {
94
95
96
97
98
99
100 channel.workerThread = null;
101 }
102
103
104 close(channel, succeededFuture(channel), true);
105
106
107
108 done = true;
109
110
111 processEventQueue();
112 }
113
114 static boolean isIoThread(AbstractOioChannel channel) {
115 return Thread.currentThread() == channel.workerThread;
116 }
117
118 public void executeInIoThread(Runnable task) {
119
120
121
122
123 if (Thread.currentThread() == thread || done) {
124 task.run();
125 } else {
126 boolean added = eventQueue.offer(task);
127
128 if (added) {
129
130 }
131 }
132 }
133
134 private void processEventQueue() {
135 for (;;) {
136 final Runnable task = eventQueue.poll();
137 if (task == null) {
138 break;
139 }
140 task.run();
141 }
142 }
143
144
145
146
147
148
149
150
151
152
153 abstract boolean process() throws IOException;
154
155 static void setInterestOps(
156 AbstractOioChannel channel, ChannelFuture future, int interestOps) {
157 boolean iothread = isIoThread(channel);
158
159
160 interestOps &= ~Channel.OP_WRITE;
161 interestOps |= channel.getInternalInterestOps() & Channel.OP_WRITE;
162
163 boolean changed = false;
164 try {
165 if (channel.getInternalInterestOps() != interestOps) {
166 if ((interestOps & Channel.OP_READ) != 0) {
167 channel.setInternalInterestOps(Channel.OP_READ);
168 } else {
169 channel.setInternalInterestOps(Channel.OP_NONE);
170 }
171 changed = true;
172 }
173
174 future.setSuccess();
175 if (changed) {
176 synchronized (channel.interestOpsLock) {
177 channel.setInternalInterestOps(interestOps);
178
179
180 Thread currentThread = Thread.currentThread();
181 Thread workerThread = channel.workerThread;
182 if (workerThread != null && currentThread != workerThread) {
183 workerThread.interrupt();
184 }
185 }
186 if (iothread) {
187 fireChannelInterestChanged(channel);
188 } else {
189 fireChannelInterestChangedLater(channel);
190 }
191 }
192 } catch (Throwable t) {
193 future.setFailure(t);
194 if (iothread) {
195 fireExceptionCaught(channel, t);
196 } else {
197 fireExceptionCaughtLater(channel, t);
198 }
199 }
200 }
201
202 static void close(AbstractOioChannel channel, ChannelFuture future) {
203 close(channel, future, isIoThread(channel));
204 }
205
206 private static void close(AbstractOioChannel channel, ChannelFuture future, boolean iothread) {
207 boolean connected = channel.isConnected();
208 boolean bound = channel.isBound();
209
210 try {
211 channel.closeSocket();
212 if (channel.setClosed()) {
213 future.setSuccess();
214 if (connected) {
215 Thread currentThread = Thread.currentThread();
216 synchronized (channel.interestOpsLock) {
217
218
219
220
221
222 Thread workerThread = channel.workerThread;
223 if (workerThread != null && currentThread != workerThread) {
224 workerThread.interrupt();
225 }
226 }
227
228 if (iothread) {
229 fireChannelDisconnected(channel);
230 } else {
231 fireChannelDisconnectedLater(channel);
232 }
233 }
234 if (bound) {
235 if (iothread) {
236 fireChannelUnbound(channel);
237 } else {
238 fireChannelUnboundLater(channel);
239 }
240 }
241 if (iothread) {
242 fireChannelClosed(channel);
243 } else {
244 fireChannelClosedLater(channel);
245 }
246 } else {
247 future.setSuccess();
248 }
249 } catch (Throwable t) {
250 future.setFailure(t);
251 if (iothread) {
252 fireExceptionCaught(channel, t);
253 } else {
254 fireExceptionCaughtLater(channel, t);
255 }
256 }
257 }
258 }