View Javadoc
1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.memcache.binary;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.ChannelHandlerAppender;
20  import io.netty.channel.ChannelHandlerContext;
21  import io.netty.handler.codec.PrematureChannelClosureException;
22  import io.netty.handler.codec.memcache.LastMemcacheContent;
23  
24  import java.util.List;
25  import java.util.concurrent.atomic.AtomicLong;
26  
27  /**
28   * The client codec that combines the proper encoder and decoder.
29   * <p/>
30   * Use this codec if you want to implement a memcache client that speaks the binary protocol. It
31   * combines both the {@link BinaryMemcacheResponseDecoder} and the {@link BinaryMemcacheRequestEncoder}.
32   * <p/>
33   * Optionally, it counts the number of outstanding responses and raises an exception if - on connection
34   * close - the list is not 0 (this is turned off by default). You can also define a chunk size for the
35   * content, which defaults to 8192. This chunk size is the maximum, so if smaller chunks arrive they
36   * will be passed up the pipeline and not queued up to the chunk size.
37   */
38  public final class BinaryMemcacheClientCodec extends ChannelHandlerAppender {
39  
40      private final boolean failOnMissingResponse;
41      private final AtomicLong requestResponseCounter = new AtomicLong();
42  
43      /**
44       * Create a new {@link BinaryMemcacheClientCodec} with the default settings applied.
45       */
46      public BinaryMemcacheClientCodec() {
47          this(AbstractBinaryMemcacheDecoder.DEFAULT_MAX_CHUNK_SIZE);
48      }
49  
50      /**
51       * Create a new {@link BinaryMemcacheClientCodec} and set a custom chunk size.
52       *
53       * @param decodeChunkSize the maximum chunk size.
54       */
55      public BinaryMemcacheClientCodec(int decodeChunkSize) {
56          this(decodeChunkSize, false);
57      }
58  
59      /**
60       * Create a new {@link BinaryMemcacheClientCodec} with custom settings.
61       *
62       * @param decodeChunkSize       the maximum chunk size.
63       * @param failOnMissingResponse report if after close there are outstanding requests.
64       */
65      public BinaryMemcacheClientCodec(int decodeChunkSize, boolean failOnMissingResponse) {
66          this.failOnMissingResponse = failOnMissingResponse;
67          add(new Decoder(decodeChunkSize));
68          add(new Encoder());
69      }
70  
71      private final class Encoder extends BinaryMemcacheRequestEncoder {
72  
73          @Override
74          protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
75              super.encode(ctx, msg, out);
76  
77              if (failOnMissingResponse && msg instanceof LastMemcacheContent) {
78                  requestResponseCounter.incrementAndGet();
79              }
80          }
81      }
82  
83      private final class Decoder extends BinaryMemcacheResponseDecoder {
84  
85          Decoder(int chunkSize) {
86              super(chunkSize);
87          }
88  
89          @Override
90          protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
91              int oldSize = out.size();
92              super.decode(ctx, in, out);
93  
94              if (failOnMissingResponse) {
95                  final int size = out.size();
96                  for (int i = oldSize; i < size; i ++) {
97                      Object msg = out.get(i);
98                      if (msg instanceof LastMemcacheContent) {
99                          requestResponseCounter.decrementAndGet();
100                     }
101                 }
102             }
103         }
104 
105         @Override
106         public void channelInactive(ChannelHandlerContext ctx) throws Exception {
107             super.channelInactive(ctx);
108 
109             if (failOnMissingResponse) {
110                 long missingResponses = requestResponseCounter.get();
111                 if (missingResponses > 0) {
112                     ctx.fireExceptionCaught(new PrematureChannelClosureException(
113                         "channel gone inactive with " + missingResponses +
114                             " missing response(s)"));
115                 }
116             }
117         }
118     }
119 }