1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.sctp.oio;
17
18 import com.sun.nio.sctp.SctpChannel;
19 import com.sun.nio.sctp.SctpServerChannel;
20 import io.netty.channel.ChannelException;
21 import io.netty.channel.ChannelFuture;
22 import io.netty.channel.ChannelMetadata;
23 import io.netty.channel.ChannelOutboundBuffer;
24 import io.netty.channel.ChannelPromise;
25 import io.netty.channel.oio.AbstractOioMessageChannel;
26 import io.netty.channel.sctp.DefaultSctpServerChannelConfig;
27 import io.netty.channel.sctp.SctpServerChannelConfig;
28 import io.netty.util.internal.logging.InternalLogger;
29 import io.netty.util.internal.logging.InternalLoggerFactory;
30
31 import java.io.IOException;
32 import java.net.InetAddress;
33 import java.net.InetSocketAddress;
34 import java.net.SocketAddress;
35 import java.nio.channels.SelectionKey;
36 import java.nio.channels.Selector;
37 import java.util.Collections;
38 import java.util.Iterator;
39 import java.util.LinkedHashSet;
40 import java.util.List;
41 import java.util.Set;
42
43
44
45
46
47
48
49
50 public class OioSctpServerChannel extends AbstractOioMessageChannel
51 implements io.netty.channel.sctp.SctpServerChannel {
52
53 private static final InternalLogger logger =
54 InternalLoggerFactory.getInstance(OioSctpServerChannel.class);
55
56 private static final ChannelMetadata METADATA = new ChannelMetadata(false);
57
58 private static SctpServerChannel newServerSocket() {
59 try {
60 return SctpServerChannel.open();
61 } catch (IOException e) {
62 throw new ChannelException("failed to create a sctp server channel", e);
63 }
64 }
65
66 private final SctpServerChannel sch;
67 private final SctpServerChannelConfig config;
68 private final Selector selector;
69
70
71
72
73 public OioSctpServerChannel() {
74 this(newServerSocket());
75 }
76
77
78
79
80
81
82 public OioSctpServerChannel(SctpServerChannel sch) {
83 super(null);
84 if (sch == null) {
85 throw new NullPointerException("sctp server channel");
86 }
87
88 this.sch = sch;
89 boolean success = false;
90 try {
91 sch.configureBlocking(false);
92 selector = Selector.open();
93 sch.register(selector, SelectionKey.OP_ACCEPT);
94 config = new OioSctpServerChannelConfig(this, sch);
95 success = true;
96 } catch (Exception e) {
97 throw new ChannelException("failed to initialize a sctp server channel", e);
98 } finally {
99 if (!success) {
100 try {
101 sch.close();
102 } catch (IOException e) {
103 logger.warn("Failed to close a sctp server channel.", e);
104 }
105 }
106 }
107 }
108
109 @Override
110 public ChannelMetadata metadata() {
111 return METADATA;
112 }
113
114 @Override
115 public SctpServerChannelConfig config() {
116 return config;
117 }
118
119 @Override
120 public InetSocketAddress remoteAddress() {
121 return null;
122 }
123
124 @Override
125 public InetSocketAddress localAddress() {
126 return (InetSocketAddress) super.localAddress();
127 }
128
129 @Override
130 public boolean isOpen() {
131 return sch.isOpen();
132 }
133
134 @Override
135 protected SocketAddress localAddress0() {
136 try {
137 Iterator<SocketAddress> i = sch.getAllLocalAddresses().iterator();
138 if (i.hasNext()) {
139 return i.next();
140 }
141 } catch (IOException e) {
142
143 }
144 return null;
145 }
146
147 @Override
148 public Set<InetSocketAddress> allLocalAddresses() {
149 try {
150 final Set<SocketAddress> allLocalAddresses = sch.getAllLocalAddresses();
151 final Set<InetSocketAddress> addresses = new LinkedHashSet<InetSocketAddress>(allLocalAddresses.size());
152 for (SocketAddress socketAddress : allLocalAddresses) {
153 addresses.add((InetSocketAddress) socketAddress);
154 }
155 return addresses;
156 } catch (Throwable ignored) {
157 return Collections.emptySet();
158 }
159 }
160
161 @Override
162 public boolean isActive() {
163 return isOpen() && localAddress0() != null;
164 }
165
166 @Override
167 protected void doBind(SocketAddress localAddress) throws Exception {
168 sch.bind(localAddress, config.getBacklog());
169 }
170
171 @Override
172 protected void doClose() throws Exception {
173 try {
174 selector.close();
175 } catch (IOException e) {
176 logger.warn("Failed to close a selector.", e);
177 }
178 sch.close();
179 }
180
181 @Override
182 protected int doReadMessages(List<Object> buf) throws Exception {
183 if (!isActive()) {
184 return -1;
185 }
186
187 SctpChannel s = null;
188 int acceptedChannels = 0;
189 try {
190 final int selectedKeys = selector.select(SO_TIMEOUT);
191 if (selectedKeys > 0) {
192 final Iterator<SelectionKey> selectionKeys = selector.selectedKeys().iterator();
193 for (;;) {
194 SelectionKey key = selectionKeys.next();
195 selectionKeys.remove();
196 if (key.isAcceptable()) {
197 s = sch.accept();
198 if (s != null) {
199 buf.add(new OioSctpChannel(this, s));
200 acceptedChannels ++;
201 }
202 }
203 if (!selectionKeys.hasNext()) {
204 return acceptedChannels;
205 }
206 }
207 }
208 } catch (Throwable t) {
209 logger.warn("Failed to create a new channel from an accepted sctp channel.", t);
210 if (s != null) {
211 try {
212 s.close();
213 } catch (Throwable t2) {
214 logger.warn("Failed to close a sctp channel.", t2);
215 }
216 }
217 }
218
219 return acceptedChannels;
220 }
221
222 @Override
223 public ChannelFuture bindAddress(InetAddress localAddress) {
224 return bindAddress(localAddress, newPromise());
225 }
226
227 @Override
228 public ChannelFuture bindAddress(final InetAddress localAddress, final ChannelPromise promise) {
229 if (eventLoop().inEventLoop()) {
230 try {
231 sch.bindAddress(localAddress);
232 promise.setSuccess();
233 } catch (Throwable t) {
234 promise.setFailure(t);
235 }
236 } else {
237 eventLoop().execute(new Runnable() {
238 @Override
239 public void run() {
240 bindAddress(localAddress, promise);
241 }
242 });
243 }
244 return promise;
245 }
246
247 @Override
248 public ChannelFuture unbindAddress(InetAddress localAddress) {
249 return unbindAddress(localAddress, newPromise());
250 }
251
252 @Override
253 public ChannelFuture unbindAddress(final InetAddress localAddress, final ChannelPromise promise) {
254 if (eventLoop().inEventLoop()) {
255 try {
256 sch.unbindAddress(localAddress);
257 promise.setSuccess();
258 } catch (Throwable t) {
259 promise.setFailure(t);
260 }
261 } else {
262 eventLoop().execute(new Runnable() {
263 @Override
264 public void run() {
265 unbindAddress(localAddress, promise);
266 }
267 });
268 }
269 return promise;
270 }
271
272 @Override
273 protected void doConnect(
274 SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
275 throw new UnsupportedOperationException();
276 }
277
278 @Override
279 protected SocketAddress remoteAddress0() {
280 return null;
281 }
282
283 @Override
284 protected void doDisconnect() throws Exception {
285 throw new UnsupportedOperationException();
286 }
287
288 @Override
289 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
290 throw new UnsupportedOperationException();
291 }
292
293 @Override
294 protected Object filterOutboundMessage(Object msg) throws Exception {
295 throw new UnsupportedOperationException();
296 }
297
298 private final class OioSctpServerChannelConfig extends DefaultSctpServerChannelConfig {
299 private OioSctpServerChannelConfig(OioSctpServerChannel channel, SctpServerChannel javaChannel) {
300 super(channel, javaChannel);
301 }
302
303 @Override
304 protected void autoReadCleared() {
305 setReadPending(false);
306 }
307 }
308 }