1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.testsuite.transport.socket;
17
18 import io.netty5.bootstrap.Bootstrap;
19 import io.netty5.bootstrap.ServerBootstrap;
20 import io.netty5.buffer.api.Buffer;
21 import io.netty5.buffer.api.BufferAllocator;
22 import io.netty5.buffer.api.DefaultBufferAllocators;
23 import io.netty5.util.Resource;
24 import io.netty5.channel.Channel;
25 import io.netty5.channel.ChannelHandler;
26 import io.netty5.channel.ChannelHandlerContext;
27 import io.netty5.channel.ChannelInitializer;
28 import io.netty5.channel.ChannelOption;
29 import io.netty5.channel.RecvBufferAllocator;
30 import org.junit.jupiter.api.Test;
31 import org.junit.jupiter.api.TestInfo;
32
33 import java.util.concurrent.CountDownLatch;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.atomic.AtomicInteger;
36 import java.util.function.Predicate;
37
38 import static org.junit.jupiter.api.Assertions.assertEquals;
39 import static org.junit.jupiter.api.Assertions.assertTrue;
40
41 public class SocketAutoReadTest extends AbstractSocketTest {
42 @Test
43 public void testAutoReadOffDuringReadOnlyReadsOne(TestInfo testInfo) throws Throwable {
44 run(testInfo, this::testAutoReadOffDuringReadOnlyReadsOne);
45 }
46
47 public void testAutoReadOffDuringReadOnlyReadsOne(ServerBootstrap sb, Bootstrap cb) throws Throwable {
48 testAutoReadOffDuringReadOnlyReadsOne(true, sb, cb);
49 testAutoReadOffDuringReadOnlyReadsOne(false, sb, cb);
50 }
51
52 private static void testAutoReadOffDuringReadOnlyReadsOne(boolean readOutsideEventLoopThread,
53 ServerBootstrap sb, Bootstrap cb) throws Throwable {
54 Channel serverChannel = null;
55 Channel clientChannel = null;
56 try {
57 AutoReadInitializer serverInitializer = new AutoReadInitializer(!readOutsideEventLoopThread);
58 AutoReadInitializer clientInitializer = new AutoReadInitializer(!readOutsideEventLoopThread);
59 sb.option(ChannelOption.SO_BACKLOG, 1024)
60 .option(ChannelOption.AUTO_READ, true)
61 .childOption(ChannelOption.AUTO_READ, true)
62
63
64 .childOption(ChannelOption.RCVBUFFER_ALLOCATOR, new TestRecvBufferAllocator())
65 .childHandler(serverInitializer);
66
67 serverChannel = sb.bind().asStage().get();
68
69 cb.option(ChannelOption.AUTO_READ, true)
70
71
72 .option(ChannelOption.RCVBUFFER_ALLOCATOR, new TestRecvBufferAllocator())
73 .handler(clientInitializer);
74
75 clientChannel = cb.connect(serverChannel.localAddress()).asStage().get();
76 BufferAllocator alloc = DefaultBufferAllocators.onHeapAllocator();
77
78
79 clientChannel.writeAndFlush(alloc.copyOf(new byte[3]));
80 serverInitializer.autoReadHandler.assertSingleRead();
81
82
83 serverInitializer.channel.writeAndFlush(alloc.copyOf(new byte[3]));
84 clientInitializer.autoReadHandler.assertSingleRead();
85
86 if (readOutsideEventLoopThread) {
87 serverInitializer.channel.read();
88 }
89 serverInitializer.autoReadHandler.assertSingleReadSecondTry();
90
91 if (readOutsideEventLoopThread) {
92 clientChannel.read();
93 }
94 clientInitializer.autoReadHandler.assertSingleReadSecondTry();
95 } finally {
96 if (clientChannel != null) {
97 clientChannel.close().asStage().sync();
98 }
99 if (serverChannel != null) {
100 serverChannel.close().asStage().sync();
101 }
102 }
103 }
104
105 private static class AutoReadInitializer extends ChannelInitializer<Channel> {
106 final AutoReadHandler autoReadHandler;
107 volatile Channel channel;
108
109 AutoReadInitializer(boolean readInEventLoop) {
110 autoReadHandler = new AutoReadHandler(readInEventLoop);
111 }
112
113 @Override
114 protected void initChannel(Channel ch) throws Exception {
115 channel = ch;
116 ch.pipeline().addLast(autoReadHandler);
117 }
118 }
119
120 private static final class AutoReadHandler implements ChannelHandler {
121 private final AtomicInteger count = new AtomicInteger();
122 private final CountDownLatch latch = new CountDownLatch(1);
123 private final CountDownLatch latch2;
124 private final boolean callRead;
125
126 AutoReadHandler(boolean callRead) {
127 this.callRead = callRead;
128 latch2 = new CountDownLatch(callRead ? 3 : 2);
129 }
130
131 @Override
132 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
133 if (msg instanceof Resource<?>) {
134 ((Resource<?>) msg).close();
135 } else {
136 Resource.dispose(msg);
137 }
138 if (count.incrementAndGet() == 1) {
139 ctx.channel().setOption(ChannelOption.AUTO_READ, false);
140 }
141 if (callRead) {
142
143 ctx.read();
144 }
145 }
146
147 @Override
148 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
149 latch.countDown();
150 latch2.countDown();
151 }
152
153 void assertSingleRead() throws InterruptedException {
154 assertTrue(latch.await(5, TimeUnit.SECONDS));
155 assertTrue(count.get() > 0);
156 }
157
158 void assertSingleReadSecondTry() throws InterruptedException {
159 assertTrue(latch2.await(5, TimeUnit.SECONDS));
160 assertEquals(callRead ? 3 : 2, count.get());
161 }
162 }
163
164
165
166
167 private static final class TestRecvBufferAllocator implements RecvBufferAllocator {
168 @Override
169 public Handle newHandle() {
170 return new Handle() {
171 private int attemptedBytesRead;
172 private int lastBytesRead;
173
174 @Override
175 public Buffer allocate(BufferAllocator alloc) {
176 return alloc.allocate(guess());
177 }
178
179 @Override
180 public int guess() {
181 return 1;
182 }
183
184 @Override
185 public void reset() {
186 }
187
188 @Override
189 public void incMessagesRead(int numMessages) {
190
191 }
192
193 @Override
194 public void lastBytesRead(int bytes) {
195 lastBytesRead = bytes;
196 }
197
198 @Override
199 public int lastBytesRead() {
200 return lastBytesRead;
201 }
202
203 @Override
204 public void attemptedBytesRead(int bytes) {
205 attemptedBytesRead = bytes;
206 }
207
208 @Override
209 public int attemptedBytesRead() {
210 return attemptedBytesRead;
211 }
212
213 @Override
214 public boolean continueReading(boolean autoRead) {
215 return autoRead;
216 }
217
218 @Override
219 public boolean continueReading(boolean autoRead, Predicate<Handle> maybeMoreDataSupplier) {
220 return autoRead;
221 }
222
223 @Override
224 public void readComplete() {
225
226 }
227 };
228 }
229 }
230 }