View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.sctp.oio;
17  
18  import com.sun.nio.sctp.Association;
19  import com.sun.nio.sctp.MessageInfo;
20  import com.sun.nio.sctp.NotificationHandler;
21  import com.sun.nio.sctp.SctpChannel;
22  
23  import io.netty.buffer.ByteBuf;
24  import io.netty.channel.Channel;
25  import io.netty.channel.ChannelException;
26  import io.netty.channel.ChannelFuture;
27  import io.netty.channel.ChannelMetadata;
28  import io.netty.channel.ChannelOutboundBuffer;
29  import io.netty.channel.ChannelPromise;
30  import io.netty.channel.RecvByteBufAllocator;
31  import io.netty.channel.oio.AbstractOioMessageChannel;
32  import io.netty.channel.sctp.DefaultSctpChannelConfig;
33  import io.netty.channel.sctp.SctpChannelConfig;
34  import io.netty.channel.sctp.SctpMessage;
35  import io.netty.channel.sctp.SctpNotificationHandler;
36  import io.netty.channel.sctp.SctpServerChannel;
37  import io.netty.util.internal.PlatformDependent;
38  import io.netty.util.internal.StringUtil;
39  import io.netty.util.internal.logging.InternalLogger;
40  import io.netty.util.internal.logging.InternalLoggerFactory;
41  
42  import java.io.IOException;
43  import java.net.InetAddress;
44  import java.net.InetSocketAddress;
45  import java.net.SocketAddress;
46  import java.nio.ByteBuffer;
47  import java.nio.channels.SelectionKey;
48  import java.nio.channels.Selector;
49  import java.util.Collections;
50  import java.util.Iterator;
51  import java.util.LinkedHashSet;
52  import java.util.List;
53  import java.util.Set;
54  
55  /**
56   * {@link io.netty.channel.sctp.SctpChannel} implementation which use blocking mode and allows to read / write
57   * {@link SctpMessage}s to the underlying {@link SctpChannel}.
58   *
59   * Be aware that not all operations systems support SCTP. Please refer to the documentation of your operation system,
60   * to understand what you need to do to use it. Also this feature is only supported on Java 7+.
61   */
62  public class OioSctpChannel extends AbstractOioMessageChannel
63          implements io.netty.channel.sctp.SctpChannel {
64  
65      private static final InternalLogger logger =
66              InternalLoggerFactory.getInstance(OioSctpChannel.class);
67  
68      private static final ChannelMetadata METADATA = new ChannelMetadata(false);
69      private static final String EXPECTED_TYPE = " (expected: " + StringUtil.simpleClassName(SctpMessage.class) + ')';
70  
71      private final SctpChannel ch;
72      private final SctpChannelConfig config;
73  
74      private final Selector readSelector;
75      private final Selector writeSelector;
76      private final Selector connectSelector;
77  
78      private final NotificationHandler<?> notificationHandler;
79  
80      private static SctpChannel openChannel() {
81          try {
82              return SctpChannel.open();
83          } catch (IOException e) {
84              throw new ChannelException("Failed to open a sctp channel.", e);
85          }
86      }
87  
88      /**
89       * Create a new instance with an new {@link SctpChannel}.
90       */
91      public OioSctpChannel() {
92          this(openChannel());
93      }
94  
95      /**
96       * Create a new instance from the given {@link SctpChannel}.
97       *
98       * @param ch    the {@link SctpChannel} which is used by this instance
99       */
100     public OioSctpChannel(SctpChannel ch) {
101         this(null, ch);
102     }
103 
104     /**
105      * Create a new instance from the given {@link SctpChannel}.
106      *
107      * @param parent    the parent {@link Channel} which was used to create this instance. This can be null if the
108      *                  {@link} has no parent as it was created by your self.
109      * @param ch        the {@link SctpChannel} which is used by this instance
110      */
111     public OioSctpChannel(Channel parent, SctpChannel ch) {
112         super(parent);
113         this.ch = ch;
114         boolean success = false;
115         try {
116             ch.configureBlocking(false);
117             readSelector = Selector.open();
118             writeSelector = Selector.open();
119             connectSelector = Selector.open();
120 
121             ch.register(readSelector, SelectionKey.OP_READ);
122             ch.register(writeSelector, SelectionKey.OP_WRITE);
123             ch.register(connectSelector, SelectionKey.OP_CONNECT);
124 
125             config = new OioSctpChannelConfig(this, ch);
126             notificationHandler = new SctpNotificationHandler(this);
127             success = true;
128         } catch (Exception e) {
129             throw new ChannelException("failed to initialize a sctp channel", e);
130         } finally {
131             if (!success) {
132                 try {
133                     ch.close();
134                 } catch (IOException e) {
135                     logger.warn("Failed to close a sctp channel.", e);
136                 }
137             }
138         }
139     }
140 
141     @Override
142     public InetSocketAddress localAddress() {
143         return (InetSocketAddress) super.localAddress();
144     }
145 
146     @Override
147     public InetSocketAddress remoteAddress() {
148         return (InetSocketAddress) super.remoteAddress();
149     }
150 
151     @Override
152     public SctpServerChannel parent() {
153         return (SctpServerChannel) super.parent();
154     }
155 
156     @Override
157     public ChannelMetadata metadata() {
158         return METADATA;
159     }
160 
161     @Override
162     public SctpChannelConfig config() {
163         return config;
164     }
165 
166     @Override
167     public boolean isOpen() {
168         return ch.isOpen();
169     }
170 
171     @Override
172     protected int doReadMessages(List<Object> msgs) throws Exception {
173         if (!readSelector.isOpen()) {
174             return 0;
175         }
176 
177         int readMessages = 0;
178 
179         final int selectedKeys = readSelector.select(SO_TIMEOUT);
180         final boolean keysSelected = selectedKeys > 0;
181 
182         if (!keysSelected) {
183             return readMessages;
184         }
185         // We must clear the selectedKeys because the Selector will never do it. If we do not clear it, the selectionKey
186         // will always be returned even if there is no data can be read which causes performance issue. And in some
187         // implementation of Selector, the select method may return 0 if the selectionKey which is ready for process has
188         // already been in the selectedKeys and cause the keysSelected above to be false even if we actually have
189         // something to read.
190         readSelector.selectedKeys().clear();
191         final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
192         ByteBuf buffer = allocHandle.allocate(config().getAllocator());
193         boolean free = true;
194 
195         try {
196             ByteBuffer data = buffer.nioBuffer(buffer.writerIndex(), buffer.writableBytes());
197             MessageInfo messageInfo = ch.receive(data, null, notificationHandler);
198             if (messageInfo == null) {
199                 return readMessages;
200             }
201 
202             data.flip();
203             allocHandle.lastBytesRead(data.remaining());
204             msgs.add(new SctpMessage(messageInfo,
205                     buffer.writerIndex(buffer.writerIndex() + allocHandle.lastBytesRead())));
206             free = false;
207             ++readMessages;
208         } catch (Throwable cause) {
209             PlatformDependent.throwException(cause);
210         }  finally {
211             if (free) {
212                 buffer.release();
213             }
214         }
215         return readMessages;
216     }
217 
218     @Override
219     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
220         if (!writeSelector.isOpen()) {
221             return;
222         }
223         final int size = in.size();
224         final int selectedKeys = writeSelector.select(SO_TIMEOUT);
225         if (selectedKeys > 0) {
226             final Set<SelectionKey> writableKeys = writeSelector.selectedKeys();
227             if (writableKeys.isEmpty()) {
228                 return;
229             }
230             Iterator<SelectionKey> writableKeysIt = writableKeys.iterator();
231             int written = 0;
232             for (;;) {
233                 if (written == size) {
234                     // all written
235                     return;
236                 }
237                 writableKeysIt.next();
238                 writableKeysIt.remove();
239 
240                 SctpMessage packet = (SctpMessage) in.current();
241                 if (packet == null) {
242                     return;
243                 }
244 
245                 ByteBuf data = packet.content();
246                 int dataLen = data.readableBytes();
247                 ByteBuffer nioData;
248 
249                 if (data.nioBufferCount() != -1) {
250                     nioData = data.nioBuffer();
251                 } else {
252                     nioData = ByteBuffer.allocate(dataLen);
253                     data.getBytes(data.readerIndex(), nioData);
254                     nioData.flip();
255                 }
256 
257                 final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier());
258                 mi.payloadProtocolID(packet.protocolIdentifier());
259                 mi.streamNumber(packet.streamIdentifier());
260                 mi.unordered(packet.isUnordered());
261 
262                 ch.send(nioData, mi);
263                 written ++;
264                 in.remove();
265 
266                 if (!writableKeysIt.hasNext()) {
267                     return;
268                 }
269             }
270         }
271     }
272 
273     @Override
274     protected Object filterOutboundMessage(Object msg) throws Exception {
275         if (msg instanceof SctpMessage) {
276             return msg;
277         }
278 
279         throw new UnsupportedOperationException(
280                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPE);
281     }
282 
283     @Override
284     public Association association() {
285         try {
286             return ch.association();
287         } catch (IOException ignored) {
288             return null;
289         }
290     }
291 
292     @Override
293     public boolean isActive() {
294         return isOpen() && association() != null;
295     }
296 
297     @Override
298     protected SocketAddress localAddress0() {
299         try {
300             Iterator<SocketAddress> i = ch.getAllLocalAddresses().iterator();
301             if (i.hasNext()) {
302                 return i.next();
303             }
304         } catch (IOException e) {
305             // ignore
306         }
307         return null;
308     }
309 
310     @Override
311     public Set<InetSocketAddress> allLocalAddresses() {
312         try {
313             final Set<SocketAddress> allLocalAddresses = ch.getAllLocalAddresses();
314             final Set<InetSocketAddress> addresses = new LinkedHashSet<InetSocketAddress>(allLocalAddresses.size());
315             for (SocketAddress socketAddress : allLocalAddresses) {
316                 addresses.add((InetSocketAddress) socketAddress);
317             }
318             return addresses;
319         } catch (Throwable ignored) {
320             return Collections.emptySet();
321         }
322     }
323 
324     @Override
325     protected SocketAddress remoteAddress0() {
326         try {
327             Iterator<SocketAddress> i = ch.getRemoteAddresses().iterator();
328             if (i.hasNext()) {
329                 return i.next();
330             }
331         } catch (IOException e) {
332             // ignore
333         }
334         return null;
335     }
336 
337     @Override
338     public Set<InetSocketAddress> allRemoteAddresses() {
339         try {
340             final Set<SocketAddress> allLocalAddresses = ch.getRemoteAddresses();
341             final Set<InetSocketAddress> addresses = new LinkedHashSet<InetSocketAddress>(allLocalAddresses.size());
342             for (SocketAddress socketAddress : allLocalAddresses) {
343                 addresses.add((InetSocketAddress) socketAddress);
344             }
345             return addresses;
346         } catch (Throwable ignored) {
347             return Collections.emptySet();
348         }
349     }
350 
351     @Override
352     protected void doBind(SocketAddress localAddress) throws Exception {
353         ch.bind(localAddress);
354     }
355 
356     @Override
357     protected void doConnect(SocketAddress remoteAddress,
358                              SocketAddress localAddress) throws Exception {
359         if (localAddress != null) {
360             ch.bind(localAddress);
361         }
362 
363         boolean success = false;
364         try {
365             ch.connect(remoteAddress);
366             boolean  finishConnect = false;
367             while (!finishConnect) {
368                 if (connectSelector.select(SO_TIMEOUT) >= 0) {
369                     final Set<SelectionKey> selectionKeys = connectSelector.selectedKeys();
370                     for (SelectionKey key : selectionKeys) {
371                        if (key.isConnectable()) {
372                            selectionKeys.clear();
373                            finishConnect = true;
374                            break;
375                        }
376                     }
377                     selectionKeys.clear();
378                 }
379             }
380             success = ch.finishConnect();
381         } finally {
382             if (!success) {
383                 doClose();
384             }
385         }
386     }
387 
388     @Override
389     protected void doDisconnect() throws Exception {
390         doClose();
391     }
392 
393     @Override
394     protected void doClose() throws Exception {
395         closeSelector("read", readSelector);
396         closeSelector("write", writeSelector);
397         closeSelector("connect", connectSelector);
398         ch.close();
399     }
400 
401     private static void closeSelector(String selectorName, Selector selector) {
402         try {
403             selector.close();
404         } catch (IOException e) {
405             logger.warn("Failed to close a " + selectorName + " selector.", e);
406         }
407     }
408 
409     @Override
410     public ChannelFuture bindAddress(InetAddress localAddress) {
411         return bindAddress(localAddress, newPromise());
412     }
413 
414     @Override
415     public ChannelFuture bindAddress(final InetAddress localAddress, final ChannelPromise promise) {
416         if (eventLoop().inEventLoop()) {
417             try {
418                 ch.bindAddress(localAddress);
419                 promise.setSuccess();
420             } catch (Throwable t) {
421                 promise.setFailure(t);
422             }
423         } else {
424             eventLoop().execute(new Runnable() {
425                 @Override
426                 public void run() {
427                     bindAddress(localAddress, promise);
428                 }
429             });
430         }
431         return promise;
432     }
433 
434     @Override
435     public ChannelFuture unbindAddress(InetAddress localAddress) {
436         return unbindAddress(localAddress, newPromise());
437     }
438 
439     @Override
440     public ChannelFuture unbindAddress(final InetAddress localAddress, final ChannelPromise promise) {
441         if (eventLoop().inEventLoop()) {
442             try {
443                 ch.unbindAddress(localAddress);
444                 promise.setSuccess();
445             } catch (Throwable t) {
446                 promise.setFailure(t);
447             }
448         } else {
449             eventLoop().execute(new Runnable() {
450                 @Override
451                 public void run() {
452                     unbindAddress(localAddress, promise);
453                 }
454             });
455         }
456         return promise;
457     }
458 
459     private final class OioSctpChannelConfig extends DefaultSctpChannelConfig {
460         private OioSctpChannelConfig(OioSctpChannel channel, SctpChannel javaChannel) {
461             super(channel, javaChannel);
462         }
463 
464         @Override
465         protected void autoReadCleared() {
466             clearReadPending();
467         }
468     }
469 }