1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.nio;
17
18 import org.jboss.netty.buffer.ChannelBuffer;
19 import org.jboss.netty.channel.AbstractChannel;
20 import org.jboss.netty.channel.Channel;
21 import org.jboss.netty.channel.ChannelFactory;
22 import org.jboss.netty.channel.ChannelPipeline;
23 import org.jboss.netty.channel.ChannelSink;
24 import org.jboss.netty.channel.MessageEvent;
25 import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer;
26 import org.jboss.netty.util.internal.ThreadLocalBoolean;
27
28 import java.net.InetSocketAddress;
29 import java.nio.channels.SelectableChannel;
30 import java.nio.channels.WritableByteChannel;
31 import java.util.Collection;
32 import java.util.Iterator;
33 import java.util.Queue;
34 import java.util.concurrent.ConcurrentLinkedQueue;
35 import java.util.concurrent.atomic.AtomicBoolean;
36 import java.util.concurrent.atomic.AtomicInteger;
37
38 import static org.jboss.netty.channel.Channels.*;
39
40 abstract class AbstractNioChannel<C extends SelectableChannel & WritableByteChannel> extends AbstractChannel {
41
42
43
44
45 final AbstractNioWorker worker;
46
47
48
49
50 final Object interestOpsLock = new Object();
51
52
53
54
55 final Object writeLock = new Object();
56
57
58
59
60 final Runnable writeTask = new WriteTask();
61
62
63
64
65 final AtomicBoolean writeTaskInTaskQueue = new AtomicBoolean();
66
67
68
69
70 final Queue<MessageEvent> writeBufferQueue = new WriteRequestQueue();
71
72
73
74
75
76 final AtomicInteger writeBufferSize = new AtomicInteger();
77
78
79
80
81 final AtomicInteger highWaterMarkCounter = new AtomicInteger();
82
83
84
85
86 MessageEvent currentWriteEvent;
87 SendBuffer currentWriteBuffer;
88
89
90
91
92 boolean inWriteNowLoop;
93 boolean writeSuspended;
94
95
96 private volatile InetSocketAddress localAddress;
97 volatile InetSocketAddress remoteAddress;
98
99 final C channel;
100
101 protected AbstractNioChannel(
102 Integer id, Channel parent, ChannelFactory factory, ChannelPipeline pipeline,
103 ChannelSink sink, AbstractNioWorker worker, C ch) {
104 super(id, parent, factory, pipeline, sink);
105 this.worker = worker;
106 channel = ch;
107 }
108
109 protected AbstractNioChannel(
110 Channel parent, ChannelFactory factory,
111 ChannelPipeline pipeline, ChannelSink sink, AbstractNioWorker worker, C ch) {
112 super(parent, factory, pipeline, sink);
113 this.worker = worker;
114 channel = ch;
115 }
116
117
118
119
120
121
122
123 public AbstractNioWorker getWorker() {
124 return worker;
125 }
126
127 public InetSocketAddress getLocalAddress() {
128 InetSocketAddress localAddress = this.localAddress;
129 if (localAddress == null) {
130 try {
131 localAddress = getLocalSocketAddress();
132 if (localAddress.getAddress().isAnyLocalAddress()) {
133
134
135 return localAddress;
136 }
137 this.localAddress = localAddress;
138 } catch (Throwable t) {
139
140 return null;
141 }
142 }
143 return localAddress;
144 }
145
146 public InetSocketAddress getRemoteAddress() {
147 InetSocketAddress remoteAddress = this.remoteAddress;
148 if (remoteAddress == null) {
149 try {
150 this.remoteAddress = remoteAddress =
151 getRemoteSocketAddress();
152 } catch (Throwable t) {
153
154 return null;
155 }
156 }
157 return remoteAddress;
158 }
159
160 public abstract NioChannelConfig getConfig();
161
162 int getRawInterestOps() {
163 return super.getInterestOps();
164 }
165
166 void setRawInterestOpsNow(int interestOps) {
167 setInterestOpsNow(interestOps);
168 }
169
170
171 @Override
172 public int getInterestOps() {
173 if (!isOpen()) {
174 return Channel.OP_WRITE;
175 }
176
177 int interestOps = getRawInterestOps();
178 int writeBufferSize = this.writeBufferSize.get();
179 if (writeBufferSize != 0) {
180 if (highWaterMarkCounter.get() > 0) {
181 int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
182 if (writeBufferSize >= lowWaterMark) {
183 interestOps |= Channel.OP_WRITE;
184 } else {
185 interestOps &= ~Channel.OP_WRITE;
186 }
187 } else {
188 int highWaterMark = getConfig().getWriteBufferHighWaterMark();
189 if (writeBufferSize >= highWaterMark) {
190 interestOps |= Channel.OP_WRITE;
191 } else {
192 interestOps &= ~Channel.OP_WRITE;
193 }
194 }
195 } else {
196 interestOps &= ~Channel.OP_WRITE;
197 }
198
199 return interestOps;
200 }
201
202 @Override
203 protected boolean setClosed() {
204 return super.setClosed();
205 }
206
207 abstract InetSocketAddress getLocalSocketAddress() throws Exception;
208
209 abstract InetSocketAddress getRemoteSocketAddress() throws Exception;
210
211 private final class WriteRequestQueue implements Queue<MessageEvent> {
212 private final ThreadLocalBoolean notifying = new ThreadLocalBoolean();
213
214 private final Queue<MessageEvent> queue;
215
216 public WriteRequestQueue() {
217 queue = new ConcurrentLinkedQueue<MessageEvent>();
218 }
219
220 public MessageEvent remove() {
221 return queue.remove();
222 }
223
224 public MessageEvent element() {
225 return queue.element();
226 }
227
228 public MessageEvent peek() {
229 return queue.peek();
230 }
231
232 public int size() {
233 return queue.size();
234 }
235
236 public boolean isEmpty() {
237 return queue.isEmpty();
238 }
239
240 public Iterator<MessageEvent> iterator() {
241 return queue.iterator();
242 }
243
244 public Object[] toArray() {
245 return queue.toArray();
246 }
247
248 public <T> T[] toArray(T[] a) {
249 return queue.toArray(a);
250 }
251
252 public boolean containsAll(Collection<?> c) {
253 return queue.containsAll(c);
254 }
255
256 public boolean addAll(Collection<? extends MessageEvent> c) {
257 return queue.addAll(c);
258 }
259
260 public boolean removeAll(Collection<?> c) {
261 return queue.removeAll(c);
262 }
263
264 public boolean retainAll(Collection<?> c) {
265 return queue.retainAll(c);
266 }
267
268 public void clear() {
269 queue.clear();
270 }
271
272 public boolean add(MessageEvent e) {
273 return queue.add(e);
274 }
275
276 public boolean remove(Object o) {
277 return queue.remove(o);
278 }
279
280 public boolean contains(Object o) {
281 return queue.contains(o);
282 }
283
284 public boolean offer(MessageEvent e) {
285 boolean success = queue.offer(e);
286 assert success;
287
288 int messageSize = getMessageSize(e);
289 int newWriteBufferSize = writeBufferSize.addAndGet(messageSize);
290 int highWaterMark = getConfig().getWriteBufferHighWaterMark();
291
292 if (newWriteBufferSize >= highWaterMark) {
293 if (newWriteBufferSize - messageSize < highWaterMark) {
294 highWaterMarkCounter.incrementAndGet();
295 if (!notifying.get()) {
296 notifying.set(Boolean.TRUE);
297 fireChannelInterestChanged(AbstractNioChannel.this);
298 notifying.set(Boolean.FALSE);
299 }
300 }
301 }
302 return true;
303 }
304
305 public MessageEvent poll() {
306 MessageEvent e = queue.poll();
307 if (e != null) {
308 int messageSize = getMessageSize(e);
309 int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize);
310 int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
311
312 if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) {
313 if (newWriteBufferSize + messageSize >= lowWaterMark) {
314 highWaterMarkCounter.decrementAndGet();
315 if (isConnected() && !notifying.get()) {
316 notifying.set(Boolean.TRUE);
317 fireChannelInterestChanged(AbstractNioChannel.this);
318 notifying.set(Boolean.FALSE);
319 }
320 }
321 }
322 }
323 return e;
324 }
325
326 private int getMessageSize(MessageEvent e) {
327 Object m = e.getMessage();
328 if (m instanceof ChannelBuffer) {
329 return ((ChannelBuffer) m).readableBytes();
330 }
331 return 0;
332 }
333 }
334
335 private final class WriteTask implements Runnable {
336
337 WriteTask() {
338 }
339
340 public void run() {
341 writeTaskInTaskQueue.set(false);
342 worker.writeFromTaskLoop(AbstractNioChannel.this);
343 }
344 }
345
346 }