1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty.channel.oio;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.ChannelConfig;
20 import io.netty.channel.ChannelPipeline;
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.List;
25
26 /**
27 * Abstract base class for OIO which reads and writes objects from/to a Socket
28 */
29 public abstract class AbstractOioMessageChannel extends AbstractOioChannel {
30
31 private final List<Object> readBuf = new ArrayList<Object>();
32
33 protected AbstractOioMessageChannel(Channel parent) {
34 super(parent);
35 }
36
37 @Override
38 protected void doRead() {
39 final ChannelConfig config = config();
40 final ChannelPipeline pipeline = pipeline();
41 boolean closed = false;
42 final int maxMessagesPerRead = config.getMaxMessagesPerRead();
43
44 Throwable exception = null;
45 int localRead = 0;
46 int totalRead = 0;
47 try {
48 for (;;) {
49 // Perform a read.
50 localRead = doReadMessages(readBuf);
51 if (localRead == 0) {
52 break;
53 }
54 if (localRead < 0) {
55 closed = true;
56 break;
57 }
58
59 // Notify with the received messages and clear the buffer.
60 int size = readBuf.size();
61 for (int i = 0; i < size; i ++) {
62 pipeline.fireChannelRead(readBuf.get(i));
63 }
64 readBuf.clear();
65
66 // Do not read beyond maxMessagesPerRead.
67 // Do not continue reading if autoRead has been turned off.
68 totalRead += localRead;
69 if (totalRead >= maxMessagesPerRead || !config.isAutoRead()) {
70 break;
71 }
72 }
73 } catch (Throwable t) {
74 exception = t;
75 }
76
77 pipeline.fireChannelReadComplete();
78
79 if (exception != null) {
80 if (exception instanceof IOException) {
81 closed = true;
82 }
83
84 pipeline().fireExceptionCaught(exception);
85 }
86
87 if (closed) {
88 if (isOpen()) {
89 unsafe().close(unsafe().voidPromise());
90 }
91 } else if (localRead == 0 && isActive()) {
92 // If the read amount was 0 and the channel is still active we need to trigger a new read()
93 // as otherwise we will never try to read again and the user will never know.
94 // Just call read() is ok here as it will be submitted to the EventLoop as a task and so we are
95 // able to process the rest of the tasks in the queue first.
96 //
97 // See https://github.com/netty/netty/issues/2404
98 read();
99 }
100 }
101
102 /**
103 * Read messages into the given array and return the amount which was read.
104 */
105 protected abstract int doReadMessages(List<Object> msgs) throws Exception;
106 }