View Javadoc

1   /*
2    * Copyright 2011 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.sctp.nio;
17  
18  import com.sun.nio.sctp.Association;
19  import com.sun.nio.sctp.MessageInfo;
20  import com.sun.nio.sctp.NotificationHandler;
21  import com.sun.nio.sctp.SctpChannel;
22  import io.netty.buffer.ByteBuf;
23  import io.netty.buffer.ByteBufAllocator;
24  import io.netty.channel.Channel;
25  import io.netty.channel.ChannelException;
26  import io.netty.channel.ChannelFuture;
27  import io.netty.channel.ChannelMetadata;
28  import io.netty.channel.ChannelOutboundBuffer;
29  import io.netty.channel.ChannelPromise;
30  import io.netty.channel.RecvByteBufAllocator;
31  import io.netty.channel.nio.AbstractNioMessageChannel;
32  import io.netty.channel.sctp.DefaultSctpChannelConfig;
33  import io.netty.channel.sctp.SctpChannelConfig;
34  import io.netty.channel.sctp.SctpMessage;
35  import io.netty.channel.sctp.SctpNotificationHandler;
36  import io.netty.channel.sctp.SctpServerChannel;
37  import io.netty.util.internal.PlatformDependent;
38  import io.netty.util.internal.StringUtil;
39  import io.netty.util.internal.logging.InternalLogger;
40  import io.netty.util.internal.logging.InternalLoggerFactory;
41  
42  import java.io.IOException;
43  import java.net.InetAddress;
44  import java.net.InetSocketAddress;
45  import java.net.SocketAddress;
46  import java.nio.ByteBuffer;
47  import java.nio.channels.SelectionKey;
48  import java.util.Collections;
49  import java.util.HashSet;
50  import java.util.Iterator;
51  import java.util.LinkedHashSet;
52  import java.util.List;
53  import java.util.Set;
54  
55  /**
56   * {@link io.netty.channel.sctp.SctpChannel} implementation which use non-blocking mode and allows to read /
57   * write {@link SctpMessage}s to the underlying {@link SctpChannel}.
58   *
59   * Be aware that not all operations systems support SCTP. Please refer to the documentation of your operation system,
60   * to understand what you need to do to use it. Also this feature is only supported on Java 7+.
61   */
62  public class NioSctpChannel extends AbstractNioMessageChannel implements io.netty.channel.sctp.SctpChannel {
63      private static final ChannelMetadata METADATA = new ChannelMetadata(false);
64  
65      private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioSctpChannel.class);
66  
67      private final SctpChannelConfig config;
68  
69      private final NotificationHandler<?> notificationHandler;
70  
71      private RecvByteBufAllocator.Handle allocHandle;
72  
73      private static SctpChannel newSctpChannel() {
74          try {
75              return SctpChannel.open();
76          } catch (IOException e) {
77              throw new ChannelException("Failed to open a sctp channel.", e);
78          }
79      }
80  
81      /**
82       * Create a new instance
83       */
84      public NioSctpChannel() {
85          this(newSctpChannel());
86      }
87  
88      /**
89       * Create a new instance using {@link SctpChannel}
90       */
91      public NioSctpChannel(SctpChannel sctpChannel) {
92          this(null, sctpChannel);
93      }
94  
95      /**
96       * Create a new instance
97       *
98       * @param parent        the {@link Channel} which is the parent of this {@link NioSctpChannel}
99       *                      or {@code null}.
100      * @param sctpChannel   the underlying {@link SctpChannel}
101      */
102     public NioSctpChannel(Channel parent, SctpChannel sctpChannel) {
103         super(parent, sctpChannel, SelectionKey.OP_READ);
104         try {
105             sctpChannel.configureBlocking(false);
106             config = new NioSctpChannelConfig(this, sctpChannel);
107             notificationHandler = new SctpNotificationHandler(this);
108         } catch (IOException e) {
109             try {
110                 sctpChannel.close();
111             } catch (IOException e2) {
112                 if (logger.isWarnEnabled()) {
113                     logger.warn(
114                             "Failed to close a partially initialized sctp channel.", e2);
115                 }
116             }
117 
118             throw new ChannelException("Failed to enter non-blocking mode.", e);
119         }
120     }
121 
122     @Override
123     public InetSocketAddress localAddress() {
124         return (InetSocketAddress) super.localAddress();
125     }
126 
127     @Override
128     public InetSocketAddress remoteAddress() {
129         return (InetSocketAddress) super.remoteAddress();
130     }
131 
132     @Override
133     public SctpServerChannel parent() {
134         return (SctpServerChannel) super.parent();
135     }
136 
137     @Override
138     public ChannelMetadata metadata() {
139         return METADATA;
140     }
141 
142     @Override
143     public Association association() {
144         try {
145             return javaChannel().association();
146         } catch (IOException ignored) {
147             return null;
148         }
149     }
150 
151     @Override
152     public Set<InetSocketAddress> allLocalAddresses() {
153         try {
154             final Set<SocketAddress> allLocalAddresses = javaChannel().getAllLocalAddresses();
155             final Set<InetSocketAddress> addresses = new LinkedHashSet<InetSocketAddress>(allLocalAddresses.size());
156             for (SocketAddress socketAddress : allLocalAddresses) {
157                 addresses.add((InetSocketAddress) socketAddress);
158             }
159             return addresses;
160         } catch (Throwable ignored) {
161             return Collections.emptySet();
162         }
163     }
164 
165     @Override
166     public SctpChannelConfig config() {
167         return config;
168     }
169 
170     @Override
171     public Set<InetSocketAddress> allRemoteAddresses() {
172         try {
173             final Set<SocketAddress> allLocalAddresses = javaChannel().getRemoteAddresses();
174             final Set<InetSocketAddress> addresses = new HashSet<InetSocketAddress>(allLocalAddresses.size());
175             for (SocketAddress socketAddress : allLocalAddresses) {
176                 addresses.add((InetSocketAddress) socketAddress);
177             }
178             return addresses;
179         } catch (Throwable ignored) {
180             return Collections.emptySet();
181         }
182     }
183 
184     @Override
185     protected SctpChannel javaChannel() {
186         return (SctpChannel) super.javaChannel();
187     }
188 
189     @Override
190     public boolean isActive() {
191         SctpChannel ch = javaChannel();
192         return ch.isOpen() && association() != null;
193     }
194 
195     @Override
196     protected SocketAddress localAddress0() {
197         try {
198             Iterator<SocketAddress> i = javaChannel().getAllLocalAddresses().iterator();
199             if (i.hasNext()) {
200                 return i.next();
201             }
202         } catch (IOException e) {
203             // ignore
204         }
205         return null;
206     }
207 
208     @Override
209     protected SocketAddress remoteAddress0() {
210         try {
211             Iterator<SocketAddress> i = javaChannel().getRemoteAddresses().iterator();
212             if (i.hasNext()) {
213                 return i.next();
214             }
215         } catch (IOException e) {
216             // ignore
217         }
218         return null;
219     }
220 
221     @Override
222     protected void doBind(SocketAddress localAddress) throws Exception {
223         javaChannel().bind(localAddress);
224     }
225 
226     @Override
227     protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
228         if (localAddress != null) {
229             javaChannel().bind(localAddress);
230         }
231 
232         boolean success = false;
233         try {
234             boolean connected = javaChannel().connect(remoteAddress);
235             if (!connected) {
236                 selectionKey().interestOps(SelectionKey.OP_CONNECT);
237             }
238             success = true;
239             return connected;
240         } finally {
241             if (!success) {
242                 doClose();
243             }
244         }
245     }
246 
247     @Override
248     protected void doFinishConnect() throws Exception {
249         if (!javaChannel().finishConnect()) {
250             throw new Error();
251         }
252     }
253 
254     @Override
255     protected void doDisconnect() throws Exception {
256         doClose();
257     }
258 
259     @Override
260     protected void doClose() throws Exception {
261         javaChannel().close();
262     }
263 
264     @Override
265     protected int doReadMessages(List<Object> buf) throws Exception {
266         SctpChannel ch = javaChannel();
267 
268         RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
269         if (allocHandle == null) {
270             this.allocHandle = allocHandle = config().getRecvByteBufAllocator().newHandle();
271         }
272         ByteBuf buffer = allocHandle.allocate(config().getAllocator());
273         boolean free = true;
274         try {
275             ByteBuffer data = buffer.internalNioBuffer(buffer.writerIndex(), buffer.writableBytes());
276             int pos = data.position();
277 
278             MessageInfo messageInfo = ch.receive(data, null, notificationHandler);
279             if (messageInfo == null) {
280                 return 0;
281             }
282             buf.add(new SctpMessage(messageInfo, buffer.writerIndex(buffer.writerIndex() + data.position() - pos)));
283             free = false;
284             return 1;
285         } catch (Throwable cause) {
286             PlatformDependent.throwException(cause);
287             return -1;
288         }  finally {
289             int bytesRead = buffer.readableBytes();
290             allocHandle.record(bytesRead);
291             if (free) {
292                 buffer.release();
293             }
294         }
295     }
296 
297     @Override
298     protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
299         SctpMessage packet = (SctpMessage) msg;
300         ByteBuf data = packet.content();
301         int dataLen = data.readableBytes();
302         if (dataLen == 0) {
303             return true;
304         }
305 
306         ByteBufAllocator alloc = alloc();
307         boolean needsCopy = data.nioBufferCount() != 1;
308         if (!needsCopy) {
309             if (!data.isDirect() && alloc.isDirectBufferPooled()) {
310                 needsCopy = true;
311             }
312         }
313         ByteBuffer nioData;
314         if (!needsCopy) {
315             nioData = data.nioBuffer();
316         } else {
317             data = alloc.directBuffer(dataLen).writeBytes(data);
318             nioData = data.nioBuffer();
319         }
320         final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier());
321         mi.payloadProtocolID(packet.protocolIdentifier());
322         mi.streamNumber(packet.streamIdentifier());
323         mi.unordered(packet.isUnordered());
324 
325         final int writtenBytes = javaChannel().send(nioData, mi);
326         return writtenBytes > 0;
327     }
328 
329     @Override
330     protected final Object filterOutboundMessage(Object msg) throws Exception {
331         if (msg instanceof SctpMessage) {
332             SctpMessage m = (SctpMessage) msg;
333             ByteBuf buf = m.content();
334             if (buf.isDirect() && buf.nioBufferCount() == 1) {
335                 return m;
336             }
337 
338             return new SctpMessage(m.protocolIdentifier(), m.streamIdentifier(), m.isUnordered(),
339                                    newDirectBuffer(m, buf));
340         }
341 
342         throw new UnsupportedOperationException(
343                 "unsupported message type: " + StringUtil.simpleClassName(msg) +
344                 " (expected: " + StringUtil.simpleClassName(SctpMessage.class));
345     }
346 
347     @Override
348     public ChannelFuture bindAddress(InetAddress localAddress) {
349         return bindAddress(localAddress, newPromise());
350     }
351 
352     @Override
353     public ChannelFuture bindAddress(final InetAddress localAddress, final ChannelPromise promise) {
354         if (eventLoop().inEventLoop()) {
355             try {
356                 javaChannel().bindAddress(localAddress);
357                 promise.setSuccess();
358             } catch (Throwable t) {
359                 promise.setFailure(t);
360             }
361         } else {
362             eventLoop().execute(new Runnable() {
363                 @Override
364                 public void run() {
365                     bindAddress(localAddress, promise);
366                 }
367             });
368         }
369         return promise;
370     }
371 
372     @Override
373     public ChannelFuture unbindAddress(InetAddress localAddress) {
374         return unbindAddress(localAddress, newPromise());
375     }
376 
377     @Override
378     public ChannelFuture unbindAddress(final InetAddress localAddress, final ChannelPromise promise) {
379         if (eventLoop().inEventLoop()) {
380             try {
381                 javaChannel().unbindAddress(localAddress);
382                 promise.setSuccess();
383             } catch (Throwable t) {
384                 promise.setFailure(t);
385             }
386         } else {
387             eventLoop().execute(new Runnable() {
388                 @Override
389                 public void run() {
390                     unbindAddress(localAddress, promise);
391                 }
392             });
393         }
394         return promise;
395     }
396 
397     private final class NioSctpChannelConfig extends DefaultSctpChannelConfig {
398         private NioSctpChannelConfig(NioSctpChannel channel, SctpChannel javaChannel) {
399             super(channel, javaChannel);
400         }
401 
402         @Override
403         protected void autoReadCleared() {
404             setReadPending(false);
405         }
406     }
407 }