1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.channel.unix.tests;
17
18 import io.netty5.bootstrap.Bootstrap;
19 import io.netty5.bootstrap.ServerBootstrap;
20 import io.netty5.buffer.api.Buffer;
21 import io.netty5.channel.Channel;
22 import io.netty5.channel.ChannelFutureListeners;
23 import io.netty5.channel.ChannelHandler;
24 import io.netty5.channel.ChannelHandlerContext;
25 import io.netty5.channel.ChannelInitializer;
26 import io.netty5.channel.ChannelOption;
27 import io.netty5.channel.EventLoopGroup;
28 import io.netty5.channel.FixedRecvBufferAllocator;
29 import io.netty5.channel.ServerChannel;
30 import io.netty5.channel.SimpleChannelInboundHandler;
31 import org.junit.jupiter.api.Test;
32 import org.junit.jupiter.api.Timeout;
33
34 import java.net.InetSocketAddress;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.atomic.AtomicInteger;
38
39 import static org.junit.jupiter.api.Assertions.assertEquals;
40
41 public abstract class DetectPeerCloseWithoutReadTest {
42 protected abstract EventLoopGroup newGroup();
43 protected abstract Class<? extends ServerChannel> serverChannel();
44 protected abstract Class<? extends Channel> clientChannel();
45
46 @Test
47 @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
48 public void clientCloseWithoutServerReadIsDetectedNoExtraReadRequested() throws Exception {
49 clientCloseWithoutServerReadIsDetected0(false);
50 }
51
52 @Test
53 @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
54 public void clientCloseWithoutServerReadIsDetectedExtraReadRequested() throws Exception {
55 clientCloseWithoutServerReadIsDetected0(true);
56 }
57
58 private void clientCloseWithoutServerReadIsDetected0(final boolean extraReadRequested) throws Exception {
59 EventLoopGroup serverGroup = null;
60 EventLoopGroup clientGroup = null;
61 Channel serverChannel = null;
62 try {
63 final CountDownLatch latch = new CountDownLatch(1);
64 final AtomicInteger bytesRead = new AtomicInteger();
65 final int expectedBytes = 100;
66 serverGroup = newGroup();
67 clientGroup = newGroup();
68 ServerBootstrap sb = new ServerBootstrap();
69 sb.group(serverGroup);
70 sb.channel(serverChannel());
71
72
73 sb.childOption(ChannelOption.AUTO_READ, false);
74 sb.childOption(ChannelOption.MAX_MESSAGES_PER_READ, 1);
75 sb.childOption(ChannelOption.RCVBUFFER_ALLOCATOR, new FixedRecvBufferAllocator(expectedBytes / 10));
76 sb.childHandler(new ChannelInitializer<>() {
77 @Override
78 protected void initChannel(Channel ch) {
79 ch.pipeline().addLast(new TestHandler(bytesRead, extraReadRequested, latch));
80 }
81 });
82
83 serverChannel = sb.bind(new InetSocketAddress(0)).asStage().get();
84
85 Bootstrap cb = new Bootstrap();
86 cb.group(serverGroup);
87 cb.channel(clientChannel());
88 cb.handler(new ChannelHandler() { });
89 Channel clientChannel = cb.connect(serverChannel.localAddress()).asStage().get();
90 Buffer buf = clientChannel.bufferAllocator().allocate(expectedBytes);
91 buf.skipWritableBytes(expectedBytes);
92 clientChannel.writeAndFlush(buf).addListener(clientChannel, ChannelFutureListeners.CLOSE);
93
94 latch.await();
95 assertEquals(expectedBytes, bytesRead.get());
96 } finally {
97 if (serverChannel != null) {
98 serverChannel.close().asStage().sync();
99 }
100 if (serverGroup != null) {
101 serverGroup.shutdownGracefully();
102 }
103 if (clientGroup != null) {
104 clientGroup.shutdownGracefully();
105 }
106 }
107 }
108
109 @Test
110 @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
111 public void serverCloseWithoutClientReadIsDetectedNoExtraReadRequested() throws Exception {
112 serverCloseWithoutClientReadIsDetected0(false);
113 }
114
115 @Test
116 @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
117 public void serverCloseWithoutClientReadIsDetectedExtraReadRequested() throws Exception {
118 serverCloseWithoutClientReadIsDetected0(true);
119 }
120
121 private void serverCloseWithoutClientReadIsDetected0(final boolean extraReadRequested) throws Exception {
122 EventLoopGroup serverGroup = null;
123 EventLoopGroup clientGroup = null;
124 Channel serverChannel = null;
125 Channel clientChannel = null;
126 try {
127 final CountDownLatch latch = new CountDownLatch(1);
128 final AtomicInteger bytesRead = new AtomicInteger();
129 final int expectedBytes = 100;
130 serverGroup = newGroup();
131 clientGroup = newGroup();
132 ServerBootstrap sb = new ServerBootstrap();
133 sb.group(serverGroup);
134 sb.channel(serverChannel());
135 sb.childHandler(new ChannelInitializer<>() {
136 @Override
137 protected void initChannel(Channel ch) {
138 ch.pipeline().addLast(new ChannelHandler() {
139 @Override
140 public void channelActive(ChannelHandlerContext ctx) {
141 Buffer buf = ctx.bufferAllocator().allocate(expectedBytes);
142 buf.skipWritableBytes(expectedBytes);
143 ctx.writeAndFlush(buf).addListener(ctx.channel(), ChannelFutureListeners.CLOSE);
144 ctx.fireChannelActive();
145 }
146 });
147 }
148 });
149
150 serverChannel = sb.bind(new InetSocketAddress(0)).asStage().get();
151
152 Bootstrap cb = new Bootstrap();
153 cb.group(serverGroup);
154 cb.channel(clientChannel());
155
156
157 cb.option(ChannelOption.AUTO_READ, false);
158 cb.option(ChannelOption.MAX_MESSAGES_PER_READ, 1);
159 cb.option(ChannelOption.RCVBUFFER_ALLOCATOR, new FixedRecvBufferAllocator(expectedBytes / 10));
160 cb.handler(new ChannelInitializer<>() {
161 @Override
162 protected void initChannel(Channel ch) throws Exception {
163 ch.pipeline().addLast(new TestHandler(bytesRead, extraReadRequested, latch));
164 }
165 });
166 clientChannel = cb.connect(serverChannel.localAddress()).asStage().get();
167
168 latch.await();
169 assertEquals(expectedBytes, bytesRead.get());
170 } finally {
171 if (serverChannel != null) {
172 serverChannel.close().asStage().sync();
173 }
174 if (clientChannel != null) {
175 clientChannel.close().asStage().sync();
176 }
177 if (serverGroup != null) {
178 serverGroup.shutdownGracefully();
179 }
180 if (clientGroup != null) {
181 clientGroup.shutdownGracefully();
182 }
183 }
184 }
185
186 private static final class TestHandler extends SimpleChannelInboundHandler<Buffer> {
187 private final AtomicInteger bytesRead;
188 private final boolean extraReadRequested;
189 private final CountDownLatch latch;
190
191 TestHandler(AtomicInteger bytesRead, boolean extraReadRequested, CountDownLatch latch) {
192 this.bytesRead = bytesRead;
193 this.extraReadRequested = extraReadRequested;
194 this.latch = latch;
195 }
196
197 @Override
198 protected void messageReceived(ChannelHandlerContext ctx, Buffer msg) {
199 bytesRead.addAndGet(msg.readableBytes());
200
201 if (extraReadRequested) {
202
203 ctx.read();
204 }
205 }
206
207 @Override
208 public void channelInactive(ChannelHandlerContext ctx) {
209 latch.countDown();
210 ctx.fireChannelInactive();
211 }
212 }
213 }