1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.sctp.nio;
17
18 import com.sun.nio.sctp.Association;
19 import com.sun.nio.sctp.MessageInfo;
20 import com.sun.nio.sctp.NotificationHandler;
21 import com.sun.nio.sctp.SctpChannel;
22 import io.netty.buffer.ByteBuf;
23 import io.netty.buffer.ByteBufAllocator;
24 import io.netty.channel.Channel;
25 import io.netty.channel.ChannelException;
26 import io.netty.channel.ChannelFuture;
27 import io.netty.channel.ChannelMetadata;
28 import io.netty.channel.ChannelOutboundBuffer;
29 import io.netty.channel.ChannelPromise;
30 import io.netty.channel.RecvByteBufAllocator;
31 import io.netty.channel.nio.AbstractNioMessageChannel;
32 import io.netty.channel.nio.NioIoOps;
33 import io.netty.channel.sctp.DefaultSctpChannelConfig;
34 import io.netty.channel.sctp.SctpChannelConfig;
35 import io.netty.channel.sctp.SctpMessage;
36 import io.netty.channel.sctp.SctpNotificationHandler;
37 import io.netty.channel.sctp.SctpServerChannel;
38 import io.netty.util.internal.PlatformDependent;
39 import io.netty.util.internal.StringUtil;
40 import io.netty.util.internal.logging.InternalLogger;
41 import io.netty.util.internal.logging.InternalLoggerFactory;
42
43 import java.io.IOException;
44 import java.net.InetAddress;
45 import java.net.InetSocketAddress;
46 import java.net.SocketAddress;
47 import java.nio.ByteBuffer;
48 import java.nio.channels.SelectionKey;
49 import java.util.Collections;
50 import java.util.HashSet;
51 import java.util.Iterator;
52 import java.util.LinkedHashSet;
53 import java.util.List;
54 import java.util.Set;
55
56
57
58
59
60
61
62
63 public class NioSctpChannel extends AbstractNioMessageChannel implements io.netty.channel.sctp.SctpChannel {
64 private static final ChannelMetadata METADATA = new ChannelMetadata(false);
65
66 private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioSctpChannel.class);
67
68 private final SctpChannelConfig config;
69
70 private final NotificationHandler<?> notificationHandler;
71
72 private static SctpChannel newSctpChannel() {
73 try {
74 return SctpChannel.open();
75 } catch (IOException e) {
76 throw new ChannelException("Failed to open a sctp channel.", e);
77 }
78 }
79
80
81
82
83 public NioSctpChannel() {
84 this(newSctpChannel());
85 }
86
87
88
89
90 public NioSctpChannel(SctpChannel sctpChannel) {
91 this(null, sctpChannel);
92 }
93
94
95
96
97
98
99
100
101 public NioSctpChannel(Channel parent, SctpChannel sctpChannel) {
102 super(parent, sctpChannel, SelectionKey.OP_READ);
103 try {
104 sctpChannel.configureBlocking(false);
105 config = new NioSctpChannelConfig(this, sctpChannel);
106 notificationHandler = new SctpNotificationHandler(this);
107 } catch (IOException e) {
108 try {
109 sctpChannel.close();
110 } catch (IOException e2) {
111 if (logger.isWarnEnabled()) {
112 logger.warn(
113 "Failed to close a partially initialized sctp channel.", e2);
114 }
115 }
116
117 throw new ChannelException("Failed to enter non-blocking mode.", e);
118 }
119 }
120
121 @Override
122 public InetSocketAddress localAddress() {
123 return (InetSocketAddress) super.localAddress();
124 }
125
126 @Override
127 public InetSocketAddress remoteAddress() {
128 return (InetSocketAddress) super.remoteAddress();
129 }
130
131 @Override
132 public SctpServerChannel parent() {
133 return (SctpServerChannel) super.parent();
134 }
135
136 @Override
137 public ChannelMetadata metadata() {
138 return METADATA;
139 }
140
141 @Override
142 public Association association() {
143 try {
144 return javaChannel().association();
145 } catch (IOException ignored) {
146 return null;
147 }
148 }
149
150 @Override
151 public Set<InetSocketAddress> allLocalAddresses() {
152 try {
153 final Set<SocketAddress> allLocalAddresses = javaChannel().getAllLocalAddresses();
154 final Set<InetSocketAddress> addresses = new LinkedHashSet<InetSocketAddress>(allLocalAddresses.size());
155 for (SocketAddress socketAddress : allLocalAddresses) {
156 addresses.add((InetSocketAddress) socketAddress);
157 }
158 return addresses;
159 } catch (Throwable ignored) {
160 return Collections.emptySet();
161 }
162 }
163
164 @Override
165 public SctpChannelConfig config() {
166 return config;
167 }
168
169 @Override
170 public Set<InetSocketAddress> allRemoteAddresses() {
171 try {
172 final Set<SocketAddress> allLocalAddresses = javaChannel().getRemoteAddresses();
173 final Set<InetSocketAddress> addresses = new HashSet<InetSocketAddress>(allLocalAddresses.size());
174 for (SocketAddress socketAddress : allLocalAddresses) {
175 addresses.add((InetSocketAddress) socketAddress);
176 }
177 return addresses;
178 } catch (Throwable ignored) {
179 return Collections.emptySet();
180 }
181 }
182
183 @Override
184 protected SctpChannel javaChannel() {
185 return (SctpChannel) super.javaChannel();
186 }
187
188 @Override
189 public boolean isActive() {
190 SctpChannel ch = javaChannel();
191 return ch.isOpen() && association() != null;
192 }
193
194 @Override
195 protected SocketAddress localAddress0() {
196 try {
197 Iterator<SocketAddress> i = javaChannel().getAllLocalAddresses().iterator();
198 if (i.hasNext()) {
199 return i.next();
200 }
201 } catch (IOException e) {
202
203 }
204 return null;
205 }
206
207 @Override
208 protected SocketAddress remoteAddress0() {
209 try {
210 Iterator<SocketAddress> i = javaChannel().getRemoteAddresses().iterator();
211 if (i.hasNext()) {
212 return i.next();
213 }
214 } catch (IOException e) {
215
216 }
217 return null;
218 }
219
220 @Override
221 protected void doBind(SocketAddress localAddress) throws Exception {
222 javaChannel().bind(localAddress);
223 }
224
225 @Override
226 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
227 if (localAddress != null) {
228 javaChannel().bind(localAddress);
229 }
230
231 boolean success = false;
232 try {
233 boolean connected = javaChannel().connect(remoteAddress);
234 if (!connected) {
235 addAndSubmit(NioIoOps.CONNECT);
236 }
237 success = true;
238 return connected;
239 } finally {
240 if (!success) {
241 doClose();
242 }
243 }
244 }
245
246 @Override
247 protected void doFinishConnect() throws Exception {
248 if (!javaChannel().finishConnect()) {
249 throw new Error();
250 }
251 }
252
253 @Override
254 protected void doDisconnect() throws Exception {
255 doClose();
256 }
257
258 @Override
259 protected void doClose() throws Exception {
260 javaChannel().close();
261 }
262
263 @Override
264 protected int doReadMessages(List<Object> buf) throws Exception {
265 SctpChannel ch = javaChannel();
266
267 RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
268 ByteBuf buffer = allocHandle.allocate(config().getAllocator());
269 boolean free = true;
270 try {
271 ByteBuffer data = buffer.internalNioBuffer(buffer.writerIndex(), buffer.writableBytes());
272 int pos = data.position();
273
274 MessageInfo messageInfo = ch.receive(data, null, notificationHandler);
275 if (messageInfo == null) {
276 return 0;
277 }
278
279 allocHandle.lastBytesRead(data.position() - pos);
280 buf.add(new SctpMessage(messageInfo,
281 buffer.writerIndex(buffer.writerIndex() + allocHandle.lastBytesRead())));
282 free = false;
283 return 1;
284 } catch (Throwable cause) {
285 PlatformDependent.throwException(cause);
286 return -1;
287 } finally {
288 if (free) {
289 buffer.release();
290 }
291 }
292 }
293
294 @Override
295 protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
296 SctpMessage packet = (SctpMessage) msg;
297 ByteBuf data = packet.content();
298 int dataLen = data.readableBytes();
299 if (dataLen == 0) {
300 return true;
301 }
302
303 ByteBufAllocator alloc = alloc();
304 boolean needsCopy = data.nioBufferCount() != 1;
305 if (!needsCopy) {
306 if (!data.isDirect() && alloc.isDirectBufferPooled()) {
307 needsCopy = true;
308 }
309 }
310 ByteBuffer nioData;
311 if (needsCopy) {
312 data = alloc.directBuffer(dataLen).writeBytes(data);
313 }
314 nioData = data.nioBuffer();
315 final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier());
316 mi.payloadProtocolID(packet.protocolIdentifier());
317 mi.streamNumber(packet.streamIdentifier());
318 mi.unordered(packet.isUnordered());
319
320 final int writtenBytes = javaChannel().send(nioData, mi);
321 return writtenBytes > 0;
322 }
323
324 @Override
325 protected final Object filterOutboundMessage(Object msg) throws Exception {
326 if (msg instanceof SctpMessage) {
327 SctpMessage m = (SctpMessage) msg;
328 ByteBuf buf = m.content();
329 if (buf.isDirect() && buf.nioBufferCount() == 1) {
330 return m;
331 }
332
333 return new SctpMessage(m.protocolIdentifier(), m.streamIdentifier(), m.isUnordered(),
334 newDirectBuffer(m, buf));
335 }
336
337 throw new UnsupportedOperationException(
338 "unsupported message type: " + StringUtil.simpleClassName(msg) +
339 " (expected: " + StringUtil.simpleClassName(SctpMessage.class));
340 }
341
342 @Override
343 public ChannelFuture bindAddress(InetAddress localAddress) {
344 return bindAddress(localAddress, newPromise());
345 }
346
347 @Override
348 public ChannelFuture bindAddress(final InetAddress localAddress, final ChannelPromise promise) {
349 if (eventLoop().inEventLoop()) {
350 try {
351 javaChannel().bindAddress(localAddress);
352 promise.setSuccess();
353 } catch (Throwable t) {
354 promise.setFailure(t);
355 }
356 } else {
357 eventLoop().execute(new Runnable() {
358 @Override
359 public void run() {
360 bindAddress(localAddress, promise);
361 }
362 });
363 }
364 return promise;
365 }
366
367 @Override
368 public ChannelFuture unbindAddress(InetAddress localAddress) {
369 return unbindAddress(localAddress, newPromise());
370 }
371
372 @Override
373 public ChannelFuture unbindAddress(final InetAddress localAddress, final ChannelPromise promise) {
374 if (eventLoop().inEventLoop()) {
375 try {
376 javaChannel().unbindAddress(localAddress);
377 promise.setSuccess();
378 } catch (Throwable t) {
379 promise.setFailure(t);
380 }
381 } else {
382 eventLoop().execute(new Runnable() {
383 @Override
384 public void run() {
385 unbindAddress(localAddress, promise);
386 }
387 });
388 }
389 return promise;
390 }
391
392 private final class NioSctpChannelConfig extends DefaultSctpChannelConfig {
393 private NioSctpChannelConfig(NioSctpChannel channel, SctpChannel javaChannel) {
394 super(channel, javaChannel);
395 }
396
397 @Override
398 protected void autoReadCleared() {
399 clearReadPending();
400 }
401 }
402 }