1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package org.jboss.netty.handler.codec.replay;
17
18 import java.net.SocketAddress;
19
20 import org.jboss.netty.buffer.ChannelBuffer;
21 import org.jboss.netty.channel.Channel;
22 import org.jboss.netty.channel.ChannelHandler;
23 import org.jboss.netty.channel.ChannelHandlerContext;
24 import org.jboss.netty.channel.ChannelPipeline;
25 import org.jboss.netty.channel.ChannelStateEvent;
26 import org.jboss.netty.channel.MessageEvent;
27 import org.jboss.netty.handler.codec.frame.FrameDecoder;
28
29 /**
30 * A specialized variation of {@link FrameDecoder} which enables implementation
31 * of a non-blocking decoder in the blocking I/O paradigm.
32 * <p>
33 * The biggest difference between {@link ReplayingDecoder} and
34 * {@link FrameDecoder} is that {@link ReplayingDecoder} allows you to
35 * implement the {@code decode()} and {@code decodeLast()} methods just like
36 * all required bytes were received already, rather than checking the
37 * availability of the required bytes. For example, the following
38 * {@link FrameDecoder} implementation:
39 * <pre>
40 * public class IntegerHeaderFrameDecoder extends {@link FrameDecoder} {
41 *
42 * {@code @Override}
43 * protected Object decode({@link ChannelHandlerContext} ctx,
44 * {@link Channel} channel,
45 * {@link ChannelBuffer} buf) throws Exception {
46 *
47 * if (buf.readableBytes() < 4) {
48 * return <strong>null</strong>;
49 * }
50 *
51 * buf.markReaderIndex();
52 * int length = buf.readInt();
53 *
54 * if (buf.readableBytes() < length) {
55 * buf.resetReaderIndex();
56 * return <strong>null</strong>;
57 * }
58 *
59 * return buf.readBytes(length);
60 * }
61 * }
62 * </pre>
63 * is simplified like the following with {@link ReplayingDecoder}:
64 * <pre>
65 * public class IntegerHeaderFrameDecoder
66 * extends {@link ReplayingDecoder}<{@link VoidEnum}> {
67 *
68 * protected Object decode({@link ChannelHandlerContext} ctx,
69 * {@link Channel} channel,
70 * {@link ChannelBuffer} buf,
71 * {@link VoidEnum} state) throws Exception {
72 *
73 * return buf.readBytes(buf.readInt());
74 * }
75 * }
76 * </pre>
77 *
78 * <h3>How does this work?</h3>
79 * <p>
80 * {@link ReplayingDecoder} passes a specialized {@link ChannelBuffer}
81 * implementation which throws an {@link Error} of certain type when there's not
82 * enough data in the buffer. In the {@code IntegerHeaderFrameDecoder} above,
83 * you just assumed that there will be 4 or more bytes in the buffer when
84 * you call {@code buf.readInt()}. If there's really 4 bytes in the buffer,
85 * it will return the integer header as you expected. Otherwise, the
86 * {@link Error} will be raised and the control will be returned to
87 * {@link ReplayingDecoder}. If {@link ReplayingDecoder} catches the
88 * {@link Error}, then it will rewind the {@code readerIndex} of the buffer
89 * back to the 'initial' position (i.e. the beginning of the buffer) and call
90 * the {@code decode(..)} method again when more data is received into the
91 * buffer.
92 * <p>
93 * Please note that {@link ReplayingDecoder} always throws the same cached
94 * {@link Error} instance to avoid the overhead of creating a new {@link Error}
95 * and filling its stack trace for every throw.
96 *
97 * <h3>Limitations</h3>
98 * <p>
99 * At the cost of the simplicity, {@link ReplayingDecoder} enforces you a few
100 * limitations:
101 * <ul>
102 * <li>Some buffer operations are prohibited.</li>
103 * <li>Performance can be worse if the network is slow and the message
104 * format is complicated unlike the example above. In this case, your
105 * decoder might have to decode the same part of the message over and over
106 * again.</li>
107 * <li>You must keep in mind that {@code decode(..)} method can be called many
108 * times to decode a single message. For example, the following code will
109 * not work:
110 * <pre> public class MyDecoder extends {@link ReplayingDecoder}<{@link VoidEnum}> {
111 *
112 * private final Queue<Integer> values = new LinkedList<Integer>();
113 *
114 * {@code @Override}
115 * public Object decode(.., {@link ChannelBuffer} buffer, ..) throws Exception {
116 *
117 * // A message contains 2 integers.
118 * values.offer(buffer.readInt());
119 * values.offer(buffer.readInt());
120 *
121 * // This assertion will fail intermittently since values.offer()
122 * // can be called more than two times!
123 * assert values.size() == 2;
124 * return values.poll() + values.poll();
125 * }
126 * }</pre>
127 * The correct implementation looks like the following, and you can also
128 * utilize the 'checkpoint' feature which is explained in detail in the
129 * next section.
130 * <pre> public class MyDecoder extends {@link ReplayingDecoder}<{@link VoidEnum}> {
131 *
132 * private final Queue<Integer> values = new LinkedList<Integer>();
133 *
134 * {@code @Override}
135 * public Object decode(.., {@link ChannelBuffer} buffer, ..) throws Exception {
136 *
137 * // Revert the state of the variable that might have been changed
138 * // since the last partial decode.
139 * values.clear();
140 *
141 * // A message contains 2 integers.
142 * values.offer(buffer.readInt());
143 * values.offer(buffer.readInt());
144 *
145 * // Now we know this assertion will never fail.
146 * assert values.size() == 2;
147 * return values.poll() + values.poll();
148 * }
149 * }</pre>
150 * </li>
151 * </ul>
152 *
153 * <h3>Improving the performance</h3>
154 * <p>
155 * Fortunately, the performance of a complex decoder implementation can be
156 * improved significantly with the {@code checkpoint()} method. The
157 * {@code checkpoint()} method updates the 'initial' position of the buffer so
158 * that {@link ReplayingDecoder} rewinds the {@code readerIndex} of the buffer
159 * to the last position where you called the {@code checkpoint()} method.
160 *
161 * <h4>Calling {@code checkpoint(T)} with an {@link Enum}</h4>
162 * <p>
163 * Although you can just use {@code checkpoint()} method and manage the state
164 * of the decoder by yourself, the easiest way to manage the state of the
165 * decoder is to create an {@link Enum} type which represents the current state
166 * of the decoder and to call {@code checkpoint(T)} method whenever the state
167 * changes. You can have as many states as you want depending on the
168 * complexity of the message you want to decode:
169 *
170 * <pre>
171 * public enum MyDecoderState {
172 * READ_LENGTH,
173 * READ_CONTENT;
174 * }
175 *
176 * public class IntegerHeaderFrameDecoder
177 * extends {@link ReplayingDecoder}<<strong>MyDecoderState</strong>> {
178 *
179 * private int length;
180 *
181 * public IntegerHeaderFrameDecoder() {
182 * // Set the initial state.
183 * <strong>super(MyDecoderState.READ_LENGTH);</strong>
184 * }
185 *
186 * {@code @Override}
187 * protected Object decode({@link ChannelHandlerContext} ctx,
188 * {@link Channel} channel,
189 * {@link ChannelBuffer} buf,
190 * <b>MyDecoderState</b> state) throws Exception {
191 * switch (state) {
192 * case READ_LENGTH:
193 * length = buf.readInt();
194 * <strong>checkpoint(MyDecoderState.READ_CONTENT);</strong>
195 * case READ_CONTENT:
196 * ChannelBuffer frame = buf.readBytes(length);
197 * <strong>checkpoint(MyDecoderState.READ_LENGTH);</strong>
198 * return frame;
199 * default:
200 * throw new Error("Shouldn't reach here.");
201 * }
202 * }
203 * }
204 * </pre>
205 *
206 * <h4>Calling {@code checkpoint()} with no parameter</h4>
207 * <p>
208 * An alternative way to manage the decoder state is to manage it by yourself.
209 * <pre>
210 * public class IntegerHeaderFrameDecoder
211 * extends {@link ReplayingDecoder}<<strong>{@link VoidEnum}</strong>> {
212 *
213 * <strong>private boolean readLength;</strong>
214 * private int length;
215 *
216 * {@code @Override}
217 * protected Object decode({@link ChannelHandlerContext} ctx,
218 * {@link Channel} channel,
219 * {@link ChannelBuffer} buf,
220 * {@link VoidEnum} state) throws Exception {
221 * if (!readLength) {
222 * length = buf.readInt();
223 * <strong>readLength = true;</strong>
224 * <strong>checkpoint();</strong>
225 * }
226 *
227 * if (readLength) {
228 * ChannelBuffer frame = buf.readBytes(length);
229 * <strong>readLength = false;</strong>
230 * <strong>checkpoint();</strong>
231 * return frame;
232 * }
233 * }
234 * }
235 * </pre>
236 *
237 * <h3>Replacing a decoder with another decoder in a pipeline</h3>
238 * <p>
239 * If you are going to write a protocol multiplexer, you will probably want to
240 * replace a {@link ReplayingDecoder} (protocol detector) with another
241 * {@link ReplayingDecoder} or {@link FrameDecoder} (actual protocol decoder).
242 * It is not possible to achieve this simply by calling
243 * {@link ChannelPipeline#replace(ChannelHandler, String, ChannelHandler)}, but
244 * some additional steps are required:
245 * <pre>
246 * public class FirstDecoder extends {@link ReplayingDecoder}<{@link VoidEnum}> {
247 *
248 * public FirstDecoder() {
249 * super(true); // Enable unfold
250 * }
251 *
252 * {@code @Override}
253 * protected Object decode({@link ChannelHandlerContext} ctx,
254 * {@link Channel} ch,
255 * {@link ChannelBuffer} buf,
256 * {@link VoidEnum} state) {
257 * ...
258 * // Decode the first message
259 * Object firstMessage = ...;
260 *
261 * // Add the second decoder
262 * ctx.getPipeline().addLast("second", new SecondDecoder());
263 *
264 * // Remove the first decoder (me)
265 * ctx.getPipeline().remove(this);
266 *
267 * if (buf.readable()) {
268 * // Hand off the remaining data to the second decoder
269 * return new Object[] { firstMessage, buf.readBytes(<b>super.actualReadableBytes()</b>) };
270 * } else {
271 * // Nothing to hand off
272 * return firstMessage;
273 * }
274 * }
275 * </pre>
276 *
277 * @param <T>
278 * the state type; use {@link VoidEnum} if state management is unused
279 *
280 * @apiviz.landmark
281 * @apiviz.has org.jboss.netty.handler.codec.replay.UnreplayableOperationException oneway - - throws
282 */
283 public abstract class ReplayingDecoder<T extends Enum<T>>
284 extends FrameDecoder {
285
286
287 private final ReplayingDecoderBuffer replayable = new ReplayingDecoderBuffer(this);
288 private T state;
289 private int checkpoint;
290 private boolean needsCleanup;
291
292
293 /**
294 * Creates a new instance with no initial state (i.e: {@code null}).
295 */
296 protected ReplayingDecoder() {
297 this(null);
298 }
299
300 protected ReplayingDecoder(boolean unfold) {
301 this(null, unfold);
302 }
303
304 /**
305 * Creates a new instance with the specified initial state.
306 */
307 protected ReplayingDecoder(T initialState) {
308 this(initialState, false);
309 }
310
311 protected ReplayingDecoder(T initialState, boolean unfold) {
312 super(unfold);
313 state = initialState;
314 }
315
316 @Override
317 protected ChannelBuffer internalBuffer() {
318 return super.internalBuffer();
319 }
320
321 /**
322 * Stores the internal cumulative buffer's reader position.
323 */
324 protected void checkpoint() {
325 ChannelBuffer cumulation = this.cumulation;
326 if (cumulation != null) {
327 checkpoint = cumulation.readerIndex();
328 } else {
329 checkpoint = -1; // buffer not available (already cleaned up)
330 }
331 }
332
333 /**
334 * Stores the internal cumulative buffer's reader position and updates
335 * the current decoder state.
336 */
337 protected void checkpoint(T state) {
338 checkpoint();
339 setState(state);
340 }
341
342 /**
343 * Returns the current state of this decoder.
344 * @return the current state of this decoder
345 */
346 protected T getState() {
347 return state;
348 }
349
350 /**
351 * Sets the current state of this decoder.
352 * @return the old state of this decoder
353 */
354 protected T setState(T newState) {
355 T oldState = state;
356 state = newState;
357 return oldState;
358 }
359
360 /**
361 * Decodes the received packets so far into a frame.
362 *
363 * @param ctx the context of this handler
364 * @param channel the current channel
365 * @param buffer the cumulative buffer of received packets so far.
366 * Note that the buffer might be empty, which means you
367 * should not make an assumption that the buffer contains
368 * at least one byte in your decoder implementation.
369 * @param state the current decoder state ({@code null} if unused)
370 *
371 * @return the decoded frame
372 */
373 protected abstract Object decode(ChannelHandlerContext ctx,
374 Channel channel, ChannelBuffer buffer, T state) throws Exception;
375
376 /**
377 * Decodes the received data so far into a frame when the channel is
378 * disconnected.
379 *
380 * @param ctx the context of this handler
381 * @param channel the current channel
382 * @param buffer the cumulative buffer of received packets so far.
383 * Note that the buffer might be empty, which means you
384 * should not make an assumption that the buffer contains
385 * at least one byte in your decoder implementation.
386 * @param state the current decoder state ({@code null} if unused)
387 *
388 * @return the decoded frame
389 */
390 protected Object decodeLast(
391 ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer, T state) throws Exception {
392 return decode(ctx, channel, buffer, state);
393 }
394
395 /**
396 * Calls {@link #decode(ChannelHandlerContext, Channel, ChannelBuffer, Enum)}. This method
397 * should be never used by {@link ReplayingDecoder} itself. But to be safe we should handle it
398 * anyway
399 */
400 @Override
401 protected final Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
402 return decode(ctx, channel, buffer, state);
403 }
404
405 @Override
406 protected final Object decodeLast(
407 ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
408 return decodeLast(ctx, channel, buffer, state);
409 }
410
411 @Override
412 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
413 throws Exception {
414
415 Object m = e.getMessage();
416 if (!(m instanceof ChannelBuffer)) {
417 ctx.sendUpstream(e);
418 return;
419 }
420
421 ChannelBuffer input = (ChannelBuffer) m;
422 if (!input.readable()) {
423 return;
424 }
425
426 needsCleanup = true;
427
428 if (cumulation == null) {
429 // the cumulation buffer is not created yet so just pass the input
430 // to callDecode(...) method
431 cumulation = input;
432
433 int oldReaderIndex = input.readerIndex();
434 int inputSize = input.readableBytes();
435
436 try {
437 callDecode(
438 ctx, e.getChannel(),
439 input, replayable,
440 e.getRemoteAddress());
441 } finally {
442 int readableBytes = input.readableBytes();
443 if (readableBytes > 0) {
444 int inputCapacity = input.capacity();
445 // check if readableBytes == capacity we can safe the copy as we will not be able to
446 // optimize memory usage anyway
447 boolean copy =
448 readableBytes != inputCapacity &&
449 inputCapacity > getMaxCumulationBufferCapacity();
450
451 // seems like there is something readable left in the input buffer
452 // or decoder wants a replay - create the cumulation buffer and
453 // copy the input into it
454 ChannelBuffer cumulation;
455 if (checkpoint > 0) {
456 int bytesToPreserve = inputSize - (checkpoint - oldReaderIndex);
457 if (copy) {
458 this.cumulation = cumulation = newCumulationBuffer(ctx, bytesToPreserve);
459 cumulation.writeBytes(input, checkpoint, bytesToPreserve);
460 } else {
461 this.cumulation = cumulation = input.slice(checkpoint, bytesToPreserve);
462 }
463 } else if (checkpoint == 0) {
464 if (copy) {
465 this.cumulation = cumulation = newCumulationBuffer(ctx, inputSize);
466 cumulation.writeBytes(input, oldReaderIndex, inputSize);
467 cumulation.readerIndex(input.readerIndex());
468 } else {
469 this.cumulation = cumulation = input.slice(oldReaderIndex, inputSize);
470 cumulation.readerIndex(input.readerIndex());
471 }
472 } else {
473 if (copy) {
474 this.cumulation = cumulation = newCumulationBuffer(ctx, input.readableBytes());
475 cumulation.writeBytes(input);
476 } else {
477 this.cumulation = cumulation = input;
478 }
479 }
480 } else {
481 cumulation = null;
482 }
483 }
484 } else {
485 input = appendToCumulation(input);
486 try {
487 callDecode(ctx, e.getChannel(), input, replayable, e.getRemoteAddress());
488 } finally {
489 updateCumulation(ctx, input);
490 }
491 }
492 }
493
494 private void callDecode(
495 ChannelHandlerContext context, Channel channel,
496 ChannelBuffer input, ChannelBuffer replayableInput, SocketAddress remoteAddress) throws Exception {
497 while (input.readable()) {
498 int oldReaderIndex = checkpoint = input.readerIndex();
499 Object result = null;
500 T oldState = state;
501 try {
502 result = decode(context, channel, replayableInput, state);
503 if (result == null) {
504 if (oldReaderIndex == input.readerIndex() && oldState == state) {
505 throw new IllegalStateException(
506 "null cannot be returned if no data is consumed and state didn't change.");
507 } else {
508 // Previous data has been discarded or caused state transition.
509 // Probably it is reading on.
510 continue;
511 }
512 }
513 } catch (ReplayError replay) {
514 // Return to the checkpoint (or oldPosition) and retry.
515 int checkpoint = this.checkpoint;
516 if (checkpoint >= 0) {
517 input.readerIndex(checkpoint);
518 } else {
519 // Called by cleanup() - no need to maintain the readerIndex
520 // anymore because the buffer has been released already.
521 }
522 }
523
524 if (result == null) {
525 // Seems like more data is required.
526 // Let us wait for the next notification.
527 break;
528 }
529
530 if (oldReaderIndex == input.readerIndex() && oldState == state) {
531 throw new IllegalStateException(
532 "decode() method must consume at least one byte " +
533 "if it returned a decoded message (caused by: " +
534 getClass() + ')');
535 }
536
537 // A successful decode
538 unfoldAndFireMessageReceived(context, remoteAddress, result);
539 }
540 }
541
542 @Override
543 protected void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e)
544 throws Exception {
545 try {
546 ChannelBuffer cumulation = this.cumulation;
547 if (!needsCleanup) {
548 return;
549 } else {
550 needsCleanup = false;
551 }
552
553 replayable.terminate();
554
555 if (cumulation != null && cumulation.readable()) {
556 // Make sure all data was read before notifying a closed channel.
557 callDecode(ctx, e.getChannel(), cumulation, replayable, null);
558 }
559
560 // Call decodeLast() finally. Please note that decodeLast() is
561 // called even if there's nothing more to read from the buffer to
562 // notify a user that the connection was closed explicitly.
563 Object partiallyDecoded = decodeLast(ctx, e.getChannel(), replayable, state);
564
565 this.cumulation = null;
566
567 if (partiallyDecoded != null) {
568 unfoldAndFireMessageReceived(ctx, null, partiallyDecoded);
569 }
570 } catch (ReplayError replay) {
571 // Ignore
572 } finally {
573 ctx.sendUpstream(e);
574 }
575 }
576 }