View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.sctp.oio;
17  
18  import com.sun.nio.sctp.Association;
19  import com.sun.nio.sctp.MessageInfo;
20  import com.sun.nio.sctp.NotificationHandler;
21  import com.sun.nio.sctp.SctpChannel;
22  
23  import io.netty.buffer.ByteBuf;
24  import io.netty.channel.Channel;
25  import io.netty.channel.ChannelException;
26  import io.netty.channel.ChannelFuture;
27  import io.netty.channel.ChannelMetadata;
28  import io.netty.channel.ChannelOutboundBuffer;
29  import io.netty.channel.ChannelPromise;
30  import io.netty.channel.RecvByteBufAllocator;
31  import io.netty.channel.oio.AbstractOioMessageChannel;
32  import io.netty.channel.sctp.DefaultSctpChannelConfig;
33  import io.netty.channel.sctp.SctpChannelConfig;
34  import io.netty.channel.sctp.SctpMessage;
35  import io.netty.channel.sctp.SctpNotificationHandler;
36  import io.netty.channel.sctp.SctpServerChannel;
37  import io.netty.util.internal.PlatformDependent;
38  import io.netty.util.internal.StringUtil;
39  import io.netty.util.internal.logging.InternalLogger;
40  import io.netty.util.internal.logging.InternalLoggerFactory;
41  
42  import java.io.IOException;
43  import java.net.InetAddress;
44  import java.net.InetSocketAddress;
45  import java.net.SocketAddress;
46  import java.nio.ByteBuffer;
47  import java.nio.channels.SelectionKey;
48  import java.nio.channels.Selector;
49  import java.util.Collections;
50  import java.util.Iterator;
51  import java.util.LinkedHashSet;
52  import java.util.List;
53  import java.util.Set;
54  
55  /**
56   * {@link io.netty.channel.sctp.SctpChannel} implementation which use blocking mode and allows to read / write
57   * {@link SctpMessage}s to the underlying {@link SctpChannel}.
58   *
59   * Be aware that not all operations systems support SCTP. Please refer to the documentation of your operation system,
60   * to understand what you need to do to use it. Also this feature is only supported on Java 7+.
61   *
62   * @deprecated use {@link io.netty.channel.sctp.nio.NioSctpChannel}.
63   */
64  @Deprecated
65  public class OioSctpChannel extends AbstractOioMessageChannel
66          implements io.netty.channel.sctp.SctpChannel {
67  
68      private static final InternalLogger logger =
69              InternalLoggerFactory.getInstance(OioSctpChannel.class);
70  
71      private static final ChannelMetadata METADATA = new ChannelMetadata(false);
72      private static final String EXPECTED_TYPE = " (expected: " + StringUtil.simpleClassName(SctpMessage.class) + ')';
73  
74      private final SctpChannel ch;
75      private final SctpChannelConfig config;
76  
77      private final Selector readSelector;
78      private final Selector writeSelector;
79      private final Selector connectSelector;
80  
81      private final NotificationHandler<?> notificationHandler;
82      private ByteBuffer inputCopy;
83      private ByteBuffer outputCopy;
84  
85      private static SctpChannel openChannel() {
86          try {
87              return SctpChannel.open();
88          } catch (IOException e) {
89              throw new ChannelException("Failed to open a sctp channel.", e);
90          }
91      }
92  
93      /**
94       * Create a new instance with an new {@link SctpChannel}.
95       */
96      public OioSctpChannel() {
97          this(openChannel());
98      }
99  
100     /**
101      * Create a new instance from the given {@link SctpChannel}.
102      *
103      * @param ch    the {@link SctpChannel} which is used by this instance
104      */
105     public OioSctpChannel(SctpChannel ch) {
106         this(null, ch);
107     }
108 
109     /**
110      * Create a new instance from the given {@link SctpChannel}.
111      *
112      * @param parent    the parent {@link Channel} which was used to create this instance. This can be null if the
113      *                  {@link} has no parent as it was created by your self.
114      * @param ch        the {@link SctpChannel} which is used by this instance
115      */
116     public OioSctpChannel(Channel parent, SctpChannel ch) {
117         super(parent);
118         this.ch = ch;
119         boolean success = false;
120         try {
121             ch.configureBlocking(false);
122             readSelector = Selector.open();
123             writeSelector = Selector.open();
124             connectSelector = Selector.open();
125 
126             ch.register(readSelector, SelectionKey.OP_READ);
127             ch.register(writeSelector, SelectionKey.OP_WRITE);
128             ch.register(connectSelector, SelectionKey.OP_CONNECT);
129 
130             config = new OioSctpChannelConfig(this, ch);
131             notificationHandler = new SctpNotificationHandler(this);
132             success = true;
133         } catch (Exception e) {
134             throw new ChannelException("failed to initialize a sctp channel", e);
135         } finally {
136             if (!success) {
137                 try {
138                     ch.close();
139                 } catch (IOException e) {
140                     logger.warn("Failed to close a sctp channel.", e);
141                 }
142             }
143         }
144     }
145 
146     @Override
147     public InetSocketAddress localAddress() {
148         return (InetSocketAddress) super.localAddress();
149     }
150 
151     @Override
152     public InetSocketAddress remoteAddress() {
153         return (InetSocketAddress) super.remoteAddress();
154     }
155 
156     @Override
157     public SctpServerChannel parent() {
158         return (SctpServerChannel) super.parent();
159     }
160 
161     @Override
162     public ChannelMetadata metadata() {
163         return METADATA;
164     }
165 
166     @Override
167     public SctpChannelConfig config() {
168         return config;
169     }
170 
171     @Override
172     public boolean isOpen() {
173         return ch.isOpen();
174     }
175 
176     @Override
177     protected int doReadMessages(List<Object> msgs) throws Exception {
178         if (!readSelector.isOpen()) {
179             return 0;
180         }
181 
182         int readMessages = 0;
183 
184         final int selectedKeys = readSelector.select(SO_TIMEOUT);
185         final boolean keysSelected = selectedKeys > 0;
186 
187         if (!keysSelected) {
188             return readMessages;
189         }
190         // We must clear the selectedKeys because the Selector will never do it. If we do not clear it, the selectionKey
191         // will always be returned even if there is no data can be read which causes performance issue. And in some
192         // implementation of Selector, the select method may return 0 if the selectionKey which is ready for process has
193         // already been in the selectedKeys and cause the keysSelected above to be false even if we actually have
194         // something to read.
195         readSelector.selectedKeys().clear();
196         final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
197         ByteBuf buffer = allocHandle.allocate(config().getAllocator());
198         boolean free = true;
199 
200         try {
201             ByteBuffer data = buffer.nioBuffer(buffer.writerIndex(), buffer.writableBytes());
202             boolean useInputCopy = false;
203             int javaVersion = PlatformDependent.javaVersion();
204             if (javaVersion >= 22 && javaVersion < 25 && data.isDirect()) {
205                 // On Java 22 through 24, we need to avoid using ByteBuffer instances that are
206                 // backed by MemorySegments, because of https://bugs.openjdk.org/browse/JDK-8357268
207                 if (inputCopy == null || inputCopy.capacity() < data.remaining()) {
208                     inputCopy = ByteBuffer.allocateDirect(data.remaining());
209                 }
210                 inputCopy.clear();
211                 inputCopy.limit(data.remaining());
212                 useInputCopy = true;
213             }
214             MessageInfo messageInfo = ch.receive(useInputCopy ? inputCopy : data, null, notificationHandler);
215             if (messageInfo == null) {
216                 return readMessages;
217             }
218             if (useInputCopy) {
219                 inputCopy.flip();
220                 data.put(inputCopy);
221             }
222 
223             data.flip();
224             allocHandle.lastBytesRead(data.remaining());
225             msgs.add(new SctpMessage(messageInfo,
226                     buffer.writerIndex(buffer.writerIndex() + allocHandle.lastBytesRead())));
227             free = false;
228             ++readMessages;
229         } catch (Throwable cause) {
230             PlatformDependent.throwException(cause);
231         }  finally {
232             if (free) {
233                 buffer.release();
234             }
235         }
236         return readMessages;
237     }
238 
239     @Override
240     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
241         if (!writeSelector.isOpen()) {
242             return;
243         }
244         final int size = in.size();
245         final int selectedKeys = writeSelector.select(SO_TIMEOUT);
246         if (selectedKeys > 0) {
247             final Set<SelectionKey> writableKeys = writeSelector.selectedKeys();
248             if (writableKeys.isEmpty()) {
249                 return;
250             }
251             Iterator<SelectionKey> writableKeysIt = writableKeys.iterator();
252             int written = 0;
253             for (;;) {
254                 if (written == size) {
255                     // all written
256                     return;
257                 }
258                 writableKeysIt.next();
259                 writableKeysIt.remove();
260 
261                 SctpMessage packet = (SctpMessage) in.current();
262                 if (packet == null) {
263                     return;
264                 }
265 
266                 ByteBuf data = packet.content();
267                 int dataLen = data.readableBytes();
268                 ByteBuffer nioData;
269 
270                 int javaVersion = PlatformDependent.javaVersion();
271                 if (javaVersion >= 22 && javaVersion < 25 && data.isDirect() ||
272                         !data.isDirect() || data.nioBufferCount() != 1) {
273                     // Ensure that we only use a single, direct ByteBuffer when doing SCTP IO.
274                     // If the ByteBuf is composite, or is on-heap, we do a copy.
275                     // On Java 22 through 24, we additionally need to avoid using ByteBuffer instances that are
276                     // backed by MemorySegments, because of https://bugs.openjdk.org/browse/JDK-8357268
277                     if (outputCopy == null || outputCopy.capacity() < dataLen) {
278                         outputCopy = ByteBuffer.allocateDirect(dataLen);
279                     }
280                     outputCopy.clear();
281                     outputCopy.limit(dataLen);
282                     data.readBytes(outputCopy);
283                     outputCopy.flip();
284                     nioData = outputCopy;
285                 } else {
286                     nioData = data.nioBuffer();
287                 }
288 
289                 final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier());
290                 mi.payloadProtocolID(packet.protocolIdentifier());
291                 mi.streamNumber(packet.streamIdentifier());
292                 mi.unordered(packet.isUnordered());
293 
294                 ch.send(nioData, mi);
295                 written ++;
296                 in.remove();
297 
298                 if (!writableKeysIt.hasNext()) {
299                     return;
300                 }
301             }
302         }
303     }
304 
305     @Override
306     protected Object filterOutboundMessage(Object msg) throws Exception {
307         if (msg instanceof SctpMessage) {
308             return msg;
309         }
310 
311         throw new UnsupportedOperationException(
312                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPE);
313     }
314 
315     @Override
316     public Association association() {
317         try {
318             return ch.association();
319         } catch (IOException ignored) {
320             return null;
321         }
322     }
323 
324     @Override
325     public boolean isActive() {
326         return isOpen() && association() != null;
327     }
328 
329     @Override
330     protected SocketAddress localAddress0() {
331         try {
332             Iterator<SocketAddress> i = ch.getAllLocalAddresses().iterator();
333             if (i.hasNext()) {
334                 return i.next();
335             }
336         } catch (IOException e) {
337             // ignore
338         }
339         return null;
340     }
341 
342     @Override
343     public Set<InetSocketAddress> allLocalAddresses() {
344         try {
345             final Set<SocketAddress> allLocalAddresses = ch.getAllLocalAddresses();
346             final Set<InetSocketAddress> addresses = new LinkedHashSet<InetSocketAddress>(allLocalAddresses.size());
347             for (SocketAddress socketAddress : allLocalAddresses) {
348                 addresses.add((InetSocketAddress) socketAddress);
349             }
350             return addresses;
351         } catch (Throwable ignored) {
352             return Collections.emptySet();
353         }
354     }
355 
356     @Override
357     protected SocketAddress remoteAddress0() {
358         try {
359             Iterator<SocketAddress> i = ch.getRemoteAddresses().iterator();
360             if (i.hasNext()) {
361                 return i.next();
362             }
363         } catch (IOException e) {
364             // ignore
365         }
366         return null;
367     }
368 
369     @Override
370     public Set<InetSocketAddress> allRemoteAddresses() {
371         try {
372             final Set<SocketAddress> allLocalAddresses = ch.getRemoteAddresses();
373             final Set<InetSocketAddress> addresses = new LinkedHashSet<InetSocketAddress>(allLocalAddresses.size());
374             for (SocketAddress socketAddress : allLocalAddresses) {
375                 addresses.add((InetSocketAddress) socketAddress);
376             }
377             return addresses;
378         } catch (Throwable ignored) {
379             return Collections.emptySet();
380         }
381     }
382 
383     @Override
384     protected void doBind(SocketAddress localAddress) throws Exception {
385         ch.bind(localAddress);
386     }
387 
388     @Override
389     protected void doConnect(SocketAddress remoteAddress,
390                              SocketAddress localAddress) throws Exception {
391         if (localAddress != null) {
392             ch.bind(localAddress);
393         }
394 
395         boolean success = false;
396         try {
397             ch.connect(remoteAddress);
398             boolean  finishConnect = false;
399             while (!finishConnect) {
400                 if (connectSelector.select(SO_TIMEOUT) >= 0) {
401                     final Set<SelectionKey> selectionKeys = connectSelector.selectedKeys();
402                     for (SelectionKey key : selectionKeys) {
403                        if (key.isConnectable()) {
404                            selectionKeys.clear();
405                            finishConnect = true;
406                            break;
407                        }
408                     }
409                     selectionKeys.clear();
410                 }
411             }
412             success = ch.finishConnect();
413         } finally {
414             if (!success) {
415                 doClose();
416             }
417         }
418     }
419 
420     @Override
421     protected void doDisconnect() throws Exception {
422         doClose();
423     }
424 
425     @Override
426     protected void doClose() throws Exception {
427         closeSelector("read", readSelector);
428         closeSelector("write", writeSelector);
429         closeSelector("connect", connectSelector);
430         ch.close();
431     }
432 
433     private static void closeSelector(String selectorName, Selector selector) {
434         try {
435             selector.close();
436         } catch (IOException e) {
437             if (logger.isWarnEnabled()) {
438                 logger.warn("Failed to close a " + selectorName + " selector.", e);
439             }
440         }
441     }
442 
443     @Override
444     public ChannelFuture bindAddress(InetAddress localAddress) {
445         return bindAddress(localAddress, newPromise());
446     }
447 
448     @Override
449     public ChannelFuture bindAddress(final InetAddress localAddress, final ChannelPromise promise) {
450         if (eventLoop().inEventLoop()) {
451             try {
452                 ch.bindAddress(localAddress);
453                 promise.setSuccess();
454             } catch (Throwable t) {
455                 promise.setFailure(t);
456             }
457         } else {
458             eventLoop().execute(new Runnable() {
459                 @Override
460                 public void run() {
461                     bindAddress(localAddress, promise);
462                 }
463             });
464         }
465         return promise;
466     }
467 
468     @Override
469     public ChannelFuture unbindAddress(InetAddress localAddress) {
470         return unbindAddress(localAddress, newPromise());
471     }
472 
473     @Override
474     public ChannelFuture unbindAddress(final InetAddress localAddress, final ChannelPromise promise) {
475         if (eventLoop().inEventLoop()) {
476             try {
477                 ch.unbindAddress(localAddress);
478                 promise.setSuccess();
479             } catch (Throwable t) {
480                 promise.setFailure(t);
481             }
482         } else {
483             eventLoop().execute(new Runnable() {
484                 @Override
485                 public void run() {
486                     unbindAddress(localAddress, promise);
487                 }
488             });
489         }
490         return promise;
491     }
492 
493     private final class OioSctpChannelConfig extends DefaultSctpChannelConfig {
494         private OioSctpChannelConfig(OioSctpChannel channel, SctpChannel javaChannel) {
495             super(channel, javaChannel);
496         }
497 
498         @Override
499         protected void autoReadCleared() {
500             clearReadPending();
501         }
502     }
503 }