View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.nio;
17  
18  import org.jboss.netty.buffer.ChannelBuffer;
19  import org.jboss.netty.channel.AbstractChannel;
20  import org.jboss.netty.channel.Channel;
21  import org.jboss.netty.channel.ChannelFactory;
22  import org.jboss.netty.channel.ChannelPipeline;
23  import org.jboss.netty.channel.ChannelSink;
24  import org.jboss.netty.channel.MessageEvent;
25  import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer;
26  import org.jboss.netty.util.internal.ThreadLocalBoolean;
27  
28  import java.net.InetSocketAddress;
29  import java.nio.channels.SelectableChannel;
30  import java.nio.channels.WritableByteChannel;
31  import java.util.Collection;
32  import java.util.Iterator;
33  import java.util.Queue;
34  import java.util.concurrent.ConcurrentLinkedQueue;
35  import java.util.concurrent.atomic.AtomicBoolean;
36  import java.util.concurrent.atomic.AtomicInteger;
37  
38  import static org.jboss.netty.channel.Channels.*;
39  
40  abstract class AbstractNioChannel<C extends SelectableChannel & WritableByteChannel> extends AbstractChannel {
41  
42      /**
43       * The {@link AbstractNioWorker}.
44       */
45      final AbstractNioWorker worker;
46  
47      /**
48       * Monitor object to synchronize access to InterestedOps.
49       */
50      final Object interestOpsLock = new Object();
51  
52      /**
53       * Monitor object for synchronizing access to the {@link WriteRequestQueue}.
54       */
55      final Object writeLock = new Object();
56  
57      /**
58       * WriteTask that performs write operations.
59       */
60      final Runnable writeTask = new WriteTask();
61  
62      /**
63       * Indicates if there is a {@link WriteTask} in the task queue.
64       */
65      final AtomicBoolean writeTaskInTaskQueue = new AtomicBoolean();
66  
67      /**
68       * Queue of write {@link MessageEvent}s.
69       */
70      final Queue<MessageEvent> writeBufferQueue = new WriteRequestQueue();
71  
72      /**
73       * Keeps track of the number of bytes that the {@link WriteRequestQueue} currently
74       * contains.
75       */
76      final AtomicInteger writeBufferSize = new AtomicInteger();
77  
78      /**
79       * Keeps track of the highWaterMark.
80       */
81      final AtomicInteger highWaterMarkCounter = new AtomicInteger();
82  
83      /**
84       * The current write {@link MessageEvent}
85       */
86      MessageEvent currentWriteEvent;
87      SendBuffer currentWriteBuffer;
88  
89      /**
90       * Boolean that indicates that write operation is in progress.
91       */
92      boolean inWriteNowLoop;
93      boolean writeSuspended;
94  
95  
96      private volatile InetSocketAddress localAddress;
97      volatile InetSocketAddress remoteAddress;
98  
99      final C channel;
100 
101     protected AbstractNioChannel(
102             Integer id, Channel parent, ChannelFactory factory, ChannelPipeline pipeline,
103             ChannelSink sink, AbstractNioWorker worker, C ch) {
104         super(id, parent, factory, pipeline, sink);
105         this.worker = worker;
106         channel = ch;
107     }
108 
109     protected AbstractNioChannel(
110             Channel parent, ChannelFactory factory,
111             ChannelPipeline pipeline, ChannelSink sink, AbstractNioWorker worker, C ch)  {
112         super(parent, factory, pipeline, sink);
113         this.worker = worker;
114         channel = ch;
115     }
116 
117     /**
118      * Return the {@link AbstractNioWorker} that handle the IO of the
119      * {@link AbstractNioChannel}
120      *
121      * @return worker
122      */
123     public AbstractNioWorker getWorker() {
124         return worker;
125     }
126 
127     public InetSocketAddress getLocalAddress() {
128         InetSocketAddress localAddress = this.localAddress;
129         if (localAddress == null) {
130             try {
131                 localAddress = getLocalSocketAddress();
132                 if (localAddress.getAddress().isAnyLocalAddress()) {
133                     // Don't cache on a wildcard address so the correct one
134                     // will be cached once the channel is connected/bound
135                     return localAddress;
136                 }
137                 this.localAddress = localAddress;
138             } catch (Throwable t) {
139                 // Sometimes fails on a closed socket in Windows.
140                 return null;
141             }
142         }
143         return localAddress;
144     }
145 
146     public InetSocketAddress getRemoteAddress() {
147         InetSocketAddress remoteAddress = this.remoteAddress;
148         if (remoteAddress == null) {
149             try {
150                 this.remoteAddress = remoteAddress =
151                     getRemoteSocketAddress();
152             } catch (Throwable t) {
153                 // Sometimes fails on a closed socket in Windows.
154                 return null;
155             }
156         }
157         return remoteAddress;
158     }
159 
160     public abstract NioChannelConfig getConfig();
161 
162     int getRawInterestOps() {
163         return super.getInterestOps();
164     }
165 
166     void setRawInterestOpsNow(int interestOps) {
167         setInterestOpsNow(interestOps);
168     }
169 
170 
171     @Override
172     public int getInterestOps() {
173         if (!isOpen()) {
174             return Channel.OP_WRITE;
175         }
176 
177         int interestOps = getRawInterestOps();
178         int writeBufferSize = this.writeBufferSize.get();
179         if (writeBufferSize != 0) {
180             if (highWaterMarkCounter.get() > 0) {
181                 int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
182                 if (writeBufferSize >= lowWaterMark) {
183                     interestOps |= Channel.OP_WRITE;
184                 } else {
185                     interestOps &= ~Channel.OP_WRITE;
186                 }
187             } else {
188                 int highWaterMark = getConfig().getWriteBufferHighWaterMark();
189                 if (writeBufferSize >= highWaterMark) {
190                     interestOps |= Channel.OP_WRITE;
191                 } else {
192                     interestOps &= ~Channel.OP_WRITE;
193                 }
194             }
195         } else {
196             interestOps &= ~Channel.OP_WRITE;
197         }
198 
199         return interestOps;
200     }
201 
202     @Override
203     protected boolean setClosed() {
204         return super.setClosed();
205     }
206 
207     abstract InetSocketAddress getLocalSocketAddress() throws Exception;
208 
209     abstract InetSocketAddress getRemoteSocketAddress() throws Exception;
210 
211     private final class WriteRequestQueue implements Queue<MessageEvent> {
212         private final ThreadLocalBoolean notifying = new ThreadLocalBoolean();
213 
214         private final Queue<MessageEvent> queue;
215 
216         public WriteRequestQueue() {
217             queue = new ConcurrentLinkedQueue<MessageEvent>();
218         }
219 
220         public MessageEvent remove() {
221             return queue.remove();
222         }
223 
224         public MessageEvent element() {
225             return queue.element();
226         }
227 
228         public MessageEvent peek() {
229             return queue.peek();
230         }
231 
232         public int size() {
233             return queue.size();
234         }
235 
236         public boolean isEmpty() {
237             return queue.isEmpty();
238         }
239 
240         public Iterator<MessageEvent> iterator() {
241             return queue.iterator();
242         }
243 
244         public Object[] toArray() {
245             return queue.toArray();
246         }
247 
248         public <T> T[] toArray(T[] a) {
249             return queue.toArray(a);
250         }
251 
252         public boolean containsAll(Collection<?> c) {
253             return queue.containsAll(c);
254         }
255 
256         public boolean addAll(Collection<? extends MessageEvent> c) {
257             return queue.addAll(c);
258         }
259 
260         public boolean removeAll(Collection<?> c) {
261             return queue.removeAll(c);
262         }
263 
264         public boolean retainAll(Collection<?> c) {
265             return queue.retainAll(c);
266         }
267 
268         public void clear() {
269             queue.clear();
270         }
271 
272         public boolean add(MessageEvent e) {
273             return queue.add(e);
274         }
275 
276         public boolean remove(Object o) {
277             return queue.remove(o);
278         }
279 
280         public boolean contains(Object o) {
281             return queue.contains(o);
282         }
283 
284         public boolean offer(MessageEvent e) {
285             boolean success = queue.offer(e);
286             assert success;
287 
288             int messageSize = getMessageSize(e);
289             int newWriteBufferSize = writeBufferSize.addAndGet(messageSize);
290             int highWaterMark =  getConfig().getWriteBufferHighWaterMark();
291 
292             if (newWriteBufferSize >= highWaterMark) {
293                 if (newWriteBufferSize - messageSize < highWaterMark) {
294                     highWaterMarkCounter.incrementAndGet();
295                     if (!notifying.get()) {
296                         notifying.set(Boolean.TRUE);
297                         fireChannelInterestChanged(AbstractNioChannel.this);
298                         notifying.set(Boolean.FALSE);
299                     }
300                 }
301             }
302             return true;
303         }
304 
305         public MessageEvent poll() {
306             MessageEvent e = queue.poll();
307             if (e != null) {
308                 int messageSize = getMessageSize(e);
309                 int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize);
310                 int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
311 
312                 if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) {
313                     if (newWriteBufferSize + messageSize >= lowWaterMark) {
314                         highWaterMarkCounter.decrementAndGet();
315                         if (isConnected() && !notifying.get()) {
316                             notifying.set(Boolean.TRUE);
317                             fireChannelInterestChanged(AbstractNioChannel.this);
318                             notifying.set(Boolean.FALSE);
319                         }
320                     }
321                 }
322             }
323             return e;
324         }
325 
326         private int getMessageSize(MessageEvent e) {
327             Object m = e.getMessage();
328             if (m instanceof ChannelBuffer) {
329                 return ((ChannelBuffer) m).readableBytes();
330             }
331             return 0;
332         }
333     }
334 
335     private final class WriteTask implements Runnable {
336 
337         WriteTask() {
338         }
339 
340         public void run() {
341             writeTaskInTaskQueue.set(false);
342             worker.writeFromTaskLoop(AbstractNioChannel.this);
343         }
344     }
345 
346 }