1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package org.jboss.netty.handler.codec.frame;
17
18 import org.jboss.netty.buffer.ChannelBuffer;
19 import org.jboss.netty.buffer.ChannelBufferFactory;
20 import org.jboss.netty.buffer.ChannelBuffers;
21 import org.jboss.netty.buffer.CompositeChannelBuffer;
22 import org.jboss.netty.channel.Channel;
23 import org.jboss.netty.channel.ChannelHandler;
24 import org.jboss.netty.channel.ChannelHandlerContext;
25 import org.jboss.netty.channel.ChannelPipeline;
26 import org.jboss.netty.channel.ChannelStateEvent;
27 import org.jboss.netty.channel.ChannelUpstreamHandler;
28 import org.jboss.netty.channel.Channels;
29 import org.jboss.netty.channel.ExceptionEvent;
30 import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
31 import org.jboss.netty.channel.MessageEvent;
32 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
33 import org.jboss.netty.handler.codec.replay.ReplayingDecoder;
34
35 import java.net.SocketAddress;
36
37 /**
38 * Decodes the received {@link ChannelBuffer}s into a meaningful frame object.
39 * <p>
40 * In a stream-based transport such as TCP/IP, packets can be fragmented and
41 * reassembled during transmission even in a LAN environment. For example,
42 * let us assume you have received three packets:
43 * <pre>
44 * +-----+-----+-----+
45 * | ABC | DEF | GHI |
46 * +-----+-----+-----+
47 * </pre>
48 * because of the packet fragmentation, a server can receive them like the
49 * following:
50 * <pre>
51 * +----+-------+---+---+
52 * | AB | CDEFG | H | I |
53 * +----+-------+---+---+
54 * </pre>
55 * <p>
56 * {@link FrameDecoder} helps you defrag the received packets into one or more
57 * meaningful <strong>frames</strong> that could be easily understood by the
58 * application logic. In case of the example above, your {@link FrameDecoder}
59 * implementation could defrag the received packets like the following:
60 * <pre>
61 * +-----+-----+-----+
62 * | ABC | DEF | GHI |
63 * +-----+-----+-----+
64 * </pre>
65 * <p>
66 * The following code shows an example handler which decodes a frame whose
67 * first 4 bytes header represents the length of the frame, excluding the
68 * header.
69 * <pre>
70 * MESSAGE FORMAT
71 * ==============
72 *
73 * Offset: 0 4 (Length + 4)
74 * +--------+------------------------+
75 * Fields: | Length | Actual message content |
76 * +--------+------------------------+
77 *
78 * DECODER IMPLEMENTATION
79 * ======================
80 *
81 * public class IntegerHeaderFrameDecoder extends {@link FrameDecoder} {
82 *
83 * {@code @Override}
84 * protected Object decode({@link ChannelHandlerContext} ctx,
85 * {@link Channel channel},
86 * {@link ChannelBuffer} buf) throws Exception {
87 *
88 * // Make sure if the length field was received.
89 * if (buf.readableBytes() < 4) {
90 * // The length field was not received yet - return null.
91 * // This method will be invoked again when more packets are
92 * // received and appended to the buffer.
93 * return <strong>null</strong>;
94 * }
95 *
96 * // The length field is in the buffer.
97 *
98 * // Mark the current buffer position before reading the length field
99 * // because the whole frame might not be in the buffer yet.
100 * // We will reset the buffer position to the marked position if
101 * // there's not enough bytes in the buffer.
102 * buf.markReaderIndex();
103 *
104 * // Read the length field.
105 * int length = buf.readInt();
106 *
107 * // Make sure if there's enough bytes in the buffer.
108 * if (buf.readableBytes() < length) {
109 * // The whole bytes were not received yet - return null.
110 * // This method will be invoked again when more packets are
111 * // received and appended to the buffer.
112 *
113 * // Reset to the marked position to read the length field again
114 * // next time.
115 * buf.resetReaderIndex();
116 *
117 * return <strong>null</strong>;
118 * }
119 *
120 * // There's enough bytes in the buffer. Read it.
121 * {@link ChannelBuffer} frame = buf.readBytes(length);
122 *
123 * // Successfully decoded a frame. Return the decoded frame.
124 * return <strong>frame</strong>;
125 * }
126 * }
127 * </pre>
128 *
129 * <h3>Returning a POJO rather than a {@link ChannelBuffer}</h3>
130 * <p>
131 * Please note that you can return an object of a different type than
132 * {@link ChannelBuffer} in your {@code decode()} and {@code decodeLast()}
133 * implementation. For example, you could return a
134 * <a href="http://en.wikipedia.org/wiki/POJO">POJO</a> so that the next
135 * {@link ChannelUpstreamHandler} receives a {@link MessageEvent} which
136 * contains a POJO rather than a {@link ChannelBuffer}.
137 *
138 * <h3>Replacing a decoder with another decoder in a pipeline</h3>
139 * <p>
140 * If you are going to write a protocol multiplexer, you will probably want to
141 * replace a {@link FrameDecoder} (protocol detector) with another
142 * {@link FrameDecoder} or {@link ReplayingDecoder} (actual protocol decoder).
143 * It is not possible to achieve this simply by calling
144 * {@link ChannelPipeline#replace(ChannelHandler, String, ChannelHandler)}, but
145 * some additional steps are required:
146 * <pre>
147 * public class FirstDecoder extends {@link FrameDecoder} {
148 *
149 * public FirstDecoder() {
150 * super(true); // Enable unfold
151 * }
152 *
153 * {@code @Override}
154 * protected Object decode({@link ChannelHandlerContext} ctx,
155 * {@link Channel} channel,
156 * {@link ChannelBuffer} buf) {
157 * ...
158 * // Decode the first message
159 * Object firstMessage = ...;
160 *
161 * // Add the second decoder
162 * ctx.getPipeline().addLast("second", new SecondDecoder());
163 *
164 * // Remove the first decoder (me)
165 * ctx.getPipeline().remove(this);
166 *
167 * if (buf.readable()) {
168 * // Hand off the remaining data to the second decoder
169 * return new Object[] { firstMessage, buf.readBytes(buf.readableBytes()) };
170 * } else {
171 * // Nothing to hand off
172 * return firstMessage;
173 * }
174 * }
175 * }
176 * </pre>
177 *
178 * @apiviz.landmark
179 */
180 public abstract class FrameDecoder extends SimpleChannelUpstreamHandler implements LifeCycleAwareChannelHandler {
181
182 public static final int DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS = 1024;
183
184 private boolean unfold;
185 protected ChannelBuffer cumulation;
186 private volatile ChannelHandlerContext ctx;
187 private int copyThreshold;
188 private int maxCumulationBufferComponents = DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS;
189
190 protected FrameDecoder() {
191 this(false);
192 }
193
194 protected FrameDecoder(boolean unfold) {
195 this.unfold = unfold;
196 }
197
198 public final boolean isUnfold() {
199 return unfold;
200 }
201
202 public final void setUnfold(boolean unfold) {
203 if (ctx == null) {
204 this.unfold = unfold;
205 } else {
206 throw new IllegalStateException(
207 "decoder properties cannot be changed once the decoder is added to a pipeline.");
208 }
209 }
210
211 /**
212 * See {@link #setMaxCumulationBufferCapacity(int)} for explaintation of this setting
213 *
214 */
215 public final int getMaxCumulationBufferCapacity() {
216 return copyThreshold;
217 }
218
219 /**
220 * Set the maximal capacity of the internal cumulation ChannelBuffer to use
221 * before the {@link FrameDecoder} tries to minimize the memory usage by
222 * "byte copy".
223 *
224 *
225 * What you use here really depends on your application and need. Using
226 * {@link Integer#MAX_VALUE} will disable all byte copies but give you the
227 * cost of a higher memory usage if big {@link ChannelBuffer}'s will be
228 * received.
229 *
230 * By default a threshold of <code>0</code> is used, which means it will
231 * always copy to try to reduce memory usage
232 *
233 *
234 * @param copyThreshold
235 * the threshold (in bytes) or {@link Integer#MAX_VALUE} to
236 * disable it. The value must be at least 0
237 * @throws IllegalStateException
238 * get thrown if someone tries to change this setting after the
239 * Decoder was added to the {@link ChannelPipeline}
240 */
241 public final void setMaxCumulationBufferCapacity(int copyThreshold) {
242 if (copyThreshold < 0) {
243 throw new IllegalArgumentException("maxCumulationBufferCapacity must be >= 0");
244 }
245 if (ctx == null) {
246 this.copyThreshold = copyThreshold;
247 } else {
248 throw new IllegalStateException(
249 "decoder properties cannot be changed once the decoder is added to a pipeline.");
250 }
251 }
252
253 /**
254 * Returns the maximum number of components in the cumulation buffer. If the number of
255 * the components in the cumulation buffer exceeds this value, the components of the
256 * cumulation buffer are consolidated into a single component, involving memory copies.
257 * The default value of this property {@link #DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS}.
258 */
259 public final int getMaxCumulationBufferComponents() {
260 return maxCumulationBufferComponents;
261 }
262
263 /**
264 * Sets the maximum number of components in the cumulation buffer. If the number of
265 * the components in the cumulation buffer exceeds this value, the components of the
266 * cumulation buffer are consolidated into a single component, involving memory copies.
267 * The default value of this property is {@link #DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS}
268 * and its minimum allowed value is {@code 2}.
269 */
270 public final void setMaxCumulationBufferComponents(int maxCumulationBufferComponents) {
271 if (maxCumulationBufferComponents < 2) {
272 throw new IllegalArgumentException(
273 "maxCumulationBufferComponents: " + maxCumulationBufferComponents +
274 " (expected: >= 2)");
275 }
276
277 if (ctx == null) {
278 this.maxCumulationBufferComponents = maxCumulationBufferComponents;
279 } else {
280 throw new IllegalStateException(
281 "decoder properties cannot be changed once the decoder is added to a pipeline.");
282 }
283 }
284
285 @Override
286 public void messageReceived(
287 ChannelHandlerContext ctx, MessageEvent e) throws Exception {
288
289 Object m = e.getMessage();
290 if (!(m instanceof ChannelBuffer)) {
291 ctx.sendUpstream(e);
292 return;
293 }
294
295 ChannelBuffer input = (ChannelBuffer) m;
296 if (!input.readable()) {
297 return;
298 }
299
300 if (cumulation == null) {
301 try {
302 // the cumulation buffer is not created yet so just pass the input to callDecode(...) method
303 callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());
304 } finally {
305 updateCumulation(ctx, input);
306 }
307
308 } else {
309 input = appendToCumulation(input);
310 try {
311 callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());
312 } finally {
313 updateCumulation(ctx, input);
314 }
315 }
316 }
317
318 protected ChannelBuffer appendToCumulation(ChannelBuffer input) {
319 ChannelBuffer cumulation = this.cumulation;
320 assert cumulation.readable();
321 if (cumulation instanceof CompositeChannelBuffer) {
322 // Make sure the resulting cumulation buffer has no more than the configured components.
323 CompositeChannelBuffer composite = (CompositeChannelBuffer) cumulation;
324 if (composite.numComponents() >= maxCumulationBufferComponents) {
325 cumulation = composite.copy();
326 }
327 }
328
329 this.cumulation = input = ChannelBuffers.wrappedBuffer(cumulation, input);
330 return input;
331 }
332
333 protected ChannelBuffer updateCumulation(ChannelHandlerContext ctx, ChannelBuffer input) {
334 ChannelBuffer newCumulation;
335 int readableBytes = input.readableBytes();
336 if (readableBytes > 0) {
337 int inputCapacity = input.capacity();
338
339 // If input.readableBytes() == input.capacity() (i.e. input is full),
340 // there's nothing to save from creating a new cumulation buffer
341 // even if input.capacity() exceeds the threshold, because the new cumulation
342 // buffer will have the same capacity and content with input.
343 if (readableBytes < inputCapacity && inputCapacity > copyThreshold) {
344 // At least one byte was consumed by callDecode() and input.capacity()
345 // exceeded the threshold.
346 cumulation = newCumulation = newCumulationBuffer(ctx, input.readableBytes());
347 cumulation.writeBytes(input);
348 } else {
349 // Nothing was consumed by callDecode() or input.capacity() did not
350 // exceed the threshold.
351 if (input.readerIndex() != 0) {
352 cumulation = newCumulation = input.slice();
353 } else {
354 cumulation = newCumulation = input;
355 }
356 }
357 } else {
358 cumulation = newCumulation = null;
359 }
360 return newCumulation;
361 }
362
363 @Override
364 public void channelDisconnected(
365 ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
366 cleanup(ctx, e);
367 }
368
369 @Override
370 public void channelClosed(
371 ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
372 cleanup(ctx, e);
373 }
374
375 @Override
376 public void exceptionCaught(
377 ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
378 ctx.sendUpstream(e);
379 }
380
381 /**
382 * Decodes the received packets so far into a frame.
383 *
384 * @param ctx the context of this handler
385 * @param channel the current channel
386 * @param buffer the cumulative buffer of received packets so far.
387 * Note that the buffer might be empty, which means you
388 * should not make an assumption that the buffer contains
389 * at least one byte in your decoder implementation.
390 *
391 * @return the decoded frame if a full frame was received and decoded.
392 * {@code null} if there's not enough data in the buffer to decode a frame.
393 */
394 protected abstract Object decode(
395 ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception;
396
397 /**
398 * Decodes the received data so far into a frame when the channel is
399 * disconnected.
400 *
401 * @param ctx the context of this handler
402 * @param channel the current channel
403 * @param buffer the cumulative buffer of received packets so far.
404 * Note that the buffer might be empty, which means you
405 * should not make an assumption that the buffer contains
406 * at least one byte in your decoder implementation.
407 *
408 * @return the decoded frame if a full frame was received and decoded.
409 * {@code null} if there's not enough data in the buffer to decode a frame.
410 */
411 protected Object decodeLast(
412 ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
413 return decode(ctx, channel, buffer);
414 }
415
416 private void callDecode(
417 ChannelHandlerContext context, Channel channel,
418 ChannelBuffer cumulation, SocketAddress remoteAddress) throws Exception {
419
420 while (cumulation.readable()) {
421 int oldReaderIndex = cumulation.readerIndex();
422 Object frame = decode(context, channel, cumulation);
423 if (frame == null) {
424 if (oldReaderIndex == cumulation.readerIndex()) {
425 // Seems like more data is required.
426 // Let us wait for the next notification.
427 break;
428 } else {
429 // Previous data has been discarded.
430 // Probably it is reading on.
431 continue;
432 }
433 } else if (oldReaderIndex == cumulation.readerIndex()) {
434 throw new IllegalStateException(
435 "decode() method must read at least one byte " +
436 "if it returned a frame (caused by: " + getClass() + ')');
437 }
438
439 unfoldAndFireMessageReceived(context, remoteAddress, frame);
440 }
441 }
442
443 protected final void unfoldAndFireMessageReceived(
444 ChannelHandlerContext context, SocketAddress remoteAddress, Object result) {
445 if (unfold) {
446 if (result instanceof Object[]) {
447 for (Object r: (Object[]) result) {
448 Channels.fireMessageReceived(context, r, remoteAddress);
449 }
450 } else if (result instanceof Iterable<?>) {
451 for (Object r: (Iterable<?>) result) {
452 Channels.fireMessageReceived(context, r, remoteAddress);
453 }
454 } else {
455 Channels.fireMessageReceived(context, result, remoteAddress);
456 }
457 } else {
458 Channels.fireMessageReceived(context, result, remoteAddress);
459 }
460 }
461
462 /**
463 * Gets called on {@link #channelDisconnected(ChannelHandlerContext, ChannelStateEvent)} and
464 * {@link #channelClosed(ChannelHandlerContext, ChannelStateEvent)}
465 */
466 protected void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e)
467 throws Exception {
468 try {
469 ChannelBuffer cumulation = this.cumulation;
470 if (cumulation == null) {
471 return;
472 }
473
474 this.cumulation = null;
475
476 if (cumulation.readable()) {
477 // Make sure all frames are read before notifying a closed channel.
478 callDecode(ctx, ctx.getChannel(), cumulation, null);
479 }
480
481 // Call decodeLast() finally. Please note that decodeLast() is
482 // called even if there's nothing more to read from the buffer to
483 // notify a user that the connection was closed explicitly.
484 Object partialFrame = decodeLast(ctx, ctx.getChannel(), cumulation);
485 if (partialFrame != null) {
486 unfoldAndFireMessageReceived(ctx, null, partialFrame);
487 }
488 } finally {
489 ctx.sendUpstream(e);
490 }
491 }
492
493 /**
494 * Create a new {@link ChannelBuffer} which is used for the cumulation.
495 * Sub-classes may override this.
496 *
497 * @param ctx {@link ChannelHandlerContext} for this handler
498 * @return buffer the {@link ChannelBuffer} which is used for cumulation
499 */
500 protected ChannelBuffer newCumulationBuffer(
501 ChannelHandlerContext ctx, int minimumCapacity) {
502 ChannelBufferFactory factory = ctx.getChannel().getConfig().getBufferFactory();
503 return factory.getBuffer(Math.max(minimumCapacity, 256));
504 }
505
506 /**
507 * Replace this {@link FrameDecoder} in the {@link ChannelPipeline} with the given {@link ChannelHandler}. All
508 * remaining bytes in the {@link ChannelBuffer} will get send to the new {@link ChannelHandler} that was used
509 * as replacement
510 *
511 */
512 public void replace(String handlerName, ChannelHandler handler) {
513 if (ctx == null) {
514 throw new IllegalStateException(
515 "Replace cann only be called once the FrameDecoder is added to the ChannelPipeline");
516 }
517 ChannelPipeline pipeline = ctx.getPipeline();
518 pipeline.addAfter(ctx.getName(), handlerName, handler);
519
520 try {
521 if (cumulation != null) {
522 Channels.fireMessageReceived(ctx, cumulation.readBytes(actualReadableBytes()));
523 }
524 } finally {
525 pipeline.remove(this);
526 }
527
528 }
529
530 /**
531 * Returns the actual number of readable bytes in the internal cumulative
532 * buffer of this decoder. You usually do not need to rely on this value
533 * to write a decoder. Use it only when you muse use it at your own risk.
534 * This method is a shortcut to {@link #internalBuffer() internalBuffer().readableBytes()}.
535 */
536 protected int actualReadableBytes() {
537 return internalBuffer().readableBytes();
538 }
539
540
541
542 /**
543 * Returns the internal cumulative buffer of this decoder. You usually
544 * do not need to access the internal buffer directly to write a decoder.
545 * Use it only when you must use it at your own risk.
546 */
547 protected ChannelBuffer internalBuffer() {
548 ChannelBuffer buf = cumulation;
549 if (buf == null) {
550 return ChannelBuffers.EMPTY_BUFFER;
551 }
552 return buf;
553 }
554
555 public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
556 this.ctx = ctx;
557 }
558
559 public void afterAdd(ChannelHandlerContext ctx) throws Exception {
560 // Nothing to do..
561 }
562
563 public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
564 // Nothing to do..
565 }
566
567 public void afterRemove(ChannelHandlerContext ctx) throws Exception {
568 // Nothing to do..
569 }
570
571 }