1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.nio;
17
18 import org.jboss.netty.buffer.ChannelBuffer;
19 import org.jboss.netty.channel.AbstractChannel;
20 import org.jboss.netty.channel.Channel;
21 import org.jboss.netty.channel.ChannelFactory;
22 import org.jboss.netty.channel.ChannelPipeline;
23 import org.jboss.netty.channel.ChannelSink;
24 import org.jboss.netty.channel.MessageEvent;
25 import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer;
26 import org.jboss.netty.util.internal.ThreadLocalBoolean;
27
28 import java.net.InetSocketAddress;
29 import java.nio.channels.SelectableChannel;
30 import java.nio.channels.WritableByteChannel;
31 import java.util.Collection;
32 import java.util.Iterator;
33 import java.util.Queue;
34 import java.util.concurrent.ConcurrentLinkedQueue;
35 import java.util.concurrent.atomic.AtomicBoolean;
36 import java.util.concurrent.atomic.AtomicInteger;
37
38 import static org.jboss.netty.channel.Channels.*;
39
40 abstract class AbstractNioChannel<C extends SelectableChannel & WritableByteChannel> extends AbstractChannel {
41
42
43
44
45 final AbstractNioWorker worker;
46
47
48
49
50 final Object writeLock = new Object();
51
52
53
54
55 final Runnable writeTask = new WriteTask();
56
57
58
59
60 final AtomicBoolean writeTaskInTaskQueue = new AtomicBoolean();
61
62
63
64
65 final Queue<MessageEvent> writeBufferQueue = new WriteRequestQueue();
66
67
68
69
70
71 final AtomicInteger writeBufferSize = new AtomicInteger();
72
73
74
75
76 final AtomicInteger highWaterMarkCounter = new AtomicInteger();
77
78
79
80
81 MessageEvent currentWriteEvent;
82 SendBuffer currentWriteBuffer;
83
84
85
86
87 boolean inWriteNowLoop;
88 boolean writeSuspended;
89
90 private volatile InetSocketAddress localAddress;
91 volatile InetSocketAddress remoteAddress;
92
93 final C channel;
94
95 protected AbstractNioChannel(
96 Integer id, Channel parent, ChannelFactory factory, ChannelPipeline pipeline,
97 ChannelSink sink, AbstractNioWorker worker, C ch) {
98 super(id, parent, factory, pipeline, sink);
99 this.worker = worker;
100 channel = ch;
101 }
102
103 protected AbstractNioChannel(
104 Channel parent, ChannelFactory factory,
105 ChannelPipeline pipeline, ChannelSink sink, AbstractNioWorker worker, C ch) {
106 super(parent, factory, pipeline, sink);
107 this.worker = worker;
108 channel = ch;
109 }
110
111
112
113
114
115
116
117 public AbstractNioWorker getWorker() {
118 return worker;
119 }
120
121 public InetSocketAddress getLocalAddress() {
122 InetSocketAddress localAddress = this.localAddress;
123 if (localAddress == null) {
124 try {
125 localAddress = getLocalSocketAddress();
126 if (localAddress.getAddress().isAnyLocalAddress()) {
127
128
129 return localAddress;
130 }
131 this.localAddress = localAddress;
132 } catch (Throwable t) {
133
134 return null;
135 }
136 }
137 return localAddress;
138 }
139
140 public InetSocketAddress getRemoteAddress() {
141 InetSocketAddress remoteAddress = this.remoteAddress;
142 if (remoteAddress == null) {
143 try {
144 this.remoteAddress = remoteAddress =
145 getRemoteSocketAddress();
146 } catch (Throwable t) {
147
148 return null;
149 }
150 }
151 return remoteAddress;
152 }
153
154 public abstract NioChannelConfig getConfig();
155
156 int getRawInterestOps() {
157 return super.getInterestOps();
158 }
159
160 void setRawInterestOpsNow(int interestOps) {
161 setInterestOpsNow(interestOps);
162 }
163
164 @Override
165 public int getInterestOps() {
166 if (!isOpen()) {
167 return Channel.OP_WRITE;
168 }
169
170 int interestOps = getRawInterestOps();
171 int writeBufferSize = this.writeBufferSize.get();
172 if (writeBufferSize != 0) {
173 if (highWaterMarkCounter.get() > 0) {
174 int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
175 if (writeBufferSize >= lowWaterMark) {
176 interestOps |= Channel.OP_WRITE;
177 } else {
178 interestOps &= ~Channel.OP_WRITE;
179 }
180 } else {
181 int highWaterMark = getConfig().getWriteBufferHighWaterMark();
182 if (writeBufferSize >= highWaterMark) {
183 interestOps |= Channel.OP_WRITE;
184 } else {
185 interestOps &= ~Channel.OP_WRITE;
186 }
187 }
188 } else {
189 interestOps &= ~Channel.OP_WRITE;
190 }
191
192 return interestOps;
193 }
194
195 @Override
196 protected boolean setClosed() {
197 return super.setClosed();
198 }
199
200 abstract InetSocketAddress getLocalSocketAddress() throws Exception;
201
202 abstract InetSocketAddress getRemoteSocketAddress() throws Exception;
203
204 private final class WriteRequestQueue implements Queue<MessageEvent> {
205 private final ThreadLocalBoolean notifying = new ThreadLocalBoolean();
206
207 private final Queue<MessageEvent> queue;
208
209 public WriteRequestQueue() {
210 queue = new ConcurrentLinkedQueue<MessageEvent>();
211 }
212
213 public MessageEvent remove() {
214 return queue.remove();
215 }
216
217 public MessageEvent element() {
218 return queue.element();
219 }
220
221 public MessageEvent peek() {
222 return queue.peek();
223 }
224
225 public int size() {
226 return queue.size();
227 }
228
229 public boolean isEmpty() {
230 return queue.isEmpty();
231 }
232
233 public Iterator<MessageEvent> iterator() {
234 return queue.iterator();
235 }
236
237 public Object[] toArray() {
238 return queue.toArray();
239 }
240
241 public <T> T[] toArray(T[] a) {
242 return queue.toArray(a);
243 }
244
245 public boolean containsAll(Collection<?> c) {
246 return queue.containsAll(c);
247 }
248
249 public boolean addAll(Collection<? extends MessageEvent> c) {
250 return queue.addAll(c);
251 }
252
253 public boolean removeAll(Collection<?> c) {
254 return queue.removeAll(c);
255 }
256
257 public boolean retainAll(Collection<?> c) {
258 return queue.retainAll(c);
259 }
260
261 public void clear() {
262 queue.clear();
263 }
264
265 public boolean add(MessageEvent e) {
266 return queue.add(e);
267 }
268
269 public boolean remove(Object o) {
270 return queue.remove(o);
271 }
272
273 public boolean contains(Object o) {
274 return queue.contains(o);
275 }
276
277 public boolean offer(MessageEvent e) {
278 boolean success = queue.offer(e);
279 assert success;
280
281 int messageSize = getMessageSize(e);
282 int newWriteBufferSize = writeBufferSize.addAndGet(messageSize);
283 int highWaterMark = getConfig().getWriteBufferHighWaterMark();
284
285 if (newWriteBufferSize >= highWaterMark) {
286 if (newWriteBufferSize - messageSize < highWaterMark) {
287 highWaterMarkCounter.incrementAndGet();
288 if (!notifying.get()) {
289 notifying.set(Boolean.TRUE);
290 fireChannelInterestChanged(AbstractNioChannel.this);
291 notifying.set(Boolean.FALSE);
292 }
293 }
294 }
295 return true;
296 }
297
298 public MessageEvent poll() {
299 MessageEvent e = queue.poll();
300 if (e != null) {
301 int messageSize = getMessageSize(e);
302 int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize);
303 int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
304
305 if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) {
306 if (newWriteBufferSize + messageSize >= lowWaterMark) {
307 highWaterMarkCounter.decrementAndGet();
308 if (isConnected() && !notifying.get()) {
309 notifying.set(Boolean.TRUE);
310 fireChannelInterestChanged(AbstractNioChannel.this);
311 notifying.set(Boolean.FALSE);
312 }
313 }
314 }
315 }
316 return e;
317 }
318
319 private int getMessageSize(MessageEvent e) {
320 Object m = e.getMessage();
321 if (m instanceof ChannelBuffer) {
322 return ((ChannelBuffer) m).readableBytes();
323 }
324 return 0;
325 }
326 }
327
328 private final class WriteTask implements Runnable {
329
330 WriteTask() {
331 }
332
333 public void run() {
334 writeTaskInTaskQueue.set(false);
335 worker.writeFromTaskLoop(AbstractNioChannel.this);
336 }
337 }
338
339 }