View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.sctp.oio;
17  
18  import com.sun.nio.sctp.SctpChannel;
19  import com.sun.nio.sctp.SctpServerChannel;
20  import io.netty.channel.ChannelException;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelMetadata;
23  import io.netty.channel.ChannelOutboundBuffer;
24  import io.netty.channel.ChannelPromise;
25  import io.netty.channel.oio.AbstractOioMessageChannel;
26  import io.netty.channel.sctp.DefaultSctpServerChannelConfig;
27  import io.netty.channel.sctp.SctpServerChannelConfig;
28  import io.netty.util.internal.logging.InternalLogger;
29  import io.netty.util.internal.logging.InternalLoggerFactory;
30  
31  import java.io.IOException;
32  import java.net.InetAddress;
33  import java.net.InetSocketAddress;
34  import java.net.SocketAddress;
35  import java.nio.channels.SelectionKey;
36  import java.nio.channels.Selector;
37  import java.util.Collections;
38  import java.util.Iterator;
39  import java.util.LinkedHashSet;
40  import java.util.List;
41  import java.util.Set;
42  
43  /**
44   * {@link io.netty.channel.sctp.SctpServerChannel} implementation which use blocking mode to accept new
45   * connections and create the {@link OioSctpChannel} for them.
46   *
47   * Be aware that not all operations systems support SCTP. Please refer to the documentation of your operation system,
48   * to understand what you need to do to use it. Also this feature is only supported on Java 7+.
49   */
50  public class OioSctpServerChannel extends AbstractOioMessageChannel
51          implements io.netty.channel.sctp.SctpServerChannel {
52  
53      private static final InternalLogger logger =
54              InternalLoggerFactory.getInstance(OioSctpServerChannel.class);
55  
56      private static final ChannelMetadata METADATA = new ChannelMetadata(false);
57  
58      private static SctpServerChannel newServerSocket() {
59          try {
60              return SctpServerChannel.open();
61          } catch (IOException e) {
62              throw new ChannelException("failed to create a sctp server channel", e);
63          }
64      }
65  
66      private final SctpServerChannel sch;
67      private final SctpServerChannelConfig config;
68      private final Selector selector;
69  
70      /**
71       * Create a new instance with an new {@link SctpServerChannel}
72       */
73      public OioSctpServerChannel() {
74          this(newServerSocket());
75      }
76  
77      /**
78       * Create a new instance from the given {@link SctpServerChannel}
79       *
80       * @param sch    the {@link SctpServerChannel} which is used by this instance
81       */
82      public OioSctpServerChannel(SctpServerChannel sch) {
83          super(null);
84          if (sch == null) {
85              throw new NullPointerException("sctp server channel");
86          }
87  
88          this.sch = sch;
89          boolean success = false;
90          try {
91              sch.configureBlocking(false);
92              selector = Selector.open();
93              sch.register(selector, SelectionKey.OP_ACCEPT);
94              config = new OioSctpServerChannelConfig(this, sch);
95              success = true;
96          } catch (Exception e) {
97              throw new ChannelException("failed to initialize a sctp server channel", e);
98          } finally {
99              if (!success) {
100                 try {
101                     sch.close();
102                 } catch (IOException e) {
103                     logger.warn("Failed to close a sctp server channel.", e);
104                 }
105             }
106         }
107     }
108 
109     @Override
110     public ChannelMetadata metadata() {
111         return METADATA;
112     }
113 
114     @Override
115     public SctpServerChannelConfig config() {
116         return config;
117     }
118 
119     @Override
120     public InetSocketAddress remoteAddress() {
121         return null;
122     }
123 
124     @Override
125     public InetSocketAddress localAddress() {
126         return (InetSocketAddress) super.localAddress();
127     }
128 
129     @Override
130     public boolean isOpen() {
131         return sch.isOpen();
132     }
133 
134     @Override
135     protected SocketAddress localAddress0() {
136         try {
137             Iterator<SocketAddress> i = sch.getAllLocalAddresses().iterator();
138             if (i.hasNext()) {
139                 return i.next();
140             }
141         } catch (IOException e) {
142             // ignore
143         }
144         return null;
145     }
146 
147     @Override
148     public Set<InetSocketAddress> allLocalAddresses() {
149         try {
150             final Set<SocketAddress> allLocalAddresses = sch.getAllLocalAddresses();
151             final Set<InetSocketAddress> addresses = new LinkedHashSet<InetSocketAddress>(allLocalAddresses.size());
152             for (SocketAddress socketAddress : allLocalAddresses) {
153                 addresses.add((InetSocketAddress) socketAddress);
154             }
155             return addresses;
156         } catch (Throwable ignored) {
157             return Collections.emptySet();
158         }
159     }
160 
161     @Override
162     public boolean isActive() {
163         return isOpen() && localAddress0() != null;
164     }
165 
166     @Override
167     protected void doBind(SocketAddress localAddress) throws Exception {
168         sch.bind(localAddress, config.getBacklog());
169     }
170 
171     @Override
172     protected void doClose() throws Exception {
173         try {
174             selector.close();
175         } catch (IOException e) {
176             logger.warn("Failed to close a selector.", e);
177         }
178         sch.close();
179     }
180 
181     @Override
182     protected int doReadMessages(List<Object> buf) throws Exception {
183         if (!isActive()) {
184             return -1;
185         }
186 
187         SctpChannel s = null;
188         int acceptedChannels = 0;
189         try {
190             final int selectedKeys = selector.select(SO_TIMEOUT);
191             if (selectedKeys > 0) {
192                 final Iterator<SelectionKey> selectionKeys = selector.selectedKeys().iterator();
193                 for (;;) {
194                     SelectionKey key = selectionKeys.next();
195                     selectionKeys.remove();
196                     if (key.isAcceptable()) {
197                         s = sch.accept();
198                         if (s != null) {
199                             buf.add(new OioSctpChannel(this, s));
200                             acceptedChannels ++;
201                         }
202                     }
203                     if (!selectionKeys.hasNext()) {
204                         return acceptedChannels;
205                     }
206                 }
207             }
208         } catch (Throwable t) {
209             logger.warn("Failed to create a new channel from an accepted sctp channel.", t);
210             if (s != null) {
211                 try {
212                     s.close();
213                 } catch (Throwable t2) {
214                     logger.warn("Failed to close a sctp channel.", t2);
215                 }
216             }
217         }
218 
219         return acceptedChannels;
220     }
221 
222     @Override
223     public ChannelFuture bindAddress(InetAddress localAddress) {
224         return bindAddress(localAddress, newPromise());
225     }
226 
227     @Override
228     public ChannelFuture bindAddress(final InetAddress localAddress, final ChannelPromise promise) {
229         if (eventLoop().inEventLoop()) {
230             try {
231                 sch.bindAddress(localAddress);
232                 promise.setSuccess();
233             } catch (Throwable t) {
234                 promise.setFailure(t);
235             }
236         } else {
237             eventLoop().execute(new Runnable() {
238                 @Override
239                 public void run() {
240                     bindAddress(localAddress, promise);
241                 }
242             });
243         }
244         return promise;
245     }
246 
247     @Override
248     public ChannelFuture unbindAddress(InetAddress localAddress) {
249         return unbindAddress(localAddress, newPromise());
250     }
251 
252     @Override
253     public ChannelFuture unbindAddress(final InetAddress localAddress, final ChannelPromise promise) {
254         if (eventLoop().inEventLoop()) {
255             try {
256                 sch.unbindAddress(localAddress);
257                 promise.setSuccess();
258             } catch (Throwable t) {
259                 promise.setFailure(t);
260             }
261         } else {
262             eventLoop().execute(new Runnable() {
263                 @Override
264                 public void run() {
265                     unbindAddress(localAddress, promise);
266                 }
267             });
268         }
269         return promise;
270     }
271 
272     @Override
273     protected void doConnect(
274             SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
275         throw new UnsupportedOperationException();
276     }
277 
278     @Override
279     protected SocketAddress remoteAddress0() {
280         return null;
281     }
282 
283     @Override
284     protected void doDisconnect() throws Exception {
285         throw new UnsupportedOperationException();
286     }
287 
288     @Override
289     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
290         throw new UnsupportedOperationException();
291     }
292 
293     @Override
294     protected Object filterOutboundMessage(Object msg) throws Exception {
295         throw new UnsupportedOperationException();
296     }
297 
298     private final class OioSctpServerChannelConfig extends DefaultSctpServerChannelConfig {
299         private OioSctpServerChannelConfig(OioSctpServerChannel channel, SctpServerChannel javaChannel) {
300             super(channel, javaChannel);
301         }
302 
303         @Override
304         protected void autoReadCleared() {
305             setReadPending(false);
306         }
307     }
308 }