View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.sctp.oio;
17  
18  import com.sun.nio.sctp.Association;
19  import com.sun.nio.sctp.MessageInfo;
20  import com.sun.nio.sctp.NotificationHandler;
21  import com.sun.nio.sctp.SctpChannel;
22  import io.netty.buffer.ByteBuf;
23  import io.netty.channel.Channel;
24  import io.netty.channel.ChannelException;
25  import io.netty.channel.ChannelFuture;
26  import io.netty.channel.ChannelMetadata;
27  import io.netty.channel.ChannelOutboundBuffer;
28  import io.netty.channel.ChannelPromise;
29  import io.netty.channel.RecvByteBufAllocator;
30  import io.netty.channel.oio.AbstractOioMessageChannel;
31  import io.netty.channel.sctp.DefaultSctpChannelConfig;
32  import io.netty.channel.sctp.SctpChannelConfig;
33  import io.netty.channel.sctp.SctpMessage;
34  import io.netty.channel.sctp.SctpNotificationHandler;
35  import io.netty.channel.sctp.SctpServerChannel;
36  import io.netty.util.internal.PlatformDependent;
37  import io.netty.util.internal.StringUtil;
38  import io.netty.util.internal.logging.InternalLogger;
39  import io.netty.util.internal.logging.InternalLoggerFactory;
40  
41  import java.io.IOException;
42  import java.net.InetAddress;
43  import java.net.InetSocketAddress;
44  import java.net.SocketAddress;
45  import java.nio.ByteBuffer;
46  import java.nio.channels.SelectionKey;
47  import java.nio.channels.Selector;
48  import java.util.Collections;
49  import java.util.Iterator;
50  import java.util.LinkedHashSet;
51  import java.util.List;
52  import java.util.Set;
53  
54  /**
55   * {@link io.netty.channel.sctp.SctpChannel} implementation which use blocking mode and allows to read / write
56   * {@link SctpMessage}s to the underlying {@link SctpChannel}.
57   *
58   * Be aware that not all operations systems support SCTP. Please refer to the documentation of your operation system,
59   * to understand what you need to do to use it. Also this feature is only supported on Java 7+.
60   */
61  public class OioSctpChannel extends AbstractOioMessageChannel
62          implements io.netty.channel.sctp.SctpChannel {
63  
64      private static final InternalLogger logger =
65              InternalLoggerFactory.getInstance(OioSctpChannel.class);
66  
67      private static final ChannelMetadata METADATA = new ChannelMetadata(false);
68      private static final String EXPECTED_TYPE = " (expected: " + StringUtil.simpleClassName(SctpMessage.class) + ')';
69  
70      private final SctpChannel ch;
71      private final SctpChannelConfig config;
72  
73      private final Selector readSelector;
74      private final Selector writeSelector;
75      private final Selector connectSelector;
76  
77      private final NotificationHandler<?> notificationHandler;
78  
79      private RecvByteBufAllocator.Handle allocHandle;
80  
81      private static SctpChannel openChannel() {
82          try {
83              return SctpChannel.open();
84          } catch (IOException e) {
85              throw new ChannelException("Failed to open a sctp channel.", e);
86          }
87      }
88  
89      /**
90       * Create a new instance with an new {@link SctpChannel}.
91       */
92      public OioSctpChannel() {
93          this(openChannel());
94      }
95  
96      /**
97       * Create a new instance from the given {@link SctpChannel}.
98       *
99       * @param ch    the {@link SctpChannel} which is used by this instance
100      */
101     public OioSctpChannel(SctpChannel ch) {
102         this(null, ch);
103     }
104 
105     /**
106      * Create a new instance from the given {@link SctpChannel}.
107      *
108      * @param parent    the parent {@link Channel} which was used to create this instance. This can be null if the
109      *                  {@link} has no parent as it was created by your self.
110      * @param ch        the {@link SctpChannel} which is used by this instance
111      */
112     public OioSctpChannel(Channel parent, SctpChannel ch) {
113         super(parent);
114         this.ch = ch;
115         boolean success = false;
116         try {
117             ch.configureBlocking(false);
118             readSelector = Selector.open();
119             writeSelector = Selector.open();
120             connectSelector = Selector.open();
121 
122             ch.register(readSelector, SelectionKey.OP_READ);
123             ch.register(writeSelector, SelectionKey.OP_WRITE);
124             ch.register(connectSelector, SelectionKey.OP_CONNECT);
125 
126             config = new OioSctpChannelConfig(this, ch);
127             notificationHandler = new SctpNotificationHandler(this);
128             success = true;
129         } catch (Exception e) {
130             throw new ChannelException("failed to initialize a sctp channel", e);
131         } finally {
132             if (!success) {
133                 try {
134                     ch.close();
135                 } catch (IOException e) {
136                     logger.warn("Failed to close a sctp channel.", e);
137                 }
138             }
139         }
140     }
141 
142     @Override
143     public InetSocketAddress localAddress() {
144         return (InetSocketAddress) super.localAddress();
145     }
146 
147     @Override
148     public InetSocketAddress remoteAddress() {
149         return (InetSocketAddress) super.remoteAddress();
150     }
151 
152     @Override
153     public SctpServerChannel parent() {
154         return (SctpServerChannel) super.parent();
155     }
156 
157     @Override
158     public ChannelMetadata metadata() {
159         return METADATA;
160     }
161 
162     @Override
163     public SctpChannelConfig config() {
164         return config;
165     }
166 
167     @Override
168     public boolean isOpen() {
169         return ch.isOpen();
170     }
171 
172     @Override
173     protected int doReadMessages(List<Object> msgs) throws Exception {
174         if (!readSelector.isOpen()) {
175             return 0;
176         }
177 
178         int readMessages = 0;
179 
180         final int selectedKeys = readSelector.select(SO_TIMEOUT);
181         final boolean keysSelected = selectedKeys > 0;
182 
183         if (!keysSelected) {
184             return readMessages;
185         }
186 
187         // We must clear the selectedKeys because the Selector will never do it. If we do not clear it, the selectionKey
188         // will always be returned even if there is no data can be read which causes performance issue. And in some
189         // implementation of Selector, the select method may return 0 if the selectionKey which is ready for process has
190         // already been in the selectedKeys and cause the keysSelected above to be false even if we actually have
191         // something to read.
192         readSelector.selectedKeys().clear();
193 
194         RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
195         if (allocHandle == null) {
196             this.allocHandle = allocHandle = config().getRecvByteBufAllocator().newHandle();
197         }
198 
199         ByteBuf buffer = allocHandle.allocate(config().getAllocator());
200         boolean free = true;
201 
202         try {
203             ByteBuffer data = buffer.nioBuffer(buffer.writerIndex(), buffer.writableBytes());
204             MessageInfo messageInfo = ch.receive(data, null, notificationHandler);
205             if (messageInfo == null) {
206                 return readMessages;
207             }
208 
209             data.flip();
210             msgs.add(new SctpMessage(messageInfo, buffer.writerIndex(buffer.writerIndex() + data.remaining())));
211             free = false;
212             readMessages ++;
213         } catch (Throwable cause) {
214             PlatformDependent.throwException(cause);
215         }  finally {
216             int bytesRead = buffer.readableBytes();
217             allocHandle.record(bytesRead);
218             if (free) {
219                 buffer.release();
220             }
221         }
222         return readMessages;
223     }
224 
225     @Override
226     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
227         if (!writeSelector.isOpen()) {
228             return;
229         }
230         final int size = in.size();
231         final int selectedKeys = writeSelector.select(SO_TIMEOUT);
232         if (selectedKeys > 0) {
233             final Set<SelectionKey> writableKeys = writeSelector.selectedKeys();
234             if (writableKeys.isEmpty()) {
235                 return;
236             }
237             Iterator<SelectionKey> writableKeysIt = writableKeys.iterator();
238             int written = 0;
239             for (;;) {
240                 if (written == size) {
241                     // all written
242                     return;
243                 }
244                 writableKeysIt.next();
245                 writableKeysIt.remove();
246 
247                 SctpMessage packet = (SctpMessage) in.current();
248                 if (packet == null) {
249                     return;
250                 }
251 
252                 ByteBuf data = packet.content();
253                 int dataLen = data.readableBytes();
254                 ByteBuffer nioData;
255 
256                 if (data.nioBufferCount() != -1) {
257                     nioData = data.nioBuffer();
258                 } else {
259                     nioData = ByteBuffer.allocate(dataLen);
260                     data.getBytes(data.readerIndex(), nioData);
261                     nioData.flip();
262                 }
263 
264                 final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier());
265                 mi.payloadProtocolID(packet.protocolIdentifier());
266                 mi.streamNumber(packet.streamIdentifier());
267                 mi.unordered(packet.isUnordered());
268 
269                 ch.send(nioData, mi);
270                 written ++;
271                 in.remove();
272 
273                 if (!writableKeysIt.hasNext()) {
274                     return;
275                 }
276             }
277         }
278     }
279 
280     @Override
281     protected Object filterOutboundMessage(Object msg) throws Exception {
282         if (msg instanceof SctpMessage) {
283             return msg;
284         }
285 
286         throw new UnsupportedOperationException(
287                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPE);
288     }
289 
290     @Override
291     public Association association() {
292         try {
293             return ch.association();
294         } catch (IOException ignored) {
295             return null;
296         }
297     }
298 
299     @Override
300     public boolean isActive() {
301         return isOpen() && association() != null;
302     }
303 
304     @Override
305     protected SocketAddress localAddress0() {
306         try {
307             Iterator<SocketAddress> i = ch.getAllLocalAddresses().iterator();
308             if (i.hasNext()) {
309                 return i.next();
310             }
311         } catch (IOException e) {
312             // ignore
313         }
314         return null;
315     }
316 
317     @Override
318     public Set<InetSocketAddress> allLocalAddresses() {
319         try {
320             final Set<SocketAddress> allLocalAddresses = ch.getAllLocalAddresses();
321             final Set<InetSocketAddress> addresses = new LinkedHashSet<InetSocketAddress>(allLocalAddresses.size());
322             for (SocketAddress socketAddress : allLocalAddresses) {
323                 addresses.add((InetSocketAddress) socketAddress);
324             }
325             return addresses;
326         } catch (Throwable ignored) {
327             return Collections.emptySet();
328         }
329     }
330 
331     @Override
332     protected SocketAddress remoteAddress0() {
333         try {
334             Iterator<SocketAddress> i = ch.getRemoteAddresses().iterator();
335             if (i.hasNext()) {
336                 return i.next();
337             }
338         } catch (IOException e) {
339             // ignore
340         }
341         return null;
342     }
343 
344     @Override
345     public Set<InetSocketAddress> allRemoteAddresses() {
346         try {
347             final Set<SocketAddress> allLocalAddresses = ch.getRemoteAddresses();
348             final Set<InetSocketAddress> addresses = new LinkedHashSet<InetSocketAddress>(allLocalAddresses.size());
349             for (SocketAddress socketAddress : allLocalAddresses) {
350                 addresses.add((InetSocketAddress) socketAddress);
351             }
352             return addresses;
353         } catch (Throwable ignored) {
354             return Collections.emptySet();
355         }
356     }
357 
358     @Override
359     protected void doBind(SocketAddress localAddress) throws Exception {
360         ch.bind(localAddress);
361     }
362 
363     @Override
364     protected void doConnect(SocketAddress remoteAddress,
365                              SocketAddress localAddress) throws Exception {
366         if (localAddress != null) {
367             ch.bind(localAddress);
368         }
369 
370         boolean success = false;
371         try {
372             ch.connect(remoteAddress);
373             boolean  finishConnect = false;
374             while (!finishConnect) {
375                 if (connectSelector.select(SO_TIMEOUT) >= 0) {
376                     final Set<SelectionKey> selectionKeys = connectSelector.selectedKeys();
377                     for (SelectionKey key : selectionKeys) {
378                        if (key.isConnectable()) {
379                            selectionKeys.clear();
380                            finishConnect = true;
381                            break;
382                        }
383                     }
384                     selectionKeys.clear();
385                 }
386             }
387             success = ch.finishConnect();
388         } finally {
389             if (!success) {
390                 doClose();
391             }
392         }
393     }
394 
395     @Override
396     protected void doDisconnect() throws Exception {
397         doClose();
398     }
399 
400     @Override
401     protected void doClose() throws Exception {
402         closeSelector("read", readSelector);
403         closeSelector("write", writeSelector);
404         closeSelector("connect", connectSelector);
405         ch.close();
406     }
407 
408     private static void closeSelector(String selectorName, Selector selector) {
409         try {
410             selector.close();
411         } catch (IOException e) {
412             logger.warn("Failed to close a " + selectorName + " selector.", e);
413         }
414     }
415 
416     @Override
417     public ChannelFuture bindAddress(InetAddress localAddress) {
418         return bindAddress(localAddress, newPromise());
419     }
420 
421     @Override
422     public ChannelFuture bindAddress(final InetAddress localAddress, final ChannelPromise promise) {
423         if (eventLoop().inEventLoop()) {
424             try {
425                 ch.bindAddress(localAddress);
426                 promise.setSuccess();
427             } catch (Throwable t) {
428                 promise.setFailure(t);
429             }
430         } else {
431             eventLoop().execute(new Runnable() {
432                 @Override
433                 public void run() {
434                     bindAddress(localAddress, promise);
435                 }
436             });
437         }
438         return promise;
439     }
440 
441     @Override
442     public ChannelFuture unbindAddress(InetAddress localAddress) {
443         return unbindAddress(localAddress, newPromise());
444     }
445 
446     @Override
447     public ChannelFuture unbindAddress(final InetAddress localAddress, final ChannelPromise promise) {
448         if (eventLoop().inEventLoop()) {
449             try {
450                 ch.unbindAddress(localAddress);
451                 promise.setSuccess();
452             } catch (Throwable t) {
453                 promise.setFailure(t);
454             }
455         } else {
456             eventLoop().execute(new Runnable() {
457                 @Override
458                 public void run() {
459                     unbindAddress(localAddress, promise);
460                 }
461             });
462         }
463         return promise;
464     }
465 
466     private final class OioSctpChannelConfig extends DefaultSctpChannelConfig {
467         private OioSctpChannelConfig(OioSctpChannel channel, SctpChannel javaChannel) {
468             super(channel, javaChannel);
469         }
470 
471         @Override
472         protected void autoReadCleared() {
473             setReadPending(false);
474         }
475     }
476 }