1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.sctp.nio;
17
18 import com.sun.nio.sctp.Association;
19 import com.sun.nio.sctp.MessageInfo;
20 import com.sun.nio.sctp.NotificationHandler;
21 import com.sun.nio.sctp.SctpChannel;
22 import io.netty.buffer.ByteBuf;
23 import io.netty.buffer.ByteBufAllocator;
24 import io.netty.channel.Channel;
25 import io.netty.channel.ChannelException;
26 import io.netty.channel.ChannelFuture;
27 import io.netty.channel.ChannelMetadata;
28 import io.netty.channel.ChannelOutboundBuffer;
29 import io.netty.channel.ChannelPromise;
30 import io.netty.channel.RecvByteBufAllocator;
31 import io.netty.channel.nio.AbstractNioMessageChannel;
32 import io.netty.channel.sctp.DefaultSctpChannelConfig;
33 import io.netty.channel.sctp.SctpChannelConfig;
34 import io.netty.channel.sctp.SctpMessage;
35 import io.netty.channel.sctp.SctpNotificationHandler;
36 import io.netty.channel.sctp.SctpServerChannel;
37 import io.netty.util.internal.PlatformDependent;
38 import io.netty.util.internal.StringUtil;
39 import io.netty.util.internal.logging.InternalLogger;
40 import io.netty.util.internal.logging.InternalLoggerFactory;
41
42 import java.io.IOException;
43 import java.net.InetAddress;
44 import java.net.InetSocketAddress;
45 import java.net.SocketAddress;
46 import java.nio.ByteBuffer;
47 import java.nio.channels.SelectionKey;
48 import java.util.Collections;
49 import java.util.HashSet;
50 import java.util.Iterator;
51 import java.util.LinkedHashSet;
52 import java.util.List;
53 import java.util.Set;
54
55
56
57
58
59
60
61
62 public class NioSctpChannel extends AbstractNioMessageChannel implements io.netty.channel.sctp.SctpChannel {
63 private static final ChannelMetadata METADATA = new ChannelMetadata(false);
64
65 private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioSctpChannel.class);
66
67 private final SctpChannelConfig config;
68
69 private final NotificationHandler<?> notificationHandler;
70
71 private RecvByteBufAllocator.Handle allocHandle;
72
73 private static SctpChannel newSctpChannel() {
74 try {
75 return SctpChannel.open();
76 } catch (IOException e) {
77 throw new ChannelException("Failed to open a sctp channel.", e);
78 }
79 }
80
81
82
83
84 public NioSctpChannel() {
85 this(newSctpChannel());
86 }
87
88
89
90
91 public NioSctpChannel(SctpChannel sctpChannel) {
92 this(null, sctpChannel);
93 }
94
95
96
97
98
99
100
101
102 public NioSctpChannel(Channel parent, SctpChannel sctpChannel) {
103 super(parent, sctpChannel, SelectionKey.OP_READ);
104 try {
105 sctpChannel.configureBlocking(false);
106 config = new NioSctpChannelConfig(this, sctpChannel);
107 notificationHandler = new SctpNotificationHandler(this);
108 } catch (IOException e) {
109 try {
110 sctpChannel.close();
111 } catch (IOException e2) {
112 if (logger.isWarnEnabled()) {
113 logger.warn(
114 "Failed to close a partially initialized sctp channel.", e2);
115 }
116 }
117
118 throw new ChannelException("Failed to enter non-blocking mode.", e);
119 }
120 }
121
122 @Override
123 public InetSocketAddress localAddress() {
124 return (InetSocketAddress) super.localAddress();
125 }
126
127 @Override
128 public InetSocketAddress remoteAddress() {
129 return (InetSocketAddress) super.remoteAddress();
130 }
131
132 @Override
133 public SctpServerChannel parent() {
134 return (SctpServerChannel) super.parent();
135 }
136
137 @Override
138 public ChannelMetadata metadata() {
139 return METADATA;
140 }
141
142 @Override
143 public Association association() {
144 try {
145 return javaChannel().association();
146 } catch (IOException ignored) {
147 return null;
148 }
149 }
150
151 @Override
152 public Set<InetSocketAddress> allLocalAddresses() {
153 try {
154 final Set<SocketAddress> allLocalAddresses = javaChannel().getAllLocalAddresses();
155 final Set<InetSocketAddress> addresses = new LinkedHashSet<InetSocketAddress>(allLocalAddresses.size());
156 for (SocketAddress socketAddress : allLocalAddresses) {
157 addresses.add((InetSocketAddress) socketAddress);
158 }
159 return addresses;
160 } catch (Throwable ignored) {
161 return Collections.emptySet();
162 }
163 }
164
165 @Override
166 public SctpChannelConfig config() {
167 return config;
168 }
169
170 @Override
171 public Set<InetSocketAddress> allRemoteAddresses() {
172 try {
173 final Set<SocketAddress> allLocalAddresses = javaChannel().getRemoteAddresses();
174 final Set<InetSocketAddress> addresses = new HashSet<InetSocketAddress>(allLocalAddresses.size());
175 for (SocketAddress socketAddress : allLocalAddresses) {
176 addresses.add((InetSocketAddress) socketAddress);
177 }
178 return addresses;
179 } catch (Throwable ignored) {
180 return Collections.emptySet();
181 }
182 }
183
184 @Override
185 protected SctpChannel javaChannel() {
186 return (SctpChannel) super.javaChannel();
187 }
188
189 @Override
190 public boolean isActive() {
191 SctpChannel ch = javaChannel();
192 return ch.isOpen() && association() != null;
193 }
194
195 @Override
196 protected SocketAddress localAddress0() {
197 try {
198 Iterator<SocketAddress> i = javaChannel().getAllLocalAddresses().iterator();
199 if (i.hasNext()) {
200 return i.next();
201 }
202 } catch (IOException e) {
203
204 }
205 return null;
206 }
207
208 @Override
209 protected SocketAddress remoteAddress0() {
210 try {
211 Iterator<SocketAddress> i = javaChannel().getRemoteAddresses().iterator();
212 if (i.hasNext()) {
213 return i.next();
214 }
215 } catch (IOException e) {
216
217 }
218 return null;
219 }
220
221 @Override
222 protected void doBind(SocketAddress localAddress) throws Exception {
223 javaChannel().bind(localAddress);
224 }
225
226 @Override
227 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
228 if (localAddress != null) {
229 javaChannel().bind(localAddress);
230 }
231
232 boolean success = false;
233 try {
234 boolean connected = javaChannel().connect(remoteAddress);
235 if (!connected) {
236 selectionKey().interestOps(SelectionKey.OP_CONNECT);
237 }
238 success = true;
239 return connected;
240 } finally {
241 if (!success) {
242 doClose();
243 }
244 }
245 }
246
247 @Override
248 protected void doFinishConnect() throws Exception {
249 if (!javaChannel().finishConnect()) {
250 throw new Error();
251 }
252 }
253
254 @Override
255 protected void doDisconnect() throws Exception {
256 doClose();
257 }
258
259 @Override
260 protected void doClose() throws Exception {
261 javaChannel().close();
262 }
263
264 @Override
265 protected int doReadMessages(List<Object> buf) throws Exception {
266 SctpChannel ch = javaChannel();
267
268 RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
269 if (allocHandle == null) {
270 this.allocHandle = allocHandle = config().getRecvByteBufAllocator().newHandle();
271 }
272 ByteBuf buffer = allocHandle.allocate(config().getAllocator());
273 boolean free = true;
274 try {
275 ByteBuffer data = buffer.internalNioBuffer(buffer.writerIndex(), buffer.writableBytes());
276 int pos = data.position();
277
278 MessageInfo messageInfo = ch.receive(data, null, notificationHandler);
279 if (messageInfo == null) {
280 return 0;
281 }
282 buf.add(new SctpMessage(messageInfo, buffer.writerIndex(buffer.writerIndex() + data.position() - pos)));
283 free = false;
284 return 1;
285 } catch (Throwable cause) {
286 PlatformDependent.throwException(cause);
287 return -1;
288 } finally {
289 int bytesRead = buffer.readableBytes();
290 allocHandle.record(bytesRead);
291 if (free) {
292 buffer.release();
293 }
294 }
295 }
296
297 @Override
298 protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
299 SctpMessage packet = (SctpMessage) msg;
300 ByteBuf data = packet.content();
301 int dataLen = data.readableBytes();
302 if (dataLen == 0) {
303 return true;
304 }
305
306 ByteBufAllocator alloc = alloc();
307 boolean needsCopy = data.nioBufferCount() != 1;
308 if (!needsCopy) {
309 if (!data.isDirect() && alloc.isDirectBufferPooled()) {
310 needsCopy = true;
311 }
312 }
313 ByteBuffer nioData;
314 if (!needsCopy) {
315 nioData = data.nioBuffer();
316 } else {
317 data = alloc.directBuffer(dataLen).writeBytes(data);
318 nioData = data.nioBuffer();
319 }
320 final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier());
321 mi.payloadProtocolID(packet.protocolIdentifier());
322 mi.streamNumber(packet.streamIdentifier());
323 mi.unordered(packet.isUnordered());
324
325 final int writtenBytes = javaChannel().send(nioData, mi);
326 return writtenBytes > 0;
327 }
328
329 @Override
330 protected final Object filterOutboundMessage(Object msg) throws Exception {
331 if (msg instanceof SctpMessage) {
332 SctpMessage m = (SctpMessage) msg;
333 ByteBuf buf = m.content();
334 if (buf.isDirect() && buf.nioBufferCount() == 1) {
335 return m;
336 }
337
338 return new SctpMessage(m.protocolIdentifier(), m.streamIdentifier(), m.isUnordered(),
339 newDirectBuffer(m, buf));
340 }
341
342 throw new UnsupportedOperationException(
343 "unsupported message type: " + StringUtil.simpleClassName(msg) +
344 " (expected: " + StringUtil.simpleClassName(SctpMessage.class));
345 }
346
347 @Override
348 public ChannelFuture bindAddress(InetAddress localAddress) {
349 return bindAddress(localAddress, newPromise());
350 }
351
352 @Override
353 public ChannelFuture bindAddress(final InetAddress localAddress, final ChannelPromise promise) {
354 if (eventLoop().inEventLoop()) {
355 try {
356 javaChannel().bindAddress(localAddress);
357 promise.setSuccess();
358 } catch (Throwable t) {
359 promise.setFailure(t);
360 }
361 } else {
362 eventLoop().execute(new Runnable() {
363 @Override
364 public void run() {
365 bindAddress(localAddress, promise);
366 }
367 });
368 }
369 return promise;
370 }
371
372 @Override
373 public ChannelFuture unbindAddress(InetAddress localAddress) {
374 return unbindAddress(localAddress, newPromise());
375 }
376
377 @Override
378 public ChannelFuture unbindAddress(final InetAddress localAddress, final ChannelPromise promise) {
379 if (eventLoop().inEventLoop()) {
380 try {
381 javaChannel().unbindAddress(localAddress);
382 promise.setSuccess();
383 } catch (Throwable t) {
384 promise.setFailure(t);
385 }
386 } else {
387 eventLoop().execute(new Runnable() {
388 @Override
389 public void run() {
390 unbindAddress(localAddress, promise);
391 }
392 });
393 }
394 return promise;
395 }
396
397 private final class NioSctpChannelConfig extends DefaultSctpChannelConfig {
398 private NioSctpChannelConfig(NioSctpChannel channel, SctpChannel javaChannel) {
399 super(channel, javaChannel);
400 }
401
402 @Override
403 protected void autoReadCleared() {
404 setReadPending(false);
405 }
406 }
407 }