1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.epoll;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.channel.AddressedEnvelope;
21 import io.netty.channel.ChannelMetadata;
22 import io.netty.channel.ChannelOutboundBuffer;
23 import io.netty.channel.ChannelPipeline;
24 import io.netty.channel.DefaultAddressedEnvelope;
25 import io.netty.channel.unix.DomainDatagramChannel;
26 import io.netty.channel.unix.DomainDatagramChannelConfig;
27 import io.netty.channel.unix.DomainDatagramPacket;
28 import io.netty.channel.unix.DomainDatagramSocketAddress;
29 import io.netty.channel.unix.DomainSocketAddress;
30 import io.netty.channel.unix.IovArray;
31 import io.netty.channel.unix.PeerCredentials;
32 import io.netty.channel.unix.UnixChannelUtil;
33 import io.netty.util.CharsetUtil;
34 import io.netty.util.UncheckedBooleanSupplier;
35 import io.netty.util.internal.StringUtil;
36 import io.netty.util.internal.UnstableApi;
37
38 import java.io.IOException;
39 import java.net.SocketAddress;
40 import java.nio.ByteBuffer;
41
42 import static io.netty.channel.epoll.LinuxSocket.newSocketDomainDgram;
43
44 @UnstableApi
45 public final class EpollDomainDatagramChannel extends AbstractEpollChannel implements DomainDatagramChannel {
46
47 private static final ChannelMetadata METADATA = new ChannelMetadata(true, 16);
48
49 private static final String EXPECTED_TYPES =
50 " (expected: " +
51 StringUtil.simpleClassName(DomainDatagramPacket.class) + ", " +
52 StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
53 StringUtil.simpleClassName(ByteBuf.class) + ", " +
54 StringUtil.simpleClassName(DomainSocketAddress.class) + ">, " +
55 StringUtil.simpleClassName(ByteBuf.class) + ')';
56
57 private volatile boolean connected;
58 private volatile DomainSocketAddress local;
59 private volatile DomainSocketAddress remote;
60
61 private final EpollDomainDatagramChannelConfig config;
62
63 public EpollDomainDatagramChannel() {
64 this(newSocketDomainDgram(), false);
65 }
66
67 public EpollDomainDatagramChannel(int fd) {
68 this(new LinuxSocket(fd), true);
69 }
70
71 private EpollDomainDatagramChannel(LinuxSocket socket, boolean active) {
72 super(null, socket, active);
73 config = new EpollDomainDatagramChannelConfig(this);
74 }
75
76 @Override
77 public EpollDomainDatagramChannelConfig config() {
78 return config;
79 }
80
81 @Override
82 protected void doBind(SocketAddress localAddress) throws Exception {
83 super.doBind(localAddress);
84 local = (DomainSocketAddress) localAddress;
85 active = true;
86 }
87
88 @Override
89 protected void doClose() throws Exception {
90 super.doClose();
91 connected = active = false;
92 local = null;
93 remote = null;
94 }
95
96 @Override
97 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
98 if (super.doConnect(remoteAddress, localAddress)) {
99 if (localAddress != null) {
100 local = (DomainSocketAddress) localAddress;
101 }
102 remote = (DomainSocketAddress) remoteAddress;
103 connected = true;
104 return true;
105 }
106 return false;
107 }
108
109 @Override
110 protected void doDisconnect() throws Exception {
111 doClose();
112 }
113
114 @Override
115 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
116 int maxMessagesPerWrite = maxMessagesPerWrite();
117 while (maxMessagesPerWrite > 0) {
118 Object msg = in.current();
119 if (msg == null) {
120 break;
121 }
122
123 try {
124 boolean done = false;
125 for (int i = config().getWriteSpinCount(); i > 0; --i) {
126 if (doWriteMessage(msg)) {
127 done = true;
128 break;
129 }
130 }
131
132 if (done) {
133 in.remove();
134 maxMessagesPerWrite--;
135 } else {
136 break;
137 }
138 } catch (IOException e) {
139 maxMessagesPerWrite--;
140
141
142
143
144 in.remove(e);
145 }
146 }
147
148 if (in.isEmpty()) {
149
150 clearFlag(Native.EPOLLOUT);
151 } else {
152
153 setFlag(Native.EPOLLOUT);
154 }
155 }
156
157 private boolean doWriteMessage(Object msg) throws Exception {
158 final ByteBuf data;
159 DomainSocketAddress remoteAddress;
160 if (msg instanceof AddressedEnvelope) {
161 @SuppressWarnings("unchecked")
162 AddressedEnvelope<ByteBuf, DomainSocketAddress> envelope =
163 (AddressedEnvelope<ByteBuf, DomainSocketAddress>) msg;
164 data = envelope.content();
165 remoteAddress = envelope.recipient();
166 } else {
167 data = (ByteBuf) msg;
168 remoteAddress = null;
169 }
170
171 final int dataLen = data.readableBytes();
172 if (dataLen == 0) {
173 return true;
174 }
175
176 final long writtenBytes;
177 if (data.hasMemoryAddress()) {
178 long memoryAddress = data.memoryAddress();
179 if (remoteAddress == null) {
180 writtenBytes = socket.sendAddress(memoryAddress, data.readerIndex(), data.writerIndex());
181 } else {
182 writtenBytes = socket.sendToAddressDomainSocket(memoryAddress, data.readerIndex(), data.writerIndex(),
183 remoteAddress.path().getBytes(CharsetUtil.UTF_8));
184 }
185 } else if (data.nioBufferCount() > 1) {
186 IovArray array = ((EpollEventLoop) eventLoop()).cleanIovArray();
187 array.add(data, data.readerIndex(), data.readableBytes());
188 int cnt = array.count();
189 assert cnt != 0;
190
191 if (remoteAddress == null) {
192 writtenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
193 } else {
194 writtenBytes = socket.sendToAddressesDomainSocket(array.memoryAddress(0), cnt,
195 remoteAddress.path().getBytes(CharsetUtil.UTF_8));
196 }
197 } else {
198 ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes());
199 if (remoteAddress == null) {
200 writtenBytes = socket.send(nioData, nioData.position(), nioData.limit());
201 } else {
202 writtenBytes = socket.sendToDomainSocket(nioData, nioData.position(), nioData.limit(),
203 remoteAddress.path().getBytes(CharsetUtil.UTF_8));
204 }
205 }
206
207 return writtenBytes > 0;
208 }
209
210 @Override
211 protected Object filterOutboundMessage(Object msg) {
212 if (msg instanceof DomainDatagramPacket) {
213 DomainDatagramPacket packet = (DomainDatagramPacket) msg;
214 ByteBuf content = packet.content();
215 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
216 new DomainDatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg;
217 }
218
219 if (msg instanceof ByteBuf) {
220 ByteBuf buf = (ByteBuf) msg;
221 return UnixChannelUtil.isBufferCopyNeededForWrite(buf) ? newDirectBuffer(buf) : buf;
222 }
223
224 if (msg instanceof AddressedEnvelope) {
225 @SuppressWarnings("unchecked")
226 AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
227 if (e.content() instanceof ByteBuf &&
228 (e.recipient() == null || e.recipient() instanceof DomainSocketAddress)) {
229
230 ByteBuf content = (ByteBuf) e.content();
231 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
232 new DefaultAddressedEnvelope<ByteBuf, DomainSocketAddress>(
233 newDirectBuffer(e, content), (DomainSocketAddress) e.recipient()) : e;
234 }
235 }
236
237 throw new UnsupportedOperationException(
238 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
239 }
240
241 @Override
242 public boolean isActive() {
243 return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || active);
244 }
245
246 @Override
247 public boolean isConnected() {
248 return connected;
249 }
250
251 @Override
252 public DomainSocketAddress localAddress() {
253 return (DomainSocketAddress) super.localAddress();
254 }
255
256 @Override
257 protected DomainSocketAddress localAddress0() {
258 return local;
259 }
260
261 @Override
262 public ChannelMetadata metadata() {
263 return METADATA;
264 }
265
266 @Override
267 protected AbstractEpollUnsafe newUnsafe() {
268 return new EpollDomainDatagramChannelUnsafe();
269 }
270
271
272
273
274
275 public PeerCredentials peerCredentials() throws IOException {
276 return socket.getPeerCredentials();
277 }
278
279 @Override
280 public DomainSocketAddress remoteAddress() {
281 return (DomainSocketAddress) super.remoteAddress();
282 }
283
284 @Override
285 protected DomainSocketAddress remoteAddress0() {
286 return remote;
287 }
288
289 final class EpollDomainDatagramChannelUnsafe extends AbstractEpollUnsafe {
290
291 @Override
292 void epollInReady() {
293 assert eventLoop().inEventLoop();
294 final DomainDatagramChannelConfig config = config();
295 if (shouldBreakEpollInReady(config)) {
296 clearEpollIn0();
297 return;
298 }
299 final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
300 allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));
301
302 final ChannelPipeline pipeline = pipeline();
303 final ByteBufAllocator allocator = config.getAllocator();
304 allocHandle.reset(config);
305 epollInBefore();
306
307 Throwable exception = null;
308 try {
309 ByteBuf byteBuf = null;
310 try {
311 boolean connected = isConnected();
312 do {
313 byteBuf = allocHandle.allocate(allocator);
314 allocHandle.attemptedBytesRead(byteBuf.writableBytes());
315
316 final DomainDatagramPacket packet;
317 if (connected) {
318 allocHandle.lastBytesRead(doReadBytes(byteBuf));
319 if (allocHandle.lastBytesRead() <= 0) {
320
321 byteBuf.release();
322 break;
323 }
324 packet = new DomainDatagramPacket(byteBuf, (DomainSocketAddress) localAddress(),
325 (DomainSocketAddress) remoteAddress());
326 } else {
327 final DomainDatagramSocketAddress remoteAddress;
328 if (byteBuf.hasMemoryAddress()) {
329
330 remoteAddress = socket.recvFromAddressDomainSocket(byteBuf.memoryAddress(),
331 byteBuf.writerIndex(), byteBuf.capacity());
332 } else {
333 ByteBuffer nioData = byteBuf.internalNioBuffer(
334 byteBuf.writerIndex(), byteBuf.writableBytes());
335 remoteAddress =
336 socket.recvFromDomainSocket(nioData, nioData.position(), nioData.limit());
337 }
338
339 if (remoteAddress == null) {
340 allocHandle.lastBytesRead(-1);
341 byteBuf.release();
342 break;
343 }
344 DomainSocketAddress localAddress = remoteAddress.localAddress();
345 if (localAddress == null) {
346 localAddress = (DomainSocketAddress) localAddress();
347 }
348 allocHandle.lastBytesRead(remoteAddress.receivedAmount());
349 byteBuf.writerIndex(byteBuf.writerIndex() + allocHandle.lastBytesRead());
350
351 packet = new DomainDatagramPacket(byteBuf, localAddress, remoteAddress);
352 }
353
354 allocHandle.incMessagesRead(1);
355
356 readPending = false;
357 pipeline.fireChannelRead(packet);
358
359 byteBuf = null;
360
361
362
363 } while (allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER));
364 } catch (Throwable t) {
365 if (byteBuf != null) {
366 byteBuf.release();
367 }
368 exception = t;
369 }
370
371 allocHandle.readComplete();
372 pipeline.fireChannelReadComplete();
373
374 if (exception != null) {
375 pipeline.fireExceptionCaught(exception);
376 }
377 } finally {
378 epollInFinally(config);
379 }
380 }
381 }
382 }