1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.testsuite.transport.socket;
17
18 import io.netty5.bootstrap.Bootstrap;
19 import io.netty5.bootstrap.ServerBootstrap;
20 import io.netty5.buffer.api.Buffer;
21 import io.netty5.util.Resource;
22 import io.netty5.channel.Channel;
23 import io.netty5.channel.ChannelHandlerContext;
24 import io.netty5.channel.SimpleChannelInboundHandler;
25 import io.netty5.util.concurrent.Future;
26 import org.junit.jupiter.api.Test;
27 import org.junit.jupiter.api.TestInfo;
28 import org.junit.jupiter.api.Timeout;
29
30 import java.io.IOException;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicInteger;
33 import java.util.concurrent.atomic.AtomicReference;
34
35 import static io.netty5.buffer.api.DefaultBufferAllocators.preferredAllocator;
36 import static org.junit.jupiter.api.Assertions.assertEquals;
37 import static org.junit.jupiter.api.Assertions.assertNull;
38 import static org.junit.jupiter.api.Assertions.assertTrue;
39
40 public class SocketCancelWriteTest extends AbstractSocketTest {
41 @Test
42 @Timeout(value = 30000, unit = TimeUnit.MILLISECONDS)
43 public void testCancelWrite(TestInfo testInfo) throws Throwable {
44 run(testInfo, this::testCancelWrite);
45 }
46
47 public void testCancelWrite(ServerBootstrap sb, Bootstrap cb) throws Throwable {
48 final TestHandler sh = new TestHandler();
49 final TestHandler ch = new TestHandler();
50 final Buffer a = preferredAllocator().allocate(1).writeByte((byte) 'a');
51 final Buffer b = preferredAllocator().allocate(1).writeByte((byte) 'b');
52 final Buffer c = preferredAllocator().allocate(1).writeByte((byte) 'c');
53 final Buffer d = preferredAllocator().allocate(1).writeByte((byte) 'd');
54 final Buffer e = preferredAllocator().allocate(1).writeByte((byte) 'e');
55
56 cb.handler(ch);
57 sb.childHandler(sh);
58
59 Channel sc = sb.bind().asStage().get();
60 Channel cc = cb.connect(sc.localAddress()).asStage().get();
61
62 Future<Void> f = cc.write(a);
63 assertTrue(f.cancel());
64 cc.writeAndFlush(b);
65 cc.write(c);
66 Future<Void> f2 = cc.write(d);
67 assertTrue(f2.cancel());
68 cc.writeAndFlush(e);
69
70 while (sh.counter.get() < 3) {
71 if (sh.exception.get() != null) {
72 break;
73 }
74 if (ch.exception.get() != null) {
75 break;
76 }
77 try {
78 Thread.sleep(50);
79 } catch (InterruptedException ignore) {
80
81 }
82 }
83 sh.channel.close().asStage().sync();
84 ch.channel.close().asStage().sync();
85 sc.close().asStage().sync();
86
87 if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
88 throw sh.exception.get();
89 }
90 if (sh.exception.get() != null) {
91 throw sh.exception.get();
92 }
93 if (ch.exception.get() != null && !(ch.exception.get() instanceof IOException)) {
94 throw ch.exception.get();
95 }
96 if (ch.exception.get() != null) {
97 throw ch.exception.get();
98 }
99 assertEquals(0, ch.counter.get());
100 assertNull(ch.received);
101 assertEquals(preferredAllocator().copyOf(new byte[] { 'b', 'c', 'e' }), sh.received);
102 Resource.dispose(sh.received);
103 }
104
105 private static class TestHandler extends SimpleChannelInboundHandler<Buffer> {
106 volatile Channel channel;
107 final AtomicReference<Throwable> exception = new AtomicReference<>();
108 final AtomicInteger counter = new AtomicInteger();
109 Buffer received;
110 @Override
111 public void channelActive(ChannelHandlerContext ctx)
112 throws Exception {
113 channel = ctx.channel();
114 }
115
116 @Override
117 public void messageReceived(ChannelHandlerContext ctx, Buffer in) throws Exception {
118 counter.getAndAdd(in.readableBytes());
119 if (received == null) {
120 received = preferredAllocator().allocate(32);
121 }
122 received.writeBytes(in);
123 }
124
125 @Override
126 public void channelExceptionCaught(ChannelHandlerContext ctx,
127 Throwable cause) throws Exception {
128 if (exception.compareAndSet(null, cause)) {
129 ctx.close();
130 }
131 }
132 }
133 }