1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.sctp.oio;
17
18 import com.sun.nio.sctp.Association;
19 import com.sun.nio.sctp.MessageInfo;
20 import com.sun.nio.sctp.NotificationHandler;
21 import com.sun.nio.sctp.SctpChannel;
22
23 import io.netty.buffer.ByteBuf;
24 import io.netty.channel.Channel;
25 import io.netty.channel.ChannelException;
26 import io.netty.channel.ChannelFuture;
27 import io.netty.channel.ChannelMetadata;
28 import io.netty.channel.ChannelOutboundBuffer;
29 import io.netty.channel.ChannelPromise;
30 import io.netty.channel.RecvByteBufAllocator;
31 import io.netty.channel.oio.AbstractOioMessageChannel;
32 import io.netty.channel.sctp.DefaultSctpChannelConfig;
33 import io.netty.channel.sctp.SctpChannelConfig;
34 import io.netty.channel.sctp.SctpMessage;
35 import io.netty.channel.sctp.SctpNotificationHandler;
36 import io.netty.channel.sctp.SctpServerChannel;
37 import io.netty.util.internal.PlatformDependent;
38 import io.netty.util.internal.StringUtil;
39 import io.netty.util.internal.logging.InternalLogger;
40 import io.netty.util.internal.logging.InternalLoggerFactory;
41
42 import java.io.IOException;
43 import java.net.InetAddress;
44 import java.net.InetSocketAddress;
45 import java.net.SocketAddress;
46 import java.nio.ByteBuffer;
47 import java.nio.channels.SelectionKey;
48 import java.nio.channels.Selector;
49 import java.util.Collections;
50 import java.util.Iterator;
51 import java.util.LinkedHashSet;
52 import java.util.List;
53 import java.util.Set;
54
55
56
57
58
59
60
61
62
63
64 @Deprecated
65 public class OioSctpChannel extends AbstractOioMessageChannel
66 implements io.netty.channel.sctp.SctpChannel {
67
68 private static final InternalLogger logger =
69 InternalLoggerFactory.getInstance(OioSctpChannel.class);
70
71 private static final ChannelMetadata METADATA = new ChannelMetadata(false);
72 private static final String EXPECTED_TYPE = " (expected: " + StringUtil.simpleClassName(SctpMessage.class) + ')';
73
74 private final SctpChannel ch;
75 private final SctpChannelConfig config;
76
77 private final Selector readSelector;
78 private final Selector writeSelector;
79 private final Selector connectSelector;
80
81 private final NotificationHandler<?> notificationHandler;
82 private ByteBuffer inputCopy;
83 private ByteBuffer outputCopy;
84
85 private static SctpChannel openChannel() {
86 try {
87 return SctpChannel.open();
88 } catch (IOException e) {
89 throw new ChannelException("Failed to open a sctp channel.", e);
90 }
91 }
92
93
94
95
96 public OioSctpChannel() {
97 this(openChannel());
98 }
99
100
101
102
103
104
105 public OioSctpChannel(SctpChannel ch) {
106 this(null, ch);
107 }
108
109
110
111
112
113
114
115
116 public OioSctpChannel(Channel parent, SctpChannel ch) {
117 super(parent);
118 this.ch = ch;
119 boolean success = false;
120 try {
121 ch.configureBlocking(false);
122 readSelector = Selector.open();
123 writeSelector = Selector.open();
124 connectSelector = Selector.open();
125
126 ch.register(readSelector, SelectionKey.OP_READ);
127 ch.register(writeSelector, SelectionKey.OP_WRITE);
128 ch.register(connectSelector, SelectionKey.OP_CONNECT);
129
130 config = new OioSctpChannelConfig(this, ch);
131 notificationHandler = new SctpNotificationHandler(this);
132 success = true;
133 } catch (Exception e) {
134 throw new ChannelException("failed to initialize a sctp channel", e);
135 } finally {
136 if (!success) {
137 try {
138 ch.close();
139 } catch (IOException e) {
140 logger.warn("Failed to close a sctp channel.", e);
141 }
142 }
143 }
144 }
145
146 @Override
147 public InetSocketAddress localAddress() {
148 return (InetSocketAddress) super.localAddress();
149 }
150
151 @Override
152 public InetSocketAddress remoteAddress() {
153 return (InetSocketAddress) super.remoteAddress();
154 }
155
156 @Override
157 public SctpServerChannel parent() {
158 return (SctpServerChannel) super.parent();
159 }
160
161 @Override
162 public ChannelMetadata metadata() {
163 return METADATA;
164 }
165
166 @Override
167 public SctpChannelConfig config() {
168 return config;
169 }
170
171 @Override
172 public boolean isOpen() {
173 return ch.isOpen();
174 }
175
176 @Override
177 protected int doReadMessages(List<Object> msgs) throws Exception {
178 if (!readSelector.isOpen()) {
179 return 0;
180 }
181
182 int readMessages = 0;
183
184 final int selectedKeys = readSelector.select(SO_TIMEOUT);
185 final boolean keysSelected = selectedKeys > 0;
186
187 if (!keysSelected) {
188 return readMessages;
189 }
190
191
192
193
194
195 readSelector.selectedKeys().clear();
196 final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
197 ByteBuf buffer = allocHandle.allocate(config().getAllocator());
198 boolean free = true;
199
200 try {
201 ByteBuffer data = buffer.nioBuffer(buffer.writerIndex(), buffer.writableBytes());
202 boolean useInputCopy = false;
203 int javaVersion = PlatformDependent.javaVersion();
204 if (javaVersion >= 22 && javaVersion < 25 && data.isDirect()) {
205
206
207 if (inputCopy == null || inputCopy.capacity() < data.remaining()) {
208 inputCopy = ByteBuffer.allocateDirect(data.remaining());
209 }
210 inputCopy.clear();
211 inputCopy.limit(data.remaining());
212 useInputCopy = true;
213 }
214 MessageInfo messageInfo = ch.receive(useInputCopy ? inputCopy : data, null, notificationHandler);
215 if (messageInfo == null) {
216 return readMessages;
217 }
218 if (useInputCopy) {
219 inputCopy.flip();
220 data.put(inputCopy);
221 }
222
223 data.flip();
224 allocHandle.lastBytesRead(data.remaining());
225 msgs.add(new SctpMessage(messageInfo,
226 buffer.writerIndex(buffer.writerIndex() + allocHandle.lastBytesRead())));
227 free = false;
228 ++readMessages;
229 } catch (Throwable cause) {
230 PlatformDependent.throwException(cause);
231 } finally {
232 if (free) {
233 buffer.release();
234 }
235 }
236 return readMessages;
237 }
238
239 @Override
240 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
241 if (!writeSelector.isOpen()) {
242 return;
243 }
244 final int size = in.size();
245 final int selectedKeys = writeSelector.select(SO_TIMEOUT);
246 if (selectedKeys > 0) {
247 final Set<SelectionKey> writableKeys = writeSelector.selectedKeys();
248 if (writableKeys.isEmpty()) {
249 return;
250 }
251 Iterator<SelectionKey> writableKeysIt = writableKeys.iterator();
252 int written = 0;
253 for (;;) {
254 if (written == size) {
255
256 return;
257 }
258 writableKeysIt.next();
259 writableKeysIt.remove();
260
261 SctpMessage packet = (SctpMessage) in.current();
262 if (packet == null) {
263 return;
264 }
265
266 ByteBuf data = packet.content();
267 int dataLen = data.readableBytes();
268 ByteBuffer nioData;
269
270 int javaVersion = PlatformDependent.javaVersion();
271 if (javaVersion >= 22 && javaVersion < 25 && data.isDirect() ||
272 !data.isDirect() || data.nioBufferCount() != 1) {
273
274
275
276
277 if (outputCopy == null || outputCopy.capacity() < dataLen) {
278 outputCopy = ByteBuffer.allocateDirect(dataLen);
279 }
280 outputCopy.clear();
281 outputCopy.limit(dataLen);
282 data.readBytes(outputCopy);
283 outputCopy.flip();
284 nioData = outputCopy;
285 } else {
286 nioData = data.nioBuffer();
287 }
288
289 final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier());
290 mi.payloadProtocolID(packet.protocolIdentifier());
291 mi.streamNumber(packet.streamIdentifier());
292 mi.unordered(packet.isUnordered());
293
294 ch.send(nioData, mi);
295 written ++;
296 in.remove();
297
298 if (!writableKeysIt.hasNext()) {
299 return;
300 }
301 }
302 }
303 }
304
305 @Override
306 protected Object filterOutboundMessage(Object msg) throws Exception {
307 if (msg instanceof SctpMessage) {
308 return msg;
309 }
310
311 throw new UnsupportedOperationException(
312 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPE);
313 }
314
315 @Override
316 public Association association() {
317 try {
318 return ch.association();
319 } catch (IOException ignored) {
320 return null;
321 }
322 }
323
324 @Override
325 public boolean isActive() {
326 return isOpen() && association() != null;
327 }
328
329 @Override
330 protected SocketAddress localAddress0() {
331 try {
332 Iterator<SocketAddress> i = ch.getAllLocalAddresses().iterator();
333 if (i.hasNext()) {
334 return i.next();
335 }
336 } catch (IOException e) {
337
338 }
339 return null;
340 }
341
342 @Override
343 public Set<InetSocketAddress> allLocalAddresses() {
344 try {
345 final Set<SocketAddress> allLocalAddresses = ch.getAllLocalAddresses();
346 final Set<InetSocketAddress> addresses = new LinkedHashSet<InetSocketAddress>(allLocalAddresses.size());
347 for (SocketAddress socketAddress : allLocalAddresses) {
348 addresses.add((InetSocketAddress) socketAddress);
349 }
350 return addresses;
351 } catch (Throwable ignored) {
352 return Collections.emptySet();
353 }
354 }
355
356 @Override
357 protected SocketAddress remoteAddress0() {
358 try {
359 Iterator<SocketAddress> i = ch.getRemoteAddresses().iterator();
360 if (i.hasNext()) {
361 return i.next();
362 }
363 } catch (IOException e) {
364
365 }
366 return null;
367 }
368
369 @Override
370 public Set<InetSocketAddress> allRemoteAddresses() {
371 try {
372 final Set<SocketAddress> allLocalAddresses = ch.getRemoteAddresses();
373 final Set<InetSocketAddress> addresses = new LinkedHashSet<InetSocketAddress>(allLocalAddresses.size());
374 for (SocketAddress socketAddress : allLocalAddresses) {
375 addresses.add((InetSocketAddress) socketAddress);
376 }
377 return addresses;
378 } catch (Throwable ignored) {
379 return Collections.emptySet();
380 }
381 }
382
383 @Override
384 protected void doBind(SocketAddress localAddress) throws Exception {
385 ch.bind(localAddress);
386 }
387
388 @Override
389 protected void doConnect(SocketAddress remoteAddress,
390 SocketAddress localAddress) throws Exception {
391 if (localAddress != null) {
392 ch.bind(localAddress);
393 }
394
395 boolean success = false;
396 try {
397 ch.connect(remoteAddress);
398 boolean finishConnect = false;
399 while (!finishConnect) {
400 if (connectSelector.select(SO_TIMEOUT) >= 0) {
401 final Set<SelectionKey> selectionKeys = connectSelector.selectedKeys();
402 for (SelectionKey key : selectionKeys) {
403 if (key.isConnectable()) {
404 selectionKeys.clear();
405 finishConnect = true;
406 break;
407 }
408 }
409 selectionKeys.clear();
410 }
411 }
412 success = ch.finishConnect();
413 } finally {
414 if (!success) {
415 doClose();
416 }
417 }
418 }
419
420 @Override
421 protected void doDisconnect() throws Exception {
422 doClose();
423 }
424
425 @Override
426 protected void doClose() throws Exception {
427 closeSelector("read", readSelector);
428 closeSelector("write", writeSelector);
429 closeSelector("connect", connectSelector);
430 ch.close();
431 }
432
433 private static void closeSelector(String selectorName, Selector selector) {
434 try {
435 selector.close();
436 } catch (IOException e) {
437 if (logger.isWarnEnabled()) {
438 logger.warn("Failed to close a " + selectorName + " selector.", e);
439 }
440 }
441 }
442
443 @Override
444 public ChannelFuture bindAddress(InetAddress localAddress) {
445 return bindAddress(localAddress, newPromise());
446 }
447
448 @Override
449 public ChannelFuture bindAddress(final InetAddress localAddress, final ChannelPromise promise) {
450 if (eventLoop().inEventLoop()) {
451 try {
452 ch.bindAddress(localAddress);
453 promise.setSuccess();
454 } catch (Throwable t) {
455 promise.setFailure(t);
456 }
457 } else {
458 eventLoop().execute(new Runnable() {
459 @Override
460 public void run() {
461 bindAddress(localAddress, promise);
462 }
463 });
464 }
465 return promise;
466 }
467
468 @Override
469 public ChannelFuture unbindAddress(InetAddress localAddress) {
470 return unbindAddress(localAddress, newPromise());
471 }
472
473 @Override
474 public ChannelFuture unbindAddress(final InetAddress localAddress, final ChannelPromise promise) {
475 if (eventLoop().inEventLoop()) {
476 try {
477 ch.unbindAddress(localAddress);
478 promise.setSuccess();
479 } catch (Throwable t) {
480 promise.setFailure(t);
481 }
482 } else {
483 eventLoop().execute(new Runnable() {
484 @Override
485 public void run() {
486 unbindAddress(localAddress, promise);
487 }
488 });
489 }
490 return promise;
491 }
492
493 private final class OioSctpChannelConfig extends DefaultSctpChannelConfig {
494 private OioSctpChannelConfig(OioSctpChannel channel, SctpChannel javaChannel) {
495 super(channel, javaChannel);
496 }
497
498 @Override
499 protected void autoReadCleared() {
500 clearReadPending();
501 }
502 }
503 }