1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.testsuite.transport.socket;
17
18 import io.netty.bootstrap.Bootstrap;
19 import io.netty.bootstrap.ServerBootstrap;
20 import io.netty.buffer.ByteBuf;
21 import io.netty.buffer.ByteBufAllocator;
22 import io.netty.channel.Channel;
23 import io.netty.channel.ChannelConfig;
24 import io.netty.channel.ChannelHandlerContext;
25 import io.netty.channel.ChannelInboundHandlerAdapter;
26 import io.netty.channel.ChannelInitializer;
27 import io.netty.channel.ChannelOption;
28 import io.netty.channel.RecvByteBufAllocator;
29 import io.netty.util.ReferenceCountUtil;
30 import io.netty.util.UncheckedBooleanSupplier;
31 import org.junit.jupiter.api.Test;
32 import org.junit.jupiter.api.TestInfo;
33
34 import java.util.concurrent.CountDownLatch;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicInteger;
37
38 import static io.netty.testsuite.transport.TestsuitePermutation.randomBufferType;
39 import static org.junit.jupiter.api.Assertions.assertEquals;
40 import static org.junit.jupiter.api.Assertions.assertTrue;
41
42 public class SocketAutoReadTest extends AbstractSocketTest {
43 @Test
44 public void testAutoReadOffDuringReadOnlyReadsOneTime(TestInfo testInfo) throws Throwable {
45 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
46 @Override
47 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
48 testAutoReadOffDuringReadOnlyReadsOneTime(serverBootstrap, bootstrap);
49 }
50 });
51 }
52
53 public void testAutoReadOffDuringReadOnlyReadsOneTime(ServerBootstrap sb, Bootstrap cb) throws Throwable {
54 testAutoReadOffDuringReadOnlyReadsOneTime(true, sb, cb);
55 testAutoReadOffDuringReadOnlyReadsOneTime(false, sb, cb);
56 }
57
58 private static void testAutoReadOffDuringReadOnlyReadsOneTime(boolean readOutsideEventLoopThread,
59 ServerBootstrap sb, Bootstrap cb) throws Throwable {
60 Channel serverChannel = null;
61 Channel clientChannel = null;
62 try {
63 AutoReadInitializer serverInitializer = new AutoReadInitializer(!readOutsideEventLoopThread);
64 AutoReadInitializer clientInitializer = new AutoReadInitializer(!readOutsideEventLoopThread);
65 sb.option(ChannelOption.SO_BACKLOG, 1024)
66 .option(ChannelOption.AUTO_READ, true)
67 .childOption(ChannelOption.AUTO_READ, true)
68
69
70 .childOption(ChannelOption.RCVBUF_ALLOCATOR, new TestRecvByteBufAllocator())
71 .childHandler(serverInitializer);
72
73 serverChannel = sb.bind().syncUninterruptibly().channel();
74
75 cb.option(ChannelOption.AUTO_READ, true)
76
77
78 .option(ChannelOption.RCVBUF_ALLOCATOR, new TestRecvByteBufAllocator())
79 .handler(clientInitializer);
80
81 clientChannel = cb.connect(serverChannel.localAddress()).syncUninterruptibly().channel();
82
83
84 clientChannel.writeAndFlush(randomBufferType(clientChannel.alloc(), new byte[3], 0, 3));
85 serverInitializer.autoReadHandler.assertSingleRead();
86
87
88 serverInitializer.channel.writeAndFlush(
89 randomBufferType(serverInitializer.channel.alloc(), new byte[3], 0, 3));
90 clientInitializer.autoReadHandler.assertSingleRead();
91
92 if (readOutsideEventLoopThread) {
93 serverInitializer.channel.read();
94 }
95 serverInitializer.autoReadHandler.assertSingleReadSecondTry();
96
97 if (readOutsideEventLoopThread) {
98 clientChannel.read();
99 }
100 clientInitializer.autoReadHandler.assertSingleReadSecondTry();
101 } finally {
102 if (clientChannel != null) {
103 clientChannel.close().sync();
104 }
105 if (serverChannel != null) {
106 serverChannel.close().sync();
107 }
108 }
109 }
110
111 private static class AutoReadInitializer extends ChannelInitializer<Channel> {
112 final AutoReadHandler autoReadHandler;
113 volatile Channel channel;
114
115 AutoReadInitializer(boolean readInEventLoop) {
116 autoReadHandler = new AutoReadHandler(readInEventLoop);
117 }
118
119 @Override
120 protected void initChannel(Channel ch) throws Exception {
121 channel = ch;
122 ch.pipeline().addLast(autoReadHandler);
123 }
124 }
125
126 private static final class AutoReadHandler extends ChannelInboundHandlerAdapter {
127 private final AtomicInteger count = new AtomicInteger();
128 private final CountDownLatch latch = new CountDownLatch(1);
129 private final CountDownLatch latch2;
130 private final boolean callRead;
131
132 AutoReadHandler(boolean callRead) {
133 this.callRead = callRead;
134 latch2 = new CountDownLatch(callRead ? 3 : 2);
135 }
136
137 @Override
138 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
139 ReferenceCountUtil.release(msg);
140 if (count.incrementAndGet() == 1) {
141 ctx.channel().config().setAutoRead(false);
142 }
143 if (callRead) {
144
145 ctx.read();
146 }
147 }
148
149 @Override
150 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
151 latch.countDown();
152 latch2.countDown();
153 }
154
155 void assertSingleRead() throws InterruptedException {
156 assertTrue(latch.await(5, TimeUnit.SECONDS));
157 assertTrue(count.get() > 0);
158 }
159
160 void assertSingleReadSecondTry() throws InterruptedException {
161 assertTrue(latch2.await(5, TimeUnit.SECONDS));
162 assertEquals(callRead ? 3 : 2, count.get());
163 }
164 }
165
166
167
168
169 private static final class TestRecvByteBufAllocator implements RecvByteBufAllocator {
170 @Override
171 public ExtendedHandle newHandle() {
172 return new ExtendedHandle() {
173 private ChannelConfig config;
174 private int attemptedBytesRead;
175 private int lastBytesRead;
176 @Override
177 public ByteBuf allocate(ByteBufAllocator alloc) {
178 return alloc.ioBuffer(guess(), guess());
179 }
180
181 @Override
182 public int guess() {
183 return 1;
184 }
185
186 @Override
187 public void reset(ChannelConfig config) {
188 this.config = config;
189 }
190
191 @Override
192 public void incMessagesRead(int numMessages) {
193
194 }
195
196 @Override
197 public void lastBytesRead(int bytes) {
198 lastBytesRead = bytes;
199 }
200
201 @Override
202 public int lastBytesRead() {
203 return lastBytesRead;
204 }
205
206 @Override
207 public void attemptedBytesRead(int bytes) {
208 attemptedBytesRead = bytes;
209 }
210
211 @Override
212 public int attemptedBytesRead() {
213 return attemptedBytesRead;
214 }
215
216 @Override
217 public boolean continueReading() {
218 return config.isAutoRead();
219 }
220
221 @Override
222 public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
223 return config.isAutoRead();
224 }
225
226 @Override
227 public void readComplete() {
228
229 }
230 };
231 }
232 }
233 }