View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.sctp.oio;
17  
18  import com.sun.nio.sctp.Association;
19  import com.sun.nio.sctp.MessageInfo;
20  import com.sun.nio.sctp.NotificationHandler;
21  import com.sun.nio.sctp.SctpChannel;
22  
23  import io.netty.buffer.ByteBuf;
24  import io.netty.channel.Channel;
25  import io.netty.channel.ChannelException;
26  import io.netty.channel.ChannelFuture;
27  import io.netty.channel.ChannelMetadata;
28  import io.netty.channel.ChannelOutboundBuffer;
29  import io.netty.channel.ChannelPromise;
30  import io.netty.channel.RecvByteBufAllocator;
31  import io.netty.channel.oio.AbstractOioMessageChannel;
32  import io.netty.channel.sctp.DefaultSctpChannelConfig;
33  import io.netty.channel.sctp.SctpChannelConfig;
34  import io.netty.channel.sctp.SctpMessage;
35  import io.netty.channel.sctp.SctpNotificationHandler;
36  import io.netty.channel.sctp.SctpServerChannel;
37  import io.netty.util.internal.PlatformDependent;
38  import io.netty.util.internal.StringUtil;
39  import io.netty.util.internal.logging.InternalLogger;
40  import io.netty.util.internal.logging.InternalLoggerFactory;
41  
42  import java.io.IOException;
43  import java.net.InetAddress;
44  import java.net.InetSocketAddress;
45  import java.net.SocketAddress;
46  import java.nio.ByteBuffer;
47  import java.nio.channels.SelectionKey;
48  import java.nio.channels.Selector;
49  import java.util.Collections;
50  import java.util.Iterator;
51  import java.util.LinkedHashSet;
52  import java.util.List;
53  import java.util.Set;
54  
55  /**
56   * {@link io.netty.channel.sctp.SctpChannel} implementation which use blocking mode and allows to read / write
57   * {@link SctpMessage}s to the underlying {@link SctpChannel}.
58   *
59   * Be aware that not all operations systems support SCTP. Please refer to the documentation of your operation system,
60   * to understand what you need to do to use it. Also this feature is only supported on Java 7+.
61   *
62   * @deprecated use {@link io.netty.channel.sctp.nio.NioSctpChannel}.
63   */
64  @Deprecated
65  public class OioSctpChannel extends AbstractOioMessageChannel
66          implements io.netty.channel.sctp.SctpChannel {
67  
68      private static final InternalLogger logger =
69              InternalLoggerFactory.getInstance(OioSctpChannel.class);
70  
71      private static final ChannelMetadata METADATA = new ChannelMetadata(false);
72      private static final String EXPECTED_TYPE = " (expected: " + StringUtil.simpleClassName(SctpMessage.class) + ')';
73  
74      private final SctpChannel ch;
75      private final SctpChannelConfig config;
76  
77      private final Selector readSelector;
78      private final Selector writeSelector;
79      private final Selector connectSelector;
80  
81      private final NotificationHandler<?> notificationHandler;
82  
83      private static SctpChannel openChannel() {
84          try {
85              return SctpChannel.open();
86          } catch (IOException e) {
87              throw new ChannelException("Failed to open a sctp channel.", e);
88          }
89      }
90  
91      /**
92       * Create a new instance with an new {@link SctpChannel}.
93       */
94      public OioSctpChannel() {
95          this(openChannel());
96      }
97  
98      /**
99       * Create a new instance from the given {@link SctpChannel}.
100      *
101      * @param ch    the {@link SctpChannel} which is used by this instance
102      */
103     public OioSctpChannel(SctpChannel ch) {
104         this(null, ch);
105     }
106 
107     /**
108      * Create a new instance from the given {@link SctpChannel}.
109      *
110      * @param parent    the parent {@link Channel} which was used to create this instance. This can be null if the
111      *                  {@link} has no parent as it was created by your self.
112      * @param ch        the {@link SctpChannel} which is used by this instance
113      */
114     public OioSctpChannel(Channel parent, SctpChannel ch) {
115         super(parent);
116         this.ch = ch;
117         boolean success = false;
118         try {
119             ch.configureBlocking(false);
120             readSelector = Selector.open();
121             writeSelector = Selector.open();
122             connectSelector = Selector.open();
123 
124             ch.register(readSelector, SelectionKey.OP_READ);
125             ch.register(writeSelector, SelectionKey.OP_WRITE);
126             ch.register(connectSelector, SelectionKey.OP_CONNECT);
127 
128             config = new OioSctpChannelConfig(this, ch);
129             notificationHandler = new SctpNotificationHandler(this);
130             success = true;
131         } catch (Exception e) {
132             throw new ChannelException("failed to initialize a sctp channel", e);
133         } finally {
134             if (!success) {
135                 try {
136                     ch.close();
137                 } catch (IOException e) {
138                     logger.warn("Failed to close a sctp channel.", e);
139                 }
140             }
141         }
142     }
143 
144     @Override
145     public InetSocketAddress localAddress() {
146         return (InetSocketAddress) super.localAddress();
147     }
148 
149     @Override
150     public InetSocketAddress remoteAddress() {
151         return (InetSocketAddress) super.remoteAddress();
152     }
153 
154     @Override
155     public SctpServerChannel parent() {
156         return (SctpServerChannel) super.parent();
157     }
158 
159     @Override
160     public ChannelMetadata metadata() {
161         return METADATA;
162     }
163 
164     @Override
165     public SctpChannelConfig config() {
166         return config;
167     }
168 
169     @Override
170     public boolean isOpen() {
171         return ch.isOpen();
172     }
173 
174     @Override
175     protected int doReadMessages(List<Object> msgs) throws Exception {
176         if (!readSelector.isOpen()) {
177             return 0;
178         }
179 
180         int readMessages = 0;
181 
182         final int selectedKeys = readSelector.select(SO_TIMEOUT);
183         final boolean keysSelected = selectedKeys > 0;
184 
185         if (!keysSelected) {
186             return readMessages;
187         }
188         // We must clear the selectedKeys because the Selector will never do it. If we do not clear it, the selectionKey
189         // will always be returned even if there is no data can be read which causes performance issue. And in some
190         // implementation of Selector, the select method may return 0 if the selectionKey which is ready for process has
191         // already been in the selectedKeys and cause the keysSelected above to be false even if we actually have
192         // something to read.
193         readSelector.selectedKeys().clear();
194         final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
195         ByteBuf buffer = allocHandle.allocate(config().getAllocator());
196         boolean free = true;
197 
198         try {
199             ByteBuffer data = buffer.nioBuffer(buffer.writerIndex(), buffer.writableBytes());
200             MessageInfo messageInfo = ch.receive(data, null, notificationHandler);
201             if (messageInfo == null) {
202                 return readMessages;
203             }
204 
205             data.flip();
206             allocHandle.lastBytesRead(data.remaining());
207             msgs.add(new SctpMessage(messageInfo,
208                     buffer.writerIndex(buffer.writerIndex() + allocHandle.lastBytesRead())));
209             free = false;
210             ++readMessages;
211         } catch (Throwable cause) {
212             PlatformDependent.throwException(cause);
213         }  finally {
214             if (free) {
215                 buffer.release();
216             }
217         }
218         return readMessages;
219     }
220 
221     @Override
222     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
223         if (!writeSelector.isOpen()) {
224             return;
225         }
226         final int size = in.size();
227         final int selectedKeys = writeSelector.select(SO_TIMEOUT);
228         if (selectedKeys > 0) {
229             final Set<SelectionKey> writableKeys = writeSelector.selectedKeys();
230             if (writableKeys.isEmpty()) {
231                 return;
232             }
233             Iterator<SelectionKey> writableKeysIt = writableKeys.iterator();
234             int written = 0;
235             for (;;) {
236                 if (written == size) {
237                     // all written
238                     return;
239                 }
240                 writableKeysIt.next();
241                 writableKeysIt.remove();
242 
243                 SctpMessage packet = (SctpMessage) in.current();
244                 if (packet == null) {
245                     return;
246                 }
247 
248                 ByteBuf data = packet.content();
249                 int dataLen = data.readableBytes();
250                 ByteBuffer nioData;
251 
252                 if (data.nioBufferCount() != -1) {
253                     nioData = data.nioBuffer();
254                 } else {
255                     nioData = ByteBuffer.allocate(dataLen);
256                     data.getBytes(data.readerIndex(), nioData);
257                     nioData.flip();
258                 }
259 
260                 final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier());
261                 mi.payloadProtocolID(packet.protocolIdentifier());
262                 mi.streamNumber(packet.streamIdentifier());
263                 mi.unordered(packet.isUnordered());
264 
265                 ch.send(nioData, mi);
266                 written ++;
267                 in.remove();
268 
269                 if (!writableKeysIt.hasNext()) {
270                     return;
271                 }
272             }
273         }
274     }
275 
276     @Override
277     protected Object filterOutboundMessage(Object msg) throws Exception {
278         if (msg instanceof SctpMessage) {
279             return msg;
280         }
281 
282         throw new UnsupportedOperationException(
283                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPE);
284     }
285 
286     @Override
287     public Association association() {
288         try {
289             return ch.association();
290         } catch (IOException ignored) {
291             return null;
292         }
293     }
294 
295     @Override
296     public boolean isActive() {
297         return isOpen() && association() != null;
298     }
299 
300     @Override
301     protected SocketAddress localAddress0() {
302         try {
303             Iterator<SocketAddress> i = ch.getAllLocalAddresses().iterator();
304             if (i.hasNext()) {
305                 return i.next();
306             }
307         } catch (IOException e) {
308             // ignore
309         }
310         return null;
311     }
312 
313     @Override
314     public Set<InetSocketAddress> allLocalAddresses() {
315         try {
316             final Set<SocketAddress> allLocalAddresses = ch.getAllLocalAddresses();
317             final Set<InetSocketAddress> addresses = new LinkedHashSet<InetSocketAddress>(allLocalAddresses.size());
318             for (SocketAddress socketAddress : allLocalAddresses) {
319                 addresses.add((InetSocketAddress) socketAddress);
320             }
321             return addresses;
322         } catch (Throwable ignored) {
323             return Collections.emptySet();
324         }
325     }
326 
327     @Override
328     protected SocketAddress remoteAddress0() {
329         try {
330             Iterator<SocketAddress> i = ch.getRemoteAddresses().iterator();
331             if (i.hasNext()) {
332                 return i.next();
333             }
334         } catch (IOException e) {
335             // ignore
336         }
337         return null;
338     }
339 
340     @Override
341     public Set<InetSocketAddress> allRemoteAddresses() {
342         try {
343             final Set<SocketAddress> allLocalAddresses = ch.getRemoteAddresses();
344             final Set<InetSocketAddress> addresses = new LinkedHashSet<InetSocketAddress>(allLocalAddresses.size());
345             for (SocketAddress socketAddress : allLocalAddresses) {
346                 addresses.add((InetSocketAddress) socketAddress);
347             }
348             return addresses;
349         } catch (Throwable ignored) {
350             return Collections.emptySet();
351         }
352     }
353 
354     @Override
355     protected void doBind(SocketAddress localAddress) throws Exception {
356         ch.bind(localAddress);
357     }
358 
359     @Override
360     protected void doConnect(SocketAddress remoteAddress,
361                              SocketAddress localAddress) throws Exception {
362         if (localAddress != null) {
363             ch.bind(localAddress);
364         }
365 
366         boolean success = false;
367         try {
368             ch.connect(remoteAddress);
369             boolean  finishConnect = false;
370             while (!finishConnect) {
371                 if (connectSelector.select(SO_TIMEOUT) >= 0) {
372                     final Set<SelectionKey> selectionKeys = connectSelector.selectedKeys();
373                     for (SelectionKey key : selectionKeys) {
374                        if (key.isConnectable()) {
375                            selectionKeys.clear();
376                            finishConnect = true;
377                            break;
378                        }
379                     }
380                     selectionKeys.clear();
381                 }
382             }
383             success = ch.finishConnect();
384         } finally {
385             if (!success) {
386                 doClose();
387             }
388         }
389     }
390 
391     @Override
392     protected void doDisconnect() throws Exception {
393         doClose();
394     }
395 
396     @Override
397     protected void doClose() throws Exception {
398         closeSelector("read", readSelector);
399         closeSelector("write", writeSelector);
400         closeSelector("connect", connectSelector);
401         ch.close();
402     }
403 
404     private static void closeSelector(String selectorName, Selector selector) {
405         try {
406             selector.close();
407         } catch (IOException e) {
408             if (logger.isWarnEnabled()) {
409                 logger.warn("Failed to close a " + selectorName + " selector.", e);
410             }
411         }
412     }
413 
414     @Override
415     public ChannelFuture bindAddress(InetAddress localAddress) {
416         return bindAddress(localAddress, newPromise());
417     }
418 
419     @Override
420     public ChannelFuture bindAddress(final InetAddress localAddress, final ChannelPromise promise) {
421         if (eventLoop().inEventLoop()) {
422             try {
423                 ch.bindAddress(localAddress);
424                 promise.setSuccess();
425             } catch (Throwable t) {
426                 promise.setFailure(t);
427             }
428         } else {
429             eventLoop().execute(new Runnable() {
430                 @Override
431                 public void run() {
432                     bindAddress(localAddress, promise);
433                 }
434             });
435         }
436         return promise;
437     }
438 
439     @Override
440     public ChannelFuture unbindAddress(InetAddress localAddress) {
441         return unbindAddress(localAddress, newPromise());
442     }
443 
444     @Override
445     public ChannelFuture unbindAddress(final InetAddress localAddress, final ChannelPromise promise) {
446         if (eventLoop().inEventLoop()) {
447             try {
448                 ch.unbindAddress(localAddress);
449                 promise.setSuccess();
450             } catch (Throwable t) {
451                 promise.setFailure(t);
452             }
453         } else {
454             eventLoop().execute(new Runnable() {
455                 @Override
456                 public void run() {
457                     unbindAddress(localAddress, promise);
458                 }
459             });
460         }
461         return promise;
462     }
463 
464     private final class OioSctpChannelConfig extends DefaultSctpChannelConfig {
465         private OioSctpChannelConfig(OioSctpChannel channel, SctpChannel javaChannel) {
466             super(channel, javaChannel);
467         }
468 
469         @Override
470         protected void autoReadCleared() {
471             clearReadPending();
472         }
473     }
474 }