1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.handler.codec.compression;
17
18 import io.netty5.buffer.api.Buffer;
19 import io.netty5.channel.ChannelHandlerContext;
20 import io.netty5.handler.codec.ByteToMessageDecoder;
21
22 import java.util.Objects;
23 import java.util.function.Supplier;
24
25
26
27
28 public final class DecompressionHandler extends ByteToMessageDecoder {
29
30 private final Supplier<? extends Decompressor> decompressorSupplier;
31 private final boolean discardBytesAfterFinished;
32 private Decompressor decompressor;
33
34
35
36
37
38
39 public DecompressionHandler(Supplier<? extends Decompressor> decompressorSupplier) {
40 this(decompressorSupplier, true);
41 }
42
43
44
45
46
47
48
49
50 public DecompressionHandler(Supplier<? extends Decompressor> decompressorSupplier,
51 boolean discardBytesAfterFinished) {
52 this.decompressorSupplier = Objects.requireNonNull(decompressorSupplier, "decompressorSupplier");
53 this.discardBytesAfterFinished = discardBytesAfterFinished;
54 }
55
56 @Override
57 protected void handlerAdded0(ChannelHandlerContext ctx) throws Exception {
58 super.handlerAdded0(ctx);
59 decompressor = decompressorSupplier.get();
60 }
61
62 @Override
63 protected void decode(ChannelHandlerContext ctx, Buffer in) throws Exception {
64 if (decompressor == null) {
65 ctx.fireChannelRead(in.split());
66 return;
67 }
68 while (!decompressor.isFinished()) {
69 int idx = in.readerOffset();
70 Buffer decompressed = decompressor.decompress(in, ctx.bufferAllocator());
71 if (decompressed != null) {
72 ctx.fireChannelRead(decompressed);
73 } else if (idx == in.readerOffset()) {
74 return;
75 }
76 }
77 assert decompressor.isFinished();
78 if (discardBytesAfterFinished) {
79 in.skipReadableBytes(in.readableBytes());
80 } else {
81 ctx.fireChannelRead(in.split());
82 }
83 }
84
85 @Override
86 protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
87 try {
88 super.handlerRemoved0(ctx);
89 } finally {
90 closeDecompressor();
91 }
92 }
93
94 @Override
95 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
96 try {
97 super.channelInactive(ctx);
98 } finally {
99 closeDecompressor();
100 }
101 }
102
103 private void closeDecompressor() {
104 if (decompressor != null) {
105 decompressor.close();
106 decompressor = null;
107 }
108 }
109 }