1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.epoll;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.ChannelConfig;
20 import io.netty.channel.ChannelOutboundBuffer;
21 import io.netty.channel.ChannelPipeline;
22 import io.netty.channel.unix.DomainSocketAddress;
23 import io.netty.channel.unix.DomainSocketChannel;
24 import io.netty.channel.unix.FileDescriptor;
25 import io.netty.channel.unix.PeerCredentials;
26
27 import java.io.IOException;
28 import java.net.SocketAddress;
29
30 import static io.netty.channel.epoll.LinuxSocket.newSocketDomain;
31
32 public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel implements DomainSocketChannel {
33 private final EpollDomainSocketChannelConfig config = new EpollDomainSocketChannelConfig(this);
34
35 private volatile DomainSocketAddress local;
36 private volatile DomainSocketAddress remote;
37
38 public EpollDomainSocketChannel() {
39 super(newSocketDomain(), false);
40 }
41
42 EpollDomainSocketChannel(Channel parent, FileDescriptor fd) {
43 this(parent, new LinuxSocket(fd.intValue()));
44 }
45
46 public EpollDomainSocketChannel(int fd) {
47 super(fd);
48 }
49
50 public EpollDomainSocketChannel(Channel parent, LinuxSocket fd) {
51 super(parent, fd);
52 local = fd.localDomainSocketAddress();
53 remote = fd.remoteDomainSocketAddress();
54 }
55
56 public EpollDomainSocketChannel(int fd, boolean active) {
57 super(new LinuxSocket(fd), active);
58 }
59
60 @Override
61 protected AbstractEpollUnsafe newUnsafe() {
62 return new EpollDomainUnsafe();
63 }
64
65 @Override
66 protected DomainSocketAddress localAddress0() {
67 return local;
68 }
69
70 @Override
71 protected DomainSocketAddress remoteAddress0() {
72 return remote;
73 }
74
75 @Override
76 protected void doBind(SocketAddress localAddress) throws Exception {
77 socket.bind(localAddress);
78 local = (DomainSocketAddress) localAddress;
79 }
80
81 @Override
82 public EpollDomainSocketChannelConfig config() {
83 return config;
84 }
85
86 @Override
87 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
88 if (super.doConnect(remoteAddress, localAddress)) {
89 local = localAddress != null ? (DomainSocketAddress) localAddress : socket.localDomainSocketAddress();
90 remote = (DomainSocketAddress) remoteAddress;
91 return true;
92 }
93 return false;
94 }
95
96 @Override
97 public DomainSocketAddress remoteAddress() {
98 return (DomainSocketAddress) super.remoteAddress();
99 }
100
101 @Override
102 public DomainSocketAddress localAddress() {
103 return (DomainSocketAddress) super.localAddress();
104 }
105
106 @Override
107 protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
108 Object msg = in.current();
109 if (msg instanceof FileDescriptor && socket.sendFd(((FileDescriptor) msg).intValue()) > 0) {
110
111 in.remove();
112 return 1;
113 }
114 return super.doWriteSingle(in);
115 }
116
117 @Override
118 protected Object filterOutboundMessage(Object msg) {
119 if (msg instanceof FileDescriptor) {
120 return msg;
121 }
122 return super.filterOutboundMessage(msg);
123 }
124
125
126
127
128
129 public PeerCredentials peerCredentials() throws IOException {
130 return socket.getPeerCredentials();
131 }
132
133 private final class EpollDomainUnsafe extends EpollStreamUnsafe {
134 @Override
135 void epollInReady() {
136 switch (config().getReadMode()) {
137 case BYTES:
138 super.epollInReady();
139 break;
140 case FILE_DESCRIPTORS:
141 epollInReadFd();
142 break;
143 default:
144 throw new Error();
145 }
146 }
147
148 private void epollInReadFd() {
149 if (socket.isInputShutdown()) {
150 clearEpollIn0();
151 return;
152 }
153 final ChannelConfig config = config();
154 final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
155 allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));
156
157 final ChannelPipeline pipeline = pipeline();
158 allocHandle.reset(config);
159 epollInBefore();
160
161 try {
162 readLoop: do {
163
164
165
166 allocHandle.lastBytesRead(socket.recvFd());
167 switch(allocHandle.lastBytesRead()) {
168 case 0:
169 break readLoop;
170 case -1:
171 close(voidPromise());
172 return;
173 default:
174 allocHandle.incMessagesRead(1);
175 readPending = false;
176 pipeline.fireChannelRead(new FileDescriptor(allocHandle.lastBytesRead()));
177 break;
178 }
179 } while (allocHandle.continueReading());
180
181 allocHandle.readComplete();
182 pipeline.fireChannelReadComplete();
183 } catch (Throwable t) {
184 allocHandle.readComplete();
185 pipeline.fireChannelReadComplete();
186 pipeline.fireExceptionCaught(t);
187 } finally {
188 epollInFinally(config);
189 }
190 }
191 }
192 }