View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.nio;
17  
18  import org.jboss.netty.buffer.ChannelBuffer;
19  import org.jboss.netty.channel.AbstractChannel;
20  import org.jboss.netty.channel.Channel;
21  import org.jboss.netty.channel.ChannelFactory;
22  import org.jboss.netty.channel.ChannelPipeline;
23  import org.jboss.netty.channel.ChannelSink;
24  import org.jboss.netty.channel.MessageEvent;
25  import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer;
26  import org.jboss.netty.util.internal.ThreadLocalBoolean;
27  
28  import java.net.InetSocketAddress;
29  import java.nio.channels.SelectableChannel;
30  import java.nio.channels.WritableByteChannel;
31  import java.util.Collection;
32  import java.util.Iterator;
33  import java.util.Queue;
34  import java.util.concurrent.ConcurrentLinkedQueue;
35  import java.util.concurrent.atomic.AtomicBoolean;
36  import java.util.concurrent.atomic.AtomicInteger;
37  
38  import static org.jboss.netty.channel.Channels.*;
39  
40  abstract class AbstractNioChannel<C extends SelectableChannel & WritableByteChannel> extends AbstractChannel {
41  
42      /**
43       * The {@link AbstractNioWorker}.
44       */
45      final AbstractNioWorker worker;
46  
47      /**
48       * Monitor object for synchronizing access to the {@link WriteRequestQueue}.
49       */
50      final Object writeLock = new Object();
51  
52      /**
53       * WriteTask that performs write operations.
54       */
55      final Runnable writeTask = new WriteTask();
56  
57      /**
58       * Indicates if there is a {@link WriteTask} in the task queue.
59       */
60      final AtomicBoolean writeTaskInTaskQueue = new AtomicBoolean();
61  
62      /**
63       * Queue of write {@link MessageEvent}s.
64       */
65      final Queue<MessageEvent> writeBufferQueue = new WriteRequestQueue();
66  
67      /**
68       * Keeps track of the number of bytes that the {@link WriteRequestQueue} currently
69       * contains.
70       */
71      final AtomicInteger writeBufferSize = new AtomicInteger();
72  
73      /**
74       * Keeps track of the highWaterMark.
75       */
76      final AtomicInteger highWaterMarkCounter = new AtomicInteger();
77  
78      /**
79       * The current write {@link MessageEvent}
80       */
81      MessageEvent currentWriteEvent;
82      SendBuffer currentWriteBuffer;
83  
84      /**
85       * Boolean that indicates that write operation is in progress.
86       */
87      boolean inWriteNowLoop;
88      boolean writeSuspended;
89  
90      private volatile InetSocketAddress localAddress;
91      volatile InetSocketAddress remoteAddress;
92  
93      final C channel;
94  
95      protected AbstractNioChannel(
96              Integer id, Channel parent, ChannelFactory factory, ChannelPipeline pipeline,
97              ChannelSink sink, AbstractNioWorker worker, C ch) {
98          super(id, parent, factory, pipeline, sink);
99          this.worker = worker;
100         channel = ch;
101     }
102 
103     protected AbstractNioChannel(
104             Channel parent, ChannelFactory factory,
105             ChannelPipeline pipeline, ChannelSink sink, AbstractNioWorker worker, C ch)  {
106         super(parent, factory, pipeline, sink);
107         this.worker = worker;
108         channel = ch;
109     }
110 
111     /**
112      * Return the {@link AbstractNioWorker} that handle the IO of the
113      * {@link AbstractNioChannel}
114      *
115      * @return worker
116      */
117     public AbstractNioWorker getWorker() {
118         return worker;
119     }
120 
121     public InetSocketAddress getLocalAddress() {
122         InetSocketAddress localAddress = this.localAddress;
123         if (localAddress == null) {
124             try {
125                 localAddress = getLocalSocketAddress();
126                 if (localAddress.getAddress().isAnyLocalAddress()) {
127                     // Don't cache on a wildcard address so the correct one
128                     // will be cached once the channel is connected/bound
129                     return localAddress;
130                 }
131                 this.localAddress = localAddress;
132             } catch (Throwable t) {
133                 // Sometimes fails on a closed socket in Windows.
134                 return null;
135             }
136         }
137         return localAddress;
138     }
139 
140     public InetSocketAddress getRemoteAddress() {
141         InetSocketAddress remoteAddress = this.remoteAddress;
142         if (remoteAddress == null) {
143             try {
144                 this.remoteAddress = remoteAddress =
145                     getRemoteSocketAddress();
146             } catch (Throwable t) {
147                 // Sometimes fails on a closed socket in Windows.
148                 return null;
149             }
150         }
151         return remoteAddress;
152     }
153 
154     public abstract NioChannelConfig getConfig();
155 
156     int getRawInterestOps() {
157         return super.getInterestOps();
158     }
159 
160     void setRawInterestOpsNow(int interestOps) {
161         setInterestOpsNow(interestOps);
162     }
163 
164     @Override
165     public int getInterestOps() {
166         if (!isOpen()) {
167             return Channel.OP_WRITE;
168         }
169 
170         int interestOps = getRawInterestOps();
171         int writeBufferSize = this.writeBufferSize.get();
172         if (writeBufferSize != 0) {
173             if (highWaterMarkCounter.get() > 0) {
174                 int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
175                 if (writeBufferSize >= lowWaterMark) {
176                     interestOps |= Channel.OP_WRITE;
177                 } else {
178                     interestOps &= ~Channel.OP_WRITE;
179                 }
180             } else {
181                 int highWaterMark = getConfig().getWriteBufferHighWaterMark();
182                 if (writeBufferSize >= highWaterMark) {
183                     interestOps |= Channel.OP_WRITE;
184                 } else {
185                     interestOps &= ~Channel.OP_WRITE;
186                 }
187             }
188         } else {
189             interestOps &= ~Channel.OP_WRITE;
190         }
191 
192         return interestOps;
193     }
194 
195     @Override
196     protected boolean setClosed() {
197         return super.setClosed();
198     }
199 
200     abstract InetSocketAddress getLocalSocketAddress() throws Exception;
201 
202     abstract InetSocketAddress getRemoteSocketAddress() throws Exception;
203 
204     private final class WriteRequestQueue implements Queue<MessageEvent> {
205         private final ThreadLocalBoolean notifying = new ThreadLocalBoolean();
206 
207         private final Queue<MessageEvent> queue;
208 
209         public WriteRequestQueue() {
210             queue = new ConcurrentLinkedQueue<MessageEvent>();
211         }
212 
213         public MessageEvent remove() {
214             return queue.remove();
215         }
216 
217         public MessageEvent element() {
218             return queue.element();
219         }
220 
221         public MessageEvent peek() {
222             return queue.peek();
223         }
224 
225         public int size() {
226             return queue.size();
227         }
228 
229         public boolean isEmpty() {
230             return queue.isEmpty();
231         }
232 
233         public Iterator<MessageEvent> iterator() {
234             return queue.iterator();
235         }
236 
237         public Object[] toArray() {
238             return queue.toArray();
239         }
240 
241         public <T> T[] toArray(T[] a) {
242             return queue.toArray(a);
243         }
244 
245         public boolean containsAll(Collection<?> c) {
246             return queue.containsAll(c);
247         }
248 
249         public boolean addAll(Collection<? extends MessageEvent> c) {
250             return queue.addAll(c);
251         }
252 
253         public boolean removeAll(Collection<?> c) {
254             return queue.removeAll(c);
255         }
256 
257         public boolean retainAll(Collection<?> c) {
258             return queue.retainAll(c);
259         }
260 
261         public void clear() {
262             queue.clear();
263         }
264 
265         public boolean add(MessageEvent e) {
266             return queue.add(e);
267         }
268 
269         public boolean remove(Object o) {
270             return queue.remove(o);
271         }
272 
273         public boolean contains(Object o) {
274             return queue.contains(o);
275         }
276 
277         public boolean offer(MessageEvent e) {
278             boolean success = queue.offer(e);
279             assert success;
280 
281             int messageSize = getMessageSize(e);
282             int newWriteBufferSize = writeBufferSize.addAndGet(messageSize);
283             int highWaterMark =  getConfig().getWriteBufferHighWaterMark();
284 
285             if (newWriteBufferSize >= highWaterMark) {
286                 if (newWriteBufferSize - messageSize < highWaterMark) {
287                     highWaterMarkCounter.incrementAndGet();
288                     if (!notifying.get()) {
289                         notifying.set(Boolean.TRUE);
290                         fireChannelInterestChanged(AbstractNioChannel.this);
291                         notifying.set(Boolean.FALSE);
292                     }
293                 }
294             }
295             return true;
296         }
297 
298         public MessageEvent poll() {
299             MessageEvent e = queue.poll();
300             if (e != null) {
301                 int messageSize = getMessageSize(e);
302                 int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize);
303                 int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
304 
305                 if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) {
306                     if (newWriteBufferSize + messageSize >= lowWaterMark) {
307                         highWaterMarkCounter.decrementAndGet();
308                         if (isConnected() && !notifying.get()) {
309                             notifying.set(Boolean.TRUE);
310                             fireChannelInterestChanged(AbstractNioChannel.this);
311                             notifying.set(Boolean.FALSE);
312                         }
313                     }
314                 }
315             }
316             return e;
317         }
318 
319         private int getMessageSize(MessageEvent e) {
320             Object m = e.getMessage();
321             if (m instanceof ChannelBuffer) {
322                 return ((ChannelBuffer) m).readableBytes();
323             }
324             return 0;
325         }
326     }
327 
328     private final class WriteTask implements Runnable {
329 
330         WriteTask() {
331         }
332 
333         public void run() {
334             writeTaskInTaskQueue.set(false);
335             worker.writeFromTaskLoop(AbstractNioChannel.this);
336         }
337     }
338 
339 }