View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.testsuite.transport.socket;
17  
18  import io.netty5.bootstrap.Bootstrap;
19  import io.netty5.bootstrap.ServerBootstrap;
20  import io.netty5.buffer.api.Buffer;
21  import io.netty5.util.Resource;
22  import io.netty5.channel.Channel;
23  import io.netty5.channel.ChannelHandlerContext;
24  import io.netty5.channel.SimpleChannelInboundHandler;
25  import io.netty5.util.concurrent.Future;
26  import org.junit.jupiter.api.Test;
27  import org.junit.jupiter.api.TestInfo;
28  import org.junit.jupiter.api.Timeout;
29  
30  import java.io.IOException;
31  import java.util.concurrent.TimeUnit;
32  import java.util.concurrent.atomic.AtomicInteger;
33  import java.util.concurrent.atomic.AtomicReference;
34  
35  import static io.netty5.buffer.api.DefaultBufferAllocators.preferredAllocator;
36  import static org.junit.jupiter.api.Assertions.assertEquals;
37  import static org.junit.jupiter.api.Assertions.assertNull;
38  import static org.junit.jupiter.api.Assertions.assertTrue;
39  
40  public class SocketCancelWriteTest extends AbstractSocketTest {
41      @Test
42      @Timeout(value = 30000, unit = TimeUnit.MILLISECONDS)
43      public void testCancelWrite(TestInfo testInfo) throws Throwable {
44          run(testInfo, this::testCancelWrite);
45      }
46  
47      public void testCancelWrite(ServerBootstrap sb, Bootstrap cb) throws Throwable {
48          final TestHandler sh = new TestHandler();
49          final TestHandler ch = new TestHandler();
50          final Buffer a = preferredAllocator().allocate(1).writeByte((byte) 'a');
51          final Buffer b = preferredAllocator().allocate(1).writeByte((byte) 'b');
52          final Buffer c = preferredAllocator().allocate(1).writeByte((byte) 'c');
53          final Buffer d = preferredAllocator().allocate(1).writeByte((byte) 'd');
54          final Buffer e = preferredAllocator().allocate(1).writeByte((byte) 'e');
55  
56          cb.handler(ch);
57          sb.childHandler(sh);
58  
59          Channel sc = sb.bind().asStage().get();
60          Channel cc = cb.connect(sc.localAddress()).asStage().get();
61  
62          Future<Void> f = cc.write(a);
63          assertTrue(f.cancel());
64          cc.writeAndFlush(b);
65          cc.write(c);
66          Future<Void> f2 = cc.write(d);
67          assertTrue(f2.cancel());
68          cc.writeAndFlush(e);
69  
70          while (sh.counter.get() < 3) {
71              if (sh.exception.get() != null) {
72                  break;
73              }
74              if (ch.exception.get() != null) {
75                  break;
76              }
77              try {
78                  Thread.sleep(50);
79              } catch (InterruptedException ignore) {
80                  // Ignore.
81              }
82          }
83          sh.channel.close().asStage().sync();
84          ch.channel.close().asStage().sync();
85          sc.close().asStage().sync();
86  
87          if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
88              throw sh.exception.get();
89          }
90          if (sh.exception.get() != null) {
91              throw sh.exception.get();
92          }
93          if (ch.exception.get() != null && !(ch.exception.get() instanceof IOException)) {
94              throw ch.exception.get();
95          }
96          if (ch.exception.get() != null) {
97              throw ch.exception.get();
98          }
99          assertEquals(0, ch.counter.get());
100         assertNull(ch.received);
101         assertEquals(preferredAllocator().copyOf(new byte[] { 'b', 'c', 'e' }), sh.received);
102         Resource.dispose(sh.received);
103     }
104 
105     private static class TestHandler extends SimpleChannelInboundHandler<Buffer> {
106         volatile Channel channel;
107         final AtomicReference<Throwable> exception = new AtomicReference<>();
108         final AtomicInteger counter = new AtomicInteger();
109         Buffer received;
110         @Override
111         public void channelActive(ChannelHandlerContext ctx)
112                 throws Exception {
113             channel = ctx.channel();
114         }
115 
116         @Override
117         public void messageReceived(ChannelHandlerContext ctx, Buffer in) throws Exception {
118             counter.getAndAdd(in.readableBytes());
119             if (received == null) {
120                 received = preferredAllocator().allocate(32);
121             }
122             received.writeBytes(in);
123         }
124 
125         @Override
126         public void channelExceptionCaught(ChannelHandlerContext ctx,
127                                            Throwable cause) throws Exception {
128             if (exception.compareAndSet(null, cause)) {
129                 ctx.close();
130             }
131         }
132     }
133 }