View Javadoc
1   /*
2    * Copyright 2022 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.testsuite.transport.socket;
17  
18  import io.netty5.bootstrap.Bootstrap;
19  import io.netty5.buffer.api.Buffer;
20  import io.netty5.channel.Channel;
21  import io.netty5.channel.ChannelHandlerAdapter;
22  import io.netty5.channel.ChannelHandlerContext;
23  import io.netty5.channel.ChannelInitializer;
24  import io.netty5.channel.SimpleChannelInboundHandler;
25  import io.netty5.channel.socket.DatagramChannel;
26  import io.netty5.channel.socket.DatagramPacket;
27  import io.netty5.channel.socket.DomainSocketAddress;
28  import io.netty5.testsuite.transport.TestsuitePermutation;
29  import io.netty5.util.concurrent.Future;
30  import org.junit.jupiter.api.Test;
31  import org.junit.jupiter.api.TestInfo;
32  import org.junit.jupiter.api.condition.EnabledIf;
33  
34  import java.net.SocketAddress;
35  import java.util.List;
36  import java.util.concurrent.CountDownLatch;
37  import java.util.concurrent.atomic.AtomicReference;
38  
39  import static org.assertj.core.api.Assertions.assertThat;
40  import static org.junit.jupiter.api.Assertions.assertEquals;
41  import static org.junit.jupiter.api.Assertions.assertNotNull;
42  
43  @EnabledIf("isSupported")
44  public class DomainDatagramUnicastTest extends DatagramUnicastTest {
45  
46      static boolean isSupported() {
47          return NioDomainSocketTestUtil.isDatagramSupported();
48      }
49  
50      @Test
51      void testBind(TestInfo testInfo) throws Throwable {
52          run(testInfo, (bootstrap, bootstrap2) -> testBind(bootstrap2));
53      }
54  
55      private void testBind(Bootstrap cb) throws Throwable {
56          Channel channel = null;
57          try {
58              channel = cb.handler(new ChannelHandlerAdapter() { })
59                      .bind(newSocketAddress()).asStage().get();
60              assertThat(channel.localAddress()).isNotNull()
61                      .isInstanceOf(DomainSocketAddress.class);
62          } finally {
63              closeChannel(channel);
64          }
65      }
66  
67      @Override
68      protected boolean supportDisconnect() {
69          return false;
70      }
71  
72      @Override
73      protected boolean isConnected(Channel channel) {
74          return ((DatagramChannel) channel).isConnected();
75      }
76  
77      @Override
78      protected List<TestsuitePermutation.BootstrapComboFactory<Bootstrap, Bootstrap>> newFactories() {
79          return SocketTestPermutation.INSTANCE.datagram(NioDomainSocketTestUtil.domainSocketFamily());
80      }
81  
82      @Override
83      protected SocketAddress newSocketAddress() {
84          return SocketTestPermutation.newDomainSocketAddress();
85      }
86  
87      @Override
88      protected Channel setupClientChannel(Bootstrap cb, final byte[] bytes, final CountDownLatch latch,
89                                           final AtomicReference<Throwable> errorRef) throws Throwable {
90          cb.handler(new SimpleChannelInboundHandler<DatagramPacket>() {
91  
92              @Override
93              public void messageReceived(ChannelHandlerContext ctx, DatagramPacket msg) {
94                  try {
95                      Buffer buf = msg.content();
96                      assertEquals(bytes.length, buf.readableBytes());
97                      for (int i = 0; i < bytes.length; i++) {
98                          assertEquals(bytes[i], buf.getByte(buf.readerOffset() + i));
99                      }
100 
101                     assertEquals(ctx.channel().localAddress(), msg.recipient());
102                 } finally {
103                     latch.countDown();
104                 }
105             }
106 
107             @Override
108             public void channelExceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
109                 errorRef.compareAndSet(null, cause);
110             }
111         });
112         return cb.bind(newSocketAddress()).asStage().get();
113     }
114 
115     @Override
116     protected Channel setupServerChannel(Bootstrap sb, final byte[] bytes, final SocketAddress sender,
117                                          final CountDownLatch latch, final AtomicReference<Throwable> errorRef,
118                                          final boolean echo) throws Throwable {
119         sb.handler(new ChannelInitializer<Channel>() {
120 
121             @Override
122             protected void initChannel(Channel ch) {
123                 ch.pipeline().addLast(new SimpleChannelInboundHandler<DatagramPacket>() {
124                     @Override
125                     public void messageReceived(ChannelHandlerContext ctx, DatagramPacket msg) {
126                         try {
127                             if (sender == null) {
128                                 assertNotNull(msg.sender());
129                             } else {
130                                 assertEquals(sender, msg.sender());
131                             }
132 
133                             Buffer buf = msg.content();
134                             assertEquals(bytes.length, buf.readableBytes());
135                             for (int i = 0; i < bytes.length; i++) {
136                                 assertEquals(bytes[i], buf.getByte(buf.readerOffset() + i));
137                             }
138 
139                             assertEquals(ctx.channel().localAddress(), msg.recipient());
140 
141                             if (echo) {
142                                 ctx.writeAndFlush(new DatagramPacket(buf.split(), msg.sender()));
143                             }
144                         } finally {
145                             latch.countDown();
146                         }
147                     }
148 
149                     @Override
150                     public void channelExceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
151                         errorRef.compareAndSet(null, cause);
152                     }
153                 });
154             }
155         });
156         return sb.bind(newSocketAddress()).asStage().get();
157     }
158 
159     @Override
160     protected Future<Void> write(Channel cc, Buffer buf, SocketAddress remote) {
161         return cc.write(new DatagramPacket(buf, remote));
162     }
163 }