1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.epoll;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.channel.AddressedEnvelope;
21 import io.netty.channel.ChannelMetadata;
22 import io.netty.channel.ChannelOutboundBuffer;
23 import io.netty.channel.ChannelPipeline;
24 import io.netty.channel.DefaultAddressedEnvelope;
25 import io.netty.channel.unix.DomainDatagramChannel;
26 import io.netty.channel.unix.DomainDatagramChannelConfig;
27 import io.netty.channel.unix.DomainDatagramPacket;
28 import io.netty.channel.unix.DomainDatagramSocketAddress;
29 import io.netty.channel.unix.DomainSocketAddress;
30 import io.netty.channel.unix.IovArray;
31 import io.netty.channel.unix.PeerCredentials;
32 import io.netty.channel.unix.UnixChannelUtil;
33 import io.netty.util.CharsetUtil;
34 import io.netty.util.UncheckedBooleanSupplier;
35 import io.netty.util.internal.StringUtil;
36
37 import java.io.IOException;
38 import java.net.SocketAddress;
39 import java.nio.ByteBuffer;
40
41 import static io.netty.channel.epoll.LinuxSocket.newSocketDomainDgram;
42
43 public final class EpollDomainDatagramChannel extends AbstractEpollChannel implements DomainDatagramChannel {
44
45 private static final ChannelMetadata METADATA = new ChannelMetadata(true, 16);
46
47 private static final String EXPECTED_TYPES =
48 " (expected: " +
49 StringUtil.simpleClassName(DomainDatagramPacket.class) + ", " +
50 StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
51 StringUtil.simpleClassName(ByteBuf.class) + ", " +
52 StringUtil.simpleClassName(DomainSocketAddress.class) + ">, " +
53 StringUtil.simpleClassName(ByteBuf.class) + ')';
54
55 private volatile boolean connected;
56 private volatile DomainSocketAddress local;
57 private volatile DomainSocketAddress remote;
58
59 private final EpollDomainDatagramChannelConfig config;
60
61 public EpollDomainDatagramChannel() {
62 this(newSocketDomainDgram(), false);
63 }
64
65 public EpollDomainDatagramChannel(int fd) {
66 this(new LinuxSocket(fd), true);
67 }
68
69 private EpollDomainDatagramChannel(LinuxSocket socket, boolean active) {
70 super(null, socket, active, EpollIoOps.valueOf(0));
71 config = new EpollDomainDatagramChannelConfig(this);
72 }
73
74 @Override
75 public EpollDomainDatagramChannelConfig config() {
76 return config;
77 }
78
79 @Override
80 protected void doBind(SocketAddress localAddress) throws Exception {
81 super.doBind(localAddress);
82 local = (DomainSocketAddress) localAddress;
83 active = true;
84 }
85
86 @Override
87 protected void doClose() throws Exception {
88 super.doClose();
89 connected = active = false;
90 local = null;
91 remote = null;
92 }
93
94 @Override
95 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
96 if (super.doConnect(remoteAddress, localAddress)) {
97 if (localAddress != null) {
98 local = (DomainSocketAddress) localAddress;
99 }
100 remote = (DomainSocketAddress) remoteAddress;
101 connected = true;
102 return true;
103 }
104 return false;
105 }
106
107 @Override
108 protected void doDisconnect() throws Exception {
109 doClose();
110 }
111
112 @Override
113 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
114 int maxMessagesPerWrite = maxMessagesPerWrite();
115 while (maxMessagesPerWrite > 0) {
116 Object msg = in.current();
117 if (msg == null) {
118 break;
119 }
120
121 try {
122 boolean done = false;
123 for (int i = config().getWriteSpinCount(); i > 0; --i) {
124 if (doWriteMessage(msg)) {
125 done = true;
126 break;
127 }
128 }
129
130 if (done) {
131 in.remove();
132 maxMessagesPerWrite--;
133 } else {
134 break;
135 }
136 } catch (IOException e) {
137 maxMessagesPerWrite--;
138
139
140
141
142 in.remove(e);
143 }
144 }
145
146 if (in.isEmpty()) {
147
148 clearFlag(Native.EPOLLOUT);
149 } else {
150
151 setFlag(Native.EPOLLOUT);
152 }
153 }
154
155 private boolean doWriteMessage(Object msg) throws Exception {
156 final ByteBuf data;
157 DomainSocketAddress remoteAddress;
158 if (msg instanceof AddressedEnvelope) {
159 @SuppressWarnings("unchecked")
160 AddressedEnvelope<ByteBuf, DomainSocketAddress> envelope =
161 (AddressedEnvelope<ByteBuf, DomainSocketAddress>) msg;
162 data = envelope.content();
163 remoteAddress = envelope.recipient();
164 } else {
165 data = (ByteBuf) msg;
166 remoteAddress = null;
167 }
168
169 final int dataLen = data.readableBytes();
170 if (dataLen == 0) {
171 return true;
172 }
173
174 final long writtenBytes;
175 if (data.hasMemoryAddress()) {
176 long memoryAddress = data.memoryAddress();
177 if (remoteAddress == null) {
178 writtenBytes = socket.sendAddress(memoryAddress, data.readerIndex(), data.writerIndex());
179 } else {
180 writtenBytes = socket.sendToAddressDomainSocket(memoryAddress, data.readerIndex(), data.writerIndex(),
181 remoteAddress.path().getBytes(CharsetUtil.UTF_8));
182 }
183 } else if (data.nioBufferCount() > 1) {
184 IovArray array = registration().ioHandler().cleanIovArray();
185 array.add(data, data.readerIndex(), data.readableBytes());
186 int cnt = array.count();
187 assert cnt != 0;
188
189 if (remoteAddress == null) {
190 writtenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
191 } else {
192 writtenBytes = socket.sendToAddressesDomainSocket(array.memoryAddress(0), cnt,
193 remoteAddress.path().getBytes(CharsetUtil.UTF_8));
194 }
195 } else {
196 ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes());
197 if (remoteAddress == null) {
198 writtenBytes = socket.send(nioData, nioData.position(), nioData.limit());
199 } else {
200 writtenBytes = socket.sendToDomainSocket(nioData, nioData.position(), nioData.limit(),
201 remoteAddress.path().getBytes(CharsetUtil.UTF_8));
202 }
203 }
204
205 return writtenBytes > 0;
206 }
207
208 @Override
209 protected Object filterOutboundMessage(Object msg) {
210 if (msg instanceof DomainDatagramPacket) {
211 DomainDatagramPacket packet = (DomainDatagramPacket) msg;
212 ByteBuf content = packet.content();
213 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
214 new DomainDatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg;
215 }
216
217 if (msg instanceof ByteBuf) {
218 ByteBuf buf = (ByteBuf) msg;
219 return UnixChannelUtil.isBufferCopyNeededForWrite(buf) ? newDirectBuffer(buf) : buf;
220 }
221
222 if (msg instanceof AddressedEnvelope) {
223 @SuppressWarnings("unchecked")
224 AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
225 if (e.content() instanceof ByteBuf &&
226 (e.recipient() == null || e.recipient() instanceof DomainSocketAddress)) {
227
228 ByteBuf content = (ByteBuf) e.content();
229 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
230 new DefaultAddressedEnvelope<ByteBuf, DomainSocketAddress>(
231 newDirectBuffer(e, content), (DomainSocketAddress) e.recipient()) : e;
232 }
233 }
234
235 throw new UnsupportedOperationException(
236 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
237 }
238
239 @Override
240 public boolean isActive() {
241 return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || active);
242 }
243
244 @Override
245 public boolean isConnected() {
246 return connected;
247 }
248
249 @Override
250 public DomainSocketAddress localAddress() {
251 return (DomainSocketAddress) super.localAddress();
252 }
253
254 @Override
255 protected DomainSocketAddress localAddress0() {
256 return local;
257 }
258
259 @Override
260 public ChannelMetadata metadata() {
261 return METADATA;
262 }
263
264 @Override
265 protected AbstractEpollUnsafe newUnsafe() {
266 return new EpollDomainDatagramChannelUnsafe();
267 }
268
269
270
271
272
273 public PeerCredentials peerCredentials() throws IOException {
274 return socket.getPeerCredentials();
275 }
276
277 @Override
278 public DomainSocketAddress remoteAddress() {
279 return (DomainSocketAddress) super.remoteAddress();
280 }
281
282 @Override
283 protected DomainSocketAddress remoteAddress0() {
284 return remote;
285 }
286
287 final class EpollDomainDatagramChannelUnsafe extends AbstractEpollUnsafe {
288
289 @Override
290 void epollInReady() {
291 assert eventLoop().inEventLoop();
292 final DomainDatagramChannelConfig config = config();
293 if (shouldBreakEpollInReady(config)) {
294 clearEpollIn0();
295 return;
296 }
297 final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
298 final ChannelPipeline pipeline = pipeline();
299 final ByteBufAllocator allocator = config.getAllocator();
300 allocHandle.reset(config);
301
302 Throwable exception = null;
303 try {
304 ByteBuf byteBuf = null;
305 try {
306 boolean connected = isConnected();
307 do {
308 byteBuf = allocHandle.allocate(allocator);
309 allocHandle.attemptedBytesRead(byteBuf.writableBytes());
310
311 final DomainDatagramPacket packet;
312 if (connected) {
313 allocHandle.lastBytesRead(doReadBytes(byteBuf));
314 if (allocHandle.lastBytesRead() <= 0) {
315
316 byteBuf.release();
317 break;
318 }
319 packet = new DomainDatagramPacket(byteBuf, (DomainSocketAddress) localAddress(),
320 (DomainSocketAddress) remoteAddress());
321 } else {
322 final DomainDatagramSocketAddress remoteAddress;
323 if (byteBuf.hasMemoryAddress()) {
324
325 remoteAddress = socket.recvFromAddressDomainSocket(byteBuf.memoryAddress(),
326 byteBuf.writerIndex(), byteBuf.capacity());
327 } else {
328 ByteBuffer nioData = byteBuf.internalNioBuffer(
329 byteBuf.writerIndex(), byteBuf.writableBytes());
330 remoteAddress =
331 socket.recvFromDomainSocket(nioData, nioData.position(), nioData.limit());
332 }
333
334 if (remoteAddress == null) {
335 allocHandle.lastBytesRead(-1);
336 byteBuf.release();
337 break;
338 }
339 DomainSocketAddress localAddress = remoteAddress.localAddress();
340 if (localAddress == null) {
341 localAddress = (DomainSocketAddress) localAddress();
342 }
343 allocHandle.lastBytesRead(remoteAddress.receivedAmount());
344 byteBuf.writerIndex(byteBuf.writerIndex() + allocHandle.lastBytesRead());
345
346 packet = new DomainDatagramPacket(byteBuf, localAddress, remoteAddress);
347 }
348
349 allocHandle.incMessagesRead(1);
350
351 readPending = false;
352 pipeline.fireChannelRead(packet);
353
354 byteBuf = null;
355
356
357
358 } while (allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER));
359 } catch (Throwable t) {
360 if (byteBuf != null) {
361 byteBuf.release();
362 }
363 exception = t;
364 }
365
366 allocHandle.readComplete();
367 pipeline.fireChannelReadComplete();
368
369 if (exception != null) {
370 pipeline.fireExceptionCaught(exception);
371 }
372 } finally {
373 if (shouldStopReading(config)) {
374 clearEpollIn();
375 }
376 }
377 }
378 }
379 }