1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.nio;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.ByteBufUtil;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.AbstractChannel;
23 import io.netty.channel.Channel;
24 import io.netty.channel.ChannelException;
25 import io.netty.channel.ChannelFuture;
26 import io.netty.channel.ChannelFutureListener;
27 import io.netty.channel.ChannelPromise;
28 import io.netty.channel.ConnectTimeoutException;
29 import io.netty.channel.EventLoop;
30 import io.netty.util.ReferenceCountUtil;
31 import io.netty.util.ReferenceCounted;
32 import io.netty.util.concurrent.Future;
33 import io.netty.util.internal.logging.InternalLogger;
34 import io.netty.util.internal.logging.InternalLoggerFactory;
35
36 import java.io.IOException;
37 import java.net.SocketAddress;
38 import java.nio.channels.CancelledKeyException;
39 import java.nio.channels.ClosedChannelException;
40 import java.nio.channels.ConnectionPendingException;
41 import java.nio.channels.SelectableChannel;
42 import java.nio.channels.SelectionKey;
43 import java.util.concurrent.TimeUnit;
44
45
46
47
48 public abstract class AbstractNioChannel extends AbstractChannel {
49
50 private static final InternalLogger logger =
51 InternalLoggerFactory.getInstance(AbstractNioChannel.class);
52
53 private final SelectableChannel ch;
54 protected final int readInterestOp;
55 volatile SelectionKey selectionKey;
56 boolean readPending;
57 private final Runnable clearReadPendingRunnable = new Runnable() {
58 @Override
59 public void run() {
60 clearReadPending0();
61 }
62 };
63
64
65
66
67
68 private ChannelPromise connectPromise;
69 private Future<?> connectTimeoutFuture;
70 private SocketAddress requestedRemoteAddress;
71
72
73
74
75
76
77
78
79 protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
80 super(parent);
81 this.ch = ch;
82 this.readInterestOp = readInterestOp;
83 try {
84 ch.configureBlocking(false);
85 } catch (IOException e) {
86 try {
87 ch.close();
88 } catch (IOException e2) {
89 logger.warn(
90 "Failed to close a partially initialized socket.", e2);
91 }
92
93 throw new ChannelException("Failed to enter non-blocking mode.", e);
94 }
95 }
96
97 @Override
98 public boolean isOpen() {
99 return ch.isOpen();
100 }
101
102 @Override
103 public NioUnsafe unsafe() {
104 return (NioUnsafe) super.unsafe();
105 }
106
107 protected SelectableChannel javaChannel() {
108 return ch;
109 }
110
111 @Override
112 public NioEventLoop eventLoop() {
113 return (NioEventLoop) super.eventLoop();
114 }
115
116
117
118
119 protected SelectionKey selectionKey() {
120 assert selectionKey != null;
121 return selectionKey;
122 }
123
124
125
126
127
128 @Deprecated
129 protected boolean isReadPending() {
130 return readPending;
131 }
132
133
134
135
136
137 @Deprecated
138 protected void setReadPending(final boolean readPending) {
139 if (isRegistered()) {
140 EventLoop eventLoop = eventLoop();
141 if (eventLoop.inEventLoop()) {
142 setReadPending0(readPending);
143 } else {
144 eventLoop.execute(new Runnable() {
145 @Override
146 public void run() {
147 setReadPending0(readPending);
148 }
149 });
150 }
151 } else {
152
153
154
155 this.readPending = readPending;
156 }
157 }
158
159
160
161
162 protected final void clearReadPending() {
163 if (isRegistered()) {
164 EventLoop eventLoop = eventLoop();
165 if (eventLoop.inEventLoop()) {
166 clearReadPending0();
167 } else {
168 eventLoop.execute(clearReadPendingRunnable);
169 }
170 } else {
171
172
173
174 readPending = false;
175 }
176 }
177
178 private void setReadPending0(boolean readPending) {
179 this.readPending = readPending;
180 if (!readPending) {
181 ((AbstractNioUnsafe) unsafe()).removeReadOp();
182 }
183 }
184
185 private void clearReadPending0() {
186 readPending = false;
187 ((AbstractNioUnsafe) unsafe()).removeReadOp();
188 }
189
190
191
192
193 public interface NioUnsafe extends Unsafe {
194
195
196
197 SelectableChannel ch();
198
199
200
201
202 void finishConnect();
203
204
205
206
207 void read();
208
209 void forceFlush();
210 }
211
212 protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe {
213
214 protected final void removeReadOp() {
215 SelectionKey key = selectionKey();
216
217
218
219 if (!key.isValid()) {
220 return;
221 }
222 int interestOps = key.interestOps();
223 if ((interestOps & readInterestOp) != 0) {
224
225 key.interestOps(interestOps & ~readInterestOp);
226 }
227 }
228
229 @Override
230 public final SelectableChannel ch() {
231 return javaChannel();
232 }
233
234 @Override
235 public final void connect(
236 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
237 if (!promise.setUncancellable() || !ensureOpen(promise)) {
238 return;
239 }
240
241 try {
242 if (connectPromise != null) {
243
244 throw new ConnectionPendingException();
245 }
246
247 boolean wasActive = isActive();
248 if (doConnect(remoteAddress, localAddress)) {
249 fulfillConnectPromise(promise, wasActive);
250 } else {
251 connectPromise = promise;
252 requestedRemoteAddress = remoteAddress;
253
254
255 int connectTimeoutMillis = config().getConnectTimeoutMillis();
256 if (connectTimeoutMillis > 0) {
257 connectTimeoutFuture = eventLoop().schedule(new Runnable() {
258 @Override
259 public void run() {
260 ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
261 if (connectPromise != null && !connectPromise.isDone()
262 && connectPromise.tryFailure(new ConnectTimeoutException(
263 "connection timed out: " + remoteAddress))) {
264 close(voidPromise());
265 }
266 }
267 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
268 }
269
270 promise.addListener(new ChannelFutureListener() {
271 @Override
272 public void operationComplete(ChannelFuture future) throws Exception {
273 if (future.isCancelled()) {
274 if (connectTimeoutFuture != null) {
275 connectTimeoutFuture.cancel(false);
276 }
277 connectPromise = null;
278 close(voidPromise());
279 }
280 }
281 });
282 }
283 } catch (Throwable t) {
284 promise.tryFailure(annotateConnectException(t, remoteAddress));
285 closeIfClosed();
286 }
287 }
288
289 private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
290 if (promise == null) {
291
292 return;
293 }
294
295
296
297 boolean active = isActive();
298
299
300 boolean promiseSet = promise.trySuccess();
301
302
303
304 if (!wasActive && active) {
305 pipeline().fireChannelActive();
306 }
307
308
309 if (!promiseSet) {
310 close(voidPromise());
311 }
312 }
313
314 private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
315 if (promise == null) {
316
317 return;
318 }
319
320
321 promise.tryFailure(cause);
322 closeIfClosed();
323 }
324
325 @Override
326 public final void finishConnect() {
327
328
329
330 assert eventLoop().inEventLoop();
331
332 try {
333 boolean wasActive = isActive();
334 doFinishConnect();
335 fulfillConnectPromise(connectPromise, wasActive);
336 } catch (Throwable t) {
337 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
338 } finally {
339
340
341 if (connectTimeoutFuture != null) {
342 connectTimeoutFuture.cancel(false);
343 }
344 connectPromise = null;
345 }
346 }
347
348 @Override
349 protected final void flush0() {
350
351
352
353 if (!isFlushPending()) {
354 super.flush0();
355 }
356 }
357
358 @Override
359 public final void forceFlush() {
360
361 super.flush0();
362 }
363
364 private boolean isFlushPending() {
365 SelectionKey selectionKey = selectionKey();
366 return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
367 }
368 }
369
370 @Override
371 protected boolean isCompatible(EventLoop loop) {
372 return loop instanceof NioEventLoop;
373 }
374
375 @Override
376 protected void doRegister() throws Exception {
377 boolean selected = false;
378 for (;;) {
379 try {
380 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
381 return;
382 } catch (CancelledKeyException e) {
383 if (!selected) {
384
385
386 eventLoop().selectNow();
387 selected = true;
388 } else {
389
390
391 throw e;
392 }
393 }
394 }
395 }
396
397 @Override
398 protected void doDeregister() throws Exception {
399 eventLoop().cancel(selectionKey());
400 }
401
402 @Override
403 protected void doBeginRead() throws Exception {
404
405 final SelectionKey selectionKey = this.selectionKey;
406 if (!selectionKey.isValid()) {
407 return;
408 }
409
410 readPending = true;
411
412 final int interestOps = selectionKey.interestOps();
413 if ((interestOps & readInterestOp) == 0) {
414 selectionKey.interestOps(interestOps | readInterestOp);
415 }
416 }
417
418
419
420
421 protected abstract boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception;
422
423
424
425
426 protected abstract void doFinishConnect() throws Exception;
427
428
429
430
431
432
433 protected final ByteBuf newDirectBuffer(ByteBuf buf) {
434 final int readableBytes = buf.readableBytes();
435 if (readableBytes == 0) {
436 ReferenceCountUtil.safeRelease(buf);
437 return Unpooled.EMPTY_BUFFER;
438 }
439
440 final ByteBufAllocator alloc = alloc();
441 if (alloc.isDirectBufferPooled()) {
442 ByteBuf directBuf = alloc.directBuffer(readableBytes);
443 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
444 ReferenceCountUtil.safeRelease(buf);
445 return directBuf;
446 }
447
448 final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
449 if (directBuf != null) {
450 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
451 ReferenceCountUtil.safeRelease(buf);
452 return directBuf;
453 }
454
455
456 return buf;
457 }
458
459
460
461
462
463
464
465 protected final ByteBuf newDirectBuffer(ReferenceCounted holder, ByteBuf buf) {
466 final int readableBytes = buf.readableBytes();
467 if (readableBytes == 0) {
468 ReferenceCountUtil.safeRelease(holder);
469 return Unpooled.EMPTY_BUFFER;
470 }
471
472 final ByteBufAllocator alloc = alloc();
473 if (alloc.isDirectBufferPooled()) {
474 ByteBuf directBuf = alloc.directBuffer(readableBytes);
475 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
476 ReferenceCountUtil.safeRelease(holder);
477 return directBuf;
478 }
479
480 final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
481 if (directBuf != null) {
482 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
483 ReferenceCountUtil.safeRelease(holder);
484 return directBuf;
485 }
486
487
488 if (holder != buf) {
489
490 buf.retain();
491 ReferenceCountUtil.safeRelease(holder);
492 }
493
494 return buf;
495 }
496
497 @Override
498 protected void doClose() throws Exception {
499 ChannelPromise promise = connectPromise;
500 if (promise != null) {
501
502 promise.tryFailure(new ClosedChannelException());
503 connectPromise = null;
504 }
505
506 Future<?> future = connectTimeoutFuture;
507 if (future != null) {
508 future.cancel(false);
509 connectTimeoutFuture = null;
510 }
511 }
512 }