1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.sctp.oio;
17
18 import com.sun.nio.sctp.Association;
19 import com.sun.nio.sctp.MessageInfo;
20 import com.sun.nio.sctp.NotificationHandler;
21 import com.sun.nio.sctp.SctpChannel;
22
23 import io.netty.buffer.ByteBuf;
24 import io.netty.channel.Channel;
25 import io.netty.channel.ChannelException;
26 import io.netty.channel.ChannelFuture;
27 import io.netty.channel.ChannelMetadata;
28 import io.netty.channel.ChannelOutboundBuffer;
29 import io.netty.channel.ChannelPromise;
30 import io.netty.channel.RecvByteBufAllocator;
31 import io.netty.channel.oio.AbstractOioMessageChannel;
32 import io.netty.channel.sctp.DefaultSctpChannelConfig;
33 import io.netty.channel.sctp.SctpChannelConfig;
34 import io.netty.channel.sctp.SctpMessage;
35 import io.netty.channel.sctp.SctpNotificationHandler;
36 import io.netty.channel.sctp.SctpServerChannel;
37 import io.netty.util.internal.PlatformDependent;
38 import io.netty.util.internal.StringUtil;
39 import io.netty.util.internal.logging.InternalLogger;
40 import io.netty.util.internal.logging.InternalLoggerFactory;
41
42 import java.io.IOException;
43 import java.net.InetAddress;
44 import java.net.InetSocketAddress;
45 import java.net.SocketAddress;
46 import java.nio.ByteBuffer;
47 import java.nio.channels.SelectionKey;
48 import java.nio.channels.Selector;
49 import java.util.Collections;
50 import java.util.Iterator;
51 import java.util.LinkedHashSet;
52 import java.util.List;
53 import java.util.Set;
54
55
56
57
58
59
60
61
62
63
64 @Deprecated
65 public class OioSctpChannel extends AbstractOioMessageChannel
66 implements io.netty.channel.sctp.SctpChannel {
67
68 private static final InternalLogger logger =
69 InternalLoggerFactory.getInstance(OioSctpChannel.class);
70
71 private static final ChannelMetadata METADATA = new ChannelMetadata(false);
72 private static final String EXPECTED_TYPE = " (expected: " + StringUtil.simpleClassName(SctpMessage.class) + ')';
73
74 private final SctpChannel ch;
75 private final SctpChannelConfig config;
76
77 private final Selector readSelector;
78 private final Selector writeSelector;
79 private final Selector connectSelector;
80
81 private final NotificationHandler<?> notificationHandler;
82
83 private static SctpChannel openChannel() {
84 try {
85 return SctpChannel.open();
86 } catch (IOException e) {
87 throw new ChannelException("Failed to open a sctp channel.", e);
88 }
89 }
90
91
92
93
94 public OioSctpChannel() {
95 this(openChannel());
96 }
97
98
99
100
101
102
103 public OioSctpChannel(SctpChannel ch) {
104 this(null, ch);
105 }
106
107
108
109
110
111
112
113
114 public OioSctpChannel(Channel parent, SctpChannel ch) {
115 super(parent);
116 this.ch = ch;
117 boolean success = false;
118 try {
119 ch.configureBlocking(false);
120 readSelector = Selector.open();
121 writeSelector = Selector.open();
122 connectSelector = Selector.open();
123
124 ch.register(readSelector, SelectionKey.OP_READ);
125 ch.register(writeSelector, SelectionKey.OP_WRITE);
126 ch.register(connectSelector, SelectionKey.OP_CONNECT);
127
128 config = new OioSctpChannelConfig(this, ch);
129 notificationHandler = new SctpNotificationHandler(this);
130 success = true;
131 } catch (Exception e) {
132 throw new ChannelException("failed to initialize a sctp channel", e);
133 } finally {
134 if (!success) {
135 try {
136 ch.close();
137 } catch (IOException e) {
138 logger.warn("Failed to close a sctp channel.", e);
139 }
140 }
141 }
142 }
143
144 @Override
145 public InetSocketAddress localAddress() {
146 return (InetSocketAddress) super.localAddress();
147 }
148
149 @Override
150 public InetSocketAddress remoteAddress() {
151 return (InetSocketAddress) super.remoteAddress();
152 }
153
154 @Override
155 public SctpServerChannel parent() {
156 return (SctpServerChannel) super.parent();
157 }
158
159 @Override
160 public ChannelMetadata metadata() {
161 return METADATA;
162 }
163
164 @Override
165 public SctpChannelConfig config() {
166 return config;
167 }
168
169 @Override
170 public boolean isOpen() {
171 return ch.isOpen();
172 }
173
174 @Override
175 protected int doReadMessages(List<Object> msgs) throws Exception {
176 if (!readSelector.isOpen()) {
177 return 0;
178 }
179
180 int readMessages = 0;
181
182 final int selectedKeys = readSelector.select(SO_TIMEOUT);
183 final boolean keysSelected = selectedKeys > 0;
184
185 if (!keysSelected) {
186 return readMessages;
187 }
188
189
190
191
192
193 readSelector.selectedKeys().clear();
194 final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
195 ByteBuf buffer = allocHandle.allocate(config().getAllocator());
196 boolean free = true;
197
198 try {
199 ByteBuffer data = buffer.nioBuffer(buffer.writerIndex(), buffer.writableBytes());
200 MessageInfo messageInfo = ch.receive(data, null, notificationHandler);
201 if (messageInfo == null) {
202 return readMessages;
203 }
204
205 data.flip();
206 allocHandle.lastBytesRead(data.remaining());
207 msgs.add(new SctpMessage(messageInfo,
208 buffer.writerIndex(buffer.writerIndex() + allocHandle.lastBytesRead())));
209 free = false;
210 ++readMessages;
211 } catch (Throwable cause) {
212 PlatformDependent.throwException(cause);
213 } finally {
214 if (free) {
215 buffer.release();
216 }
217 }
218 return readMessages;
219 }
220
221 @Override
222 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
223 if (!writeSelector.isOpen()) {
224 return;
225 }
226 final int size = in.size();
227 final int selectedKeys = writeSelector.select(SO_TIMEOUT);
228 if (selectedKeys > 0) {
229 final Set<SelectionKey> writableKeys = writeSelector.selectedKeys();
230 if (writableKeys.isEmpty()) {
231 return;
232 }
233 Iterator<SelectionKey> writableKeysIt = writableKeys.iterator();
234 int written = 0;
235 for (;;) {
236 if (written == size) {
237
238 return;
239 }
240 writableKeysIt.next();
241 writableKeysIt.remove();
242
243 SctpMessage packet = (SctpMessage) in.current();
244 if (packet == null) {
245 return;
246 }
247
248 ByteBuf data = packet.content();
249 int dataLen = data.readableBytes();
250 ByteBuffer nioData;
251
252 if (data.nioBufferCount() != -1) {
253 nioData = data.nioBuffer();
254 } else {
255 nioData = ByteBuffer.allocate(dataLen);
256 data.getBytes(data.readerIndex(), nioData);
257 nioData.flip();
258 }
259
260 final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier());
261 mi.payloadProtocolID(packet.protocolIdentifier());
262 mi.streamNumber(packet.streamIdentifier());
263 mi.unordered(packet.isUnordered());
264
265 ch.send(nioData, mi);
266 written ++;
267 in.remove();
268
269 if (!writableKeysIt.hasNext()) {
270 return;
271 }
272 }
273 }
274 }
275
276 @Override
277 protected Object filterOutboundMessage(Object msg) throws Exception {
278 if (msg instanceof SctpMessage) {
279 return msg;
280 }
281
282 throw new UnsupportedOperationException(
283 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPE);
284 }
285
286 @Override
287 public Association association() {
288 try {
289 return ch.association();
290 } catch (IOException ignored) {
291 return null;
292 }
293 }
294
295 @Override
296 public boolean isActive() {
297 return isOpen() && association() != null;
298 }
299
300 @Override
301 protected SocketAddress localAddress0() {
302 try {
303 Iterator<SocketAddress> i = ch.getAllLocalAddresses().iterator();
304 if (i.hasNext()) {
305 return i.next();
306 }
307 } catch (IOException e) {
308
309 }
310 return null;
311 }
312
313 @Override
314 public Set<InetSocketAddress> allLocalAddresses() {
315 try {
316 final Set<SocketAddress> allLocalAddresses = ch.getAllLocalAddresses();
317 final Set<InetSocketAddress> addresses = new LinkedHashSet<InetSocketAddress>(allLocalAddresses.size());
318 for (SocketAddress socketAddress : allLocalAddresses) {
319 addresses.add((InetSocketAddress) socketAddress);
320 }
321 return addresses;
322 } catch (Throwable ignored) {
323 return Collections.emptySet();
324 }
325 }
326
327 @Override
328 protected SocketAddress remoteAddress0() {
329 try {
330 Iterator<SocketAddress> i = ch.getRemoteAddresses().iterator();
331 if (i.hasNext()) {
332 return i.next();
333 }
334 } catch (IOException e) {
335
336 }
337 return null;
338 }
339
340 @Override
341 public Set<InetSocketAddress> allRemoteAddresses() {
342 try {
343 final Set<SocketAddress> allLocalAddresses = ch.getRemoteAddresses();
344 final Set<InetSocketAddress> addresses = new LinkedHashSet<InetSocketAddress>(allLocalAddresses.size());
345 for (SocketAddress socketAddress : allLocalAddresses) {
346 addresses.add((InetSocketAddress) socketAddress);
347 }
348 return addresses;
349 } catch (Throwable ignored) {
350 return Collections.emptySet();
351 }
352 }
353
354 @Override
355 protected void doBind(SocketAddress localAddress) throws Exception {
356 ch.bind(localAddress);
357 }
358
359 @Override
360 protected void doConnect(SocketAddress remoteAddress,
361 SocketAddress localAddress) throws Exception {
362 if (localAddress != null) {
363 ch.bind(localAddress);
364 }
365
366 boolean success = false;
367 try {
368 ch.connect(remoteAddress);
369 boolean finishConnect = false;
370 while (!finishConnect) {
371 if (connectSelector.select(SO_TIMEOUT) >= 0) {
372 final Set<SelectionKey> selectionKeys = connectSelector.selectedKeys();
373 for (SelectionKey key : selectionKeys) {
374 if (key.isConnectable()) {
375 selectionKeys.clear();
376 finishConnect = true;
377 break;
378 }
379 }
380 selectionKeys.clear();
381 }
382 }
383 success = ch.finishConnect();
384 } finally {
385 if (!success) {
386 doClose();
387 }
388 }
389 }
390
391 @Override
392 protected void doDisconnect() throws Exception {
393 doClose();
394 }
395
396 @Override
397 protected void doClose() throws Exception {
398 closeSelector("read", readSelector);
399 closeSelector("write", writeSelector);
400 closeSelector("connect", connectSelector);
401 ch.close();
402 }
403
404 private static void closeSelector(String selectorName, Selector selector) {
405 try {
406 selector.close();
407 } catch (IOException e) {
408 if (logger.isWarnEnabled()) {
409 logger.warn("Failed to close a " + selectorName + " selector.", e);
410 }
411 }
412 }
413
414 @Override
415 public ChannelFuture bindAddress(InetAddress localAddress) {
416 return bindAddress(localAddress, newPromise());
417 }
418
419 @Override
420 public ChannelFuture bindAddress(final InetAddress localAddress, final ChannelPromise promise) {
421 if (eventLoop().inEventLoop()) {
422 try {
423 ch.bindAddress(localAddress);
424 promise.setSuccess();
425 } catch (Throwable t) {
426 promise.setFailure(t);
427 }
428 } else {
429 eventLoop().execute(new Runnable() {
430 @Override
431 public void run() {
432 bindAddress(localAddress, promise);
433 }
434 });
435 }
436 return promise;
437 }
438
439 @Override
440 public ChannelFuture unbindAddress(InetAddress localAddress) {
441 return unbindAddress(localAddress, newPromise());
442 }
443
444 @Override
445 public ChannelFuture unbindAddress(final InetAddress localAddress, final ChannelPromise promise) {
446 if (eventLoop().inEventLoop()) {
447 try {
448 ch.unbindAddress(localAddress);
449 promise.setSuccess();
450 } catch (Throwable t) {
451 promise.setFailure(t);
452 }
453 } else {
454 eventLoop().execute(new Runnable() {
455 @Override
456 public void run() {
457 unbindAddress(localAddress, promise);
458 }
459 });
460 }
461 return promise;
462 }
463
464 private final class OioSctpChannelConfig extends DefaultSctpChannelConfig {
465 private OioSctpChannelConfig(OioSctpChannel channel, SctpChannel javaChannel) {
466 super(channel, javaChannel);
467 }
468
469 @Override
470 protected void autoReadCleared() {
471 clearReadPending();
472 }
473 }
474 }