1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.oio;
17
18 import io.netty.channel.AbstractChannel;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelPromise;
21 import io.netty.channel.EventLoop;
22 import io.netty.channel.ThreadPerChannelEventLoop;
23
24 import java.net.SocketAddress;
25
26
27
28
29 public abstract class AbstractOioChannel extends AbstractChannel {
30
31 protected static final int SO_TIMEOUT = 1000;
32
33 private volatile boolean readPending;
34
35 private final Runnable readTask = new Runnable() {
36 @Override
37 public void run() {
38 if (!isReadPending() && !config().isAutoRead()) {
39
40 return;
41 }
42
43 setReadPending(false);
44 doRead();
45 }
46 };
47
48
49
50
51 protected AbstractOioChannel(Channel parent) {
52 super(parent);
53 }
54
55 @Override
56 protected AbstractUnsafe newUnsafe() {
57 return new DefaultOioUnsafe();
58 }
59
60 private final class DefaultOioUnsafe extends AbstractUnsafe {
61 @Override
62 public void connect(
63 final SocketAddress remoteAddress,
64 final SocketAddress localAddress, final ChannelPromise promise) {
65 if (!promise.setUncancellable() || !ensureOpen(promise)) {
66 return;
67 }
68
69 try {
70 boolean wasActive = isActive();
71 doConnect(remoteAddress, localAddress);
72 safeSetSuccess(promise);
73 if (!wasActive && isActive()) {
74 pipeline().fireChannelActive();
75 }
76 } catch (Throwable t) {
77 safeSetFailure(promise, annotateConnectException(t, remoteAddress));
78 closeIfClosed();
79 }
80 }
81 }
82
83 @Override
84 protected boolean isCompatible(EventLoop loop) {
85 return loop instanceof ThreadPerChannelEventLoop;
86 }
87
88
89
90
91 protected abstract void doConnect(
92 SocketAddress remoteAddress, SocketAddress localAddress) throws Exception;
93
94 @Override
95 protected void doBeginRead() throws Exception {
96 if (isReadPending()) {
97 return;
98 }
99
100 setReadPending(true);
101 eventLoop().execute(readTask);
102 }
103
104 protected abstract void doRead();
105
106 protected boolean isReadPending() {
107 return readPending;
108 }
109
110 protected void setReadPending(boolean readPending) {
111 this.readPending = readPending;
112 }
113 }