1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.sctp.nio;
17
18 import com.sun.nio.sctp.Association;
19 import com.sun.nio.sctp.MessageInfo;
20 import com.sun.nio.sctp.NotificationHandler;
21 import com.sun.nio.sctp.SctpChannel;
22 import io.netty.buffer.ByteBuf;
23 import io.netty.channel.Channel;
24 import io.netty.channel.ChannelException;
25 import io.netty.channel.ChannelFuture;
26 import io.netty.channel.ChannelMetadata;
27 import io.netty.channel.ChannelOutboundBuffer;
28 import io.netty.channel.ChannelPromise;
29 import io.netty.channel.RecvByteBufAllocator;
30 import io.netty.channel.nio.AbstractNioMessageChannel;
31 import io.netty.channel.nio.NioIoOps;
32 import io.netty.channel.sctp.DefaultSctpChannelConfig;
33 import io.netty.channel.sctp.SctpChannelConfig;
34 import io.netty.channel.sctp.SctpMessage;
35 import io.netty.channel.sctp.SctpNotificationHandler;
36 import io.netty.channel.sctp.SctpServerChannel;
37 import io.netty.util.internal.PlatformDependent;
38 import io.netty.util.internal.StringUtil;
39 import io.netty.util.internal.logging.InternalLogger;
40 import io.netty.util.internal.logging.InternalLoggerFactory;
41
42 import java.io.IOException;
43 import java.net.InetAddress;
44 import java.net.InetSocketAddress;
45 import java.net.SocketAddress;
46 import java.nio.ByteBuffer;
47 import java.nio.channels.SelectionKey;
48 import java.util.Collections;
49 import java.util.HashSet;
50 import java.util.Iterator;
51 import java.util.LinkedHashSet;
52 import java.util.List;
53 import java.util.Set;
54
55
56
57
58
59
60
61
62 public class NioSctpChannel extends AbstractNioMessageChannel implements io.netty.channel.sctp.SctpChannel {
63 private static final ChannelMetadata METADATA = new ChannelMetadata(false);
64
65 private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioSctpChannel.class);
66
67 private final SctpChannelConfig config;
68
69 private final NotificationHandler<?> notificationHandler;
70 private ByteBuffer inputCopy;
71 private ByteBuffer outputCopy;
72
73 private static SctpChannel newSctpChannel() {
74 try {
75 return SctpChannel.open();
76 } catch (IOException e) {
77 throw new ChannelException("Failed to open a sctp channel.", e);
78 }
79 }
80
81
82
83
84 public NioSctpChannel() {
85 this(newSctpChannel());
86 }
87
88
89
90
91 public NioSctpChannel(SctpChannel sctpChannel) {
92 this(null, sctpChannel);
93 }
94
95
96
97
98
99
100
101
102 public NioSctpChannel(Channel parent, SctpChannel sctpChannel) {
103 super(parent, sctpChannel, SelectionKey.OP_READ);
104 try {
105 sctpChannel.configureBlocking(false);
106 config = new NioSctpChannelConfig(this, sctpChannel);
107 notificationHandler = new SctpNotificationHandler(this);
108 } catch (IOException e) {
109 try {
110 sctpChannel.close();
111 } catch (IOException e2) {
112 if (logger.isWarnEnabled()) {
113 logger.warn(
114 "Failed to close a partially initialized sctp channel.", e2);
115 }
116 }
117
118 throw new ChannelException("Failed to enter non-blocking mode.", e);
119 }
120 }
121
122 @Override
123 public InetSocketAddress localAddress() {
124 return (InetSocketAddress) super.localAddress();
125 }
126
127 @Override
128 public InetSocketAddress remoteAddress() {
129 return (InetSocketAddress) super.remoteAddress();
130 }
131
132 @Override
133 public SctpServerChannel parent() {
134 return (SctpServerChannel) super.parent();
135 }
136
137 @Override
138 public ChannelMetadata metadata() {
139 return METADATA;
140 }
141
142 @Override
143 public Association association() {
144 try {
145 return javaChannel().association();
146 } catch (IOException ignored) {
147 return null;
148 }
149 }
150
151 @Override
152 public Set<InetSocketAddress> allLocalAddresses() {
153 try {
154 final Set<SocketAddress> allLocalAddresses = javaChannel().getAllLocalAddresses();
155 final Set<InetSocketAddress> addresses = new LinkedHashSet<InetSocketAddress>(allLocalAddresses.size());
156 for (SocketAddress socketAddress : allLocalAddresses) {
157 addresses.add((InetSocketAddress) socketAddress);
158 }
159 return addresses;
160 } catch (Throwable ignored) {
161 return Collections.emptySet();
162 }
163 }
164
165 @Override
166 public SctpChannelConfig config() {
167 return config;
168 }
169
170 @Override
171 public Set<InetSocketAddress> allRemoteAddresses() {
172 try {
173 final Set<SocketAddress> allLocalAddresses = javaChannel().getRemoteAddresses();
174 final Set<InetSocketAddress> addresses = new HashSet<InetSocketAddress>(allLocalAddresses.size());
175 for (SocketAddress socketAddress : allLocalAddresses) {
176 addresses.add((InetSocketAddress) socketAddress);
177 }
178 return addresses;
179 } catch (Throwable ignored) {
180 return Collections.emptySet();
181 }
182 }
183
184 @Override
185 protected SctpChannel javaChannel() {
186 return (SctpChannel) super.javaChannel();
187 }
188
189 @Override
190 public boolean isActive() {
191 SctpChannel ch = javaChannel();
192 return ch.isOpen() && association() != null;
193 }
194
195 @Override
196 protected SocketAddress localAddress0() {
197 try {
198 Iterator<SocketAddress> i = javaChannel().getAllLocalAddresses().iterator();
199 if (i.hasNext()) {
200 return i.next();
201 }
202 } catch (IOException e) {
203
204 }
205 return null;
206 }
207
208 @Override
209 protected SocketAddress remoteAddress0() {
210 try {
211 Iterator<SocketAddress> i = javaChannel().getRemoteAddresses().iterator();
212 if (i.hasNext()) {
213 return i.next();
214 }
215 } catch (IOException e) {
216
217 }
218 return null;
219 }
220
221 @Override
222 protected void doBind(SocketAddress localAddress) throws Exception {
223 javaChannel().bind(localAddress);
224 }
225
226 @Override
227 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
228 if (localAddress != null) {
229 javaChannel().bind(localAddress);
230 }
231
232 boolean success = false;
233 try {
234 boolean connected = javaChannel().connect(remoteAddress);
235 if (!connected) {
236 addAndSubmit(NioIoOps.CONNECT);
237 }
238 success = true;
239 return connected;
240 } finally {
241 if (!success) {
242 doClose();
243 }
244 }
245 }
246
247 @Override
248 protected void doFinishConnect() throws Exception {
249 if (!javaChannel().finishConnect()) {
250 throw new Error();
251 }
252 }
253
254 @Override
255 protected void doDisconnect() throws Exception {
256 doClose();
257 }
258
259 @Override
260 protected void doClose() throws Exception {
261 javaChannel().close();
262 }
263
264 @Override
265 protected int doReadMessages(List<Object> buf) throws Exception {
266 SctpChannel ch = javaChannel();
267
268 RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
269 ByteBuf buffer = allocHandle.allocate(config().getAllocator());
270 boolean free = true;
271 try {
272 ByteBuffer data = buffer.internalNioBuffer(buffer.writerIndex(), buffer.writableBytes());
273 boolean useInputCopy = false;
274 int javaVersion = PlatformDependent.javaVersion();
275 if (javaVersion >= 22 && javaVersion < 25 && data.isDirect()) {
276
277
278 if (inputCopy == null || inputCopy.capacity() < data.remaining()) {
279 inputCopy = ByteBuffer.allocateDirect(data.remaining());
280 }
281 inputCopy.clear();
282 inputCopy.limit(data.remaining());
283 useInputCopy = true;
284 }
285 int pos = data.position();
286
287 MessageInfo messageInfo = ch.receive(useInputCopy ? inputCopy : data, null, notificationHandler);
288 if (messageInfo == null) {
289 return 0;
290 }
291 if (useInputCopy) {
292 inputCopy.flip();
293 data.put(inputCopy);
294 }
295
296 allocHandle.lastBytesRead(data.position() - pos);
297 buf.add(new SctpMessage(messageInfo,
298 buffer.writerIndex(buffer.writerIndex() + allocHandle.lastBytesRead())));
299 free = false;
300 return 1;
301 } catch (Throwable cause) {
302 PlatformDependent.throwException(cause);
303 return -1;
304 } finally {
305 if (free) {
306 buffer.release();
307 }
308 }
309 }
310
311 @Override
312 protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
313 SctpMessage packet = (SctpMessage) msg;
314 ByteBuf data = packet.content();
315 int dataLen = data.readableBytes();
316 if (dataLen == 0) {
317 return true;
318 }
319
320 ByteBuffer nioData;
321 int javaVersion = PlatformDependent.javaVersion();
322 if (javaVersion >= 22 && javaVersion < 25 && data.isDirect() ||
323 !data.isDirect() || data.nioBufferCount() != 1) {
324
325
326
327
328 if (outputCopy == null || outputCopy.capacity() < dataLen) {
329 outputCopy = ByteBuffer.allocateDirect(dataLen);
330 }
331 outputCopy.clear();
332 outputCopy.limit(dataLen);
333 data.readBytes(outputCopy);
334 outputCopy.flip();
335 nioData = outputCopy;
336 } else {
337 nioData = data.nioBuffer();
338 }
339
340 final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier());
341 mi.payloadProtocolID(packet.protocolIdentifier());
342 mi.streamNumber(packet.streamIdentifier());
343 mi.unordered(packet.isUnordered());
344
345 final int writtenBytes = javaChannel().send(nioData, mi);
346 return writtenBytes > 0;
347 }
348
349 @Override
350 protected final Object filterOutboundMessage(Object msg) throws Exception {
351 if (msg instanceof SctpMessage) {
352 SctpMessage m = (SctpMessage) msg;
353 ByteBuf buf = m.content();
354 if (buf.isDirect() && buf.nioBufferCount() == 1) {
355 return m;
356 }
357
358 return new SctpMessage(m.protocolIdentifier(), m.streamIdentifier(), m.isUnordered(),
359 newDirectBuffer(m, buf));
360 }
361
362 throw new UnsupportedOperationException(
363 "unsupported message type: " + StringUtil.simpleClassName(msg) +
364 " (expected: " + StringUtil.simpleClassName(SctpMessage.class));
365 }
366
367 @Override
368 public ChannelFuture bindAddress(InetAddress localAddress) {
369 return bindAddress(localAddress, newPromise());
370 }
371
372 @Override
373 public ChannelFuture bindAddress(final InetAddress localAddress, final ChannelPromise promise) {
374 if (eventLoop().inEventLoop()) {
375 try {
376 javaChannel().bindAddress(localAddress);
377 promise.setSuccess();
378 } catch (Throwable t) {
379 promise.setFailure(t);
380 }
381 } else {
382 eventLoop().execute(new Runnable() {
383 @Override
384 public void run() {
385 bindAddress(localAddress, promise);
386 }
387 });
388 }
389 return promise;
390 }
391
392 @Override
393 public ChannelFuture unbindAddress(InetAddress localAddress) {
394 return unbindAddress(localAddress, newPromise());
395 }
396
397 @Override
398 public ChannelFuture unbindAddress(final InetAddress localAddress, final ChannelPromise promise) {
399 if (eventLoop().inEventLoop()) {
400 try {
401 javaChannel().unbindAddress(localAddress);
402 promise.setSuccess();
403 } catch (Throwable t) {
404 promise.setFailure(t);
405 }
406 } else {
407 eventLoop().execute(new Runnable() {
408 @Override
409 public void run() {
410 unbindAddress(localAddress, promise);
411 }
412 });
413 }
414 return promise;
415 }
416
417 private final class NioSctpChannelConfig extends DefaultSctpChannelConfig {
418 private NioSctpChannelConfig(NioSctpChannel channel, SctpChannel javaChannel) {
419 super(channel, javaChannel);
420 }
421
422 @Override
423 protected void autoReadCleared() {
424 clearReadPending();
425 }
426 }
427 }