1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.udt.nio;
17
18 import com.barchart.udt.TypeUDT;
19 import com.barchart.udt.nio.SocketChannelUDT;
20 import io.netty.buffer.ByteBuf;
21 import io.netty.channel.Channel;
22 import io.netty.channel.ChannelException;
23 import io.netty.channel.ChannelMetadata;
24 import io.netty.channel.ChannelOutboundBuffer;
25 import io.netty.channel.nio.NioIoOps;
26 import io.netty.channel.nio.NioIoRegistration;
27 import io.netty.util.internal.SocketUtils;
28 import io.netty.channel.nio.AbstractNioMessageChannel;
29 import io.netty.channel.udt.DefaultUdtChannelConfig;
30 import io.netty.channel.udt.UdtChannel;
31 import io.netty.channel.udt.UdtChannelConfig;
32 import io.netty.channel.udt.UdtMessage;
33 import io.netty.util.internal.logging.InternalLogger;
34 import io.netty.util.internal.logging.InternalLoggerFactory;
35
36 import java.io.IOException;
37 import java.net.InetSocketAddress;
38 import java.net.SocketAddress;
39 import java.security.AccessController;
40 import java.security.PrivilegedActionException;
41 import java.security.PrivilegedExceptionAction;
42 import java.util.List;
43
44
45
46
47
48
49
50
51 @Deprecated
52 public class NioUdtMessageConnectorChannel extends AbstractNioMessageChannel implements UdtChannel {
53
54 private static final InternalLogger logger =
55 InternalLoggerFactory.getInstance(NioUdtMessageConnectorChannel.class);
56
57 private static final ChannelMetadata METADATA = new ChannelMetadata(false);
58
59 private final UdtChannelConfig config;
60
61 public NioUdtMessageConnectorChannel() {
62 this(TypeUDT.DATAGRAM);
63 }
64
65 public NioUdtMessageConnectorChannel(final Channel parent, final SocketChannelUDT channelUDT) {
66 super(parent, channelUDT, NioIoOps.READ);
67 try {
68 channelUDT.configureBlocking(false);
69 switch (channelUDT.socketUDT().status()) {
70 case INIT:
71 case OPENED:
72 config = new DefaultUdtChannelConfig(this, channelUDT, true);
73 break;
74 default:
75 config = new DefaultUdtChannelConfig(this, channelUDT, false);
76 break;
77 }
78 } catch (final Exception e) {
79 try {
80 channelUDT.close();
81 } catch (final Exception e2) {
82 logger.warn("Failed to close channel.", e2);
83 }
84 throw new ChannelException("Failed to configure channel.", e);
85 }
86 }
87
88 public NioUdtMessageConnectorChannel(final SocketChannelUDT channelUDT) {
89 this(null, channelUDT);
90 }
91
92 public NioUdtMessageConnectorChannel(final TypeUDT type) {
93 this(NioUdtProvider.newConnectorChannelUDT(type));
94 }
95
96 @Override
97 public UdtChannelConfig config() {
98 return config;
99 }
100
101 @Override
102 protected void doBind(final SocketAddress localAddress) throws Exception {
103 privilegedBind(javaChannel(), localAddress);
104 }
105
106 @Override
107 protected void doClose() throws Exception {
108 javaChannel().close();
109 }
110
111 @Override
112 protected boolean doConnect(final SocketAddress remoteAddress,
113 final SocketAddress localAddress) throws Exception {
114 doBind(localAddress != null? localAddress : new InetSocketAddress(0));
115 boolean success = false;
116 try {
117 final boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
118 if (!connected) {
119 addAndSubmit(NioIoOps.CONNECT);
120 }
121 success = true;
122 return connected;
123 } finally {
124 if (!success) {
125 doClose();
126 }
127 }
128 }
129
130 @Override
131 protected void doDisconnect() throws Exception {
132 doClose();
133 }
134
135 @Override
136 protected void doFinishConnect() throws Exception {
137 if (javaChannel().finishConnect()) {
138 NioIoRegistration registration = registration();
139 removeAndSubmit(NioIoOps.CONNECT);
140 } else {
141 throw new Error(
142 "Provider error: failed to finish connect. Provider library should be upgraded.");
143 }
144 }
145
146 @Override
147 protected int doReadMessages(List<Object> buf) throws Exception {
148
149 final int maximumMessageSize = config.getReceiveBufferSize();
150
151 final ByteBuf byteBuf = config.getAllocator().directBuffer(
152 maximumMessageSize);
153
154 final int receivedMessageSize = byteBuf.writeBytes(javaChannel(),
155 maximumMessageSize);
156
157 if (receivedMessageSize <= 0) {
158 byteBuf.release();
159 return 0;
160 }
161
162 if (receivedMessageSize >= maximumMessageSize) {
163 javaChannel().close();
164 throw new ChannelException(
165 "Invalid config : increase receive buffer size to avoid message truncation");
166 }
167
168
169 buf.add(new UdtMessage(byteBuf));
170
171 return 1;
172 }
173
174 @Override
175 protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
176
177 final UdtMessage message = (UdtMessage) msg;
178
179 final ByteBuf byteBuf = message.content();
180
181 final int messageSize = byteBuf.readableBytes();
182 if (messageSize == 0) {
183 return true;
184 }
185
186 final long writtenBytes;
187 if (byteBuf.nioBufferCount() == 1) {
188 writtenBytes = javaChannel().write(byteBuf.nioBuffer());
189 } else {
190 writtenBytes = javaChannel().write(byteBuf.nioBuffers());
191 }
192
193
194 if (writtenBytes > 0 && writtenBytes != messageSize) {
195 throw new Error(
196 "Provider error: failed to write message. Provider library should be upgraded.");
197 }
198
199 return writtenBytes > 0;
200 }
201
202 @Override
203 public boolean isActive() {
204 final SocketChannelUDT channelUDT = javaChannel();
205 return channelUDT.isOpen() && channelUDT.isConnectFinished();
206 }
207
208 @Override
209 protected SocketChannelUDT javaChannel() {
210 return (SocketChannelUDT) super.javaChannel();
211 }
212
213 @Override
214 protected SocketAddress localAddress0() {
215 return javaChannel().socket().getLocalSocketAddress();
216 }
217
218 @Override
219 public ChannelMetadata metadata() {
220 return METADATA;
221 }
222
223 @Override
224 protected SocketAddress remoteAddress0() {
225 return javaChannel().socket().getRemoteSocketAddress();
226 }
227
228 @Override
229 public InetSocketAddress localAddress() {
230 return (InetSocketAddress) super.localAddress();
231 }
232
233 @Override
234 public InetSocketAddress remoteAddress() {
235 return (InetSocketAddress) super.remoteAddress();
236 }
237
238 private static void privilegedBind(final SocketChannelUDT socketChannel, final SocketAddress localAddress)
239 throws IOException {
240 try {
241 AccessController.doPrivileged(new PrivilegedExceptionAction<Void>() {
242 @Override
243 public Void run() throws IOException {
244 socketChannel.bind(localAddress);
245 return null;
246 }
247 });
248 } catch (PrivilegedActionException e) {
249 throw (IOException) e.getCause();
250 }
251 }
252 }