1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.oio;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.channel.Channel;
21 import io.netty.channel.ChannelConfig;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.ChannelMetadata;
24 import io.netty.channel.ChannelOption;
25 import io.netty.channel.ChannelOutboundBuffer;
26 import io.netty.channel.ChannelPipeline;
27 import io.netty.channel.FileRegion;
28 import io.netty.channel.RecvByteBufAllocator;
29 import io.netty.channel.socket.ChannelInputShutdownEvent;
30 import io.netty.channel.socket.ChannelInputShutdownReadComplete;
31 import io.netty.util.LeakPresenceDetector;
32 import io.netty.util.internal.StringUtil;
33
34 import java.io.IOException;
35
36
37
38
39
40
41 public abstract class AbstractOioByteChannel extends AbstractOioChannel {
42
43 private static final ChannelMetadata METADATA = new ChannelMetadata(false);
44 private static final String EXPECTED_TYPES =
45 " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
46 StringUtil.simpleClassName(FileRegion.class) + ')';
47
48
49
50
51 protected AbstractOioByteChannel(Channel parent) {
52 super(parent);
53 }
54
55 @Override
56 public ChannelMetadata metadata() {
57 return METADATA;
58 }
59
60
61
62
63
64 protected abstract boolean isInputShutdown();
65
66
67
68
69
70 protected abstract ChannelFuture shutdownInput();
71
72 private void closeOnRead(ChannelPipeline pipeline) {
73 if (isOpen()) {
74 if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
75 shutdownInput();
76 pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
77 } else {
78 unsafe().close(unsafe().voidPromise());
79 }
80 pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
81 }
82 }
83
84 private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
85 RecvByteBufAllocator.Handle allocHandle) {
86 if (byteBuf != null) {
87 if (byteBuf.isReadable()) {
88 readPending = false;
89 pipeline.fireChannelRead(byteBuf);
90 } else {
91 byteBuf.release();
92 }
93 }
94 allocHandle.readComplete();
95 pipeline.fireChannelReadComplete();
96 pipeline.fireExceptionCaught(cause);
97
98
99
100 if (close ||
101 cause instanceof OutOfMemoryError ||
102 cause instanceof LeakPresenceDetector.AllocationProhibitedException ||
103 cause instanceof IOException) {
104 closeOnRead(pipeline);
105 }
106 }
107
108 @Override
109 protected void doRead() {
110 final ChannelConfig config = config();
111 if (isInputShutdown() || !readPending) {
112
113
114 return;
115 }
116
117
118 readPending = false;
119
120 final ChannelPipeline pipeline = pipeline();
121 final ByteBufAllocator allocator = config.getAllocator();
122 final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
123 allocHandle.reset(config);
124
125 ByteBuf byteBuf = null;
126 boolean close = false;
127 boolean readData = false;
128 try {
129 byteBuf = allocHandle.allocate(allocator);
130 do {
131 allocHandle.lastBytesRead(doReadBytes(byteBuf));
132 if (allocHandle.lastBytesRead() <= 0) {
133 if (!byteBuf.isReadable()) {
134 byteBuf.release();
135 byteBuf = null;
136 close = allocHandle.lastBytesRead() < 0;
137 if (close) {
138
139 readPending = false;
140 }
141 }
142 break;
143 } else {
144 readData = true;
145 }
146
147 final int available = available();
148 if (available <= 0) {
149 break;
150 }
151
152
153 if (!byteBuf.isWritable()) {
154 final int capacity = byteBuf.capacity();
155 final int maxCapacity = byteBuf.maxCapacity();
156 if (capacity == maxCapacity) {
157 allocHandle.incMessagesRead(1);
158 readPending = false;
159 pipeline.fireChannelRead(byteBuf);
160 byteBuf = allocHandle.allocate(allocator);
161 } else {
162 final int writerIndex = byteBuf.writerIndex();
163 if (writerIndex + available > maxCapacity) {
164 byteBuf.capacity(maxCapacity);
165 } else {
166 byteBuf.ensureWritable(available);
167 }
168 }
169 }
170 } while (allocHandle.continueReading());
171
172 if (byteBuf != null) {
173
174
175 if (byteBuf.isReadable()) {
176 readPending = false;
177 pipeline.fireChannelRead(byteBuf);
178 } else {
179 byteBuf.release();
180 }
181 byteBuf = null;
182 }
183
184 if (readData) {
185 allocHandle.readComplete();
186 pipeline.fireChannelReadComplete();
187 }
188
189 if (close) {
190 closeOnRead(pipeline);
191 }
192 } catch (Throwable t) {
193 handleReadException(pipeline, byteBuf, t, close, allocHandle);
194 } finally {
195 if (readPending || config.isAutoRead() || !readData && isActive()) {
196
197
198 read();
199 }
200 }
201 }
202
203 @Override
204 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
205 for (;;) {
206 Object msg = in.current();
207 if (msg == null) {
208
209 break;
210 }
211 if (msg instanceof ByteBuf) {
212 ByteBuf buf = (ByteBuf) msg;
213 int readableBytes = buf.readableBytes();
214 while (readableBytes > 0) {
215 doWriteBytes(buf);
216 int newReadableBytes = buf.readableBytes();
217 in.progress(readableBytes - newReadableBytes);
218 readableBytes = newReadableBytes;
219 }
220 in.remove();
221 } else if (msg instanceof FileRegion) {
222 FileRegion region = (FileRegion) msg;
223 long transferred = region.transferred();
224 doWriteFileRegion(region);
225 in.progress(region.transferred() - transferred);
226 in.remove();
227 } else {
228 in.remove(new UnsupportedOperationException(
229 "unsupported message type: " + StringUtil.simpleClassName(msg)));
230 }
231 }
232 }
233
234 @Override
235 protected final Object filterOutboundMessage(Object msg) throws Exception {
236 if (msg instanceof ByteBuf || msg instanceof FileRegion) {
237 return msg;
238 }
239
240 throw new UnsupportedOperationException(
241 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
242 }
243
244
245
246
247 protected abstract int available();
248
249
250
251
252
253
254
255
256
257 protected abstract int doReadBytes(ByteBuf buf) throws Exception;
258
259
260
261
262
263
264
265 protected abstract void doWriteBytes(ByteBuf buf) throws Exception;
266
267
268
269
270
271
272
273 protected abstract void doWriteFileRegion(FileRegion region) throws Exception;
274 }