View Javadoc
1   /*
2    * Copyright 2011 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.sctp.nio;
17  
18  import com.sun.nio.sctp.Association;
19  import com.sun.nio.sctp.MessageInfo;
20  import com.sun.nio.sctp.NotificationHandler;
21  import com.sun.nio.sctp.SctpChannel;
22  import io.netty.buffer.ByteBuf;
23  import io.netty.buffer.ByteBufAllocator;
24  import io.netty.channel.Channel;
25  import io.netty.channel.ChannelException;
26  import io.netty.channel.ChannelFuture;
27  import io.netty.channel.ChannelMetadata;
28  import io.netty.channel.ChannelOutboundBuffer;
29  import io.netty.channel.ChannelPromise;
30  import io.netty.channel.RecvByteBufAllocator;
31  import io.netty.channel.nio.AbstractNioMessageChannel;
32  import io.netty.channel.nio.NioIoOps;
33  import io.netty.channel.sctp.DefaultSctpChannelConfig;
34  import io.netty.channel.sctp.SctpChannelConfig;
35  import io.netty.channel.sctp.SctpMessage;
36  import io.netty.channel.sctp.SctpNotificationHandler;
37  import io.netty.channel.sctp.SctpServerChannel;
38  import io.netty.util.internal.PlatformDependent;
39  import io.netty.util.internal.StringUtil;
40  import io.netty.util.internal.logging.InternalLogger;
41  import io.netty.util.internal.logging.InternalLoggerFactory;
42  
43  import java.io.IOException;
44  import java.net.InetAddress;
45  import java.net.InetSocketAddress;
46  import java.net.SocketAddress;
47  import java.nio.ByteBuffer;
48  import java.nio.channels.SelectionKey;
49  import java.util.Collections;
50  import java.util.HashSet;
51  import java.util.Iterator;
52  import java.util.LinkedHashSet;
53  import java.util.List;
54  import java.util.Set;
55  
56  /**
57   * {@link io.netty.channel.sctp.SctpChannel} implementation which use non-blocking mode and allows to read /
58   * write {@link SctpMessage}s to the underlying {@link SctpChannel}.
59   *
60   * Be aware that not all operations systems support SCTP. Please refer to the documentation of your operation system,
61   * to understand what you need to do to use it. Also this feature is only supported on Java 7+.
62   */
63  public class NioSctpChannel extends AbstractNioMessageChannel implements io.netty.channel.sctp.SctpChannel {
64      private static final ChannelMetadata METADATA = new ChannelMetadata(false);
65  
66      private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioSctpChannel.class);
67  
68      private final SctpChannelConfig config;
69  
70      private final NotificationHandler<?> notificationHandler;
71  
72      private static SctpChannel newSctpChannel() {
73          try {
74              return SctpChannel.open();
75          } catch (IOException e) {
76              throw new ChannelException("Failed to open a sctp channel.", e);
77          }
78      }
79  
80      /**
81       * Create a new instance
82       */
83      public NioSctpChannel() {
84          this(newSctpChannel());
85      }
86  
87      /**
88       * Create a new instance using {@link SctpChannel}
89       */
90      public NioSctpChannel(SctpChannel sctpChannel) {
91          this(null, sctpChannel);
92      }
93  
94      /**
95       * Create a new instance
96       *
97       * @param parent        the {@link Channel} which is the parent of this {@link NioSctpChannel}
98       *                      or {@code null}.
99       * @param sctpChannel   the underlying {@link SctpChannel}
100      */
101     public NioSctpChannel(Channel parent, SctpChannel sctpChannel) {
102         super(parent, sctpChannel, SelectionKey.OP_READ);
103         try {
104             sctpChannel.configureBlocking(false);
105             config = new NioSctpChannelConfig(this, sctpChannel);
106             notificationHandler = new SctpNotificationHandler(this);
107         } catch (IOException e) {
108             try {
109                 sctpChannel.close();
110             } catch (IOException e2) {
111                 if (logger.isWarnEnabled()) {
112                     logger.warn(
113                             "Failed to close a partially initialized sctp channel.", e2);
114                 }
115             }
116 
117             throw new ChannelException("Failed to enter non-blocking mode.", e);
118         }
119     }
120 
121     @Override
122     public InetSocketAddress localAddress() {
123         return (InetSocketAddress) super.localAddress();
124     }
125 
126     @Override
127     public InetSocketAddress remoteAddress() {
128         return (InetSocketAddress) super.remoteAddress();
129     }
130 
131     @Override
132     public SctpServerChannel parent() {
133         return (SctpServerChannel) super.parent();
134     }
135 
136     @Override
137     public ChannelMetadata metadata() {
138         return METADATA;
139     }
140 
141     @Override
142     public Association association() {
143         try {
144             return javaChannel().association();
145         } catch (IOException ignored) {
146             return null;
147         }
148     }
149 
150     @Override
151     public Set<InetSocketAddress> allLocalAddresses() {
152         try {
153             final Set<SocketAddress> allLocalAddresses = javaChannel().getAllLocalAddresses();
154             final Set<InetSocketAddress> addresses = new LinkedHashSet<InetSocketAddress>(allLocalAddresses.size());
155             for (SocketAddress socketAddress : allLocalAddresses) {
156                 addresses.add((InetSocketAddress) socketAddress);
157             }
158             return addresses;
159         } catch (Throwable ignored) {
160             return Collections.emptySet();
161         }
162     }
163 
164     @Override
165     public SctpChannelConfig config() {
166         return config;
167     }
168 
169     @Override
170     public Set<InetSocketAddress> allRemoteAddresses() {
171         try {
172             final Set<SocketAddress> allLocalAddresses = javaChannel().getRemoteAddresses();
173             final Set<InetSocketAddress> addresses = new HashSet<InetSocketAddress>(allLocalAddresses.size());
174             for (SocketAddress socketAddress : allLocalAddresses) {
175                 addresses.add((InetSocketAddress) socketAddress);
176             }
177             return addresses;
178         } catch (Throwable ignored) {
179             return Collections.emptySet();
180         }
181     }
182 
183     @Override
184     protected SctpChannel javaChannel() {
185         return (SctpChannel) super.javaChannel();
186     }
187 
188     @Override
189     public boolean isActive() {
190         SctpChannel ch = javaChannel();
191         return ch.isOpen() && association() != null;
192     }
193 
194     @Override
195     protected SocketAddress localAddress0() {
196         try {
197             Iterator<SocketAddress> i = javaChannel().getAllLocalAddresses().iterator();
198             if (i.hasNext()) {
199                 return i.next();
200             }
201         } catch (IOException e) {
202             // ignore
203         }
204         return null;
205     }
206 
207     @Override
208     protected SocketAddress remoteAddress0() {
209         try {
210             Iterator<SocketAddress> i = javaChannel().getRemoteAddresses().iterator();
211             if (i.hasNext()) {
212                 return i.next();
213             }
214         } catch (IOException e) {
215             // ignore
216         }
217         return null;
218     }
219 
220     @Override
221     protected void doBind(SocketAddress localAddress) throws Exception {
222         javaChannel().bind(localAddress);
223     }
224 
225     @Override
226     protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
227         if (localAddress != null) {
228             javaChannel().bind(localAddress);
229         }
230 
231         boolean success = false;
232         try {
233             boolean connected = javaChannel().connect(remoteAddress);
234             if (!connected) {
235                 addAndSubmit(NioIoOps.CONNECT);
236             }
237             success = true;
238             return connected;
239         } finally {
240             if (!success) {
241                 doClose();
242             }
243         }
244     }
245 
246     @Override
247     protected void doFinishConnect() throws Exception {
248         if (!javaChannel().finishConnect()) {
249             throw new Error();
250         }
251     }
252 
253     @Override
254     protected void doDisconnect() throws Exception {
255         doClose();
256     }
257 
258     @Override
259     protected void doClose() throws Exception {
260         javaChannel().close();
261     }
262 
263     @Override
264     protected int doReadMessages(List<Object> buf) throws Exception {
265         SctpChannel ch = javaChannel();
266 
267         RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
268         ByteBuf buffer = allocHandle.allocate(config().getAllocator());
269         boolean free = true;
270         try {
271             ByteBuffer data = buffer.internalNioBuffer(buffer.writerIndex(), buffer.writableBytes());
272             int pos = data.position();
273 
274             MessageInfo messageInfo = ch.receive(data, null, notificationHandler);
275             if (messageInfo == null) {
276                 return 0;
277             }
278 
279             allocHandle.lastBytesRead(data.position() - pos);
280             buf.add(new SctpMessage(messageInfo,
281                     buffer.writerIndex(buffer.writerIndex() + allocHandle.lastBytesRead())));
282             free = false;
283             return 1;
284         } catch (Throwable cause) {
285             PlatformDependent.throwException(cause);
286             return -1;
287         }  finally {
288             if (free) {
289                 buffer.release();
290             }
291         }
292     }
293 
294     @Override
295     protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
296         SctpMessage packet = (SctpMessage) msg;
297         ByteBuf data = packet.content();
298         int dataLen = data.readableBytes();
299         if (dataLen == 0) {
300             return true;
301         }
302 
303         ByteBufAllocator alloc = alloc();
304         boolean needsCopy = data.nioBufferCount() != 1;
305         if (!needsCopy) {
306             if (!data.isDirect() && alloc.isDirectBufferPooled()) {
307                 needsCopy = true;
308             }
309         }
310         ByteBuffer nioData;
311         if (needsCopy) {
312             data = alloc.directBuffer(dataLen).writeBytes(data);
313         }
314         nioData = data.nioBuffer();
315         final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier());
316         mi.payloadProtocolID(packet.protocolIdentifier());
317         mi.streamNumber(packet.streamIdentifier());
318         mi.unordered(packet.isUnordered());
319 
320         final int writtenBytes = javaChannel().send(nioData, mi);
321         return writtenBytes > 0;
322     }
323 
324     @Override
325     protected final Object filterOutboundMessage(Object msg) throws Exception {
326         if (msg instanceof SctpMessage) {
327             SctpMessage m = (SctpMessage) msg;
328             ByteBuf buf = m.content();
329             if (buf.isDirect() && buf.nioBufferCount() == 1) {
330                 return m;
331             }
332 
333             return new SctpMessage(m.protocolIdentifier(), m.streamIdentifier(), m.isUnordered(),
334                                    newDirectBuffer(m, buf));
335         }
336 
337         throw new UnsupportedOperationException(
338                 "unsupported message type: " + StringUtil.simpleClassName(msg) +
339                 " (expected: " + StringUtil.simpleClassName(SctpMessage.class));
340     }
341 
342     @Override
343     public ChannelFuture bindAddress(InetAddress localAddress) {
344         return bindAddress(localAddress, newPromise());
345     }
346 
347     @Override
348     public ChannelFuture bindAddress(final InetAddress localAddress, final ChannelPromise promise) {
349         if (eventLoop().inEventLoop()) {
350             try {
351                 javaChannel().bindAddress(localAddress);
352                 promise.setSuccess();
353             } catch (Throwable t) {
354                 promise.setFailure(t);
355             }
356         } else {
357             eventLoop().execute(new Runnable() {
358                 @Override
359                 public void run() {
360                     bindAddress(localAddress, promise);
361                 }
362             });
363         }
364         return promise;
365     }
366 
367     @Override
368     public ChannelFuture unbindAddress(InetAddress localAddress) {
369         return unbindAddress(localAddress, newPromise());
370     }
371 
372     @Override
373     public ChannelFuture unbindAddress(final InetAddress localAddress, final ChannelPromise promise) {
374         if (eventLoop().inEventLoop()) {
375             try {
376                 javaChannel().unbindAddress(localAddress);
377                 promise.setSuccess();
378             } catch (Throwable t) {
379                 promise.setFailure(t);
380             }
381         } else {
382             eventLoop().execute(new Runnable() {
383                 @Override
384                 public void run() {
385                     unbindAddress(localAddress, promise);
386                 }
387             });
388         }
389         return promise;
390     }
391 
392     private final class NioSctpChannelConfig extends DefaultSctpChannelConfig {
393         private NioSctpChannelConfig(NioSctpChannel channel, SctpChannel javaChannel) {
394             super(channel, javaChannel);
395         }
396 
397         @Override
398         protected void autoReadCleared() {
399             clearReadPending();
400         }
401     }
402 }