View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.nio;
17  
18  import org.jboss.netty.buffer.ChannelBuffer;
19  import org.jboss.netty.channel.AbstractChannel;
20  import org.jboss.netty.channel.Channel;
21  import org.jboss.netty.channel.ChannelFactory;
22  import org.jboss.netty.channel.ChannelPipeline;
23  import org.jboss.netty.channel.ChannelSink;
24  import org.jboss.netty.channel.MessageEvent;
25  import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer;
26  import org.jboss.netty.util.internal.ThreadLocalBoolean;
27  
28  import java.net.InetSocketAddress;
29  import java.nio.channels.SelectableChannel;
30  import java.nio.channels.WritableByteChannel;
31  import java.util.Queue;
32  import java.util.concurrent.ConcurrentLinkedQueue;
33  import java.util.concurrent.atomic.AtomicBoolean;
34  import java.util.concurrent.atomic.AtomicInteger;
35  
36  import static org.jboss.netty.channel.Channels.*;
37  import static org.jboss.netty.channel.socket.nio.AbstractNioWorker.isIoThread;
38  
39  abstract class AbstractNioChannel<C extends SelectableChannel & WritableByteChannel> extends AbstractChannel {
40  
41      /**
42       * The {@link AbstractNioWorker}.
43       */
44      final AbstractNioWorker worker;
45  
46      /**
47       * Monitor object for synchronizing access to the {@link WriteRequestQueue}.
48       */
49      final Object writeLock = new Object();
50  
51      /**
52       * WriteTask that performs write operations.
53       */
54      final Runnable writeTask = new WriteTask();
55  
56      /**
57       * Indicates if there is a {@link WriteTask} in the task queue.
58       */
59      final AtomicBoolean writeTaskInTaskQueue = new AtomicBoolean();
60  
61      /**
62       * Queue of write {@link MessageEvent}s.
63       */
64      final WriteRequestQueue writeBufferQueue = new WriteRequestQueue();
65  
66      /**
67       * Keeps track of the number of bytes that the {@link WriteRequestQueue} currently
68       * contains.
69       */
70      final AtomicInteger writeBufferSize = new AtomicInteger();
71  
72      /**
73       * Keeps track of the highWaterMark.
74       */
75      final AtomicInteger highWaterMarkCounter = new AtomicInteger();
76  
77      /**
78       * The current write {@link MessageEvent}
79       */
80      MessageEvent currentWriteEvent;
81      SendBuffer currentWriteBuffer;
82  
83      /**
84       * Boolean that indicates that write operation is in progress.
85       */
86      boolean inWriteNowLoop;
87      boolean writeSuspended;
88  
89      private volatile InetSocketAddress localAddress;
90      volatile InetSocketAddress remoteAddress;
91  
92      final C channel;
93  
94      protected AbstractNioChannel(
95              Integer id, Channel parent, ChannelFactory factory, ChannelPipeline pipeline,
96              ChannelSink sink, AbstractNioWorker worker, C ch) {
97          super(id, parent, factory, pipeline, sink);
98          this.worker = worker;
99          channel = ch;
100     }
101 
102     protected AbstractNioChannel(
103             Channel parent, ChannelFactory factory,
104             ChannelPipeline pipeline, ChannelSink sink, AbstractNioWorker worker, C ch)  {
105         super(parent, factory, pipeline, sink);
106         this.worker = worker;
107         channel = ch;
108     }
109 
110     /**
111      * Return the {@link AbstractNioWorker} that handle the IO of the
112      * {@link AbstractNioChannel}
113      *
114      * @return worker
115      */
116     public AbstractNioWorker getWorker() {
117         return worker;
118     }
119 
120     public InetSocketAddress getLocalAddress() {
121         InetSocketAddress localAddress = this.localAddress;
122         if (localAddress == null) {
123             try {
124                 localAddress = getLocalSocketAddress();
125                 if (localAddress.getAddress().isAnyLocalAddress()) {
126                     // Don't cache on a wildcard address so the correct one
127                     // will be cached once the channel is connected/bound
128                     return localAddress;
129                 }
130                 this.localAddress = localAddress;
131             } catch (Throwable t) {
132                 // Sometimes fails on a closed socket in Windows.
133                 return null;
134             }
135         }
136         return localAddress;
137     }
138 
139     public InetSocketAddress getRemoteAddress() {
140         InetSocketAddress remoteAddress = this.remoteAddress;
141         if (remoteAddress == null) {
142             try {
143                 this.remoteAddress = remoteAddress =
144                     getRemoteSocketAddress();
145             } catch (Throwable t) {
146                 // Sometimes fails on a closed socket in Windows.
147                 return null;
148             }
149         }
150         return remoteAddress;
151     }
152 
153     public abstract NioChannelConfig getConfig();
154 
155     @Override
156     protected int getInternalInterestOps() {
157         return super.getInternalInterestOps();
158     }
159 
160     @Override
161     protected void setInternalInterestOps(int interestOps) {
162         super.setInternalInterestOps(interestOps);
163     }
164 
165     @Override
166     protected boolean setClosed() {
167         return super.setClosed();
168     }
169 
170     abstract InetSocketAddress getLocalSocketAddress() throws Exception;
171 
172     abstract InetSocketAddress getRemoteSocketAddress() throws Exception;
173 
174     final class WriteRequestQueue {
175         private final ThreadLocalBoolean notifying = new ThreadLocalBoolean();
176 
177         private final Queue<MessageEvent> queue;
178 
179         public WriteRequestQueue() {
180             queue = new ConcurrentLinkedQueue<MessageEvent>();
181         }
182 
183         public boolean isEmpty() {
184             return queue.isEmpty();
185         }
186 
187         public boolean offer(MessageEvent e) {
188             boolean success = queue.offer(e);
189             assert success;
190 
191             int messageSize = getMessageSize(e);
192             int newWriteBufferSize = writeBufferSize.addAndGet(messageSize);
193             final int highWaterMark =  getConfig().getWriteBufferHighWaterMark();
194 
195             if (newWriteBufferSize >= highWaterMark) {
196                 if (newWriteBufferSize - messageSize < highWaterMark) {
197                     highWaterMarkCounter.incrementAndGet();
198                     if (setUnwritable()) {
199                         if (isIoThread(AbstractNioChannel.this)) {
200                             if (!notifying.get()) {
201                                 notifying.set(Boolean.TRUE);
202                                 fireChannelInterestChanged(AbstractNioChannel.this);
203                                 notifying.set(Boolean.FALSE);
204                             }
205                         } else {
206                             // Adjusting writability requires careful synchronization.
207                             //
208                             // Consider for instance:
209                             //
210                             // T1 repeated offer: go above high water mark
211                             //      context switch *before* calling setUnwritable
212                             // T2 repeated poll: go under low water mark
213                             //      context switch *after* calling setWritable
214                             // T1 setUnwritable
215                             //
216                             // At this point the channel is incorrectly marked unwritable and would
217                             // remain so until the high water mark were exceeded again, which may
218                             // never happen if the application did control flow based on writability.
219                             //
220                             // The simplest solution would be to use a mutex to protect both the
221                             // buffer size and the writability bit, however that would impose a
222                             // serious performance penalty.
223                             //
224                             // A better approach would be to always call setUnwritable in the io
225                             // thread, which does not impact performance as the interestChanged
226                             // event has to be fired from there anyway.
227                             //
228                             // However this could break code which expects isWritable to immediately
229                             // be updated after a write() from a non-io thread.
230                             //
231                             // Instead, we re-check the buffer size before firing the interest
232                             // changed event and revert the change to the writability bit if needed.
233                             worker.executeInIoThread(new Runnable() {
234                                 @Override
235                                 public void run() {
236                                     if (writeBufferSize.get() >= highWaterMark || setWritable()) {
237                                         fireChannelInterestChanged(AbstractNioChannel.this);
238                                     }
239                                 }
240                             });
241                         }
242                     }
243                 }
244             }
245             return true;
246         }
247 
248         public MessageEvent poll() {
249             MessageEvent e = queue.poll();
250             if (e != null) {
251                 int messageSize = getMessageSize(e);
252                 int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize);
253                 int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
254 
255                 if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) {
256                     if (newWriteBufferSize + messageSize >= lowWaterMark) {
257                         highWaterMarkCounter.decrementAndGet();
258                         if (isConnected()) {
259                             // write events are only ever processed from the channel io thread
260                             // except when cleaning up the buffer, which only happens on close()
261                             // changing that would require additional synchronization around
262                             // writeBufferSize and writability changes.
263                             assert isIoThread(AbstractNioChannel.this);
264                             if (setWritable()) {
265                                 notifying.set(Boolean.TRUE);
266                                 fireChannelInterestChanged(AbstractNioChannel.this);
267                                 notifying.set(Boolean.FALSE);
268                             }
269                         }
270                     }
271                 }
272             }
273             return e;
274         }
275 
276         private int getMessageSize(MessageEvent e) {
277             Object m = e.getMessage();
278             if (m instanceof ChannelBuffer) {
279                 return ((ChannelBuffer) m).readableBytes();
280             }
281             return 0;
282         }
283     }
284 
285     private final class WriteTask implements Runnable {
286 
287         WriteTask() {
288         }
289 
290         public void run() {
291             writeTaskInTaskQueue.set(false);
292             worker.writeFromTaskLoop(AbstractNioChannel.this);
293         }
294     }
295 
296 }