View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufHolder;
20  import io.netty.buffer.CompositeByteBuf;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelFutureListener;
23  import io.netty.channel.ChannelHandler;
24  import io.netty.channel.ChannelHandlerContext;
25  import io.netty.channel.ChannelPipeline;
26  import io.netty.util.ReferenceCountUtil;
27  
28  import java.util.List;
29  
30  import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
31  
32  /**
33   * An abstract {@link ChannelHandler} that aggregates a series of message objects into a single aggregated message.
34   * <p>
35   * 'A series of messages' is composed of the following:
36   * <ul>
37   * <li>a single start message which optionally contains the first part of the content, and</li>
38   * <li>1 or more content messages.</li>
39   * </ul>
40   * The content of the aggregated message will be the merged content of the start message and its following content
41   * messages. If this aggregator encounters a content message where {@link #isLastContentMessage(ByteBufHolder)}
42   * return {@code true} for, the aggregator will finish the aggregation and produce the aggregated message and expect
43   * another start message.
44   * </p>
45   *
46   * @param <I> the type that covers both start message and content message
47   * @param <S> the type of the start message
48   * @param <C> the type of the content message (must be a subtype of {@link ByteBufHolder})
49   * @param <O> the type of the aggregated message (must be a subtype of {@code S} and {@link ByteBufHolder})
50   */
51  public abstract class MessageAggregator<I, S, C extends ByteBufHolder, O extends ByteBufHolder>
52          extends MessageToMessageDecoder<I> {
53  
54      private static final int DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS = 1024;
55  
56      private final int maxContentLength;
57      private O currentMessage;
58      private boolean handlingOversizedMessage;
59  
60      private int maxCumulationBufferComponents = DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS;
61      private ChannelHandlerContext ctx;
62      private ChannelFutureListener continueResponseWriteListener;
63  
64      /**
65       * Creates a new instance.
66       *
67       * @param maxContentLength
68       *        the maximum length of the aggregated content.
69       *        If the length of the aggregated content exceeds this value,
70       *        {@link #handleOversizedMessage(ChannelHandlerContext, Object)} will be called.
71       */
72      protected MessageAggregator(int maxContentLength) {
73          validateMaxContentLength(maxContentLength);
74          this.maxContentLength = maxContentLength;
75      }
76  
77      protected MessageAggregator(int maxContentLength, Class<? extends I> inboundMessageType) {
78          super(inboundMessageType);
79          validateMaxContentLength(maxContentLength);
80          this.maxContentLength = maxContentLength;
81      }
82  
83      private static void validateMaxContentLength(int maxContentLength) {
84          if (maxContentLength < 0) {
85              throw new IllegalArgumentException("maxContentLength: " + maxContentLength + " (expected: >= 0)");
86          }
87      }
88  
89      @Override
90      public boolean acceptInboundMessage(Object msg) throws Exception {
91          // No need to match last and full types because they are subset of first and middle types.
92          if (!super.acceptInboundMessage(msg)) {
93              return false;
94          }
95  
96          @SuppressWarnings("unchecked")
97          I in = (I) msg;
98  
99          return (isContentMessage(in) || isStartMessage(in)) && !isAggregated(in);
100     }
101 
102     /**
103      * Returns {@code true} if and only if the specified message is a start message. Typically, this method is
104      * implemented as a single {@code return} statement with {@code instanceof}:
105      * <pre>
106      * return msg instanceof MyStartMessage;
107      * </pre>
108      */
109     protected abstract boolean isStartMessage(I msg) throws Exception;
110 
111     /**
112      * Returns {@code true} if and only if the specified message is a content message. Typically, this method is
113      * implemented as a single {@code return} statement with {@code instanceof}:
114      * <pre>
115      * return msg instanceof MyContentMessage;
116      * </pre>
117      */
118     protected abstract boolean isContentMessage(I msg) throws Exception;
119 
120     /**
121      * Returns {@code true} if and only if the specified message is the last content message. Typically, this method is
122      * implemented as a single {@code return} statement with {@code instanceof}:
123      * <pre>
124      * return msg instanceof MyLastContentMessage;
125      * </pre>
126      * or with {@code instanceof} and boolean field check:
127      * <pre>
128      * return msg instanceof MyContentMessage && msg.isLastFragment();
129      * </pre>
130      */
131     protected abstract boolean isLastContentMessage(C msg) throws Exception;
132 
133     /**
134      * Returns {@code true} if and only if the specified message is already aggregated.  If this method returns
135      * {@code true}, this handler will simply forward the message to the next handler as-is.
136      */
137     protected abstract boolean isAggregated(I msg) throws Exception;
138 
139     /**
140      * Returns the maximum allowed length of the aggregated message in bytes.
141      */
142     public final int maxContentLength() {
143         return maxContentLength;
144     }
145 
146     /**
147      * Returns the maximum number of components in the cumulation buffer.  If the number of
148      * the components in the cumulation buffer exceeds this value, the components of the
149      * cumulation buffer are consolidated into a single component, involving memory copies.
150      * The default value of this property is {@value #DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS}.
151      */
152     public final int maxCumulationBufferComponents() {
153         return maxCumulationBufferComponents;
154     }
155 
156     /**
157      * Sets the maximum number of components in the cumulation buffer.  If the number of
158      * the components in the cumulation buffer exceeds this value, the components of the
159      * cumulation buffer are consolidated into a single component, involving memory copies.
160      * The default value of this property is {@value #DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS}
161      * and its minimum allowed value is {@code 2}.
162      */
163     public final void setMaxCumulationBufferComponents(int maxCumulationBufferComponents) {
164         if (maxCumulationBufferComponents < 2) {
165             throw new IllegalArgumentException(
166                     "maxCumulationBufferComponents: " + maxCumulationBufferComponents +
167                     " (expected: >= 2)");
168         }
169 
170         if (ctx == null) {
171             this.maxCumulationBufferComponents = maxCumulationBufferComponents;
172         } else {
173             throw new IllegalStateException(
174                     "decoder properties cannot be changed once the decoder is added to a pipeline.");
175         }
176     }
177 
178     /**
179      * @deprecated This method will be removed in future releases.
180      */
181     @Deprecated
182     public final boolean isHandlingOversizedMessage() {
183         return handlingOversizedMessage;
184     }
185 
186     protected final ChannelHandlerContext ctx() {
187         if (ctx == null) {
188             throw new IllegalStateException("not added to a pipeline yet");
189         }
190         return ctx;
191     }
192 
193     @Override
194     protected void decode(final ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception {
195         if (isStartMessage(msg)) {
196             handlingOversizedMessage = false;
197             if (currentMessage != null) {
198                 currentMessage.release();
199                 currentMessage = null;
200                 throw new MessageAggregationException();
201             }
202 
203             @SuppressWarnings("unchecked")
204             S m = (S) msg;
205 
206             // Send the continue response if necessary (e.g. 'Expect: 100-continue' header)
207             // Check before content length. Failing an expectation may result in a different response being sent.
208             Object continueResponse = newContinueResponse(m, maxContentLength, ctx.pipeline());
209             if (continueResponse != null) {
210                 // Cache the write listener for reuse.
211                 ChannelFutureListener listener = continueResponseWriteListener;
212                 if (listener == null) {
213                     continueResponseWriteListener = listener = new ChannelFutureListener() {
214                         @Override
215                         public void operationComplete(ChannelFuture future) throws Exception {
216                             if (!future.isSuccess()) {
217                                 ctx.fireExceptionCaught(future.cause());
218                             }
219                         }
220                     };
221                 }
222 
223                 // Make sure to call this before writing, otherwise reference counts may be invalid.
224                 boolean closeAfterWrite = closeAfterContinueResponse(continueResponse);
225                 handlingOversizedMessage = ignoreContentAfterContinueResponse(continueResponse);
226 
227                 final ChannelFuture future = ctx.writeAndFlush(continueResponse).addListener(listener);
228 
229                 if (closeAfterWrite) {
230                     future.addListener(ChannelFutureListener.CLOSE);
231                     return;
232                 }
233                 if (handlingOversizedMessage) {
234                     return;
235                 }
236             } else if (isContentLengthInvalid(m, maxContentLength)) {
237                 // if content length is set, preemptively close if it's too large
238                 invokeHandleOversizedMessage(ctx, m);
239                 return;
240             }
241 
242             if (m instanceof DecoderResultProvider && !((DecoderResultProvider) m).decoderResult().isSuccess()) {
243                 O aggregated;
244                 if (m instanceof ByteBufHolder) {
245                     aggregated = beginAggregation(m, ((ByteBufHolder) m).content().retain());
246                 } else {
247                     aggregated = beginAggregation(m, EMPTY_BUFFER);
248                 }
249                 finishAggregation(aggregated);
250                 out.add(aggregated);
251                 return;
252             }
253 
254             // A streamed message - initialize the cumulative buffer, and wait for incoming chunks.
255             CompositeByteBuf content = ctx.alloc().compositeBuffer(maxCumulationBufferComponents);
256             if (m instanceof ByteBufHolder) {
257                 appendPartialContent(content, ((ByteBufHolder) m).content());
258             }
259             currentMessage = beginAggregation(m, content);
260         } else if (isContentMessage(msg)) {
261             if (currentMessage == null) {
262                 // it is possible that a TooLongFrameException was already thrown but we can still discard data
263                 // until the begging of the next request/response.
264                 return;
265             }
266 
267             // Merge the received chunk into the content of the current message.
268             CompositeByteBuf content = (CompositeByteBuf) currentMessage.content();
269 
270             @SuppressWarnings("unchecked")
271             final C m = (C) msg;
272             // Handle oversized message.
273             if (content.readableBytes() > maxContentLength - m.content().readableBytes()) {
274                 // By convention, full message type extends first message type.
275                 @SuppressWarnings("unchecked")
276                 S s = (S) currentMessage;
277                 invokeHandleOversizedMessage(ctx, s);
278                 return;
279             }
280 
281             // Append the content of the chunk.
282             appendPartialContent(content, m.content());
283 
284             // Give the subtypes a chance to merge additional information such as trailing headers.
285             aggregate(currentMessage, m);
286 
287             final boolean last;
288             if (m instanceof DecoderResultProvider) {
289                 DecoderResult decoderResult = ((DecoderResultProvider) m).decoderResult();
290                 if (!decoderResult.isSuccess()) {
291                     if (currentMessage instanceof DecoderResultProvider) {
292                         ((DecoderResultProvider) currentMessage).setDecoderResult(
293                                 DecoderResult.failure(decoderResult.cause()));
294                     }
295                     last = true;
296                 } else {
297                     last = isLastContentMessage(m);
298                 }
299             } else {
300                 last = isLastContentMessage(m);
301             }
302 
303             if (last) {
304                 finishAggregation(currentMessage);
305 
306                 // All done
307                 out.add(currentMessage);
308                 currentMessage = null;
309             }
310         } else {
311             throw new MessageAggregationException();
312         }
313     }
314 
315     private static void appendPartialContent(CompositeByteBuf content, ByteBuf partialContent) {
316         if (partialContent.isReadable()) {
317             content.addComponent(true, partialContent.retain());
318         }
319     }
320 
321     /**
322      * Determine if the message {@code start}'s content length is known, and if it greater than
323      * {@code maxContentLength}.
324      * @param start The message which may indicate the content length.
325      * @param maxContentLength The maximum allowed content length.
326      * @return {@code true} if the message {@code start}'s content length is known, and if it greater than
327      * {@code maxContentLength}. {@code false} otherwise.
328      */
329     protected abstract boolean isContentLengthInvalid(S start, int maxContentLength) throws Exception;
330 
331     /**
332      * Returns the 'continue response' for the specified start message if necessary. For example, this method is
333      * useful to handle an HTTP 100-continue header.
334      *
335      * @return the 'continue response', or {@code null} if there's no message to send
336      */
337     protected abstract Object newContinueResponse(S start, int maxContentLength, ChannelPipeline pipeline)
338             throws Exception;
339 
340     /**
341      * Determine if the channel should be closed after the result of
342      * {@link #newContinueResponse(Object, int, ChannelPipeline)} is written.
343      * @param msg The return value from {@link #newContinueResponse(Object, int, ChannelPipeline)}.
344      * @return {@code true} if the channel should be closed after the result of
345      * {@link #newContinueResponse(Object, int, ChannelPipeline)} is written. {@code false} otherwise.
346      */
347     protected abstract boolean closeAfterContinueResponse(Object msg) throws Exception;
348 
349     /**
350      * Determine if all objects for the current request/response should be ignored or not.
351      * Messages will stop being ignored the next time {@link #isContentMessage(Object)} returns {@code true}.
352      *
353      * @param msg The return value from {@link #newContinueResponse(Object, int, ChannelPipeline)}.
354      * @return {@code true} if all objects for the current request/response should be ignored or not.
355      * {@code false} otherwise.
356      */
357     protected abstract boolean ignoreContentAfterContinueResponse(Object msg) throws Exception;
358 
359     /**
360      * Creates a new aggregated message from the specified start message and the specified content.  If the start
361      * message implements {@link ByteBufHolder}, its content is appended to the specified {@code content}.
362      * This aggregator will continue to append the received content to the specified {@code content}.
363      */
364     protected abstract O beginAggregation(S start, ByteBuf content) throws Exception;
365 
366     /**
367      * Transfers the information provided by the specified content message to the specified aggregated message.
368      * Note that the content of the specified content message has been appended to the content of the specified
369      * aggregated message already, so that you don't need to.  Use this method to transfer the additional information
370      * that the content message provides to {@code aggregated}.
371      */
372     protected void aggregate(O aggregated, C content) throws Exception { }
373 
374     /**
375      * Invoked when the specified {@code aggregated} message is about to be passed to the next handler in the pipeline.
376      */
377     protected void finishAggregation(O aggregated) throws Exception { }
378 
379     private void invokeHandleOversizedMessage(ChannelHandlerContext ctx, S oversized) throws Exception {
380         handlingOversizedMessage = true;
381         currentMessage = null;
382         try {
383             handleOversizedMessage(ctx, oversized);
384         } finally {
385             // Release the message in case it is a full one.
386             ReferenceCountUtil.release(oversized);
387         }
388     }
389 
390     /**
391      * Invoked when an incoming request exceeds the maximum content length.  The default behvaior is to trigger an
392      * {@code exceptionCaught()} event with a {@link TooLongFrameException}.
393      *
394      * @param ctx the {@link ChannelHandlerContext}
395      * @param oversized the accumulated message up to this point, whose type is {@code S} or {@code O}
396      */
397     protected void handleOversizedMessage(ChannelHandlerContext ctx, S oversized) throws Exception {
398         ctx.fireExceptionCaught(
399                 new TooLongFrameException("content length exceeded " + maxContentLength() + " bytes."));
400     }
401 
402     @Override
403     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
404         // We might need keep reading the channel until the full message is aggregated.
405         //
406         // See https://github.com/netty/netty/issues/6583
407         if (currentMessage != null && !ctx.channel().config().isAutoRead()) {
408             ctx.read();
409         }
410         ctx.fireChannelReadComplete();
411     }
412 
413     @Override
414     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
415         try {
416             // release current message if it is not null as it may be a left-over
417             super.channelInactive(ctx);
418         } finally {
419             releaseCurrentMessage();
420         }
421     }
422 
423     @Override
424     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
425         this.ctx = ctx;
426     }
427 
428     @Override
429     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
430         try {
431             super.handlerRemoved(ctx);
432         } finally {
433             // release current message if it is not null as it may be a left-over as there is not much more we can do in
434             // this case
435             releaseCurrentMessage();
436         }
437     }
438 
439     private void releaseCurrentMessage() {
440         if (currentMessage != null) {
441             currentMessage.release();
442             currentMessage = null;
443             handlingOversizedMessage = false;
444         }
445     }
446 }