View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.AddressedEnvelope;
20  import io.netty.channel.ChannelHandlerContext;
21  import io.netty.channel.ChannelPipeline;
22  import io.netty.channel.ChannelPromise;
23  import io.netty.channel.socket.DatagramPacket;
24  import io.netty.handler.codec.protobuf.ProtobufEncoder;
25  import io.netty.util.internal.StringUtil;
26  import static io.netty.util.internal.ObjectUtil.checkNotNull;
27  
28  import java.net.InetSocketAddress;
29  import java.net.SocketAddress;
30  import java.util.List;
31  
32  /**
33   * An encoder that encodes the content in {@link AddressedEnvelope} to {@link DatagramPacket} using
34   * the specified message encoder. E.g.,
35   *
36   * <pre><code>
37   * {@link ChannelPipeline} pipeline = ...;
38   * pipeline.addLast("udpEncoder", new {@link DatagramPacketEncoder}(new {@link ProtobufEncoder}(...));
39   * </code></pre>
40   *
41   * Note: As UDP packets are out-of-order, you should make sure the encoded message size are not greater than
42   * the max safe packet size in your particular network path which guarantees no packet fragmentation.
43   *
44   * @param <M> the type of message to be encoded
45   */
46  public class DatagramPacketEncoder<M> extends MessageToMessageEncoder<AddressedEnvelope<M, InetSocketAddress>> {
47  
48      private final MessageToMessageEncoder<? super M> encoder;
49  
50      /**
51       * Create an encoder that encodes the content in {@link AddressedEnvelope} to {@link DatagramPacket} using
52       * the specified message encoder.
53       *
54       * @param encoder the specified message encoder
55       */
56      public DatagramPacketEncoder(MessageToMessageEncoder<? super M> encoder) {
57          this.encoder = checkNotNull(encoder, "encoder");
58      }
59  
60      @Override
61      public boolean acceptOutboundMessage(Object msg) throws Exception {
62          if (super.acceptOutboundMessage(msg)) {
63              @SuppressWarnings("rawtypes")
64              AddressedEnvelope envelope = (AddressedEnvelope) msg;
65              return encoder.acceptOutboundMessage(envelope.content())
66                      && (envelope.sender() instanceof InetSocketAddress || envelope.sender() == null)
67                      && envelope.recipient() instanceof InetSocketAddress;
68          }
69          return false;
70      }
71  
72      @Override
73      protected void encode(
74              ChannelHandlerContext ctx, AddressedEnvelope<M, InetSocketAddress> msg, List<Object> out) throws Exception {
75          assert out.isEmpty();
76  
77          encoder.encode(ctx, msg.content(), out);
78          if (out.size() != 1) {
79              throw new EncoderException(
80                      StringUtil.simpleClassName(encoder) + " must produce only one message.");
81          }
82          Object content = out.get(0);
83          if (content instanceof ByteBuf) {
84              // Replace the ByteBuf with a DatagramPacket.
85              out.set(0, new DatagramPacket((ByteBuf) content, msg.recipient(), msg.sender()));
86          } else {
87              throw new EncoderException(
88                      StringUtil.simpleClassName(encoder) + " must produce only ByteBuf.");
89          }
90      }
91  
92      @Override
93      public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
94          encoder.bind(ctx, localAddress, promise);
95      }
96  
97      @Override
98      public void connect(
99              ChannelHandlerContext ctx, SocketAddress remoteAddress,
100             SocketAddress localAddress, ChannelPromise promise) throws Exception {
101         encoder.connect(ctx, remoteAddress, localAddress, promise);
102     }
103 
104     @Override
105     public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
106         encoder.disconnect(ctx, promise);
107     }
108 
109     @Override
110     public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
111         encoder.close(ctx, promise);
112     }
113 
114     @Override
115     public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
116         encoder.deregister(ctx, promise);
117     }
118 
119     @Override
120     public void read(ChannelHandlerContext ctx) throws Exception {
121         encoder.read(ctx);
122     }
123 
124     @Override
125     public void flush(ChannelHandlerContext ctx) throws Exception {
126         encoder.flush(ctx);
127     }
128 
129     @Override
130     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
131         encoder.handlerAdded(ctx);
132     }
133 
134     @Override
135     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
136         encoder.handlerRemoved(ctx);
137     }
138 
139     @Override
140     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
141         encoder.exceptionCaught(ctx, cause);
142     }
143 
144     @Override
145     public boolean isSharable() {
146         return encoder.isSharable();
147     }
148 }