View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.handler.codec.http.websocketx.extensions.compression;
17  
18  import io.netty5.buffer.api.Buffer;
19  import io.netty5.buffer.api.CompositeBuffer;
20  import io.netty5.util.Send;
21  import io.netty5.channel.ChannelHandlerContext;
22  import io.netty5.channel.embedded.EmbeddedChannel;
23  import io.netty5.handler.codec.CodecException;
24  import io.netty5.handler.codec.compression.ZlibCodecFactory;
25  import io.netty5.handler.codec.compression.ZlibWrapper;
26  import io.netty5.handler.codec.http.websocketx.BinaryWebSocketFrame;
27  import io.netty5.handler.codec.http.websocketx.ContinuationWebSocketFrame;
28  import io.netty5.handler.codec.http.websocketx.TextWebSocketFrame;
29  import io.netty5.handler.codec.http.websocketx.WebSocketFrame;
30  import io.netty5.handler.codec.http.websocketx.extensions.WebSocketExtensionEncoder;
31  import io.netty5.handler.codec.http.websocketx.extensions.WebSocketExtensionFilter;
32  
33  import java.util.ArrayList;
34  import java.util.List;
35  import java.util.Objects;
36  import java.util.function.Supplier;
37  
38  import static io.netty5.buffer.api.DefaultBufferAllocators.preferredAllocator;
39  import static io.netty5.handler.codec.http.websocketx.extensions.compression.DeflateDecoder.FRAME_TAIL_LENGTH;
40  
41  /**
42   * Deflate implementation of a payload compressor for
43   * <tt>io.netty5.handler.codec.http.websocketx.WebSocketFrame</tt>.
44   */
45  abstract class DeflateEncoder extends WebSocketExtensionEncoder {
46      static final Supplier<Buffer> EMPTY_DEFLATE_BLOCK;
47      static final int EMPTY_DEFLATE_BLOCK_LENGTH;
48      static {
49          byte[] emptyDeflate = { 0x00 };
50          EMPTY_DEFLATE_BLOCK = preferredAllocator().constBufferSupplier(emptyDeflate);
51          EMPTY_DEFLATE_BLOCK_LENGTH = emptyDeflate.length;
52      }
53  
54      private final int compressionLevel;
55      private final int windowSize;
56      private final boolean noContext;
57      private final WebSocketExtensionFilter extensionEncoderFilter;
58  
59      private EmbeddedChannel encoder;
60  
61      /**
62       * Constructor
63       * @param compressionLevel compression level of the compressor.
64       * @param windowSize maximum size of the window compressor buffer.
65       * @param noContext true to disable context takeover.
66       * @param extensionEncoderFilter extension encoder filter.
67       */
68      DeflateEncoder(int compressionLevel, int windowSize, boolean noContext,
69                     WebSocketExtensionFilter extensionEncoderFilter) {
70          this.compressionLevel = compressionLevel;
71          this.windowSize = windowSize;
72          this.noContext = noContext;
73          this.extensionEncoderFilter = Objects.requireNonNull(extensionEncoderFilter, "extensionEncoderFilter");
74      }
75  
76      /**
77       * Returns the extension encoder filter.
78       */
79      protected WebSocketExtensionFilter extensionEncoderFilter() {
80          return extensionEncoderFilter;
81      }
82  
83      /**
84       * @param msg the current frame.
85       * @return the rsv bits to set in the compressed frame.
86       */
87      protected abstract int rsv(WebSocketFrame msg);
88  
89      /**
90       * @param msg the current frame.
91       * @return true if compressed payload tail needs to be removed.
92       */
93      protected abstract boolean removeFrameTail(WebSocketFrame msg);
94  
95      @Override
96      protected void encodeAndClose(ChannelHandlerContext ctx, WebSocketFrame msg, List<Object> out) throws Exception {
97          final Buffer compressedContent;
98          if (msg.binaryData().readableBytes() > 0) {
99              compressedContent = compressContent(ctx, msg);
100         } else if (msg.isFinalFragment()) {
101             // Set empty DEFLATE block manually for unknown buffer size
102             // https://tools.ietf.org/html/rfc7692#section-7.2.3.6
103             compressedContent = EMPTY_DEFLATE_BLOCK.get();
104         } else {
105             msg.close();
106             throw new CodecException("cannot compress content buffer");
107         }
108 
109         final WebSocketFrame outMsg;
110         if (msg instanceof TextWebSocketFrame) {
111             outMsg = new TextWebSocketFrame(msg.isFinalFragment(), rsv(msg), compressedContent);
112         } else if (msg instanceof BinaryWebSocketFrame) {
113             outMsg = new BinaryWebSocketFrame(msg.isFinalFragment(), rsv(msg), compressedContent);
114         } else if (msg instanceof ContinuationWebSocketFrame) {
115             outMsg = new ContinuationWebSocketFrame(msg.isFinalFragment(), rsv(msg), compressedContent);
116         } else {
117             compressedContent.close();
118             throw new CodecException("unexpected frame type: " + msg.getClass().getName());
119         }
120 
121         out.add(outMsg);
122     }
123 
124     @Override
125     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
126         cleanup();
127         super.handlerRemoved(ctx);
128     }
129 
130     private Buffer compressContent(ChannelHandlerContext ctx, WebSocketFrame msg) {
131         if (encoder == null) {
132             encoder = new EmbeddedChannel(ZlibCodecFactory.newZlibEncoder(
133                     ZlibWrapper.NONE, compressionLevel, windowSize, 8));
134         }
135 
136         encoder.writeOutbound(msg.binaryData());
137 
138         List<Send<Buffer>> bufferList = new ArrayList<>();
139         for (;;) {
140             Buffer partCompressedContent = encoder.readOutbound();
141             if (partCompressedContent == null) {
142                 break;
143             }
144             if (partCompressedContent.readableBytes() == 0) {
145                 partCompressedContent.close();
146                 continue;
147             }
148             bufferList.add(partCompressedContent.send());
149         }
150 
151         if (bufferList.isEmpty()) {
152             throw new CodecException("cannot read compressed buffer");
153         }
154 
155         if (msg.isFinalFragment() && noContext) {
156             cleanup();
157         }
158 
159         Buffer compressedContent;
160         CompositeBuffer fullCompressedContent = ctx.bufferAllocator().compose(bufferList);
161         if (removeFrameTail(msg)) {
162             int realLength = fullCompressedContent.readableBytes() - FRAME_TAIL_LENGTH;
163             compressedContent = fullCompressedContent.readerOffset(0).writerOffset(realLength);
164         } else {
165             compressedContent = fullCompressedContent;
166         }
167 
168         return compressedContent;
169     }
170 
171     private void cleanup() {
172         if (encoder != null) {
173             // Clean-up the previous encoder if not cleaned up correctly.
174             encoder.finishAndReleaseAll();
175             encoder = null;
176         }
177     }
178 }