View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.handler.codec.string;
17  
18  import io.netty5.buffer.api.Buffer;
19  import io.netty5.channel.ChannelHandlerContext;
20  import io.netty5.channel.ChannelPipeline;
21  import io.netty5.handler.codec.LineBasedFrameDecoder;
22  import io.netty5.handler.codec.MessageToMessageEncoder;
23  import io.netty5.util.CharsetUtil;
24  import io.netty5.util.internal.SilentDispose;
25  import io.netty5.util.internal.logging.InternalLogger;
26  import io.netty5.util.internal.logging.InternalLoggerFactory;
27  
28  import java.nio.ByteBuffer;
29  import java.nio.CharBuffer;
30  import java.nio.charset.Charset;
31  import java.nio.charset.CharsetEncoder;
32  import java.nio.charset.CoderResult;
33  import java.util.List;
34  
35  import static java.util.Objects.requireNonNull;
36  
37  /**
38   * Apply a line separator to the requested {@link String} and encode it into a {@link Buffer}.
39   * A typical setup for a text-based line protocol in a TCP/IP socket would be:
40   * <pre>
41   * {@link ChannelPipeline} pipeline = ...;
42   *
43   * // Decoders
44   * pipeline.addLast("frameDecoder", new {@link LineBasedFrameDecoder}(80));
45   * pipeline.addLast("stringDecoder", new {@link StringDecoder}(CharsetUtil.UTF_8));
46   *
47   * // Encoder
48   * pipeline.addLast("lineEncoder", new {@link LineEncoder}(LineSeparator.UNIX, CharsetUtil.UTF_8));
49   * </pre>
50   * and then you can use a {@link String} instead of a {@link Buffer}
51   * as a message:
52   * <pre>
53   * void channelRead({@link ChannelHandlerContext} ctx, {@link String} msg) {
54   *     ch.write("Did you say '" + msg + "'?");
55   * }
56   * </pre>
57   */
58  public class LineEncoder extends MessageToMessageEncoder<CharSequence> {
59      private static final InternalLogger logger = InternalLoggerFactory.getInstance(LineEncoder.class);
60  
61      private final Charset charset;
62      private final byte[] lineSeparator;
63  
64      /**
65       * Creates a new instance with the current system line separator and UTF-8 charset encoding.
66       */
67      public LineEncoder() {
68          this(LineSeparator.DEFAULT, CharsetUtil.UTF_8);
69      }
70  
71      /**
72       * Creates a new instance with the specified line separator and UTF-8 charset encoding.
73       */
74      public LineEncoder(LineSeparator lineSeparator) {
75          this(lineSeparator, CharsetUtil.UTF_8);
76      }
77  
78      /**
79       * Creates a new instance with the specified character set.
80       */
81      public LineEncoder(Charset charset) {
82          this(LineSeparator.DEFAULT, charset);
83      }
84  
85      /**
86       * Creates a new instance with the specified line separator and character set.
87       */
88      public LineEncoder(LineSeparator lineSeparator, Charset charset) {
89          this.charset = requireNonNull(charset, "charset");
90          this.lineSeparator = requireNonNull(lineSeparator, "lineSeparator").value().getBytes(charset);
91      }
92  
93      @Override
94      public boolean isSharable() {
95          return true;
96      }
97  
98      @Override
99      protected void encode(ChannelHandlerContext ctx, CharSequence msg, List<Object> out) throws Exception {
100         CharsetEncoder encoder = CharsetUtil.encoder(charset);
101         int size = (int) (msg.length() * encoder.maxBytesPerChar() + lineSeparator.length);
102         Buffer buf = ctx.bufferAllocator().allocate(size);
103         assert buf.countWritableComponents() == 1;
104         boolean release = true;
105         CharBuffer chars = CharBuffer.wrap(msg);
106         try (var iterator = buf.forEachWritable()) {
107             var component = requireNonNull(iterator.first(), "writable component");
108             ByteBuffer byteBuffer = component.writableBuffer();
109             int start = byteBuffer.position();
110             CoderResult result = encoder.encode(chars, byteBuffer, true);
111             if (!result.isUnderflow()) {
112                 result.throwException();
113             }
114             encoder.flush(byteBuffer);
115             if (!result.isUnderflow()) {
116                 result.throwException();
117             }
118             component.skipWritableBytes(byteBuffer.position() - start);
119             release = false;
120         } finally {
121             if (release) {
122                 SilentDispose.dispose(buf, logger);
123             }
124         }
125         buf.writeBytes(lineSeparator);
126         out.add(buf);
127     }
128 }