View Javadoc
1   /*
2    * Copyright 2026 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.testsuite.transport.socket;
17  
18  import io.netty.bootstrap.Bootstrap;
19  import io.netty.buffer.ByteBuf;
20  import io.netty.buffer.Unpooled;
21  import io.netty.channel.Channel;
22  import io.netty.channel.ChannelHandlerContext;
23  import io.netty.channel.ChannelOption;
24  import io.netty.channel.SimpleChannelInboundHandler;
25  import io.netty.channel.socket.DatagramPacket;
26  import io.netty.testsuite.transport.TestsuitePermutation;
27  import io.netty.util.CharsetUtil;
28  import io.netty.util.NetUtil;
29  import org.junit.jupiter.api.Test;
30  import org.junit.jupiter.api.TestInfo;
31  import org.junit.jupiter.api.Timeout;
32  import org.junit.jupiter.api.condition.DisabledOnOs;
33  import org.junit.jupiter.api.condition.OS;
34  
35  import java.net.InetSocketAddress;
36  import java.net.PortUnreachableException;
37  import java.util.List;
38  import java.util.concurrent.CountDownLatch;
39  import java.util.concurrent.TimeUnit;
40  import java.util.concurrent.atomic.AtomicReference;
41  
42  import static org.junit.jupiter.api.Assertions.assertInstanceOf;
43  import static org.junit.jupiter.api.Assertions.assertNotNull;
44  import static org.junit.jupiter.api.Assertions.assertTrue;
45  
46  public class DatagramConnectedWriteExceptionTest extends AbstractClientSocketTest {
47  
48      @Override
49      protected List<TestsuitePermutation.BootstrapFactory<Bootstrap>> newFactories() {
50          return SocketTestPermutation.INSTANCE.datagramSocket();
51      }
52  
53      @Test
54      @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
55      @DisabledOnOs(OS.WINDOWS)
56      public void testWriteThrowsPortUnreachableException(TestInfo testInfo) throws Throwable {
57          run(testInfo, (Runner<Bootstrap>) this::testWriteExceptionAfterServerStop);
58      }
59  
60      protected void testWriteExceptionAfterServerStop(Bootstrap clientBootstrap) throws Throwable {
61          CountDownLatch serverReceivedLatch = new CountDownLatch(1);
62          Bootstrap serverBootstrap = clientBootstrap.clone()
63                  .option(ChannelOption.SO_BROADCAST, false)
64                  .handler(new SimpleChannelInboundHandler<DatagramPacket>() {
65  
66                      @Override
67                      protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) {
68                          serverReceivedLatch.countDown();
69                      }
70                  });
71  
72          Channel serverChannel = serverBootstrap.bind(new InetSocketAddress(NetUtil.LOCALHOST, 0)).sync().channel();
73          InetSocketAddress serverAddress = (InetSocketAddress) serverChannel.localAddress();
74  
75          clientBootstrap.option(ChannelOption.AUTO_READ, false)
76                  .handler(new SimpleChannelInboundHandler<DatagramPacket>() {
77  
78                      @Override
79                      protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) {
80                          // no-op
81                      }
82                  });
83  
84          Channel clientChannel = clientBootstrap.connect(serverAddress).sync().channel();
85  
86          CountDownLatch clientFirstSendLatch = new CountDownLatch(1);
87          try {
88              ByteBuf firstMessage = Unpooled.wrappedBuffer("First message".getBytes(CharsetUtil.UTF_8));
89              clientChannel.writeAndFlush(firstMessage)
90                      .addListener(future -> {
91                          if (future.isSuccess()) {
92                              clientFirstSendLatch.countDown();
93                          }
94                      });
95  
96              assertTrue(serverReceivedLatch.await(5, TimeUnit.SECONDS), "Server should receive first message");
97              assertTrue(clientFirstSendLatch.await(5, TimeUnit.SECONDS), "Client should send first message");
98  
99              serverChannel.close().sync();
100 
101             AtomicReference<Throwable> writeException = new AtomicReference<>();
102             CountDownLatch writesCompleteLatch = new CountDownLatch(10);
103 
104             for (int i = 0; i < 10; i++) {
105                 ByteBuf message = Unpooled.wrappedBuffer(("Message " + i).getBytes(CharsetUtil.UTF_8));
106                 clientChannel.writeAndFlush(message)
107                         .addListener(future -> {
108                             if (!future.isSuccess()) {
109                                 writeException.compareAndSet(null, future.cause());
110                             }
111                             writesCompleteLatch.countDown();
112                         });
113                 Thread.sleep(50);
114             }
115 
116             assertTrue(writesCompleteLatch.await(5, TimeUnit.SECONDS), "All writes should complete");
117 
118             assertNotNull(writeException.get(), "Should have captured a write exception");
119 
120             assertInstanceOf(PortUnreachableException.class, writeException.get(), "Expected " +
121                     "PortUnreachableException but got: " + writeException.get().getClass().getName());
122         } finally {
123             if (clientChannel != null) {
124                 clientChannel.close().sync();
125             }
126         }
127     }
128 }