1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.nio;
17
18 import org.jboss.netty.buffer.ChannelBuffer;
19 import org.jboss.netty.channel.AbstractChannel;
20 import org.jboss.netty.channel.Channel;
21 import org.jboss.netty.channel.ChannelFactory;
22 import org.jboss.netty.channel.ChannelPipeline;
23 import org.jboss.netty.channel.ChannelSink;
24 import org.jboss.netty.channel.MessageEvent;
25 import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer;
26 import org.jboss.netty.util.internal.ThreadLocalBoolean;
27
28 import java.net.InetSocketAddress;
29 import java.nio.channels.SelectableChannel;
30 import java.nio.channels.WritableByteChannel;
31 import java.util.Queue;
32 import java.util.concurrent.ConcurrentLinkedQueue;
33 import java.util.concurrent.atomic.AtomicBoolean;
34 import java.util.concurrent.atomic.AtomicInteger;
35
36 import static org.jboss.netty.channel.Channels.*;
37 import static org.jboss.netty.channel.socket.nio.AbstractNioWorker.isIoThread;
38
39 abstract class AbstractNioChannel<C extends SelectableChannel & WritableByteChannel> extends AbstractChannel {
40
41
42
43
44 final AbstractNioWorker worker;
45
46
47
48
49 final Object writeLock = new Object();
50
51
52
53
54 final Runnable writeTask = new WriteTask();
55
56
57
58
59 final AtomicBoolean writeTaskInTaskQueue = new AtomicBoolean();
60
61
62
63
64 final WriteRequestQueue writeBufferQueue = new WriteRequestQueue();
65
66
67
68
69
70 final AtomicInteger writeBufferSize = new AtomicInteger();
71
72
73
74
75 final AtomicInteger highWaterMarkCounter = new AtomicInteger();
76
77
78
79
80 MessageEvent currentWriteEvent;
81 SendBuffer currentWriteBuffer;
82
83
84
85
86 boolean inWriteNowLoop;
87 boolean writeSuspended;
88
89 private volatile InetSocketAddress localAddress;
90 volatile InetSocketAddress remoteAddress;
91
92 final C channel;
93
94 protected AbstractNioChannel(
95 Integer id, Channel parent, ChannelFactory factory, ChannelPipeline pipeline,
96 ChannelSink sink, AbstractNioWorker worker, C ch) {
97 super(id, parent, factory, pipeline, sink);
98 this.worker = worker;
99 channel = ch;
100 }
101
102 protected AbstractNioChannel(
103 Channel parent, ChannelFactory factory,
104 ChannelPipeline pipeline, ChannelSink sink, AbstractNioWorker worker, C ch) {
105 super(parent, factory, pipeline, sink);
106 this.worker = worker;
107 channel = ch;
108 }
109
110
111
112
113
114
115
116 public AbstractNioWorker getWorker() {
117 return worker;
118 }
119
120 public InetSocketAddress getLocalAddress() {
121 InetSocketAddress localAddress = this.localAddress;
122 if (localAddress == null) {
123 try {
124 localAddress = getLocalSocketAddress();
125 if (localAddress.getAddress().isAnyLocalAddress()) {
126
127
128 return localAddress;
129 }
130 this.localAddress = localAddress;
131 } catch (Throwable t) {
132
133 return null;
134 }
135 }
136 return localAddress;
137 }
138
139 public InetSocketAddress getRemoteAddress() {
140 InetSocketAddress remoteAddress = this.remoteAddress;
141 if (remoteAddress == null) {
142 try {
143 this.remoteAddress = remoteAddress =
144 getRemoteSocketAddress();
145 } catch (Throwable t) {
146
147 return null;
148 }
149 }
150 return remoteAddress;
151 }
152
153 public abstract NioChannelConfig getConfig();
154
155 @Override
156 protected int getInternalInterestOps() {
157 return super.getInternalInterestOps();
158 }
159
160 @Override
161 protected void setInternalInterestOps(int interestOps) {
162 super.setInternalInterestOps(interestOps);
163 }
164
165 @Override
166 protected boolean setClosed() {
167 return super.setClosed();
168 }
169
170 abstract InetSocketAddress getLocalSocketAddress() throws Exception;
171
172 abstract InetSocketAddress getRemoteSocketAddress() throws Exception;
173
174 final class WriteRequestQueue {
175 private final ThreadLocalBoolean notifying = new ThreadLocalBoolean();
176
177 private final Queue<MessageEvent> queue;
178
179 public WriteRequestQueue() {
180 queue = new ConcurrentLinkedQueue<MessageEvent>();
181 }
182
183 public boolean isEmpty() {
184 return queue.isEmpty();
185 }
186
187 public boolean offer(MessageEvent e) {
188 boolean success = queue.offer(e);
189 assert success;
190
191 int messageSize = getMessageSize(e);
192 int newWriteBufferSize = writeBufferSize.addAndGet(messageSize);
193 final int highWaterMark = getConfig().getWriteBufferHighWaterMark();
194
195 if (newWriteBufferSize >= highWaterMark) {
196 if (newWriteBufferSize - messageSize < highWaterMark) {
197 highWaterMarkCounter.incrementAndGet();
198 if (setUnwritable()) {
199 if (isIoThread(AbstractNioChannel.this)) {
200 if (!notifying.get()) {
201 notifying.set(Boolean.TRUE);
202 fireChannelInterestChanged(AbstractNioChannel.this);
203 notifying.set(Boolean.FALSE);
204 }
205 } else {
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233 worker.executeInIoThread(new Runnable() {
234 @Override
235 public void run() {
236 if (writeBufferSize.get() >= highWaterMark || setWritable()) {
237 fireChannelInterestChanged(AbstractNioChannel.this);
238 }
239 }
240 });
241 }
242 }
243 }
244 }
245 return true;
246 }
247
248 public MessageEvent poll() {
249 MessageEvent e = queue.poll();
250 if (e != null) {
251 int messageSize = getMessageSize(e);
252 int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize);
253 int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
254
255 if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) {
256 if (newWriteBufferSize + messageSize >= lowWaterMark) {
257 highWaterMarkCounter.decrementAndGet();
258 if (isConnected()) {
259
260
261
262
263 assert isIoThread(AbstractNioChannel.this);
264 if (setWritable()) {
265 notifying.set(Boolean.TRUE);
266 fireChannelInterestChanged(AbstractNioChannel.this);
267 notifying.set(Boolean.FALSE);
268 }
269 }
270 }
271 }
272 }
273 return e;
274 }
275
276 private int getMessageSize(MessageEvent e) {
277 Object m = e.getMessage();
278 if (m instanceof ChannelBuffer) {
279 return ((ChannelBuffer) m).readableBytes();
280 }
281 return 0;
282 }
283 }
284
285 private final class WriteTask implements Runnable {
286
287 WriteTask() {
288 }
289
290 public void run() {
291 writeTaskInTaskQueue.set(false);
292 worker.writeFromTaskLoop(AbstractNioChannel.this);
293 }
294 }
295
296 }