View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.stomp;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufUtil;
20  import io.netty.channel.ChannelHandlerContext;
21  import io.netty.handler.codec.MessageToMessageEncoder;
22  import io.netty.util.concurrent.FastThreadLocal;
23  import io.netty.util.internal.AppendableCharSequence;
24  
25  import java.util.LinkedHashMap;
26  import java.util.List;
27  import java.util.Map.Entry;
28  
29  import static io.netty.handler.codec.stomp.StompConstants.NUL;
30  import static io.netty.handler.codec.stomp.StompHeaders.ACCEPT_VERSION;
31  import static io.netty.handler.codec.stomp.StompHeaders.ACK;
32  import static io.netty.handler.codec.stomp.StompHeaders.CONTENT_LENGTH;
33  import static io.netty.handler.codec.stomp.StompHeaders.CONTENT_TYPE;
34  import static io.netty.handler.codec.stomp.StompHeaders.DESTINATION;
35  import static io.netty.handler.codec.stomp.StompHeaders.HEART_BEAT;
36  import static io.netty.handler.codec.stomp.StompHeaders.HOST;
37  import static io.netty.handler.codec.stomp.StompHeaders.ID;
38  import static io.netty.handler.codec.stomp.StompHeaders.LOGIN;
39  import static io.netty.handler.codec.stomp.StompHeaders.MESSAGE;
40  import static io.netty.handler.codec.stomp.StompHeaders.MESSAGE_ID;
41  import static io.netty.handler.codec.stomp.StompHeaders.PASSCODE;
42  import static io.netty.handler.codec.stomp.StompHeaders.RECEIPT;
43  import static io.netty.handler.codec.stomp.StompHeaders.RECEIPT_ID;
44  import static io.netty.handler.codec.stomp.StompHeaders.SERVER;
45  import static io.netty.handler.codec.stomp.StompHeaders.SESSION;
46  import static io.netty.handler.codec.stomp.StompHeaders.SUBSCRIPTION;
47  import static io.netty.handler.codec.stomp.StompHeaders.TRANSACTION;
48  import static io.netty.handler.codec.stomp.StompHeaders.VERSION;
49  
50  /**
51   * Encodes a {@link StompFrame} or a {@link StompSubframe} into a {@link ByteBuf}.
52   */
53  public class StompSubframeEncoder extends MessageToMessageEncoder<StompSubframe> {
54  
55      private static final int ESCAPE_HEADER_KEY_CACHE_LIMIT = 32;
56      private static final float DEFAULT_LOAD_FACTOR = 0.75f;
57      private static final FastThreadLocal<LinkedHashMap<CharSequence, CharSequence>> ESCAPE_HEADER_KEY_CACHE =
58              new FastThreadLocal<LinkedHashMap<CharSequence, CharSequence>>() {
59                  @Override
60                  protected LinkedHashMap<CharSequence, CharSequence> initialValue() throws Exception {
61                      LinkedHashMap<CharSequence, CharSequence> cache = new LinkedHashMap<CharSequence, CharSequence>(
62                              ESCAPE_HEADER_KEY_CACHE_LIMIT, DEFAULT_LOAD_FACTOR, true) {
63  
64                          @Override
65                          protected boolean removeEldestEntry(Entry eldest) {
66                              return size() > ESCAPE_HEADER_KEY_CACHE_LIMIT;
67                          }
68                      };
69  
70                      cache.put(ACCEPT_VERSION, ACCEPT_VERSION);
71                      cache.put(HOST, HOST);
72                      cache.put(LOGIN, LOGIN);
73                      cache.put(PASSCODE, PASSCODE);
74                      cache.put(HEART_BEAT, HEART_BEAT);
75                      cache.put(VERSION, VERSION);
76                      cache.put(SESSION, SESSION);
77                      cache.put(SERVER, SERVER);
78                      cache.put(DESTINATION, DESTINATION);
79                      cache.put(ID, ID);
80                      cache.put(ACK, ACK);
81                      cache.put(TRANSACTION, TRANSACTION);
82                      cache.put(RECEIPT, RECEIPT);
83                      cache.put(MESSAGE_ID, MESSAGE_ID);
84                      cache.put(SUBSCRIPTION, SUBSCRIPTION);
85                      cache.put(RECEIPT_ID, RECEIPT_ID);
86                      cache.put(MESSAGE, MESSAGE);
87                      cache.put(CONTENT_LENGTH, CONTENT_LENGTH);
88                      cache.put(CONTENT_TYPE, CONTENT_TYPE);
89  
90                      return cache;
91                  }
92              };
93  
94      @Override
95      protected void encode(ChannelHandlerContext ctx, StompSubframe msg, List<Object> out) throws Exception {
96          if (msg instanceof StompFrame) {
97              StompFrame stompFrame = (StompFrame) msg;
98              ByteBuf buf = encodeFullFrame(stompFrame, ctx);
99  
100             out.add(convertFullFrame(stompFrame, buf));
101         } else if (msg instanceof StompHeadersSubframe) {
102             StompHeadersSubframe stompHeadersSubframe = (StompHeadersSubframe) msg;
103             ByteBuf buf = ctx.alloc().buffer(headersSubFrameSize(stompHeadersSubframe));
104             encodeHeaders(stompHeadersSubframe, buf);
105 
106             out.add(convertHeadersSubFrame(stompHeadersSubframe, buf));
107         } else if (msg instanceof StompContentSubframe) {
108             StompContentSubframe stompContentSubframe = (StompContentSubframe) msg;
109             ByteBuf buf = encodeContent(stompContentSubframe, ctx);
110 
111             out.add(convertContentSubFrame(stompContentSubframe, buf));
112         }
113     }
114 
115     /**
116      * An extension method to convert a STOMP encoded buffer to a different message type
117      * based on an original {@link StompFrame} full frame.
118      *
119      * <p>By default an encoded buffer is returned as is.
120      */
121     protected Object convertFullFrame(StompFrame original, ByteBuf encoded) {
122         return encoded;
123     }
124 
125     /**
126      * An extension method to convert a STOMP encoded buffer to a different message type
127      * based on an original {@link StompHeadersSubframe} headers sub frame.
128      *
129      * <p>By default an encoded buffer is returned as is.
130      */
131     protected Object convertHeadersSubFrame(StompHeadersSubframe original, ByteBuf encoded) {
132         return encoded;
133     }
134 
135     /**
136      * An extension method to convert a STOMP encoded buffer to a different message type
137      * based on an original {@link StompHeadersSubframe} content sub frame.
138      *
139      * <p>By default an encoded buffer is returned as is.
140      */
141     protected Object convertContentSubFrame(StompContentSubframe original, ByteBuf encoded) {
142         return encoded;
143     }
144 
145     /**
146      * Returns a heuristic size for headers (32 bytes per header line) + (2 bytes for colon and eol) + (additional
147      * command buffer).
148      */
149     protected int headersSubFrameSize(StompHeadersSubframe headersSubframe) {
150         int estimatedSize = headersSubframe.headers().size() * 34 + 48;
151         if (estimatedSize < 128) {
152             return 128;
153         }
154 
155         return Math.max(estimatedSize, 256);
156     }
157 
158     private ByteBuf encodeFullFrame(StompFrame frame, ChannelHandlerContext ctx) {
159         int contentReadableBytes = frame.content().readableBytes();
160         ByteBuf buf = ctx.alloc().buffer(headersSubFrameSize(frame) + contentReadableBytes);
161         encodeHeaders(frame, buf);
162 
163         if (contentReadableBytes > 0) {
164             buf.writeBytes(frame.content());
165         }
166 
167         return buf.writeByte(NUL);
168     }
169 
170     private static void encodeHeaders(StompHeadersSubframe frame, ByteBuf buf) {
171         StompCommand command = frame.command();
172         ByteBufUtil.writeUtf8(buf, command.toString());
173         buf.writeByte(StompConstants.LF);
174 
175         boolean shouldEscape = shouldEscape(command);
176         LinkedHashMap<CharSequence, CharSequence> cache = ESCAPE_HEADER_KEY_CACHE.get();
177         for (Entry<CharSequence, CharSequence> entry : frame.headers()) {
178             CharSequence headerKey = entry.getKey();
179             if (shouldEscape) {
180                 CharSequence cachedHeaderKey = cache.get(headerKey);
181                 if (cachedHeaderKey == null) {
182                     cachedHeaderKey = escape(headerKey);
183                     cache.put(headerKey, cachedHeaderKey);
184                 }
185                 headerKey = cachedHeaderKey;
186             }
187 
188             ByteBufUtil.writeUtf8(buf, headerKey);
189             buf.writeByte(StompConstants.COLON);
190 
191             CharSequence headerValue = shouldEscape? escape(entry.getValue()) : entry.getValue();
192             ByteBufUtil.writeUtf8(buf, headerValue);
193             buf.writeByte(StompConstants.LF);
194         }
195 
196         buf.writeByte(StompConstants.LF);
197     }
198 
199     private static ByteBuf encodeContent(StompContentSubframe content, ChannelHandlerContext ctx) {
200         if (content instanceof LastStompContentSubframe) {
201             ByteBuf buf = ctx.alloc().buffer(content.content().readableBytes() + 1);
202             buf.writeBytes(content.content());
203             buf.writeByte(StompConstants.NUL);
204             return buf;
205         }
206 
207         return content.content().retain();
208     }
209 
210     private static boolean shouldEscape(StompCommand command) {
211         return command != StompCommand.CONNECT && command != StompCommand.CONNECTED;
212     }
213 
214     private static CharSequence escape(CharSequence input) {
215         AppendableCharSequence builder = null;
216         for (int i = 0; i < input.length(); i++) {
217             char chr = input.charAt(i);
218             if (chr == '\\') {
219                 builder = escapeBuilder(builder, input, i);
220                 builder.append("\\\\");
221             } else if (chr == ':') {
222                 builder = escapeBuilder(builder, input, i);
223                 builder.append("\\c");
224             } else if (chr == '\n') {
225                 builder = escapeBuilder(builder, input, i);
226                 builder.append("\\n");
227             } else if (chr == '\r') {
228                 builder = escapeBuilder(builder, input, i);
229                 builder.append("\\r");
230             } else if (builder != null) {
231                 builder.append(chr);
232             }
233         }
234 
235         return builder != null? builder : input;
236     }
237 
238     private static AppendableCharSequence escapeBuilder(AppendableCharSequence builder, CharSequence input,
239                                                         int offset) {
240         if (builder != null) {
241             return builder;
242         }
243 
244         // Add extra overhead to the input char sequence to avoid resizing during escaping.
245         return new AppendableCharSequence(input.length() + 8).append(input, 0, offset);
246     }
247 }