1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.sctp.oio;
17
18 import com.sun.nio.sctp.Association;
19 import com.sun.nio.sctp.MessageInfo;
20 import com.sun.nio.sctp.NotificationHandler;
21 import com.sun.nio.sctp.SctpChannel;
22 import io.netty.buffer.ByteBuf;
23 import io.netty.channel.Channel;
24 import io.netty.channel.ChannelException;
25 import io.netty.channel.ChannelFuture;
26 import io.netty.channel.ChannelMetadata;
27 import io.netty.channel.ChannelOutboundBuffer;
28 import io.netty.channel.ChannelPromise;
29 import io.netty.channel.RecvByteBufAllocator;
30 import io.netty.channel.oio.AbstractOioMessageChannel;
31 import io.netty.channel.sctp.DefaultSctpChannelConfig;
32 import io.netty.channel.sctp.SctpChannelConfig;
33 import io.netty.channel.sctp.SctpMessage;
34 import io.netty.channel.sctp.SctpNotificationHandler;
35 import io.netty.channel.sctp.SctpServerChannel;
36 import io.netty.util.internal.PlatformDependent;
37 import io.netty.util.internal.StringUtil;
38 import io.netty.util.internal.logging.InternalLogger;
39 import io.netty.util.internal.logging.InternalLoggerFactory;
40
41 import java.io.IOException;
42 import java.net.InetAddress;
43 import java.net.InetSocketAddress;
44 import java.net.SocketAddress;
45 import java.nio.ByteBuffer;
46 import java.nio.channels.SelectionKey;
47 import java.nio.channels.Selector;
48 import java.util.Collections;
49 import java.util.Iterator;
50 import java.util.LinkedHashSet;
51 import java.util.List;
52 import java.util.Set;
53
54
55
56
57
58
59
60
61 public class OioSctpChannel extends AbstractOioMessageChannel
62 implements io.netty.channel.sctp.SctpChannel {
63
64 private static final InternalLogger logger =
65 InternalLoggerFactory.getInstance(OioSctpChannel.class);
66
67 private static final ChannelMetadata METADATA = new ChannelMetadata(false);
68 private static final String EXPECTED_TYPE = " (expected: " + StringUtil.simpleClassName(SctpMessage.class) + ')';
69
70 private final SctpChannel ch;
71 private final SctpChannelConfig config;
72
73 private final Selector readSelector;
74 private final Selector writeSelector;
75 private final Selector connectSelector;
76
77 private final NotificationHandler<?> notificationHandler;
78
79 private RecvByteBufAllocator.Handle allocHandle;
80
81 private static SctpChannel openChannel() {
82 try {
83 return SctpChannel.open();
84 } catch (IOException e) {
85 throw new ChannelException("Failed to open a sctp channel.", e);
86 }
87 }
88
89
90
91
92 public OioSctpChannel() {
93 this(openChannel());
94 }
95
96
97
98
99
100
101 public OioSctpChannel(SctpChannel ch) {
102 this(null, ch);
103 }
104
105
106
107
108
109
110
111
112 public OioSctpChannel(Channel parent, SctpChannel ch) {
113 super(parent);
114 this.ch = ch;
115 boolean success = false;
116 try {
117 ch.configureBlocking(false);
118 readSelector = Selector.open();
119 writeSelector = Selector.open();
120 connectSelector = Selector.open();
121
122 ch.register(readSelector, SelectionKey.OP_READ);
123 ch.register(writeSelector, SelectionKey.OP_WRITE);
124 ch.register(connectSelector, SelectionKey.OP_CONNECT);
125
126 config = new OioSctpChannelConfig(this, ch);
127 notificationHandler = new SctpNotificationHandler(this);
128 success = true;
129 } catch (Exception e) {
130 throw new ChannelException("failed to initialize a sctp channel", e);
131 } finally {
132 if (!success) {
133 try {
134 ch.close();
135 } catch (IOException e) {
136 logger.warn("Failed to close a sctp channel.", e);
137 }
138 }
139 }
140 }
141
142 @Override
143 public InetSocketAddress localAddress() {
144 return (InetSocketAddress) super.localAddress();
145 }
146
147 @Override
148 public InetSocketAddress remoteAddress() {
149 return (InetSocketAddress) super.remoteAddress();
150 }
151
152 @Override
153 public SctpServerChannel parent() {
154 return (SctpServerChannel) super.parent();
155 }
156
157 @Override
158 public ChannelMetadata metadata() {
159 return METADATA;
160 }
161
162 @Override
163 public SctpChannelConfig config() {
164 return config;
165 }
166
167 @Override
168 public boolean isOpen() {
169 return ch.isOpen();
170 }
171
172 @Override
173 protected int doReadMessages(List<Object> msgs) throws Exception {
174 if (!readSelector.isOpen()) {
175 return 0;
176 }
177
178 int readMessages = 0;
179
180 final int selectedKeys = readSelector.select(SO_TIMEOUT);
181 final boolean keysSelected = selectedKeys > 0;
182
183 if (!keysSelected) {
184 return readMessages;
185 }
186
187
188
189
190
191
192 readSelector.selectedKeys().clear();
193
194 RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
195 if (allocHandle == null) {
196 this.allocHandle = allocHandle = config().getRecvByteBufAllocator().newHandle();
197 }
198
199 ByteBuf buffer = allocHandle.allocate(config().getAllocator());
200 boolean free = true;
201
202 try {
203 ByteBuffer data = buffer.nioBuffer(buffer.writerIndex(), buffer.writableBytes());
204 MessageInfo messageInfo = ch.receive(data, null, notificationHandler);
205 if (messageInfo == null) {
206 return readMessages;
207 }
208
209 data.flip();
210 msgs.add(new SctpMessage(messageInfo, buffer.writerIndex(buffer.writerIndex() + data.remaining())));
211 free = false;
212 readMessages ++;
213 } catch (Throwable cause) {
214 PlatformDependent.throwException(cause);
215 } finally {
216 int bytesRead = buffer.readableBytes();
217 allocHandle.record(bytesRead);
218 if (free) {
219 buffer.release();
220 }
221 }
222 return readMessages;
223 }
224
225 @Override
226 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
227 if (!writeSelector.isOpen()) {
228 return;
229 }
230 final int size = in.size();
231 final int selectedKeys = writeSelector.select(SO_TIMEOUT);
232 if (selectedKeys > 0) {
233 final Set<SelectionKey> writableKeys = writeSelector.selectedKeys();
234 if (writableKeys.isEmpty()) {
235 return;
236 }
237 Iterator<SelectionKey> writableKeysIt = writableKeys.iterator();
238 int written = 0;
239 for (;;) {
240 if (written == size) {
241
242 return;
243 }
244 writableKeysIt.next();
245 writableKeysIt.remove();
246
247 SctpMessage packet = (SctpMessage) in.current();
248 if (packet == null) {
249 return;
250 }
251
252 ByteBuf data = packet.content();
253 int dataLen = data.readableBytes();
254 ByteBuffer nioData;
255
256 if (data.nioBufferCount() != -1) {
257 nioData = data.nioBuffer();
258 } else {
259 nioData = ByteBuffer.allocate(dataLen);
260 data.getBytes(data.readerIndex(), nioData);
261 nioData.flip();
262 }
263
264 final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier());
265 mi.payloadProtocolID(packet.protocolIdentifier());
266 mi.streamNumber(packet.streamIdentifier());
267 mi.unordered(packet.isUnordered());
268
269 ch.send(nioData, mi);
270 written ++;
271 in.remove();
272
273 if (!writableKeysIt.hasNext()) {
274 return;
275 }
276 }
277 }
278 }
279
280 @Override
281 protected Object filterOutboundMessage(Object msg) throws Exception {
282 if (msg instanceof SctpMessage) {
283 return msg;
284 }
285
286 throw new UnsupportedOperationException(
287 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPE);
288 }
289
290 @Override
291 public Association association() {
292 try {
293 return ch.association();
294 } catch (IOException ignored) {
295 return null;
296 }
297 }
298
299 @Override
300 public boolean isActive() {
301 return isOpen() && association() != null;
302 }
303
304 @Override
305 protected SocketAddress localAddress0() {
306 try {
307 Iterator<SocketAddress> i = ch.getAllLocalAddresses().iterator();
308 if (i.hasNext()) {
309 return i.next();
310 }
311 } catch (IOException e) {
312
313 }
314 return null;
315 }
316
317 @Override
318 public Set<InetSocketAddress> allLocalAddresses() {
319 try {
320 final Set<SocketAddress> allLocalAddresses = ch.getAllLocalAddresses();
321 final Set<InetSocketAddress> addresses = new LinkedHashSet<InetSocketAddress>(allLocalAddresses.size());
322 for (SocketAddress socketAddress : allLocalAddresses) {
323 addresses.add((InetSocketAddress) socketAddress);
324 }
325 return addresses;
326 } catch (Throwable ignored) {
327 return Collections.emptySet();
328 }
329 }
330
331 @Override
332 protected SocketAddress remoteAddress0() {
333 try {
334 Iterator<SocketAddress> i = ch.getRemoteAddresses().iterator();
335 if (i.hasNext()) {
336 return i.next();
337 }
338 } catch (IOException e) {
339
340 }
341 return null;
342 }
343
344 @Override
345 public Set<InetSocketAddress> allRemoteAddresses() {
346 try {
347 final Set<SocketAddress> allLocalAddresses = ch.getRemoteAddresses();
348 final Set<InetSocketAddress> addresses = new LinkedHashSet<InetSocketAddress>(allLocalAddresses.size());
349 for (SocketAddress socketAddress : allLocalAddresses) {
350 addresses.add((InetSocketAddress) socketAddress);
351 }
352 return addresses;
353 } catch (Throwable ignored) {
354 return Collections.emptySet();
355 }
356 }
357
358 @Override
359 protected void doBind(SocketAddress localAddress) throws Exception {
360 ch.bind(localAddress);
361 }
362
363 @Override
364 protected void doConnect(SocketAddress remoteAddress,
365 SocketAddress localAddress) throws Exception {
366 if (localAddress != null) {
367 ch.bind(localAddress);
368 }
369
370 boolean success = false;
371 try {
372 ch.connect(remoteAddress);
373 boolean finishConnect = false;
374 while (!finishConnect) {
375 if (connectSelector.select(SO_TIMEOUT) >= 0) {
376 final Set<SelectionKey> selectionKeys = connectSelector.selectedKeys();
377 for (SelectionKey key : selectionKeys) {
378 if (key.isConnectable()) {
379 selectionKeys.clear();
380 finishConnect = true;
381 break;
382 }
383 }
384 selectionKeys.clear();
385 }
386 }
387 success = ch.finishConnect();
388 } finally {
389 if (!success) {
390 doClose();
391 }
392 }
393 }
394
395 @Override
396 protected void doDisconnect() throws Exception {
397 doClose();
398 }
399
400 @Override
401 protected void doClose() throws Exception {
402 closeSelector("read", readSelector);
403 closeSelector("write", writeSelector);
404 closeSelector("connect", connectSelector);
405 ch.close();
406 }
407
408 private static void closeSelector(String selectorName, Selector selector) {
409 try {
410 selector.close();
411 } catch (IOException e) {
412 logger.warn("Failed to close a " + selectorName + " selector.", e);
413 }
414 }
415
416 @Override
417 public ChannelFuture bindAddress(InetAddress localAddress) {
418 return bindAddress(localAddress, newPromise());
419 }
420
421 @Override
422 public ChannelFuture bindAddress(final InetAddress localAddress, final ChannelPromise promise) {
423 if (eventLoop().inEventLoop()) {
424 try {
425 ch.bindAddress(localAddress);
426 promise.setSuccess();
427 } catch (Throwable t) {
428 promise.setFailure(t);
429 }
430 } else {
431 eventLoop().execute(new Runnable() {
432 @Override
433 public void run() {
434 bindAddress(localAddress, promise);
435 }
436 });
437 }
438 return promise;
439 }
440
441 @Override
442 public ChannelFuture unbindAddress(InetAddress localAddress) {
443 return unbindAddress(localAddress, newPromise());
444 }
445
446 @Override
447 public ChannelFuture unbindAddress(final InetAddress localAddress, final ChannelPromise promise) {
448 if (eventLoop().inEventLoop()) {
449 try {
450 ch.unbindAddress(localAddress);
451 promise.setSuccess();
452 } catch (Throwable t) {
453 promise.setFailure(t);
454 }
455 } else {
456 eventLoop().execute(new Runnable() {
457 @Override
458 public void run() {
459 unbindAddress(localAddress, promise);
460 }
461 });
462 }
463 return promise;
464 }
465
466 private final class OioSctpChannelConfig extends DefaultSctpChannelConfig {
467 private OioSctpChannelConfig(OioSctpChannel channel, SctpChannel javaChannel) {
468 super(channel, javaChannel);
469 }
470
471 @Override
472 protected void autoReadCleared() {
473 setReadPending(false);
474 }
475 }
476 }