1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.udt.nio;
17
18 import com.barchart.udt.TypeUDT;
19 import com.barchart.udt.nio.SocketChannelUDT;
20 import io.netty.buffer.ByteBuf;
21 import io.netty.channel.Channel;
22 import io.netty.channel.ChannelException;
23 import io.netty.channel.ChannelMetadata;
24 import io.netty.channel.ChannelOutboundBuffer;
25 import io.netty.util.internal.SocketUtils;
26 import io.netty.channel.nio.AbstractNioMessageChannel;
27 import io.netty.channel.udt.DefaultUdtChannelConfig;
28 import io.netty.channel.udt.UdtChannel;
29 import io.netty.channel.udt.UdtChannelConfig;
30 import io.netty.channel.udt.UdtMessage;
31 import io.netty.util.internal.StringUtil;
32 import io.netty.util.internal.logging.InternalLogger;
33 import io.netty.util.internal.logging.InternalLoggerFactory;
34
35 import java.io.IOException;
36 import java.net.InetSocketAddress;
37 import java.net.SocketAddress;
38 import java.security.AccessController;
39 import java.security.PrivilegedActionException;
40 import java.security.PrivilegedExceptionAction;
41 import java.util.List;
42
43 import static java.nio.channels.SelectionKey.*;
44
45
46
47
48
49
50
51
52 @Deprecated
53 public class NioUdtMessageConnectorChannel extends AbstractNioMessageChannel implements UdtChannel {
54
55 private static final InternalLogger logger =
56 InternalLoggerFactory.getInstance(NioUdtMessageConnectorChannel.class);
57
58 private static final ChannelMetadata METADATA = new ChannelMetadata(false);
59 private static final String EXPECTED_TYPE = " (expected: " + StringUtil.simpleClassName(UdtMessage.class) + ')';
60
61 private final UdtChannelConfig config;
62
63 public NioUdtMessageConnectorChannel() {
64 this(TypeUDT.DATAGRAM);
65 }
66
67 public NioUdtMessageConnectorChannel(final Channel parent, final SocketChannelUDT channelUDT) {
68 super(parent, channelUDT, OP_READ);
69 try {
70 channelUDT.configureBlocking(false);
71 switch (channelUDT.socketUDT().status()) {
72 case INIT:
73 case OPENED:
74 config = new DefaultUdtChannelConfig(this, channelUDT, true);
75 break;
76 default:
77 config = new DefaultUdtChannelConfig(this, channelUDT, false);
78 break;
79 }
80 } catch (final Exception e) {
81 try {
82 channelUDT.close();
83 } catch (final Exception e2) {
84 if (logger.isWarnEnabled()) {
85 logger.warn("Failed to close channel.", e2);
86 }
87 }
88 throw new ChannelException("Failed to configure channel.", e);
89 }
90 }
91
92 public NioUdtMessageConnectorChannel(final SocketChannelUDT channelUDT) {
93 this(null, channelUDT);
94 }
95
96 public NioUdtMessageConnectorChannel(final TypeUDT type) {
97 this(NioUdtProvider.newConnectorChannelUDT(type));
98 }
99
100 @Override
101 public UdtChannelConfig config() {
102 return config;
103 }
104
105 @Override
106 protected void doBind(final SocketAddress localAddress) throws Exception {
107 privilegedBind(javaChannel(), localAddress);
108 }
109
110 @Override
111 protected void doClose() throws Exception {
112 javaChannel().close();
113 }
114
115 @Override
116 protected boolean doConnect(final SocketAddress remoteAddress,
117 final SocketAddress localAddress) throws Exception {
118 doBind(localAddress != null? localAddress : new InetSocketAddress(0));
119 boolean success = false;
120 try {
121 final boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
122 if (!connected) {
123 selectionKey().interestOps(
124 selectionKey().interestOps() | OP_CONNECT);
125 }
126 success = true;
127 return connected;
128 } finally {
129 if (!success) {
130 doClose();
131 }
132 }
133 }
134
135 @Override
136 protected void doDisconnect() throws Exception {
137 doClose();
138 }
139
140 @Override
141 protected void doFinishConnect() throws Exception {
142 if (javaChannel().finishConnect()) {
143 selectionKey().interestOps(
144 selectionKey().interestOps() & ~OP_CONNECT);
145 } else {
146 throw new Error(
147 "Provider error: failed to finish connect. Provider library should be upgraded.");
148 }
149 }
150
151 @Override
152 protected int doReadMessages(List<Object> buf) throws Exception {
153
154 final int maximumMessageSize = config.getReceiveBufferSize();
155
156 final ByteBuf byteBuf = config.getAllocator().directBuffer(
157 maximumMessageSize);
158
159 final int receivedMessageSize = byteBuf.writeBytes(javaChannel(),
160 maximumMessageSize);
161
162 if (receivedMessageSize <= 0) {
163 byteBuf.release();
164 return 0;
165 }
166
167 if (receivedMessageSize >= maximumMessageSize) {
168 javaChannel().close();
169 throw new ChannelException(
170 "Invalid config : increase receive buffer size to avoid message truncation");
171 }
172
173
174 buf.add(new UdtMessage(byteBuf));
175
176 return 1;
177 }
178
179 @Override
180 protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
181
182 final UdtMessage message = (UdtMessage) msg;
183
184 final ByteBuf byteBuf = message.content();
185
186 final int messageSize = byteBuf.readableBytes();
187 if (messageSize == 0) {
188 return true;
189 }
190
191 final long writtenBytes;
192 if (byteBuf.nioBufferCount() == 1) {
193 writtenBytes = javaChannel().write(byteBuf.nioBuffer());
194 } else {
195 writtenBytes = javaChannel().write(byteBuf.nioBuffers());
196 }
197
198
199 if (writtenBytes > 0 && writtenBytes != messageSize) {
200 throw new Error(
201 "Provider error: failed to write message. Provider library should be upgraded.");
202 }
203
204 return writtenBytes > 0;
205 }
206
207 @Override
208 protected final Object filterOutboundMessage(Object msg) throws Exception {
209 if (msg instanceof UdtMessage) {
210 return msg;
211 }
212
213 throw new UnsupportedOperationException(
214 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPE);
215 }
216
217 @Override
218 public boolean isActive() {
219 final SocketChannelUDT channelUDT = javaChannel();
220 return channelUDT.isOpen() && channelUDT.isConnectFinished();
221 }
222
223 @Override
224 protected SocketChannelUDT javaChannel() {
225 return (SocketChannelUDT) super.javaChannel();
226 }
227
228 @Override
229 protected SocketAddress localAddress0() {
230 return javaChannel().socket().getLocalSocketAddress();
231 }
232
233 @Override
234 public ChannelMetadata metadata() {
235 return METADATA;
236 }
237
238 @Override
239 protected SocketAddress remoteAddress0() {
240 return javaChannel().socket().getRemoteSocketAddress();
241 }
242
243 @Override
244 public InetSocketAddress localAddress() {
245 return (InetSocketAddress) super.localAddress();
246 }
247
248 @Override
249 public InetSocketAddress remoteAddress() {
250 return (InetSocketAddress) super.remoteAddress();
251 }
252
253 private static void privilegedBind(final SocketChannelUDT socketChannel, final SocketAddress localAddress)
254 throws IOException {
255 try {
256 AccessController.doPrivileged(new PrivilegedExceptionAction<Void>() {
257 @Override
258 public Void run() throws IOException {
259 socketChannel.bind(localAddress);
260 return null;
261 }
262 });
263 } catch (PrivilegedActionException e) {
264 throw (IOException) e.getCause();
265 }
266 }
267 }