1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.testsuite.transport.socket;
17
18 import io.netty.bootstrap.Bootstrap;
19 import io.netty.buffer.ByteBuf;
20 import io.netty.buffer.Unpooled;
21 import io.netty.channel.Channel;
22 import io.netty.channel.ChannelHandlerContext;
23 import io.netty.channel.ChannelOption;
24 import io.netty.channel.SimpleChannelInboundHandler;
25 import io.netty.channel.socket.DatagramPacket;
26 import io.netty.testsuite.transport.TestsuitePermutation;
27 import io.netty.util.CharsetUtil;
28 import io.netty.util.NetUtil;
29 import org.junit.jupiter.api.Test;
30 import org.junit.jupiter.api.TestInfo;
31 import org.junit.jupiter.api.Timeout;
32 import org.junit.jupiter.api.condition.DisabledOnOs;
33 import org.junit.jupiter.api.condition.OS;
34
35 import java.net.InetSocketAddress;
36 import java.net.PortUnreachableException;
37 import java.util.List;
38 import java.util.concurrent.CountDownLatch;
39 import java.util.concurrent.TimeUnit;
40 import java.util.concurrent.atomic.AtomicReference;
41
42 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
43 import static org.junit.jupiter.api.Assertions.assertNotNull;
44 import static org.junit.jupiter.api.Assertions.assertTrue;
45
46 public class DatagramConnectedWriteExceptionTest extends AbstractClientSocketTest {
47
48 @Override
49 protected List<TestsuitePermutation.BootstrapFactory<Bootstrap>> newFactories() {
50 return SocketTestPermutation.INSTANCE.datagramSocket();
51 }
52
53 @Test
54 @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
55 @DisabledOnOs(OS.WINDOWS)
56 public void testWriteThrowsPortUnreachableException(TestInfo testInfo) throws Throwable {
57 run(testInfo, (Runner<Bootstrap>) this::testWriteExceptionAfterServerStop);
58 }
59
60 protected void testWriteExceptionAfterServerStop(Bootstrap clientBootstrap) throws Throwable {
61 CountDownLatch serverReceivedLatch = new CountDownLatch(1);
62 Bootstrap serverBootstrap = clientBootstrap.clone()
63 .option(ChannelOption.SO_BROADCAST, false)
64 .handler(new SimpleChannelInboundHandler<DatagramPacket>() {
65
66 @Override
67 protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) {
68 serverReceivedLatch.countDown();
69 }
70 });
71
72 Channel serverChannel = serverBootstrap.bind(new InetSocketAddress(NetUtil.LOCALHOST, 0)).sync().channel();
73 InetSocketAddress serverAddress = (InetSocketAddress) serverChannel.localAddress();
74
75 clientBootstrap.option(ChannelOption.AUTO_READ, false)
76 .handler(new SimpleChannelInboundHandler<DatagramPacket>() {
77
78 @Override
79 protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) {
80
81 }
82 });
83
84 Channel clientChannel = clientBootstrap.connect(serverAddress).sync().channel();
85
86 CountDownLatch clientFirstSendLatch = new CountDownLatch(1);
87 try {
88 ByteBuf firstMessage = Unpooled.wrappedBuffer("First message".getBytes(CharsetUtil.UTF_8));
89 clientChannel.writeAndFlush(firstMessage)
90 .addListener(future -> {
91 if (future.isSuccess()) {
92 clientFirstSendLatch.countDown();
93 }
94 });
95
96 assertTrue(serverReceivedLatch.await(5, TimeUnit.SECONDS), "Server should receive first message");
97 assertTrue(clientFirstSendLatch.await(5, TimeUnit.SECONDS), "Client should send first message");
98
99 serverChannel.close().sync();
100
101 AtomicReference<Throwable> writeException = new AtomicReference<>();
102 CountDownLatch writesCompleteLatch = new CountDownLatch(10);
103
104 for (int i = 0; i < 10; i++) {
105 ByteBuf message = Unpooled.wrappedBuffer(("Message " + i).getBytes(CharsetUtil.UTF_8));
106 clientChannel.writeAndFlush(message)
107 .addListener(future -> {
108 if (!future.isSuccess()) {
109 writeException.compareAndSet(null, future.cause());
110 }
111 writesCompleteLatch.countDown();
112 });
113 Thread.sleep(50);
114 }
115
116 assertTrue(writesCompleteLatch.await(5, TimeUnit.SECONDS), "All writes should complete");
117
118 assertNotNull(writeException.get(), "Should have captured a write exception");
119
120 assertInstanceOf(PortUnreachableException.class, writeException.get(), "Expected " +
121 "PortUnreachableException but got: " + writeException.get().getClass().getName());
122 } finally {
123 if (clientChannel != null) {
124 clientChannel.close().sync();
125 }
126 }
127 }
128 }