View Javadoc
1   /*
2    * Copyright 2021 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.handler.codec.compression;
17  
18  import io.netty5.buffer.api.Buffer;
19  import io.netty5.channel.ChannelHandlerContext;
20  import io.netty5.handler.codec.ByteToMessageDecoder;
21  
22  import java.util.Objects;
23  import java.util.function.Supplier;
24  
25  /**
26   * {@link ByteToMessageDecoder} that uses a {@link Decompressor} for decompressing incoming {@link Buffer}s.
27   */
28  public final class DecompressionHandler extends ByteToMessageDecoder {
29  
30      private final Supplier<? extends Decompressor> decompressorSupplier;
31      private final boolean discardBytesAfterFinished;
32      private Decompressor decompressor;
33  
34      /**
35       * Creates a new instance.
36       *
37       * @param decompressorSupplier  the {@link Supplier} that is used to create the {@link Decompressor}.
38       */
39      public DecompressionHandler(Supplier<? extends Decompressor> decompressorSupplier) {
40          this(decompressorSupplier, true);
41      }
42  
43      /**
44       * Creates a new instance.
45       *
46       * @param decompressorSupplier          the {@link Supplier} that is used to create the {@link Decompressor}.
47       * @param discardBytesAfterFinished     {@code true} if the bytes should be discarded after the {@link Compressor}
48       *                                      finished the compression of the whole stream.
49       */
50      public DecompressionHandler(Supplier<? extends Decompressor> decompressorSupplier,
51                                  boolean discardBytesAfterFinished) {
52          this.decompressorSupplier = Objects.requireNonNull(decompressorSupplier, "decompressorSupplier");
53          this.discardBytesAfterFinished = discardBytesAfterFinished;
54      }
55  
56      @Override
57      protected void handlerAdded0(ChannelHandlerContext ctx) throws Exception {
58          super.handlerAdded0(ctx);
59          decompressor = decompressorSupplier.get();
60      }
61  
62      @Override
63      protected void decode(ChannelHandlerContext ctx, Buffer in) throws Exception {
64          if (decompressor == null) {
65              ctx.fireChannelRead(in.split());
66              return;
67          }
68          while (!decompressor.isFinished()) {
69              int idx = in.readerOffset();
70              Buffer decompressed = decompressor.decompress(in, ctx.bufferAllocator());
71              if (decompressed != null) {
72                  ctx.fireChannelRead(decompressed);
73              } else if (idx == in.readerOffset()) {
74                  return;
75              }
76          }
77          assert decompressor.isFinished();
78          if (discardBytesAfterFinished) {
79              in.skipReadableBytes(in.readableBytes());
80          } else {
81              ctx.fireChannelRead(in.split());
82          }
83      }
84  
85      @Override
86      protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
87          try {
88              super.handlerRemoved0(ctx);
89          } finally {
90              closeDecompressor();
91          }
92      }
93  
94      @Override
95      public void channelInactive(ChannelHandlerContext ctx) throws Exception {
96          try {
97              super.channelInactive(ctx);
98          } finally {
99              closeDecompressor();
100         }
101     }
102 
103     private void closeDecompressor() {
104         if (decompressor != null) {
105             decompressor.close();
106             decompressor = null;
107         }
108     }
109 }