1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.epoll;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.ChannelConfig;
20 import io.netty.channel.ChannelOutboundBuffer;
21 import io.netty.channel.ChannelPipeline;
22 import io.netty.channel.unix.DomainSocketAddress;
23 import io.netty.channel.unix.DomainSocketChannel;
24 import io.netty.channel.unix.FileDescriptor;
25 import io.netty.channel.unix.PeerCredentials;
26 import io.netty.channel.unix.Socket;
27
28 import java.net.SocketAddress;
29
30 import static io.netty.channel.unix.Socket.newSocketDomain;
31 import java.io.IOException;
32
33 public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel implements DomainSocketChannel {
34 private final EpollDomainSocketChannelConfig config = new EpollDomainSocketChannelConfig(this);
35
36 private volatile DomainSocketAddress local;
37 private volatile DomainSocketAddress remote;
38
39 public EpollDomainSocketChannel() {
40 super(newSocketDomain(), false);
41 }
42
43
44
45
46 @Deprecated
47 public EpollDomainSocketChannel(Channel parent, FileDescriptor fd) {
48 super(parent, new Socket(fd.intValue()));
49 }
50
51
52
53
54
55
56 @Deprecated
57 public EpollDomainSocketChannel(FileDescriptor fd) {
58 super(fd);
59 }
60
61 public EpollDomainSocketChannel(Channel parent, Socket fd) {
62 super(parent, fd);
63 }
64
65
66
67
68 public EpollDomainSocketChannel(Socket fd, boolean active) {
69 super(fd, active);
70 }
71
72 @Override
73 protected AbstractEpollUnsafe newUnsafe() {
74 return new EpollDomainUnsafe();
75 }
76
77 @Override
78 protected DomainSocketAddress localAddress0() {
79 return local;
80 }
81
82 @Override
83 protected DomainSocketAddress remoteAddress0() {
84 return remote;
85 }
86
87 @Override
88 protected void doBind(SocketAddress localAddress) throws Exception {
89 fd().bind(localAddress);
90 local = (DomainSocketAddress) localAddress;
91 }
92
93 @Override
94 public EpollDomainSocketChannelConfig config() {
95 return config;
96 }
97
98 @Override
99 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
100 if (super.doConnect(remoteAddress, localAddress)) {
101 local = (DomainSocketAddress) localAddress;
102 remote = (DomainSocketAddress) remoteAddress;
103 return true;
104 }
105 return false;
106 }
107
108 @Override
109 public DomainSocketAddress remoteAddress() {
110 return (DomainSocketAddress) super.remoteAddress();
111 }
112
113 @Override
114 public DomainSocketAddress localAddress() {
115 return (DomainSocketAddress) super.localAddress();
116 }
117
118 @Override
119 protected boolean doWriteSingle(ChannelOutboundBuffer in, int writeSpinCount) throws Exception {
120 Object msg = in.current();
121 if (msg instanceof FileDescriptor && Native.sendFd(fd().intValue(), ((FileDescriptor) msg).intValue()) > 0) {
122
123 in.remove();
124 return true;
125 }
126 return super.doWriteSingle(in, writeSpinCount);
127 }
128
129 @Override
130 protected Object filterOutboundMessage(Object msg) {
131 if (msg instanceof FileDescriptor) {
132 return msg;
133 }
134 return super.filterOutboundMessage(msg);
135 }
136
137
138
139
140
141 public PeerCredentials peerCredentials() throws IOException {
142 return fd().getPeerCredentials();
143 }
144
145 private final class EpollDomainUnsafe extends EpollStreamUnsafe {
146 @Override
147 void epollInReady() {
148 switch (config().getReadMode()) {
149 case BYTES:
150 super.epollInReady();
151 break;
152 case FILE_DESCRIPTORS:
153 epollInReadFd();
154 break;
155 default:
156 throw new Error();
157 }
158 }
159
160 private void epollInReadFd() {
161 if (fd().isInputShutdown()) {
162 return;
163 }
164 boolean edgeTriggered = isFlagSet(Native.EPOLLET);
165 final ChannelConfig config = config();
166 if (!readPending && !edgeTriggered && !config.isAutoRead()) {
167
168 clearEpollIn0();
169 return;
170 }
171
172 final ChannelPipeline pipeline = pipeline();
173
174 try {
175
176 final int maxMessagesPerRead = edgeTriggered
177 ? Integer.MAX_VALUE : config.getMaxMessagesPerRead();
178 int messages = 0;
179 do {
180 int socketFd = Native.recvFd(fd().intValue());
181 if (socketFd == 0) {
182 break;
183 }
184 if (socketFd == -1) {
185 close(voidPromise());
186 return;
187 }
188 readPending = false;
189
190 try {
191 pipeline.fireChannelRead(new FileDescriptor(socketFd));
192 } catch (Throwable t) {
193
194 pipeline.fireChannelReadComplete();
195 pipeline.fireExceptionCaught(t);
196 } finally {
197 if (!edgeTriggered && !config.isAutoRead()) {
198
199
200
201 break;
202 }
203 }
204 } while (++ messages < maxMessagesPerRead || isRdHup());
205
206 pipeline.fireChannelReadComplete();
207
208 } catch (Throwable t) {
209 pipeline.fireChannelReadComplete();
210 pipeline.fireExceptionCaught(t);
211
212
213 eventLoop().execute(new Runnable() {
214 @Override
215 public void run() {
216 epollInReady();
217 }
218 });
219 } finally {
220
221
222
223
224
225
226 if (!readPending && !config.isAutoRead()) {
227 clearEpollIn0();
228 }
229 }
230 }
231 }
232 }