View Javadoc
1   /*
2    * Copyright 2021 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.handler.codec.compression;
17  
18  import io.netty5.buffer.api.Buffer;
19  import io.netty5.channel.ChannelHandler;
20  import io.netty5.channel.ChannelHandlerContext;
21  import io.netty5.util.concurrent.Future;
22  import io.netty5.util.concurrent.Promise;
23  
24  import java.util.concurrent.TimeUnit;
25  import java.util.function.Supplier;
26  
27  import static io.netty5.util.internal.ObjectUtil.checkPositive;
28  import static java.util.Objects.requireNonNull;
29  
30  /**
31   * {@link ChannelHandler} which uses a {@link Compressor} for compressing the written {@link Buffer}s.
32   */
33  public final class CompressionHandler implements ChannelHandler {
34  
35      private final Supplier<? extends Compressor> compressorSupplier;
36      private final long closeWriteTimeout;
37      private final TimeUnit closeWriteTimeoutUnit;
38      private final boolean discardBytesAfterFinished;
39      private Compressor compressor;
40  
41      /**
42       * Creates a new instance.
43       *
44       * @param compressorSupplier  the {@link Supplier} that is used to create the {@link Compressor}.
45       */
46      public CompressionHandler(Supplier<? extends Compressor> compressorSupplier) {
47          this(compressorSupplier, 10, TimeUnit.SECONDS, true);
48      }
49  
50      /**
51       * Creates a new instance.
52       *
53       * @param compressorSupplier        the {@link Supplier} that is used to create the {@link Compressor}.
54       * @param closeWriteTimeout         the amount to wait before we will close even tho the write of the trailer was
55       *                                  not finished yet.
56       * @param closeWriteTimeoutUnit     the unit of the timeout.
57       * @param discardBytesAfterFinished {@code true} if the bytes should be discarded after the {@link Compressor}
58       *                                  finished the compression of the whole stream.
59       */
60      public CompressionHandler(Supplier<? extends Compressor> compressorSupplier,
61                                long closeWriteTimeout, TimeUnit closeWriteTimeoutUnit,
62                                boolean discardBytesAfterFinished) {
63          this.compressorSupplier = requireNonNull(compressorSupplier, "compressorSupplier");
64          this.closeWriteTimeout = checkPositive(closeWriteTimeout, "closeWriteTimeout");
65          this.closeWriteTimeoutUnit = requireNonNull(closeWriteTimeoutUnit, "closeWriteTimeoutUnit");
66          this.discardBytesAfterFinished = discardBytesAfterFinished;
67      }
68  
69      @Override
70      public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
71          compressor = compressorSupplier.get();
72      }
73  
74      @Override
75      public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
76          if (compressor != null) {
77              try {
78                  finish(ctx, false);
79              } finally {
80                  closeCompressor();
81              }
82          }
83      }
84  
85      @Override
86      public void channelInactive(ChannelHandlerContext ctx) throws Exception {
87          if (compressor != null) {
88              closeCompressor();
89          }
90          ctx.fireChannelInactive();
91      }
92  
93      @Override
94      public Future<Void> write(ChannelHandlerContext ctx, Object msg) {
95          if (compressor == null || !(msg instanceof Buffer)) {
96              return ctx.write(msg);
97          }
98          Buffer input = (Buffer) msg;
99          if (compressor.isFinished()) {
100             if (discardBytesAfterFinished) {
101                 input.close();
102                 return ctx.newSucceededFuture();
103             }
104             return ctx.write(msg);
105         }
106         try (input) {
107             Buffer buffer = compressor.compress(input, ctx.bufferAllocator());
108             return ctx.write(buffer);
109         }
110     }
111 
112     @Override
113     public Future<Void> close(ChannelHandlerContext ctx) {
114         return finish(ctx, true);
115     }
116 
117     private Future<Void> finish(ChannelHandlerContext ctx, boolean closeCtx) {
118         if (compressor == null || compressor.isFinished()) {
119             return closeCtx ? ctx.close() : ctx.newSucceededFuture();
120         }
121         Buffer buffer = compressor.finish(ctx.bufferAllocator());
122         if (buffer.readableBytes() == 0) {
123             buffer.close();
124             return closeCtx ? ctx.close() : ctx.newSucceededFuture();
125         }
126         if (closeCtx) {
127             Promise<Void> promise = ctx.newPromise();
128             Future<Void> f = ctx.writeAndFlush(buffer).addListener(ctx, (c, ignore) -> c.close().cascadeTo(promise));
129             if (!f.isDone()) {
130                 // Ensure the channel is closed even if the write operation completes in time.
131                 Future<?> sF =  ctx.executor().schedule(() -> ctx.close().cascadeTo(promise),
132                         closeWriteTimeout, closeWriteTimeoutUnit);
133                 f.addListener(sF, (scheduledFuture, ignore) -> scheduledFuture.cancel());
134             }
135             return promise.asFuture();
136         }
137         return ctx.write(buffer);
138     }
139 
140     private void closeCompressor() {
141         compressor.close();
142         compressor = null;
143     }
144 }