View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  
16  package io.netty.handler.codec.redis;
17  
18  import io.netty.channel.ChannelHandlerContext;
19  import io.netty.handler.codec.CodecException;
20  import io.netty.handler.codec.MessageToMessageDecoder;
21  import io.netty.handler.codec.PrematureChannelClosureException;
22  import io.netty.util.ReferenceCountUtil;
23  import io.netty.util.internal.ObjectUtil;
24  import io.netty.util.internal.UnstableApi;
25  
26  import java.util.ArrayDeque;
27  import java.util.ArrayList;
28  import java.util.Deque;
29  import java.util.List;
30  
31  /**
32   * Aggregates {@link RedisMessage} parts into {@link ArrayRedisMessage}. This decoder
33   * should be used together with {@link RedisDecoder}.
34   */
35  @UnstableApi
36  public final class RedisArrayAggregator extends MessageToMessageDecoder<RedisMessage> {
37  
38      private static final int DEFAULT_MAX_ARRAY_LENGTH = RedisConstants.REDIS_MAX_ARRAY_LENGTH;
39      private final int maxNestedArrayDepth;
40      private final Deque<AggregateState> depths = new ArrayDeque<AggregateState>(4);
41      private final int maxElements;
42  
43      /**
44       * Create a new instance that will aggregate an {@link ArrayHeaderRedisMessage}
45       * and its subsequent elements into an {@link ArrayRedisMessage}.
46       * <p>
47       * This constructor specifies a maximum number of elements of 1.000.000,
48       * but this default can be increased with the {@value RedisConstants#PROP_REDIS_MAX_ARRAY_LENGTH} system property.
49       *
50       * @deprecated Use {@link #RedisArrayAggregator(int, int)} instead to define a max size of the array to aggregate.
51       */
52      @Deprecated
53      public RedisArrayAggregator() {
54          // Let's impose some limit at least by default.
55          this(DEFAULT_MAX_ARRAY_LENGTH, 1024);
56      }
57  
58      /**
59       * Create a new instance that will aggregate an {@link ArrayHeaderRedisMessage}
60       * and its subsequent elements into an {@link ArrayRedisMessage}.
61       * <p>
62       * A {@link CodecException} will be thrown if the array header specify a length greater than
63       * the given number of max elements.
64       * @param maxElements The maximum number of elements to aggregate in a single message.
65       * @param maxNestedArrayDepth   the maximum depth of the nested array before an exception will be thrown
66       */
67      public RedisArrayAggregator(int maxElements, int maxNestedArrayDepth) {
68          super(RedisMessage.class);
69          this.maxElements = ObjectUtil.checkPositive(maxElements, "maxElements");
70          this.maxNestedArrayDepth = ObjectUtil.checkPositive(maxNestedArrayDepth, "maxNestedArrayDepth");
71      }
72  
73      @Override
74      protected void decode(ChannelHandlerContext ctx, RedisMessage msg, List<Object> out) throws Exception {
75          if (msg instanceof ArrayHeaderRedisMessage) {
76              msg = decodeRedisArrayHeader((ArrayHeaderRedisMessage) msg);
77              if (msg == null) {
78                  return;
79              }
80          } else {
81              ReferenceCountUtil.retain(msg);
82          }
83  
84          while (!depths.isEmpty()) {
85              AggregateState current = depths.peek();
86              current.children.add(msg);
87  
88              // if current aggregation completed, go to parent aggregation.
89              if (current.children.size() == current.length) {
90                  msg = new ArrayRedisMessage(current.children);
91                  depths.pop();
92              } else {
93                  // not aggregated yet. try next time.
94                  return;
95              }
96          }
97  
98          out.add(msg);
99      }
100 
101     private RedisMessage decodeRedisArrayHeader(ArrayHeaderRedisMessage header) {
102         if (header.isNull()) {
103             return ArrayRedisMessage.NULL_INSTANCE;
104         } else if (header.length() == 0L) {
105             return ArrayRedisMessage.EMPTY_INSTANCE;
106         } else if (header.length() > 0L) {
107             // Currently, this codec doesn't support `long` length for arrays because Java's List.size() is int.
108             if (header.length() > maxElements) {
109                 throw new CodecException("this codec doesn't support longer length than " + maxElements);
110             }
111 
112             if (depths.size() >= maxNestedArrayDepth) {
113                 releaseAndClearDepths();
114                 throw new CodecException("max nested array depth exceeded: "  + maxNestedArrayDepth);
115             }
116             // start aggregating array
117             depths.push(new AggregateState((int) header.length()));
118             return null;
119         } else {
120             throw new CodecException("bad length: " + header.length());
121         }
122     }
123 
124     private static final class AggregateState {
125         private final int length;
126         private final List<RedisMessage> children;
127         AggregateState(int length) {
128             this.length = length;
129             this.children = new ArrayList<RedisMessage>(length);
130         }
131     }
132 
133     @Override
134     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
135         super.handlerRemoved(ctx);
136         releaseAndClearDepths();
137     }
138 
139     private void releaseAndClearDepths() {
140         for (AggregateState state : depths) {
141             for (RedisMessage message : state.children) {
142                 ReferenceCountUtil.safeRelease(message);
143             }
144         }
145         depths.clear();
146     }
147 
148     @Override
149     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
150         super.channelInactive(ctx);
151 
152         if (!depths.isEmpty()) {
153             ctx.fireExceptionCaught(new PrematureChannelClosureException(
154                     "channel gone inactive with " + depths.size() +
155                             " messages still incomplete"));
156         }
157     }
158 }