1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.oio;
17
18 import io.netty.channel.AbstractChannel;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelPromise;
21 import io.netty.channel.EventLoop;
22 import io.netty.channel.ThreadPerChannelEventLoop;
23
24 import java.net.SocketAddress;
25
26
27
28
29 public abstract class AbstractOioChannel extends AbstractChannel {
30
31 protected static final int SO_TIMEOUT = 1000;
32
33 private volatile boolean readPending;
34
35 private final Runnable readTask = new Runnable() {
36 @Override
37 public void run() {
38 if (!isReadPending() && !config().isAutoRead()) {
39
40 return;
41 }
42
43 setReadPending(false);
44 doRead();
45 }
46 };
47
48
49
50
51 protected AbstractOioChannel(Channel parent) {
52 super(parent);
53 }
54
55 @Override
56 protected AbstractUnsafe newUnsafe() {
57 return new DefaultOioUnsafe();
58 }
59
60 private final class DefaultOioUnsafe extends AbstractUnsafe {
61 @Override
62 public void connect(
63 final SocketAddress remoteAddress,
64 final SocketAddress localAddress, final ChannelPromise promise) {
65 if (!promise.setUncancellable() || !ensureOpen(promise)) {
66 return;
67 }
68
69 try {
70 boolean wasActive = isActive();
71 doConnect(remoteAddress, localAddress);
72
73
74
75 boolean active = isActive();
76
77 safeSetSuccess(promise);
78 if (!wasActive && active) {
79 pipeline().fireChannelActive();
80 }
81 } catch (Throwable t) {
82 safeSetFailure(promise, annotateConnectException(t, remoteAddress));
83 closeIfClosed();
84 }
85 }
86 }
87
88 @Override
89 protected boolean isCompatible(EventLoop loop) {
90 return loop instanceof ThreadPerChannelEventLoop;
91 }
92
93
94
95
96 protected abstract void doConnect(
97 SocketAddress remoteAddress, SocketAddress localAddress) throws Exception;
98
99 @Override
100 protected void doBeginRead() throws Exception {
101 if (isReadPending()) {
102 return;
103 }
104
105 setReadPending(true);
106 eventLoop().execute(readTask);
107 }
108
109 protected abstract void doRead();
110
111 protected boolean isReadPending() {
112 return readPending;
113 }
114
115 protected void setReadPending(boolean readPending) {
116 this.readPending = readPending;
117 }
118 }