1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.handler.codec.compression;
17
18 import io.netty5.buffer.api.Buffer;
19 import io.netty5.channel.ChannelHandler;
20 import io.netty5.channel.ChannelHandlerContext;
21 import io.netty5.util.concurrent.Future;
22 import io.netty5.util.concurrent.Promise;
23
24 import java.util.concurrent.TimeUnit;
25 import java.util.function.Supplier;
26
27 import static io.netty5.util.internal.ObjectUtil.checkPositive;
28 import static java.util.Objects.requireNonNull;
29
30
31
32
33 public final class CompressionHandler implements ChannelHandler {
34
35 private final Supplier<? extends Compressor> compressorSupplier;
36 private final long closeWriteTimeout;
37 private final TimeUnit closeWriteTimeoutUnit;
38 private final boolean discardBytesAfterFinished;
39 private Compressor compressor;
40
41
42
43
44
45
46 public CompressionHandler(Supplier<? extends Compressor> compressorSupplier) {
47 this(compressorSupplier, 10, TimeUnit.SECONDS, true);
48 }
49
50
51
52
53
54
55
56
57
58
59
60 public CompressionHandler(Supplier<? extends Compressor> compressorSupplier,
61 long closeWriteTimeout, TimeUnit closeWriteTimeoutUnit,
62 boolean discardBytesAfterFinished) {
63 this.compressorSupplier = requireNonNull(compressorSupplier, "compressorSupplier");
64 this.closeWriteTimeout = checkPositive(closeWriteTimeout, "closeWriteTimeout");
65 this.closeWriteTimeoutUnit = requireNonNull(closeWriteTimeoutUnit, "closeWriteTimeoutUnit");
66 this.discardBytesAfterFinished = discardBytesAfterFinished;
67 }
68
69 @Override
70 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
71 compressor = compressorSupplier.get();
72 }
73
74 @Override
75 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
76 if (compressor != null) {
77 try {
78 finish(ctx, false);
79 } finally {
80 closeCompressor();
81 }
82 }
83 }
84
85 @Override
86 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
87 if (compressor != null) {
88 closeCompressor();
89 }
90 ctx.fireChannelInactive();
91 }
92
93 @Override
94 public Future<Void> write(ChannelHandlerContext ctx, Object msg) {
95 if (compressor == null || !(msg instanceof Buffer)) {
96 return ctx.write(msg);
97 }
98 Buffer input = (Buffer) msg;
99 if (compressor.isFinished()) {
100 if (discardBytesAfterFinished) {
101 input.close();
102 return ctx.newSucceededFuture();
103 }
104 return ctx.write(msg);
105 }
106 try (input) {
107 Buffer buffer = compressor.compress(input, ctx.bufferAllocator());
108 return ctx.write(buffer);
109 }
110 }
111
112 @Override
113 public Future<Void> close(ChannelHandlerContext ctx) {
114 return finish(ctx, true);
115 }
116
117 private Future<Void> finish(ChannelHandlerContext ctx, boolean closeCtx) {
118 if (compressor == null || compressor.isFinished()) {
119 return closeCtx ? ctx.close() : ctx.newSucceededFuture();
120 }
121 Buffer buffer = compressor.finish(ctx.bufferAllocator());
122 if (buffer.readableBytes() == 0) {
123 buffer.close();
124 return closeCtx ? ctx.close() : ctx.newSucceededFuture();
125 }
126 if (closeCtx) {
127 Promise<Void> promise = ctx.newPromise();
128 Future<Void> f = ctx.writeAndFlush(buffer).addListener(ctx, (c, ignore) -> c.close().cascadeTo(promise));
129 if (!f.isDone()) {
130
131 Future<?> sF = ctx.executor().schedule(() -> ctx.close().cascadeTo(promise),
132 closeWriteTimeout, closeWriteTimeoutUnit);
133 f.addListener(sF, (scheduledFuture, ignore) -> scheduledFuture.cancel());
134 }
135 return promise.asFuture();
136 }
137 return ctx.write(buffer);
138 }
139
140 private void closeCompressor() {
141 compressor.close();
142 compressor = null;
143 }
144 }