1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.oio;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelConfig;
21 import io.netty.channel.ChannelMetadata;
22 import io.netty.channel.ChannelOption;
23 import io.netty.channel.ChannelOutboundBuffer;
24 import io.netty.channel.ChannelPipeline;
25 import io.netty.channel.FileRegion;
26 import io.netty.channel.RecvByteBufAllocator;
27 import io.netty.channel.socket.ChannelInputShutdownEvent;
28 import io.netty.util.internal.StringUtil;
29
30 import java.io.IOException;
31
32
33
34
35 public abstract class AbstractOioByteChannel extends AbstractOioChannel {
36
37 private static final ChannelMetadata METADATA = new ChannelMetadata(false);
38 private static final String EXPECTED_TYPES =
39 " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
40 StringUtil.simpleClassName(FileRegion.class) + ')';
41
42 private RecvByteBufAllocator.Handle allocHandle;
43 private volatile boolean inputShutdown;
44
45
46
47
48 protected AbstractOioByteChannel(Channel parent) {
49 super(parent);
50 }
51
52 protected boolean isInputShutdown() {
53 return inputShutdown;
54 }
55
56 @Override
57 public ChannelMetadata metadata() {
58 return METADATA;
59 }
60
61
62
63
64
65 protected boolean checkInputShutdown() {
66 if (inputShutdown) {
67 try {
68 Thread.sleep(SO_TIMEOUT);
69 } catch (InterruptedException e) {
70
71 }
72 return true;
73 }
74 return false;
75 }
76
77 @Override
78 protected void doRead() {
79 if (checkInputShutdown()) {
80 return;
81 }
82 final ChannelConfig config = config();
83 final ChannelPipeline pipeline = pipeline();
84
85 RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
86 if (allocHandle == null) {
87 this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
88 }
89
90 ByteBuf byteBuf = allocHandle.allocate(alloc());
91
92 boolean closed = false;
93 boolean read = false;
94 Throwable exception = null;
95 int localReadAmount = 0;
96 try {
97 int totalReadAmount = 0;
98
99 for (;;) {
100 localReadAmount = doReadBytes(byteBuf);
101 if (localReadAmount > 0) {
102 read = true;
103 } else if (localReadAmount < 0) {
104 closed = true;
105 }
106
107 final int available = available();
108 if (available <= 0) {
109 break;
110 }
111
112 if (!byteBuf.isWritable()) {
113 final int capacity = byteBuf.capacity();
114 final int maxCapacity = byteBuf.maxCapacity();
115 if (capacity == maxCapacity) {
116 if (read) {
117 read = false;
118 pipeline.fireChannelRead(byteBuf);
119 byteBuf = alloc().buffer();
120 }
121 } else {
122 final int writerIndex = byteBuf.writerIndex();
123 if (writerIndex + available > maxCapacity) {
124 byteBuf.capacity(maxCapacity);
125 } else {
126 byteBuf.ensureWritable(available);
127 }
128 }
129 }
130
131 if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
132
133 totalReadAmount = Integer.MAX_VALUE;
134 break;
135 }
136
137 totalReadAmount += localReadAmount;
138
139 if (!config.isAutoRead()) {
140
141
142 break;
143 }
144 }
145 allocHandle.record(totalReadAmount);
146
147 } catch (Throwable t) {
148 exception = t;
149 } finally {
150 if (read) {
151 pipeline.fireChannelRead(byteBuf);
152 } else {
153
154 byteBuf.release();
155 }
156
157 pipeline.fireChannelReadComplete();
158 if (exception != null) {
159 if (exception instanceof IOException) {
160 closed = true;
161 pipeline().fireExceptionCaught(exception);
162 } else {
163 pipeline.fireExceptionCaught(exception);
164 unsafe().close(voidPromise());
165 }
166 }
167
168 if (closed) {
169
170 setReadPending(false);
171
172 inputShutdown = true;
173 if (isOpen()) {
174 if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
175 pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
176 } else {
177 unsafe().close(unsafe().voidPromise());
178 }
179 }
180 }
181 if (localReadAmount == 0 && isActive()) {
182
183
184
185
186
187
188 read();
189 }
190 }
191 }
192
193 @Override
194 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
195 for (;;) {
196 Object msg = in.current();
197 if (msg == null) {
198
199 break;
200 }
201 if (msg instanceof ByteBuf) {
202 ByteBuf buf = (ByteBuf) msg;
203 int readableBytes = buf.readableBytes();
204 while (readableBytes > 0) {
205 doWriteBytes(buf);
206 int newReadableBytes = buf.readableBytes();
207 in.progress(readableBytes - newReadableBytes);
208 readableBytes = newReadableBytes;
209 }
210 in.remove();
211 } else if (msg instanceof FileRegion) {
212 FileRegion region = (FileRegion) msg;
213 long transfered = region.transfered();
214 doWriteFileRegion(region);
215 in.progress(region.transfered() - transfered);
216 in.remove();
217 } else {
218 in.remove(new UnsupportedOperationException(
219 "unsupported message type: " + StringUtil.simpleClassName(msg)));
220 }
221 }
222 }
223
224 @Override
225 protected final Object filterOutboundMessage(Object msg) throws Exception {
226 if (msg instanceof ByteBuf || msg instanceof FileRegion) {
227 return msg;
228 }
229
230 throw new UnsupportedOperationException(
231 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
232 }
233
234
235
236
237 protected abstract int available();
238
239
240
241
242
243
244
245
246
247 protected abstract int doReadBytes(ByteBuf buf) throws Exception;
248
249
250
251
252
253
254
255 protected abstract void doWriteBytes(ByteBuf buf) throws Exception;
256
257
258
259
260
261
262
263 protected abstract void doWriteFileRegion(FileRegion region) throws Exception;
264 }