View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.stomp;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufUtil;
20  import io.netty.channel.ChannelHandlerContext;
21  import io.netty.handler.codec.MessageToMessageEncoder;
22  
23  import java.util.List;
24  import java.util.Map.Entry;
25  
26  import static io.netty.handler.codec.stomp.StompConstants.*;
27  
28  /**
29   * Encodes a {@link StompFrame} or a {@link StompSubframe} into a {@link ByteBuf}.
30   */
31  public class StompSubframeEncoder extends MessageToMessageEncoder<StompSubframe> {
32  
33      @Override
34      protected void encode(ChannelHandlerContext ctx, StompSubframe msg, List<Object> out) throws Exception {
35          if (msg instanceof StompFrame) {
36              StompFrame stompFrame = (StompFrame) msg;
37              ByteBuf buf = encodeFullFrame(stompFrame, ctx);
38  
39              out.add(convertFullFrame(stompFrame, buf));
40          } else if (msg instanceof StompHeadersSubframe) {
41              StompHeadersSubframe stompHeadersSubframe = (StompHeadersSubframe) msg;
42              ByteBuf buf = ctx.alloc().buffer(headersSubFrameSize(stompHeadersSubframe));
43              encodeHeaders(stompHeadersSubframe, buf);
44  
45              out.add(convertHeadersSubFrame(stompHeadersSubframe, buf));
46          } else if (msg instanceof StompContentSubframe) {
47              StompContentSubframe stompContentSubframe = (StompContentSubframe) msg;
48              ByteBuf buf = encodeContent(stompContentSubframe, ctx);
49  
50              out.add(convertContentSubFrame(stompContentSubframe, buf));
51          }
52      }
53  
54      /**
55       * An extension method to convert a STOMP encoded buffer to a different message type
56       * based on an original {@link StompFrame} full frame.
57       *
58       * <p>By default an encoded buffer is returned as is.
59       */
60      protected Object convertFullFrame(StompFrame original, ByteBuf encoded) {
61          return encoded;
62      }
63  
64      /**
65       * An extension method to convert a STOMP encoded buffer to a different message type
66       * based on an original {@link StompHeadersSubframe} headers sub frame.
67       *
68       * <p>By default an encoded buffer is returned as is.
69       */
70      protected Object convertHeadersSubFrame(StompHeadersSubframe original, ByteBuf encoded) {
71          return encoded;
72      }
73  
74      /**
75       * An extension method to convert a STOMP encoded buffer to a different message type
76       * based on an original {@link StompHeadersSubframe} content sub frame.
77       *
78       * <p>By default an encoded buffer is returned as is.
79       */
80      protected Object convertContentSubFrame(StompContentSubframe original, ByteBuf encoded) {
81          return encoded;
82      }
83  
84      /**
85       * Returns a heuristic size for headers (32 bytes per header line) + (2 bytes for colon and eol) + (additional
86       * command buffer).
87       */
88      protected int headersSubFrameSize(StompHeadersSubframe headersSubframe) {
89          int estimatedSize = headersSubframe.headers().size() * 34 + 48;
90          if (estimatedSize < 128) {
91              return 128;
92          } else if (estimatedSize < 256) {
93              return 256;
94          }
95  
96          return estimatedSize;
97      }
98  
99      private ByteBuf encodeFullFrame(StompFrame frame, ChannelHandlerContext ctx) {
100         int contentReadableBytes = frame.content().readableBytes();
101         ByteBuf buf = ctx.alloc().buffer(headersSubFrameSize(frame) + contentReadableBytes);
102         encodeHeaders(frame, buf);
103 
104         if (contentReadableBytes > 0) {
105             buf.writeBytes(frame.content());
106         }
107 
108         return buf.writeByte(NUL);
109     }
110 
111     private static void encodeHeaders(StompHeadersSubframe frame, ByteBuf buf) {
112         ByteBufUtil.writeUtf8(buf, frame.command().toString());
113         buf.writeByte(StompConstants.LF);
114 
115         for (Entry<CharSequence, CharSequence> entry : frame.headers()) {
116             ByteBufUtil.writeUtf8(buf, entry.getKey());
117             buf.writeByte(StompConstants.COLON);
118             ByteBufUtil.writeUtf8(buf, entry.getValue());
119             buf.writeByte(StompConstants.LF);
120         }
121 
122         buf.writeByte(StompConstants.LF);
123     }
124 
125     private static ByteBuf encodeContent(StompContentSubframe content, ChannelHandlerContext ctx) {
126         if (content instanceof LastStompContentSubframe) {
127             ByteBuf buf = ctx.alloc().buffer(content.content().readableBytes() + 1);
128             buf.writeBytes(content.content());
129             buf.writeByte(StompConstants.NUL);
130             return buf;
131         }
132 
133         return content.content().retain();
134     }
135 }