1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.oio;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.io.IOException;
21 import java.net.SocketTimeoutException;
22 import java.util.Queue;
23 import java.util.concurrent.ConcurrentLinkedQueue;
24
25 import org.jboss.netty.channel.Channel;
26 import org.jboss.netty.channel.ChannelFuture;
27 import org.jboss.netty.channel.Channels;
28 import org.jboss.netty.channel.socket.Worker;
29
30
31
32
33
34
35 abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker {
36
37 private final Queue<Runnable> eventQueue = new ConcurrentLinkedQueue<Runnable>();
38
39 protected final C channel;
40
41
42
43
44
45 protected volatile Thread thread;
46
47 private volatile boolean done;
48
49 protected AbstractOioWorker(C channel) {
50 this.channel = channel;
51 channel.worker = this;
52 }
53
54
55 public void run() {
56 thread = channel.workerThread = Thread.currentThread();
57
58 while (channel.isOpen()) {
59 synchronized (channel.interestOpsLock) {
60 while (!channel.isReadable()) {
61 try {
62
63
64 channel.interestOpsLock.wait();
65 } catch (InterruptedException e) {
66 if (!channel.isOpen()) {
67 break;
68 }
69 }
70 }
71 }
72
73 boolean cont = false;
74 try {
75 cont = process();
76 } catch (Throwable t) {
77 boolean readTimeout = t instanceof SocketTimeoutException;
78 if (!readTimeout && !channel.isSocketClosed()) {
79 fireExceptionCaught(channel, t);
80 }
81 if (readTimeout) {
82
83
84 cont = true;
85 }
86 } finally {
87 processEventQueue();
88
89 if (!cont) {
90 break;
91 }
92 }
93 }
94
95
96
97 channel.workerThread = null;
98
99
100 close(channel, succeededFuture(channel), true);
101
102
103
104 done = true;
105
106
107 processEventQueue();
108
109 }
110
111 static boolean isIoThread(AbstractOioChannel channel) {
112 return Thread.currentThread() == channel.workerThread;
113 }
114
115
116 public void executeInIoThread(Runnable task) {
117
118
119
120
121 if (Thread.currentThread() == thread || done) {
122 task.run();
123 } else {
124 boolean added = eventQueue.offer(task);
125
126 if (added) {
127
128 }
129 }
130 }
131
132 private void processEventQueue() {
133 for (;;) {
134 final Runnable task = eventQueue.poll();
135 if (task == null) {
136 break;
137 }
138 task.run();
139 }
140 }
141
142
143
144
145
146
147
148
149
150
151
152 abstract boolean process() throws IOException;
153
154 static void setInterestOps(
155 AbstractOioChannel channel, ChannelFuture future, int interestOps) {
156 boolean iothread = isIoThread(channel);
157
158
159 interestOps &= ~Channel.OP_WRITE;
160 interestOps |= channel.getInterestOps() & Channel.OP_WRITE;
161
162 boolean changed = false;
163 try {
164 if (channel.getInterestOps() != interestOps) {
165 if ((interestOps & Channel.OP_READ) != 0) {
166 channel.setInterestOpsNow(Channel.OP_READ);
167 } else {
168 channel.setInterestOpsNow(Channel.OP_NONE);
169 }
170 changed = true;
171 }
172
173 future.setSuccess();
174 if (changed) {
175 synchronized (channel.interestOpsLock) {
176 channel.setInterestOpsNow(interestOps);
177
178
179 Thread currentThread = Thread.currentThread();
180 Thread workerThread = channel.workerThread;
181 if (workerThread != null && currentThread != workerThread) {
182 workerThread.interrupt();
183 }
184 }
185 if (iothread) {
186 fireChannelInterestChanged(channel);
187 } else {
188 fireChannelInterestChangedLater(channel);
189 }
190 }
191 } catch (Throwable t) {
192 future.setFailure(t);
193 if (iothread) {
194 fireExceptionCaught(channel, t);
195 } else {
196 fireExceptionCaughtLater(channel, t);
197 }
198 }
199 }
200
201 static void close(AbstractOioChannel channel, ChannelFuture future) {
202 close(channel, future, isIoThread(channel));
203 }
204
205 private static void close(AbstractOioChannel channel, ChannelFuture future, boolean iothread) {
206 boolean connected = channel.isConnected();
207 boolean bound = channel.isBound();
208
209 try {
210 channel.closeSocket();
211 if (channel.setClosed()) {
212 future.setSuccess();
213 if (connected) {
214
215 Thread currentThread = Thread.currentThread();
216 Thread workerThread = channel.workerThread;
217 if (workerThread != null && currentThread != workerThread) {
218 workerThread.interrupt();
219 }
220 if (iothread) {
221 fireChannelDisconnected(channel);
222 } else {
223 fireChannelDisconnectedLater(channel);
224 }
225 }
226 if (bound) {
227 if (iothread) {
228 fireChannelUnbound(channel);
229 } else {
230 fireChannelUnboundLater(channel);
231 }
232 }
233 if (iothread) {
234 fireChannelClosed(channel);
235 } else {
236 fireChannelClosedLater(channel);
237 }
238 } else {
239 future.setSuccess();
240 }
241 } catch (Throwable t) {
242 future.setFailure(t);
243 if (iothread) {
244 fireExceptionCaught(channel, t);
245 } else {
246 fireExceptionCaughtLater(channel, t);
247 }
248 }
249 }
250 }