View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.sctp.oio;
17  
18  import com.sun.nio.sctp.Association;
19  import com.sun.nio.sctp.MessageInfo;
20  import com.sun.nio.sctp.NotificationHandler;
21  import com.sun.nio.sctp.SctpChannel;
22  import io.netty.buffer.ByteBuf;
23  import io.netty.channel.Channel;
24  import io.netty.channel.ChannelException;
25  import io.netty.channel.ChannelFuture;
26  import io.netty.channel.ChannelMetadata;
27  import io.netty.channel.ChannelOutboundBuffer;
28  import io.netty.channel.ChannelPromise;
29  import io.netty.channel.RecvByteBufAllocator;
30  import io.netty.channel.oio.AbstractOioMessageChannel;
31  import io.netty.channel.sctp.DefaultSctpChannelConfig;
32  import io.netty.channel.sctp.SctpChannelConfig;
33  import io.netty.channel.sctp.SctpMessage;
34  import io.netty.channel.sctp.SctpNotificationHandler;
35  import io.netty.channel.sctp.SctpServerChannel;
36  import io.netty.util.internal.PlatformDependent;
37  import io.netty.util.internal.StringUtil;
38  import io.netty.util.internal.logging.InternalLogger;
39  import io.netty.util.internal.logging.InternalLoggerFactory;
40  
41  import java.io.IOException;
42  import java.net.InetAddress;
43  import java.net.InetSocketAddress;
44  import java.net.SocketAddress;
45  import java.nio.ByteBuffer;
46  import java.nio.channels.SelectionKey;
47  import java.nio.channels.Selector;
48  import java.util.Collections;
49  import java.util.Iterator;
50  import java.util.LinkedHashSet;
51  import java.util.List;
52  import java.util.Set;
53  
54  /**
55   * {@link io.netty.channel.sctp.SctpChannel} implementation which use blocking mode and allows to read / write
56   * {@link SctpMessage}s to the underlying {@link SctpChannel}.
57   *
58   * Be aware that not all operations systems support SCTP. Please refer to the documentation of your operation system,
59   * to understand what you need to do to use it. Also this feature is only supported on Java 7+.
60   */
61  public class OioSctpChannel extends AbstractOioMessageChannel
62          implements io.netty.channel.sctp.SctpChannel {
63  
64      private static final InternalLogger logger =
65              InternalLoggerFactory.getInstance(OioSctpChannel.class);
66  
67      private static final ChannelMetadata METADATA = new ChannelMetadata(false);
68      private static final String EXPECTED_TYPE = " (expected: " + StringUtil.simpleClassName(SctpMessage.class) + ')';
69  
70      private final SctpChannel ch;
71      private final SctpChannelConfig config;
72  
73      private final Selector readSelector;
74      private final Selector writeSelector;
75      private final Selector connectSelector;
76  
77      private final NotificationHandler<?> notificationHandler;
78  
79      private static SctpChannel openChannel() {
80          try {
81              return SctpChannel.open();
82          } catch (IOException e) {
83              throw new ChannelException("Failed to open a sctp channel.", e);
84          }
85      }
86  
87      /**
88       * Create a new instance with an new {@link SctpChannel}.
89       */
90      public OioSctpChannel() {
91          this(openChannel());
92      }
93  
94      /**
95       * Create a new instance from the given {@link SctpChannel}.
96       *
97       * @param ch    the {@link SctpChannel} which is used by this instance
98       */
99      public OioSctpChannel(SctpChannel ch) {
100         this(null, ch);
101     }
102 
103     /**
104      * Create a new instance from the given {@link SctpChannel}.
105      *
106      * @param parent    the parent {@link Channel} which was used to create this instance. This can be null if the
107      *                  {@link} has no parent as it was created by your self.
108      * @param ch        the {@link SctpChannel} which is used by this instance
109      */
110     public OioSctpChannel(Channel parent, SctpChannel ch) {
111         super(parent);
112         this.ch = ch;
113         boolean success = false;
114         try {
115             ch.configureBlocking(false);
116             readSelector = Selector.open();
117             writeSelector = Selector.open();
118             connectSelector = Selector.open();
119 
120             ch.register(readSelector, SelectionKey.OP_READ);
121             ch.register(writeSelector, SelectionKey.OP_WRITE);
122             ch.register(connectSelector, SelectionKey.OP_CONNECT);
123 
124             config = new OioSctpChannelConfig(this, ch);
125             notificationHandler = new SctpNotificationHandler(this);
126             success = true;
127         } catch (Exception e) {
128             throw new ChannelException("failed to initialize a sctp channel", e);
129         } finally {
130             if (!success) {
131                 try {
132                     ch.close();
133                 } catch (IOException e) {
134                     logger.warn("Failed to close a sctp channel.", e);
135                 }
136             }
137         }
138     }
139 
140     @Override
141     public InetSocketAddress localAddress() {
142         return (InetSocketAddress) super.localAddress();
143     }
144 
145     @Override
146     public InetSocketAddress remoteAddress() {
147         return (InetSocketAddress) super.remoteAddress();
148     }
149 
150     @Override
151     public SctpServerChannel parent() {
152         return (SctpServerChannel) super.parent();
153     }
154 
155     @Override
156     public ChannelMetadata metadata() {
157         return METADATA;
158     }
159 
160     @Override
161     public SctpChannelConfig config() {
162         return config;
163     }
164 
165     @Override
166     public boolean isOpen() {
167         return ch.isOpen();
168     }
169 
170     @Override
171     protected int doReadMessages(List<Object> msgs) throws Exception {
172         if (!readSelector.isOpen()) {
173             return 0;
174         }
175 
176         int readMessages = 0;
177 
178         final int selectedKeys = readSelector.select(SO_TIMEOUT);
179         final boolean keysSelected = selectedKeys > 0;
180 
181         if (!keysSelected) {
182             return readMessages;
183         }
184 
185         Set<SelectionKey> reableKeys = readSelector.selectedKeys();
186         try {
187             for (SelectionKey ignored : reableKeys) {
188                 RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
189                 ByteBuf buffer = allocHandle.allocate(config().getAllocator());
190                 boolean free = true;
191 
192                 try {
193                     ByteBuffer data = buffer.nioBuffer(buffer.writerIndex(), buffer.writableBytes());
194                     MessageInfo messageInfo = ch.receive(data, null, notificationHandler);
195                     if (messageInfo == null) {
196                         return readMessages;
197                     }
198 
199                     data.flip();
200                     msgs.add(new SctpMessage(messageInfo, buffer.writerIndex(buffer.writerIndex() + data.remaining())));
201                     free = false;
202                     readMessages ++;
203                 } catch (Throwable cause) {
204                     PlatformDependent.throwException(cause);
205                 }  finally {
206                     int bytesRead = buffer.readableBytes();
207                     allocHandle.record(bytesRead);
208                     if (free) {
209                         buffer.release();
210                     }
211                 }
212             }
213         } finally {
214             reableKeys.clear();
215         }
216         return readMessages;
217     }
218 
219     @Override
220     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
221         if (!writeSelector.isOpen()) {
222             return;
223         }
224         final int size = in.size();
225         final int selectedKeys = writeSelector.select(SO_TIMEOUT);
226         if (selectedKeys > 0) {
227             final Set<SelectionKey> writableKeys = writeSelector.selectedKeys();
228             if (writableKeys.isEmpty()) {
229                 return;
230             }
231             Iterator<SelectionKey> writableKeysIt = writableKeys.iterator();
232             int written = 0;
233             for (;;) {
234                 if (written == size) {
235                     // all written
236                     return;
237                 }
238                 writableKeysIt.next();
239                 writableKeysIt.remove();
240 
241                 SctpMessage packet = (SctpMessage) in.current();
242                 if (packet == null) {
243                     return;
244                 }
245 
246                 ByteBuf data = packet.content();
247                 int dataLen = data.readableBytes();
248                 ByteBuffer nioData;
249 
250                 if (data.nioBufferCount() != -1) {
251                     nioData = data.nioBuffer();
252                 } else {
253                     nioData = ByteBuffer.allocate(dataLen);
254                     data.getBytes(data.readerIndex(), nioData);
255                     nioData.flip();
256                 }
257 
258                 final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier());
259                 mi.payloadProtocolID(packet.protocolIdentifier());
260                 mi.streamNumber(packet.streamIdentifier());
261 
262                 ch.send(nioData, mi);
263                 written ++;
264                 in.remove();
265 
266                 if (!writableKeysIt.hasNext()) {
267                     return;
268                 }
269             }
270         }
271     }
272 
273     @Override
274     protected Object filterOutboundMessage(Object msg) throws Exception {
275         if (msg instanceof SctpMessage) {
276             return msg;
277         }
278 
279         throw new UnsupportedOperationException(
280                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPE);
281     }
282 
283     @Override
284     public Association association() {
285         try {
286             return ch.association();
287         } catch (IOException ignored) {
288             return null;
289         }
290     }
291 
292     @Override
293     public boolean isActive() {
294         return isOpen() && association() != null;
295     }
296 
297     @Override
298     protected SocketAddress localAddress0() {
299         try {
300             Iterator<SocketAddress> i = ch.getAllLocalAddresses().iterator();
301             if (i.hasNext()) {
302                 return i.next();
303             }
304         } catch (IOException e) {
305             // ignore
306         }
307         return null;
308     }
309 
310     @Override
311     public Set<InetSocketAddress> allLocalAddresses() {
312         try {
313             final Set<SocketAddress> allLocalAddresses = ch.getAllLocalAddresses();
314             final Set<InetSocketAddress> addresses = new LinkedHashSet<InetSocketAddress>(allLocalAddresses.size());
315             for (SocketAddress socketAddress : allLocalAddresses) {
316                 addresses.add((InetSocketAddress) socketAddress);
317             }
318             return addresses;
319         } catch (Throwable ignored) {
320             return Collections.emptySet();
321         }
322     }
323 
324     @Override
325     protected SocketAddress remoteAddress0() {
326         try {
327             Iterator<SocketAddress> i = ch.getRemoteAddresses().iterator();
328             if (i.hasNext()) {
329                 return i.next();
330             }
331         } catch (IOException e) {
332             // ignore
333         }
334         return null;
335     }
336 
337     @Override
338     public Set<InetSocketAddress> allRemoteAddresses() {
339         try {
340             final Set<SocketAddress> allLocalAddresses = ch.getRemoteAddresses();
341             final Set<InetSocketAddress> addresses = new LinkedHashSet<InetSocketAddress>(allLocalAddresses.size());
342             for (SocketAddress socketAddress : allLocalAddresses) {
343                 addresses.add((InetSocketAddress) socketAddress);
344             }
345             return addresses;
346         } catch (Throwable ignored) {
347             return Collections.emptySet();
348         }
349     }
350 
351     @Override
352     protected void doBind(SocketAddress localAddress) throws Exception {
353         ch.bind(localAddress);
354     }
355 
356     @Override
357     protected void doConnect(SocketAddress remoteAddress,
358                              SocketAddress localAddress) throws Exception {
359         if (localAddress != null) {
360             ch.bind(localAddress);
361         }
362 
363         boolean success = false;
364         try {
365             ch.connect(remoteAddress);
366             boolean  finishConnect = false;
367             while (!finishConnect) {
368                 if (connectSelector.select(SO_TIMEOUT) >= 0) {
369                     final Set<SelectionKey> selectionKeys = connectSelector.selectedKeys();
370                     for (SelectionKey key : selectionKeys) {
371                        if (key.isConnectable()) {
372                            selectionKeys.clear();
373                            finishConnect = true;
374                            break;
375                        }
376                     }
377                     selectionKeys.clear();
378                 }
379             }
380             success = ch.finishConnect();
381         } finally {
382             if (!success) {
383                 doClose();
384             }
385         }
386     }
387 
388     @Override
389     protected void doDisconnect() throws Exception {
390         doClose();
391     }
392 
393     @Override
394     protected void doClose() throws Exception {
395         closeSelector("read", readSelector);
396         closeSelector("write", writeSelector);
397         closeSelector("connect", connectSelector);
398         ch.close();
399     }
400 
401     private static void closeSelector(String selectorName, Selector selector) {
402         try {
403             selector.close();
404         } catch (IOException e) {
405             logger.warn("Failed to close a " + selectorName + " selector.", e);
406         }
407     }
408 
409     @Override
410     public ChannelFuture bindAddress(InetAddress localAddress) {
411         return bindAddress(localAddress, newPromise());
412     }
413 
414     @Override
415     public ChannelFuture bindAddress(final InetAddress localAddress, final ChannelPromise promise) {
416         if (eventLoop().inEventLoop()) {
417             try {
418                 ch.bindAddress(localAddress);
419                 promise.setSuccess();
420             } catch (Throwable t) {
421                 promise.setFailure(t);
422             }
423         } else {
424             eventLoop().execute(new Runnable() {
425                 @Override
426                 public void run() {
427                     bindAddress(localAddress, promise);
428                 }
429             });
430         }
431         return promise;
432     }
433 
434     @Override
435     public ChannelFuture unbindAddress(InetAddress localAddress) {
436         return unbindAddress(localAddress, newPromise());
437     }
438 
439     @Override
440     public ChannelFuture unbindAddress(final InetAddress localAddress, final ChannelPromise promise) {
441         if (eventLoop().inEventLoop()) {
442             try {
443                 ch.unbindAddress(localAddress);
444                 promise.setSuccess();
445             } catch (Throwable t) {
446                 promise.setFailure(t);
447             }
448         } else {
449             eventLoop().execute(new Runnable() {
450                 @Override
451                 public void run() {
452                     unbindAddress(localAddress, promise);
453                 }
454             });
455         }
456         return promise;
457     }
458 
459     private final class OioSctpChannelConfig extends DefaultSctpChannelConfig {
460         private OioSctpChannelConfig(OioSctpChannel channel, SctpChannel javaChannel) {
461             super(channel, javaChannel);
462         }
463 
464         @Override
465         protected void autoReadCleared() {
466             setReadPending(false);
467         }
468     }
469 }