1 /* 2 * Copyright 2012 The Netty Project 3 * 4 * The Netty Project licenses this file to you under the Apache License, 5 * version 2.0 (the "License"); you may not use this file except in compliance 6 * with the License. You may obtain a copy of the License at: 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13 * License for the specific language governing permissions and limitations 14 * under the License. 15 */ 16 package org.jboss.netty.handler.codec.frame; 17 18 import org.jboss.netty.buffer.ChannelBuffer; 19 import org.jboss.netty.buffer.ChannelBufferFactory; 20 import org.jboss.netty.buffer.ChannelBuffers; 21 import org.jboss.netty.buffer.CompositeChannelBuffer; 22 import org.jboss.netty.channel.Channel; 23 import org.jboss.netty.channel.ChannelHandler; 24 import org.jboss.netty.channel.ChannelHandlerContext; 25 import org.jboss.netty.channel.ChannelPipeline; 26 import org.jboss.netty.channel.ChannelStateEvent; 27 import org.jboss.netty.channel.ChannelUpstreamHandler; 28 import org.jboss.netty.channel.Channels; 29 import org.jboss.netty.channel.ExceptionEvent; 30 import org.jboss.netty.channel.LifeCycleAwareChannelHandler; 31 import org.jboss.netty.channel.MessageEvent; 32 import org.jboss.netty.channel.SimpleChannelUpstreamHandler; 33 import org.jboss.netty.handler.codec.replay.ReplayingDecoder; 34 35 import java.net.SocketAddress; 36 37 /** 38 * Decodes the received {@link ChannelBuffer}s into a meaningful frame object. 39 * <p> 40 * In a stream-based transport such as TCP/IP, packets can be fragmented and 41 * reassembled during transmission even in a LAN environment. For example, 42 * let us assume you have received three packets: 43 * <pre> 44 * +-----+-----+-----+ 45 * | ABC | DEF | GHI | 46 * +-----+-----+-----+ 47 * </pre> 48 * because of the packet fragmentation, a server can receive them like the 49 * following: 50 * <pre> 51 * +----+-------+---+---+ 52 * | AB | CDEFG | H | I | 53 * +----+-------+---+---+ 54 * </pre> 55 * <p> 56 * {@link FrameDecoder} helps you defrag the received packets into one or more 57 * meaningful <strong>frames</strong> that could be easily understood by the 58 * application logic. In case of the example above, your {@link FrameDecoder} 59 * implementation could defrag the received packets like the following: 60 * <pre> 61 * +-----+-----+-----+ 62 * | ABC | DEF | GHI | 63 * +-----+-----+-----+ 64 * </pre> 65 * <p> 66 * The following code shows an example handler which decodes a frame whose 67 * first 4 bytes header represents the length of the frame, excluding the 68 * header. 69 * <pre> 70 * MESSAGE FORMAT 71 * ============== 72 * 73 * Offset: 0 4 (Length + 4) 74 * +--------+------------------------+ 75 * Fields: | Length | Actual message content | 76 * +--------+------------------------+ 77 * 78 * DECODER IMPLEMENTATION 79 * ====================== 80 * 81 * public class IntegerHeaderFrameDecoder extends {@link FrameDecoder} { 82 * 83 * {@code @Override} 84 * protected Object decode({@link ChannelHandlerContext} ctx, 85 * {@link Channel channel}, 86 * {@link ChannelBuffer} buf) throws Exception { 87 * 88 * // Make sure if the length field was received. 89 * if (buf.readableBytes() < 4) { 90 * // The length field was not received yet - return null. 91 * // This method will be invoked again when more packets are 92 * // received and appended to the buffer. 93 * return <strong>null</strong>; 94 * } 95 * 96 * // The length field is in the buffer. 97 * 98 * // Mark the current buffer position before reading the length field 99 * // because the whole frame might not be in the buffer yet. 100 * // We will reset the buffer position to the marked position if 101 * // there's not enough bytes in the buffer. 102 * buf.markReaderIndex(); 103 * 104 * // Read the length field. 105 * int length = buf.readInt(); 106 * 107 * // Make sure if there's enough bytes in the buffer. 108 * if (buf.readableBytes() < length) { 109 * // The whole bytes were not received yet - return null. 110 * // This method will be invoked again when more packets are 111 * // received and appended to the buffer. 112 * 113 * // Reset to the marked position to read the length field again 114 * // next time. 115 * buf.resetReaderIndex(); 116 * 117 * return <strong>null</strong>; 118 * } 119 * 120 * // There's enough bytes in the buffer. Read it. 121 * {@link ChannelBuffer} frame = buf.readBytes(length); 122 * 123 * // Successfully decoded a frame. Return the decoded frame. 124 * return <strong>frame</strong>; 125 * } 126 * } 127 * </pre> 128 * 129 * <h3>Returning a POJO rather than a {@link ChannelBuffer}</h3> 130 * <p> 131 * Please note that you can return an object of a different type than 132 * {@link ChannelBuffer} in your {@code decode()} and {@code decodeLast()} 133 * implementation. For example, you could return a 134 * <a href="http://en.wikipedia.org/wiki/POJO">POJO</a> so that the next 135 * {@link ChannelUpstreamHandler} receives a {@link MessageEvent} which 136 * contains a POJO rather than a {@link ChannelBuffer}. 137 * 138 * <h3>Replacing a decoder with another decoder in a pipeline</h3> 139 * <p> 140 * If you are going to write a protocol multiplexer, you will probably want to 141 * replace a {@link FrameDecoder} (protocol detector) with another 142 * {@link FrameDecoder} or {@link ReplayingDecoder} (actual protocol decoder). 143 * It is not possible to achieve this simply by calling 144 * {@link ChannelPipeline#replace(ChannelHandler, String, ChannelHandler)}, but 145 * some additional steps are required: 146 * <pre> 147 * public class FirstDecoder extends {@link FrameDecoder} { 148 * 149 * public FirstDecoder() { 150 * super(true); // Enable unfold 151 * } 152 * 153 * {@code @Override} 154 * protected Object decode({@link ChannelHandlerContext} ctx, 155 * {@link Channel} channel, 156 * {@link ChannelBuffer} buf) { 157 * ... 158 * // Decode the first message 159 * Object firstMessage = ...; 160 * 161 * // Add the second decoder 162 * ctx.getPipeline().addLast("second", new SecondDecoder()); 163 * 164 * // Remove the first decoder (me) 165 * ctx.getPipeline().remove(this); 166 * 167 * if (buf.readable()) { 168 * // Hand off the remaining data to the second decoder 169 * return new Object[] { firstMessage, buf.readBytes(buf.readableBytes()) }; 170 * } else { 171 * // Nothing to hand off 172 * return firstMessage; 173 * } 174 * } 175 * } 176 * </pre> 177 * 178 * @apiviz.landmark 179 */ 180 public abstract class FrameDecoder extends SimpleChannelUpstreamHandler implements LifeCycleAwareChannelHandler { 181 182 public static final int DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS = 1024; 183 184 private boolean unfold; 185 protected ChannelBuffer cumulation; 186 private volatile ChannelHandlerContext ctx; 187 private int copyThreshold; 188 private int maxCumulationBufferComponents = DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS; 189 190 protected FrameDecoder() { 191 this(false); 192 } 193 194 protected FrameDecoder(boolean unfold) { 195 this.unfold = unfold; 196 } 197 198 public final boolean isUnfold() { 199 return unfold; 200 } 201 202 public final void setUnfold(boolean unfold) { 203 if (ctx == null) { 204 this.unfold = unfold; 205 } else { 206 throw new IllegalStateException( 207 "decoder properties cannot be changed once the decoder is added to a pipeline."); 208 } 209 } 210 211 /** 212 * See {@link #setMaxCumulationBufferCapacity(int)} for explaintation of this setting 213 * 214 */ 215 public final int getMaxCumulationBufferCapacity() { 216 return copyThreshold; 217 } 218 219 /** 220 * Set the maximal capacity of the internal cumulation ChannelBuffer to use 221 * before the {@link FrameDecoder} tries to minimize the memory usage by 222 * "byte copy". 223 * 224 * 225 * What you use here really depends on your application and need. Using 226 * {@link Integer#MAX_VALUE} will disable all byte copies but give you the 227 * cost of a higher memory usage if big {@link ChannelBuffer}'s will be 228 * received. 229 * 230 * By default a threshold of {@code 0} is used, which means it will 231 * always copy to try to reduce memory usage 232 * 233 * 234 * @param copyThreshold 235 * the threshold (in bytes) or {@link Integer#MAX_VALUE} to 236 * disable it. The value must be at least 0 237 * @throws IllegalStateException 238 * get thrown if someone tries to change this setting after the 239 * Decoder was added to the {@link ChannelPipeline} 240 */ 241 public final void setMaxCumulationBufferCapacity(int copyThreshold) { 242 if (copyThreshold < 0) { 243 throw new IllegalArgumentException("maxCumulationBufferCapacity must be >= 0"); 244 } 245 if (ctx == null) { 246 this.copyThreshold = copyThreshold; 247 } else { 248 throw new IllegalStateException( 249 "decoder properties cannot be changed once the decoder is added to a pipeline."); 250 } 251 } 252 253 /** 254 * Returns the maximum number of components in the cumulation buffer. If the number of 255 * the components in the cumulation buffer exceeds this value, the components of the 256 * cumulation buffer are consolidated into a single component, involving memory copies. 257 * The default value of this property {@link #DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS}. 258 */ 259 public final int getMaxCumulationBufferComponents() { 260 return maxCumulationBufferComponents; 261 } 262 263 /** 264 * Sets the maximum number of components in the cumulation buffer. If the number of 265 * the components in the cumulation buffer exceeds this value, the components of the 266 * cumulation buffer are consolidated into a single component, involving memory copies. 267 * The default value of this property is {@link #DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS} 268 * and its minimum allowed value is {@code 2}. 269 */ 270 public final void setMaxCumulationBufferComponents(int maxCumulationBufferComponents) { 271 if (maxCumulationBufferComponents < 2) { 272 throw new IllegalArgumentException( 273 "maxCumulationBufferComponents: " + maxCumulationBufferComponents + 274 " (expected: >= 2)"); 275 } 276 277 if (ctx == null) { 278 this.maxCumulationBufferComponents = maxCumulationBufferComponents; 279 } else { 280 throw new IllegalStateException( 281 "decoder properties cannot be changed once the decoder is added to a pipeline."); 282 } 283 } 284 285 @Override 286 public void messageReceived( 287 ChannelHandlerContext ctx, MessageEvent e) throws Exception { 288 289 Object m = e.getMessage(); 290 if (!(m instanceof ChannelBuffer)) { 291 ctx.sendUpstream(e); 292 return; 293 } 294 295 ChannelBuffer input = (ChannelBuffer) m; 296 if (!input.readable()) { 297 return; 298 } 299 300 if (cumulation == null) { 301 try { 302 // the cumulation buffer is not created yet so just pass the input to callDecode(...) method 303 callDecode(ctx, e.getChannel(), input, e.getRemoteAddress()); 304 } finally { 305 updateCumulation(ctx, input); 306 } 307 } else { 308 input = appendToCumulation(input); 309 try { 310 callDecode(ctx, e.getChannel(), input, e.getRemoteAddress()); 311 } finally { 312 updateCumulation(ctx, input); 313 } 314 } 315 } 316 317 protected ChannelBuffer appendToCumulation(ChannelBuffer input) { 318 ChannelBuffer cumulation = this.cumulation; 319 assert cumulation.readable(); 320 if (cumulation instanceof CompositeChannelBuffer) { 321 // Make sure the resulting cumulation buffer has no more than the configured components. 322 CompositeChannelBuffer composite = (CompositeChannelBuffer) cumulation; 323 if (composite.numComponents() >= maxCumulationBufferComponents) { 324 cumulation = composite.copy(); 325 } 326 } 327 328 this.cumulation = input = ChannelBuffers.wrappedBuffer(cumulation, input); 329 return input; 330 } 331 332 protected ChannelBuffer updateCumulation(ChannelHandlerContext ctx, ChannelBuffer input) { 333 ChannelBuffer newCumulation; 334 int readableBytes = input.readableBytes(); 335 if (readableBytes > 0) { 336 int inputCapacity = input.capacity(); 337 338 // If input.readableBytes() == input.capacity() (i.e. input is full), 339 // there's nothing to save from creating a new cumulation buffer 340 // even if input.capacity() exceeds the threshold, because the new cumulation 341 // buffer will have the same capacity and content with input. 342 if (readableBytes < inputCapacity && inputCapacity > copyThreshold) { 343 // At least one byte was consumed by callDecode() and input.capacity() 344 // exceeded the threshold. 345 cumulation = newCumulation = newCumulationBuffer(ctx, input.readableBytes()); 346 cumulation.writeBytes(input); 347 } else { 348 // Nothing was consumed by callDecode() or input.capacity() did not 349 // exceed the threshold. 350 if (input.readerIndex() != 0) { 351 cumulation = newCumulation = input.slice(); 352 } else { 353 cumulation = newCumulation = input; 354 } 355 } 356 } else { 357 cumulation = newCumulation = null; 358 } 359 return newCumulation; 360 } 361 362 @Override 363 public void channelDisconnected( 364 ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { 365 cleanup(ctx, e); 366 } 367 368 @Override 369 public void channelClosed( 370 ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { 371 cleanup(ctx, e); 372 } 373 374 @Override 375 public void exceptionCaught( 376 ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { 377 ctx.sendUpstream(e); 378 } 379 380 /** 381 * Decodes the received packets so far into a frame. 382 * 383 * If an sub-class wants to extract a frame out of the buffer it should use 384 * the {@link #extractFrame(ChannelBuffer, int, int)} method, 385 * to make optimizations easier later. 386 * 387 * @param ctx the context of this handler 388 * @param channel the current channel 389 * @param buffer the cumulative buffer of received packets so far. 390 * Note that the buffer might be empty, which means you 391 * should not make an assumption that the buffer contains 392 * at least one byte in your decoder implementation. 393 * 394 * @return the decoded frame if a full frame was received and decoded. 395 * {@code null} if there's not enough data in the buffer to decode a frame. 396 */ 397 protected abstract Object decode( 398 ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception; 399 400 /** 401 * Decodes the received data so far into a frame when the channel is 402 * disconnected. 403 * 404 * @param ctx the context of this handler 405 * @param channel the current channel 406 * @param buffer the cumulative buffer of received packets so far. 407 * Note that the buffer might be empty, which means you 408 * should not make an assumption that the buffer contains 409 * at least one byte in your decoder implementation. 410 * 411 * @return the decoded frame if a full frame was received and decoded. 412 * {@code null} if there's not enough data in the buffer to decode a frame. 413 */ 414 protected Object decodeLast( 415 ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception { 416 return decode(ctx, channel, buffer); 417 } 418 419 private void callDecode( 420 ChannelHandlerContext context, Channel channel, 421 ChannelBuffer cumulation, SocketAddress remoteAddress) throws Exception { 422 423 while (cumulation.readable()) { 424 int oldReaderIndex = cumulation.readerIndex(); 425 Object frame = decode(context, channel, cumulation); 426 if (frame == null) { 427 if (oldReaderIndex == cumulation.readerIndex()) { 428 // Seems like more data is required. 429 // Let us wait for the next notification. 430 break; 431 } else { 432 // Previous data has been discarded. 433 // Probably it is reading on. 434 continue; 435 } 436 } 437 if (oldReaderIndex == cumulation.readerIndex()) { 438 throw new IllegalStateException( 439 "decode() method must read at least one byte " + 440 "if it returned a frame (caused by: " + getClass() + ')'); 441 } 442 443 unfoldAndFireMessageReceived(context, remoteAddress, frame); 444 } 445 } 446 447 protected final void unfoldAndFireMessageReceived( 448 ChannelHandlerContext context, SocketAddress remoteAddress, Object result) { 449 if (unfold) { 450 if (result instanceof Object[]) { 451 for (Object r: (Object[]) result) { 452 Channels.fireMessageReceived(context, r, remoteAddress); 453 } 454 } else if (result instanceof Iterable<?>) { 455 for (Object r: (Iterable<?>) result) { 456 Channels.fireMessageReceived(context, r, remoteAddress); 457 } 458 } else { 459 Channels.fireMessageReceived(context, result, remoteAddress); 460 } 461 } else { 462 Channels.fireMessageReceived(context, result, remoteAddress); 463 } 464 } 465 466 /** 467 * Gets called on {@link #channelDisconnected(ChannelHandlerContext, ChannelStateEvent)} and 468 * {@link #channelClosed(ChannelHandlerContext, ChannelStateEvent)} 469 */ 470 protected void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e) 471 throws Exception { 472 try { 473 ChannelBuffer cumulation = this.cumulation; 474 if (cumulation == null) { 475 return; 476 } 477 478 this.cumulation = null; 479 480 if (cumulation.readable()) { 481 // Make sure all frames are read before notifying a closed channel. 482 callDecode(ctx, ctx.getChannel(), cumulation, null); 483 } 484 485 // Call decodeLast() finally. Please note that decodeLast() is 486 // called even if there's nothing more to read from the buffer to 487 // notify a user that the connection was closed explicitly. 488 Object partialFrame = decodeLast(ctx, ctx.getChannel(), cumulation); 489 if (partialFrame != null) { 490 unfoldAndFireMessageReceived(ctx, null, partialFrame); 491 } 492 } finally { 493 ctx.sendUpstream(e); 494 } 495 } 496 497 /** 498 * Create a new {@link ChannelBuffer} which is used for the cumulation. 499 * Sub-classes may override this. 500 * 501 * @param ctx {@link ChannelHandlerContext} for this handler 502 * @return buffer the {@link ChannelBuffer} which is used for cumulation 503 */ 504 protected ChannelBuffer newCumulationBuffer( 505 ChannelHandlerContext ctx, int minimumCapacity) { 506 ChannelBufferFactory factory = ctx.getChannel().getConfig().getBufferFactory(); 507 return factory.getBuffer(Math.max(minimumCapacity, 256)); 508 } 509 510 /** 511 * Replace this {@link FrameDecoder} in the {@link ChannelPipeline} with the given {@link ChannelHandler}. All 512 * remaining bytes in the {@link ChannelBuffer} will get send to the new {@link ChannelHandler} that was used 513 * as replacement 514 * 515 */ 516 public void replace(String handlerName, ChannelHandler handler) { 517 if (ctx == null) { 518 throw new IllegalStateException( 519 "Replace cann only be called once the FrameDecoder is added to the ChannelPipeline"); 520 } 521 ChannelPipeline pipeline = ctx.getPipeline(); 522 pipeline.addAfter(ctx.getName(), handlerName, handler); 523 524 try { 525 if (cumulation != null) { 526 Channels.fireMessageReceived(ctx, cumulation.readBytes(actualReadableBytes())); 527 } 528 } finally { 529 pipeline.remove(this); 530 } 531 } 532 533 /** 534 * Returns the actual number of readable bytes in the internal cumulative 535 * buffer of this decoder. You usually do not need to rely on this value 536 * to write a decoder. Use it only when you muse use it at your own risk. 537 * This method is a shortcut to {@link #internalBuffer() internalBuffer().readableBytes()}. 538 */ 539 protected int actualReadableBytes() { 540 return internalBuffer().readableBytes(); 541 } 542 543 /** 544 * Returns the internal cumulative buffer of this decoder. You usually 545 * do not need to access the internal buffer directly to write a decoder. 546 * Use it only when you must use it at your own risk. 547 */ 548 protected ChannelBuffer internalBuffer() { 549 ChannelBuffer buf = cumulation; 550 if (buf == null) { 551 return ChannelBuffers.EMPTY_BUFFER; 552 } 553 return buf; 554 } 555 556 /** 557 * Extract a Frame of the specified buffer. By default this implementation 558 * will return a extract the sub-region of the buffer and create a new one. 559 * If an sub-class want to extract a frame from the buffer it should use this method 560 * by default. 561 * 562 * <strong>Be sure that this method MUST not modify the readerIndex of the given buffer</strong> 563 * 564 */ 565 protected ChannelBuffer extractFrame(ChannelBuffer buffer, int index, int length) { 566 ChannelBuffer frame = buffer.factory().getBuffer(length); 567 frame.writeBytes(buffer, index, length); 568 return frame; 569 } 570 571 public void beforeAdd(ChannelHandlerContext ctx) throws Exception { 572 this.ctx = ctx; 573 } 574 575 public void afterAdd(ChannelHandlerContext ctx) throws Exception { 576 // Nothing to do.. 577 } 578 579 public void beforeRemove(ChannelHandlerContext ctx) throws Exception { 580 // Nothing to do.. 581 } 582 583 public void afterRemove(ChannelHandlerContext ctx) throws Exception { 584 // Nothing to do.. 585 } 586 587 }