1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty.handler.ssl;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.ByteBufUtil;
21 import io.netty.buffer.CompositeByteBuf;
22 import io.netty.buffer.Unpooled;
23 import io.netty.channel.Channel;
24 import io.netty.channel.ChannelConfig;
25 import io.netty.channel.ChannelException;
26 import io.netty.channel.ChannelFuture;
27 import io.netty.channel.ChannelFutureListener;
28 import io.netty.channel.ChannelHandlerContext;
29 import io.netty.channel.ChannelInboundHandler;
30 import io.netty.channel.ChannelOption;
31 import io.netty.channel.ChannelOutboundBuffer;
32 import io.netty.channel.ChannelOutboundHandler;
33 import io.netty.channel.ChannelPipeline;
34 import io.netty.channel.ChannelPromise;
35 import io.netty.channel.unix.UnixChannel;
36 import io.netty.handler.codec.ByteToMessageDecoder;
37 import io.netty.handler.codec.DecoderException;
38 import io.netty.handler.codec.UnsupportedMessageTypeException;
39 import io.netty.util.ReferenceCountUtil;
40 import io.netty.util.concurrent.DefaultPromise;
41 import io.netty.util.concurrent.EventExecutor;
42 import io.netty.util.concurrent.Future;
43 import io.netty.util.concurrent.FutureListener;
44 import io.netty.util.concurrent.ImmediateExecutor;
45 import io.netty.util.concurrent.Promise;
46 import io.netty.util.concurrent.PromiseNotifier;
47 import io.netty.util.internal.ObjectUtil;
48 import io.netty.util.internal.PlatformDependent;
49 import io.netty.util.internal.ThrowableUtil;
50 import io.netty.util.internal.UnstableApi;
51 import io.netty.util.internal.logging.InternalLogger;
52 import io.netty.util.internal.logging.InternalLoggerFactory;
53
54 import java.io.IOException;
55 import java.net.SocketAddress;
56 import java.nio.ByteBuffer;
57 import java.nio.channels.ClosedChannelException;
58 import java.nio.channels.DatagramChannel;
59 import java.nio.channels.SocketChannel;
60 import java.security.cert.CertificateException;
61 import java.util.List;
62 import java.util.concurrent.Executor;
63 import java.util.concurrent.RejectedExecutionException;
64 import java.util.concurrent.TimeUnit;
65 import java.util.regex.Pattern;
66
67 import javax.net.ssl.SSLEngine;
68 import javax.net.ssl.SSLEngineResult;
69 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
70 import javax.net.ssl.SSLEngineResult.Status;
71 import javax.net.ssl.SSLException;
72 import javax.net.ssl.SSLHandshakeException;
73 import javax.net.ssl.SSLSession;
74
75 import static io.netty.handler.ssl.SslUtils.NOT_ENOUGH_DATA;
76 import static io.netty.handler.ssl.SslUtils.getEncryptedPacketLength;
77 import static io.netty.util.internal.ObjectUtil.checkNotNull;
78 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
79
80 /**
81 * Adds <a href="https://en.wikipedia.org/wiki/Transport_Layer_Security">SSL
82 * · TLS</a> and StartTLS support to a {@link Channel}. Please refer
83 * to the <strong>"SecureChat"</strong> example in the distribution or the web
84 * site for the detailed usage.
85 *
86 * <h3>Beginning the handshake</h3>
87 * <p>
88 * Beside using the handshake {@link ChannelFuture} to get notified about the completion of the handshake it's
89 * also possible to detect it by implement the
90 * {@link ChannelInboundHandler#userEventTriggered(ChannelHandlerContext, Object)}
91 * method and check for a {@link SslHandshakeCompletionEvent}.
92 *
93 * <h3>Handshake</h3>
94 * <p>
95 * The handshake will be automatically issued for you once the {@link Channel} is active and
96 * {@link SSLEngine#getUseClientMode()} returns {@code true}.
97 * So no need to bother with it by your self.
98 *
99 * <h3>Closing the session</h3>
100 * <p>
101 * To close the SSL session, the {@link #closeOutbound()} method should be
102 * called to send the {@code close_notify} message to the remote peer. One
103 * exception is when you close the {@link Channel} - {@link SslHandler}
104 * intercepts the close request and send the {@code close_notify} message
105 * before the channel closure automatically. Once the SSL session is closed,
106 * it is not reusable, and consequently you should create a new
107 * {@link SslHandler} with a new {@link SSLEngine} as explained in the
108 * following section.
109 *
110 * <h3>Restarting the session</h3>
111 * <p>
112 * To restart the SSL session, you must remove the existing closed
113 * {@link SslHandler} from the {@link ChannelPipeline}, insert a new
114 * {@link SslHandler} with a new {@link SSLEngine} into the pipeline,
115 * and start the handshake process as described in the first section.
116 *
117 * <h3>Implementing StartTLS</h3>
118 * <p>
119 * <a href="https://en.wikipedia.org/wiki/STARTTLS">StartTLS</a> is the
120 * communication pattern that secures the wire in the middle of the plaintext
121 * connection. Please note that it is different from SSL · TLS, that
122 * secures the wire from the beginning of the connection. Typically, StartTLS
123 * is composed of three steps:
124 * <ol>
125 * <li>Client sends a StartTLS request to server.</li>
126 * <li>Server sends a StartTLS response to client.</li>
127 * <li>Client begins SSL handshake.</li>
128 * </ol>
129 * If you implement a server, you need to:
130 * <ol>
131 * <li>create a new {@link SslHandler} instance with {@code startTls} flag set
132 * to {@code true},</li>
133 * <li>insert the {@link SslHandler} to the {@link ChannelPipeline}, and</li>
134 * <li>write a StartTLS response.</li>
135 * </ol>
136 * Please note that you must insert {@link SslHandler} <em>before</em> sending
137 * the StartTLS response. Otherwise the client can send begin SSL handshake
138 * before {@link SslHandler} is inserted to the {@link ChannelPipeline}, causing
139 * data corruption.
140 * <p>
141 * The client-side implementation is much simpler.
142 * <ol>
143 * <li>Write a StartTLS request,</li>
144 * <li>wait for the StartTLS response,</li>
145 * <li>create a new {@link SslHandler} instance with {@code startTls} flag set
146 * to {@code false},</li>
147 * <li>insert the {@link SslHandler} to the {@link ChannelPipeline}, and</li>
148 * <li>Initiate SSL handshake.</li>
149 * </ol>
150 *
151 * <h3>Known issues</h3>
152 * <p>
153 * Because of a known issue with the current implementation of the SslEngine that comes
154 * with Java it may be possible that you see blocked IO-Threads while a full GC is done.
155 * <p>
156 * So if you are affected you can workaround this problem by adjust the cache settings
157 * like shown below:
158 *
159 * <pre>
160 * SslContext context = ...;
161 * context.getServerSessionContext().setSessionCacheSize(someSaneSize);
162 * context.getServerSessionContext().setSessionTime(someSameTimeout);
163 * </pre>
164 * <p>
165 * What values to use here depends on the nature of your application and should be set
166 * based on monitoring and debugging of it.
167 * For more details see
168 * <a href="https://github.com/netty/netty/issues/832">#832</a> in our issue tracker.
169 */
170 public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundHandler {
171 private static final InternalLogger logger =
172 InternalLoggerFactory.getInstance(SslHandler.class);
173 private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile(
174 "^.*(?:connection.*(?:reset|closed|abort|broken)|broken.*pipe).*$", Pattern.CASE_INSENSITIVE);
175 private static final int STATE_SENT_FIRST_MESSAGE = 1;
176 private static final int STATE_FLUSHED_BEFORE_HANDSHAKE = 1 << 1;
177 private static final int STATE_READ_DURING_HANDSHAKE = 1 << 2;
178 private static final int STATE_HANDSHAKE_STARTED = 1 << 3;
179 /**
180 * Set by wrap*() methods when something is produced.
181 * {@link #channelReadComplete(ChannelHandlerContext)} will check this flag, clear it, and call ctx.flush().
182 */
183 private static final int STATE_NEEDS_FLUSH = 1 << 4;
184 private static final int STATE_OUTBOUND_CLOSED = 1 << 5;
185 private static final int STATE_CLOSE_NOTIFY = 1 << 6;
186 private static final int STATE_PROCESS_TASK = 1 << 7;
187 /**
188 * This flag is used to determine if we need to call {@link ChannelHandlerContext#read()} to consume more data
189 * when {@link ChannelConfig#isAutoRead()} is {@code false}.
190 */
191 private static final int STATE_FIRE_CHANNEL_READ = 1 << 8;
192 private static final int STATE_UNWRAP_REENTRY = 1 << 9;
193
194 /**
195 * <a href="https://tools.ietf.org/html/rfc5246#section-6.2">2^14</a> which is the maximum sized plaintext chunk
196 * allowed by the TLS RFC.
197 */
198 private static final int MAX_PLAINTEXT_LENGTH = 16 * 1024;
199
200 private enum SslEngineType {
201 TCNATIVE(true, COMPOSITE_CUMULATOR) {
202 @Override
203 SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException {
204 int nioBufferCount = in.nioBufferCount();
205 int writerIndex = out.writerIndex();
206 final SSLEngineResult result;
207 if (nioBufferCount > 1) {
208 /*
209 * If {@link OpenSslEngine} is in use,
210 * we can use a special {@link OpenSslEngine#unwrap(ByteBuffer[], ByteBuffer[])} method
211 * that accepts multiple {@link ByteBuffer}s without additional memory copies.
212 */
213 ReferenceCountedOpenSslEngine opensslEngine = (ReferenceCountedOpenSslEngine) handler.engine;
214 try {
215 handler.singleBuffer[0] = toByteBuffer(out, writerIndex, out.writableBytes());
216 result = opensslEngine.unwrap(in.nioBuffers(in.readerIndex(), len), handler.singleBuffer);
217 } finally {
218 handler.singleBuffer[0] = null;
219 }
220 } else {
221 result = handler.engine.unwrap(toByteBuffer(in, in.readerIndex(), len),
222 toByteBuffer(out, writerIndex, out.writableBytes()));
223 }
224 out.writerIndex(writerIndex + result.bytesProduced());
225 return result;
226 }
227
228 @Override
229 ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
230 int pendingBytes, int numComponents) {
231 return allocator.directBuffer(((ReferenceCountedOpenSslEngine) handler.engine)
232 .calculateOutNetBufSize(pendingBytes, numComponents));
233 }
234
235 @Override
236 int calculateRequiredOutBufSpace(SslHandler handler, int pendingBytes, int numComponents) {
237 return ((ReferenceCountedOpenSslEngine) handler.engine)
238 .calculateMaxLengthForWrap(pendingBytes, numComponents);
239 }
240
241 @Override
242 int calculatePendingData(SslHandler handler, int guess) {
243 int sslPending = ((ReferenceCountedOpenSslEngine) handler.engine).sslPending();
244 return sslPending > 0 ? sslPending : guess;
245 }
246
247 @Override
248 boolean jdkCompatibilityMode(SSLEngine engine) {
249 return ((ReferenceCountedOpenSslEngine) engine).jdkCompatibilityMode;
250 }
251 },
252 CONSCRYPT(true, COMPOSITE_CUMULATOR) {
253 @Override
254 SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException {
255 int nioBufferCount = in.nioBufferCount();
256 int writerIndex = out.writerIndex();
257 final SSLEngineResult result;
258 if (nioBufferCount > 1) {
259 /*
260 * Use a special unwrap method without additional memory copies.
261 */
262 try {
263 handler.singleBuffer[0] = toByteBuffer(out, writerIndex, out.writableBytes());
264 result = ((ConscryptAlpnSslEngine) handler.engine).unwrap(
265 in.nioBuffers(in.readerIndex(), len),
266 handler.singleBuffer);
267 } finally {
268 handler.singleBuffer[0] = null;
269 }
270 } else {
271 result = handler.engine.unwrap(toByteBuffer(in, in.readerIndex(), len),
272 toByteBuffer(out, writerIndex, out.writableBytes()));
273 }
274 out.writerIndex(writerIndex + result.bytesProduced());
275 return result;
276 }
277
278 @Override
279 ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
280 int pendingBytes, int numComponents) {
281 return allocator.directBuffer(
282 ((ConscryptAlpnSslEngine) handler.engine).calculateOutNetBufSize(pendingBytes, numComponents));
283 }
284
285 @Override
286 int calculateRequiredOutBufSpace(SslHandler handler, int pendingBytes, int numComponents) {
287 return ((ConscryptAlpnSslEngine) handler.engine)
288 .calculateRequiredOutBufSpace(pendingBytes, numComponents);
289 }
290
291 @Override
292 int calculatePendingData(SslHandler handler, int guess) {
293 return guess;
294 }
295
296 @Override
297 boolean jdkCompatibilityMode(SSLEngine engine) {
298 return true;
299 }
300 },
301 JDK(false, MERGE_CUMULATOR) {
302 @Override
303 SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException {
304 int writerIndex = out.writerIndex();
305 ByteBuffer inNioBuffer = getUnwrapInputBuffer(handler, toByteBuffer(in, in.readerIndex(), len));
306 int position = inNioBuffer.position();
307 final SSLEngineResult result = handler.engine.unwrap(inNioBuffer,
308 toByteBuffer(out, writerIndex, out.writableBytes()));
309 out.writerIndex(writerIndex + result.bytesProduced());
310
311 // This is a workaround for a bug in Android 5.0. Android 5.0 does not correctly update the
312 // SSLEngineResult.bytesConsumed() in some cases and just return 0.
313 //
314 // See:
315 // - https://android-review.googlesource.com/c/platform/external/conscrypt/+/122080
316 // - https://github.com/netty/netty/issues/7758
317 if (result.bytesConsumed() == 0) {
318 int consumed = inNioBuffer.position() - position;
319 if (consumed != result.bytesConsumed()) {
320 // Create a new SSLEngineResult with the correct bytesConsumed().
321 return new SSLEngineResult(
322 result.getStatus(), result.getHandshakeStatus(), consumed, result.bytesProduced());
323 }
324 }
325 return result;
326 }
327
328 private ByteBuffer getUnwrapInputBuffer(SslHandler handler, ByteBuffer inNioBuffer) {
329 int javaVersion = PlatformDependent.javaVersion();
330 if (javaVersion >= 22 && javaVersion < 25 && inNioBuffer.isDirect()) {
331 // Work-around for https://bugs.openjdk.org/browse/JDK-8357268
332 int remaining = inNioBuffer.remaining();
333 ByteBuffer copy = handler.unwrapInputCopy;
334 if (copy == null || copy.capacity() < remaining) {
335 handler.unwrapInputCopy = copy = ByteBuffer.allocate(remaining);
336 } else {
337 copy.clear();
338 }
339 copy.put(inNioBuffer).flip();
340 return copy;
341 }
342 return inNioBuffer;
343 }
344
345 @Override
346 ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
347 int pendingBytes, int numComponents) {
348 // For JDK we don't have a good source for the max wrap overhead. We need at least one packet buffer
349 // size, but may be able to fit more in based on the total requested.
350 return allocator.heapBuffer(Math.max(pendingBytes, handler.engine.getSession().getPacketBufferSize()));
351 }
352
353 @Override
354 int calculateRequiredOutBufSpace(SslHandler handler, int pendingBytes, int numComponents) {
355 // As for the JDK SSLEngine we always need to operate on buffer space required by the SSLEngine
356 // (normally ~16KB). This is required even if the amount of data to encrypt is very small. Use heap
357 // buffers to reduce the native memory usage.
358 //
359 // Beside this the JDK SSLEngine also (as of today) will do an extra heap to direct buffer copy
360 // if a direct buffer is used as its internals operate on byte[].
361 return handler.engine.getSession().getPacketBufferSize();
362 }
363
364 @Override
365 int calculatePendingData(SslHandler handler, int guess) {
366 return guess;
367 }
368
369 @Override
370 boolean jdkCompatibilityMode(SSLEngine engine) {
371 return true;
372 }
373 };
374
375 static SslEngineType forEngine(SSLEngine engine) {
376 return engine instanceof ReferenceCountedOpenSslEngine ? TCNATIVE :
377 engine instanceof ConscryptAlpnSslEngine ? CONSCRYPT : JDK;
378 }
379
380 SslEngineType(boolean wantsDirectBuffer, Cumulator cumulator) {
381 this.wantsDirectBuffer = wantsDirectBuffer;
382 this.cumulator = cumulator;
383 }
384
385 abstract SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException;
386
387 abstract int calculatePendingData(SslHandler handler, int guess);
388
389 abstract boolean jdkCompatibilityMode(SSLEngine engine);
390
391 abstract ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
392 int pendingBytes, int numComponents);
393
394 abstract int calculateRequiredOutBufSpace(SslHandler handler, int pendingBytes, int numComponents);
395
396 // BEGIN Platform-dependent flags
397
398 /**
399 * {@code true} if and only if {@link SSLEngine} expects a direct buffer and so if a heap buffer
400 * is given will make an extra memory copy.
401 */
402 final boolean wantsDirectBuffer;
403
404 // END Platform-dependent flags
405
406 /**
407 * When using JDK {@link SSLEngine}, we use {@link #MERGE_CUMULATOR} because it works only with
408 * one {@link ByteBuffer}.
409 *
410 * When using {@link OpenSslEngine}, we can use {@link #COMPOSITE_CUMULATOR} because it has
411 * {@link OpenSslEngine#unwrap(ByteBuffer[], ByteBuffer[])} which works with multiple {@link ByteBuffer}s
412 * and which does not need to do extra memory copies.
413 */
414 final Cumulator cumulator;
415 }
416
417 private volatile ChannelHandlerContext ctx;
418 private final SSLEngine engine;
419 private final SslEngineType engineType;
420 private final Executor delegatedTaskExecutor;
421 private final boolean jdkCompatibilityMode;
422
423 /**
424 * Used if {@link SSLEngine#wrap(ByteBuffer[], ByteBuffer)} and {@link SSLEngine#unwrap(ByteBuffer, ByteBuffer[])}
425 * should be called with a {@link ByteBuf} that is only backed by one {@link ByteBuffer} to reduce the object
426 * creation.
427 */
428 private final ByteBuffer[] singleBuffer = new ByteBuffer[1];
429 private ByteBuffer unwrapInputCopy;
430
431 private final boolean startTls;
432 private final ResumptionController resumptionController;
433
434 private final SslTasksRunner sslTaskRunnerForUnwrap = new SslTasksRunner(true);
435 private final SslTasksRunner sslTaskRunner = new SslTasksRunner(false);
436
437 private SslHandlerCoalescingBufferQueue pendingUnencryptedWrites;
438 private Promise<Channel> handshakePromise = new LazyChannelPromise();
439 private final LazyChannelPromise sslClosePromise = new LazyChannelPromise();
440
441 private int packetLength;
442 private short state;
443
444 private volatile long handshakeTimeoutMillis = 10000;
445 private volatile long closeNotifyFlushTimeoutMillis = 3000;
446 private volatile long closeNotifyReadTimeoutMillis;
447 volatile int wrapDataSize = MAX_PLAINTEXT_LENGTH;
448
449 /**
450 * Creates a new instance which runs all delegated tasks directly on the {@link EventExecutor}.
451 *
452 * @param engine the {@link SSLEngine} this handler will use
453 */
454 public SslHandler(SSLEngine engine) {
455 this(engine, false);
456 }
457
458 /**
459 * Creates a new instance which runs all delegated tasks directly on the {@link EventExecutor}.
460 *
461 * @param engine the {@link SSLEngine} this handler will use
462 * @param startTls {@code true} if the first write request shouldn't be
463 * encrypted by the {@link SSLEngine}
464 */
465 public SslHandler(SSLEngine engine, boolean startTls) {
466 this(engine, startTls, ImmediateExecutor.INSTANCE);
467 }
468
469 /**
470 * Creates a new instance.
471 *
472 * @param engine the {@link SSLEngine} this handler will use
473 * @param delegatedTaskExecutor the {@link Executor} that will be used to execute tasks that are returned by
474 * {@link SSLEngine#getDelegatedTask()}.
475 */
476 public SslHandler(SSLEngine engine, Executor delegatedTaskExecutor) {
477 this(engine, false, delegatedTaskExecutor);
478 }
479
480 /**
481 * Creates a new instance.
482 *
483 * @param engine the {@link SSLEngine} this handler will use
484 * @param startTls {@code true} if the first write request shouldn't be
485 * encrypted by the {@link SSLEngine}
486 * @param delegatedTaskExecutor the {@link Executor} that will be used to execute tasks that are returned by
487 * {@link SSLEngine#getDelegatedTask()}.
488 */
489 public SslHandler(SSLEngine engine, boolean startTls, Executor delegatedTaskExecutor) {
490 this(engine, startTls, delegatedTaskExecutor, null);
491 }
492
493 SslHandler(SSLEngine engine, boolean startTls, Executor delegatedTaskExecutor,
494 ResumptionController resumptionController) {
495 this.engine = ObjectUtil.checkNotNull(engine, "engine");
496 this.delegatedTaskExecutor = ObjectUtil.checkNotNull(delegatedTaskExecutor, "delegatedTaskExecutor");
497 engineType = SslEngineType.forEngine(engine);
498 this.startTls = startTls;
499 this.jdkCompatibilityMode = engineType.jdkCompatibilityMode(engine);
500 setCumulator(engineType.cumulator);
501 this.resumptionController = resumptionController;
502 }
503
504 public long getHandshakeTimeoutMillis() {
505 return handshakeTimeoutMillis;
506 }
507
508 public void setHandshakeTimeout(long handshakeTimeout, TimeUnit unit) {
509 checkNotNull(unit, "unit");
510 setHandshakeTimeoutMillis(unit.toMillis(handshakeTimeout));
511 }
512
513 public void setHandshakeTimeoutMillis(long handshakeTimeoutMillis) {
514 this.handshakeTimeoutMillis = checkPositiveOrZero(handshakeTimeoutMillis, "handshakeTimeoutMillis");
515 }
516
517 /**
518 * Sets the number of bytes to pass to each {@link SSLEngine#wrap(ByteBuffer[], int, int, ByteBuffer)} call.
519 * <p>
520 * This value will partition data which is passed to write
521 * {@link #write(ChannelHandlerContext, Object, ChannelPromise)}. The partitioning will work as follows:
522 * <ul>
523 * <li>If {@code wrapDataSize <= 0} then we will write each data chunk as is.</li>
524 * <li>If {@code wrapDataSize > data size} then we will attempt to aggregate multiple data chunks together.</li>
525 * <li>If {@code wrapDataSize > data size} Else if {@code wrapDataSize <= data size} then we will divide the data
526 * into chunks of {@code wrapDataSize} when writing.</li>
527 * </ul>
528 * <p>
529 * If the {@link SSLEngine} doesn't support a gather wrap operation (e.g. {@link SslProvider#OPENSSL}) then
530 * aggregating data before wrapping can help reduce the ratio between TLS overhead vs data payload which will lead
531 * to better goodput. Writing fixed chunks of data can also help target the underlying transport's (e.g. TCP)
532 * frame size. Under lossy/congested network conditions this may help the peer get full TLS packets earlier and
533 * be able to do work sooner, as opposed to waiting for the all the pieces of the TLS packet to arrive.
534 * @param wrapDataSize the number of bytes which will be passed to each
535 * {@link SSLEngine#wrap(ByteBuffer[], int, int, ByteBuffer)} call.
536 */
537 @UnstableApi
538 public final void setWrapDataSize(int wrapDataSize) {
539 this.wrapDataSize = wrapDataSize;
540 }
541
542 /**
543 * @deprecated use {@link #getCloseNotifyFlushTimeoutMillis()}
544 */
545 @Deprecated
546 public long getCloseNotifyTimeoutMillis() {
547 return getCloseNotifyFlushTimeoutMillis();
548 }
549
550 /**
551 * @deprecated use {@link #setCloseNotifyFlushTimeout(long, TimeUnit)}
552 */
553 @Deprecated
554 public void setCloseNotifyTimeout(long closeNotifyTimeout, TimeUnit unit) {
555 setCloseNotifyFlushTimeout(closeNotifyTimeout, unit);
556 }
557
558 /**
559 * @deprecated use {@link #setCloseNotifyFlushTimeoutMillis(long)}
560 */
561 @Deprecated
562 public void setCloseNotifyTimeoutMillis(long closeNotifyFlushTimeoutMillis) {
563 setCloseNotifyFlushTimeoutMillis(closeNotifyFlushTimeoutMillis);
564 }
565
566 /**
567 * Gets the timeout for flushing the close_notify that was triggered by closing the
568 * {@link Channel}. If the close_notify was not flushed in the given timeout the {@link Channel} will be closed
569 * forcibly.
570 */
571 public final long getCloseNotifyFlushTimeoutMillis() {
572 return closeNotifyFlushTimeoutMillis;
573 }
574
575 /**
576 * Sets the timeout for flushing the close_notify that was triggered by closing the
577 * {@link Channel}. If the close_notify was not flushed in the given timeout the {@link Channel} will be closed
578 * forcibly.
579 */
580 public final void setCloseNotifyFlushTimeout(long closeNotifyFlushTimeout, TimeUnit unit) {
581 setCloseNotifyFlushTimeoutMillis(unit.toMillis(closeNotifyFlushTimeout));
582 }
583
584 /**
585 * See {@link #setCloseNotifyFlushTimeout(long, TimeUnit)}.
586 */
587 public final void setCloseNotifyFlushTimeoutMillis(long closeNotifyFlushTimeoutMillis) {
588 this.closeNotifyFlushTimeoutMillis = checkPositiveOrZero(closeNotifyFlushTimeoutMillis,
589 "closeNotifyFlushTimeoutMillis");
590 }
591
592 /**
593 * Gets the timeout (in ms) for receiving the response for the close_notify that was triggered by closing the
594 * {@link Channel}. This timeout starts after the close_notify message was successfully written to the
595 * remote peer. Use {@code 0} to directly close the {@link Channel} and not wait for the response.
596 */
597 public final long getCloseNotifyReadTimeoutMillis() {
598 return closeNotifyReadTimeoutMillis;
599 }
600
601 /**
602 * Sets the timeout for receiving the response for the close_notify that was triggered by closing the
603 * {@link Channel}. This timeout starts after the close_notify message was successfully written to the
604 * remote peer. Use {@code 0} to directly close the {@link Channel} and not wait for the response.
605 */
606 public final void setCloseNotifyReadTimeout(long closeNotifyReadTimeout, TimeUnit unit) {
607 setCloseNotifyReadTimeoutMillis(unit.toMillis(closeNotifyReadTimeout));
608 }
609
610 /**
611 * See {@link #setCloseNotifyReadTimeout(long, TimeUnit)}.
612 */
613 public final void setCloseNotifyReadTimeoutMillis(long closeNotifyReadTimeoutMillis) {
614 this.closeNotifyReadTimeoutMillis = checkPositiveOrZero(closeNotifyReadTimeoutMillis,
615 "closeNotifyReadTimeoutMillis");
616 }
617
618 /**
619 * Returns the {@link SSLEngine} which is used by this handler.
620 */
621 public SSLEngine engine() {
622 return engine;
623 }
624
625 /**
626 * Returns the name of the current application-level protocol.
627 *
628 * @return the protocol name or {@code null} if application-level protocol has not been negotiated
629 */
630 public String applicationProtocol() {
631 SSLEngine engine = engine();
632 if (!(engine instanceof ApplicationProtocolAccessor)) {
633 return null;
634 }
635
636 return ((ApplicationProtocolAccessor) engine).getNegotiatedApplicationProtocol();
637 }
638
639 /**
640 * Returns a {@link Future} that will get notified once the current TLS handshake completes.
641 *
642 * @return the {@link Future} for the initial TLS handshake if {@link #renegotiate()} was not invoked.
643 * The {@link Future} for the most recent {@linkplain #renegotiate() TLS renegotiation} otherwise.
644 */
645 public Future<Channel> handshakeFuture() {
646 return handshakePromise;
647 }
648
649 /**
650 * Use {@link #closeOutbound()}
651 */
652 @Deprecated
653 public ChannelFuture close() {
654 return closeOutbound();
655 }
656
657 /**
658 * Use {@link #closeOutbound(ChannelPromise)}
659 */
660 @Deprecated
661 public ChannelFuture close(ChannelPromise promise) {
662 return closeOutbound(promise);
663 }
664
665 /**
666 * Sends an SSL {@code close_notify} message to the specified channel and
667 * destroys the underlying {@link SSLEngine}. This will <strong>not</strong> close the underlying
668 * {@link Channel}. If you want to also close the {@link Channel} use {@link Channel#close()} or
669 * {@link ChannelHandlerContext#close()}
670 */
671 public ChannelFuture closeOutbound() {
672 return closeOutbound(ctx.newPromise());
673 }
674
675 /**
676 * Sends an SSL {@code close_notify} message to the specified channel and
677 * destroys the underlying {@link SSLEngine}. This will <strong>not</strong> close the underlying
678 * {@link Channel}. If you want to also close the {@link Channel} use {@link Channel#close()} or
679 * {@link ChannelHandlerContext#close()}
680 */
681 public ChannelFuture closeOutbound(final ChannelPromise promise) {
682 final ChannelHandlerContext ctx = this.ctx;
683 if (ctx.executor().inEventLoop()) {
684 closeOutbound0(promise);
685 } else {
686 ctx.executor().execute(new Runnable() {
687 @Override
688 public void run() {
689 closeOutbound0(promise);
690 }
691 });
692 }
693 return promise;
694 }
695
696 private void closeOutbound0(ChannelPromise promise) {
697 setState(STATE_OUTBOUND_CLOSED);
698 engine.closeOutbound();
699 try {
700 flush(ctx, promise);
701 } catch (Exception e) {
702 if (!promise.tryFailure(e)) {
703 logger.warn("{} flush() raised a masked exception.", ctx.channel(), e);
704 }
705 }
706 }
707
708 /**
709 * Return the {@link Future} that will get notified if the inbound of the {@link SSLEngine} is closed.
710 *
711 * This method will return the same {@link Future} all the time.
712 *
713 * @see SSLEngine
714 */
715 public Future<Channel> sslCloseFuture() {
716 return sslClosePromise;
717 }
718
719 @Override
720 public void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
721 try {
722 if (pendingUnencryptedWrites != null && !pendingUnencryptedWrites.isEmpty()) {
723 // Check if queue is not empty first because create a new ChannelException is expensive
724 pendingUnencryptedWrites.releaseAndFailAll(ctx,
725 new ChannelException("Pending write on removal of SslHandler"));
726 }
727 pendingUnencryptedWrites = null;
728
729 SSLException cause = null;
730
731 // If the handshake or SSLEngine closure is not done yet we should fail corresponding promise and
732 // notify the rest of the
733 // pipeline.
734 if (!handshakePromise.isDone()) {
735 cause = new SSLHandshakeException("SslHandler removed before handshake completed");
736 if (handshakePromise.tryFailure(cause)) {
737 ctx.fireUserEventTriggered(new SslHandshakeCompletionEvent(cause));
738 }
739 }
740 if (!sslClosePromise.isDone()) {
741 if (cause == null) {
742 cause = new SSLException("SslHandler removed before SSLEngine was closed");
743 }
744 notifyClosePromise(cause);
745 }
746 } finally {
747 ReferenceCountUtil.release(engine);
748 }
749 }
750
751 @Override
752 public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
753 ctx.bind(localAddress, promise);
754 }
755
756 @Override
757 public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
758 ChannelPromise promise) throws Exception {
759 ctx.connect(remoteAddress, localAddress, promise);
760 }
761
762 @Override
763 public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
764 ctx.deregister(promise);
765 }
766
767 @Override
768 public void disconnect(final ChannelHandlerContext ctx,
769 final ChannelPromise promise) throws Exception {
770 closeOutboundAndChannel(ctx, promise, true);
771 }
772
773 @Override
774 public void close(final ChannelHandlerContext ctx,
775 final ChannelPromise promise) throws Exception {
776 closeOutboundAndChannel(ctx, promise, false);
777 }
778
779 @Override
780 public void read(ChannelHandlerContext ctx) throws Exception {
781 if (!handshakePromise.isDone()) {
782 setState(STATE_READ_DURING_HANDSHAKE);
783 }
784
785 ctx.read();
786 }
787
788 private static IllegalStateException newPendingWritesNullException() {
789 return new IllegalStateException("pendingUnencryptedWrites is null, handlerRemoved0 called?");
790 }
791
792 @Override
793 public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
794 if (!(msg instanceof ByteBuf)) {
795 UnsupportedMessageTypeException exception = new UnsupportedMessageTypeException(msg, ByteBuf.class);
796 ReferenceCountUtil.safeRelease(msg);
797 promise.setFailure(exception);
798 } else if (pendingUnencryptedWrites == null) {
799 ReferenceCountUtil.safeRelease(msg);
800 promise.setFailure(newPendingWritesNullException());
801 } else {
802 pendingUnencryptedWrites.add((ByteBuf) msg, promise);
803 }
804 }
805
806 @Override
807 public void flush(ChannelHandlerContext ctx) throws Exception {
808 // Do not encrypt the first write request if this handler is
809 // created with startTLS flag turned on.
810 if (startTls && !isStateSet(STATE_SENT_FIRST_MESSAGE)) {
811 setState(STATE_SENT_FIRST_MESSAGE);
812 pendingUnencryptedWrites.writeAndRemoveAll(ctx);
813 forceFlush(ctx);
814 // Explicit start handshake processing once we send the first message. This will also ensure
815 // we will schedule the timeout if needed.
816 startHandshakeProcessing(true);
817 return;
818 }
819
820 if (isStateSet(STATE_PROCESS_TASK)) {
821 return;
822 }
823
824 try {
825 wrapAndFlush(ctx);
826 } catch (Throwable cause) {
827 setHandshakeFailure(ctx, cause);
828 PlatformDependent.throwException(cause);
829 }
830 }
831
832 private void wrapAndFlush(ChannelHandlerContext ctx) throws SSLException {
833 if (pendingUnencryptedWrites.isEmpty()) {
834 // It's important to NOT use a voidPromise here as the user
835 // may want to add a ChannelFutureListener to the ChannelPromise later.
836 //
837 // See https://github.com/netty/netty/issues/3364
838 pendingUnencryptedWrites.add(Unpooled.EMPTY_BUFFER, ctx.newPromise());
839 }
840 if (!handshakePromise.isDone()) {
841 setState(STATE_FLUSHED_BEFORE_HANDSHAKE);
842 }
843 try {
844 wrap(ctx, false);
845 } finally {
846 // We may have written some parts of data before an exception was thrown so ensure we always flush.
847 // See https://github.com/netty/netty/issues/3900#issuecomment-172481830
848 forceFlush(ctx);
849 }
850 }
851
852 // This method will not call setHandshakeFailure(...) !
853 private void wrap(ChannelHandlerContext ctx, boolean inUnwrap) throws SSLException {
854 ByteBuf out = null;
855 ByteBufAllocator alloc = ctx.alloc();
856 try {
857 final int wrapDataSize = this.wrapDataSize;
858 // Only continue to loop if the handler was not removed in the meantime.
859 // See https://github.com/netty/netty/issues/5860
860 outer: while (!ctx.isRemoved()) {
861 ChannelPromise promise = ctx.newPromise();
862 ByteBuf buf = wrapDataSize > 0 ?
863 pendingUnencryptedWrites.remove(alloc, wrapDataSize, promise) :
864 pendingUnencryptedWrites.removeFirst(promise);
865 if (buf == null) {
866 break;
867 }
868
869 SSLEngineResult result;
870
871 try {
872 if (buf.readableBytes() > MAX_PLAINTEXT_LENGTH) {
873 // If we pulled a buffer larger than the supported packet size, we can slice it up and
874 // iteratively, encrypting multiple packets into a single larger buffer. This substantially
875 // saves on allocations for large responses. Here we estimate how large of a buffer we need.
876 // If we overestimate a bit, that's fine. If we underestimate, we'll simply re-enqueue the
877 // remaining buffer and get it on the next outer loop.
878 int readableBytes = buf.readableBytes();
879 int numPackets = readableBytes / MAX_PLAINTEXT_LENGTH;
880 if (readableBytes % MAX_PLAINTEXT_LENGTH != 0) {
881 numPackets += 1;
882 }
883
884 if (out == null) {
885 out = allocateOutNetBuf(ctx, readableBytes, buf.nioBufferCount() + numPackets);
886 }
887 result = wrapMultiple(alloc, engine, buf, out);
888 } else {
889 if (out == null) {
890 out = allocateOutNetBuf(ctx, buf.readableBytes(), buf.nioBufferCount());
891 }
892 result = wrap(alloc, engine, buf, out);
893 }
894 } catch (SSLException e) {
895 // Either wrapMultiple(...) or wrap(...) did throw. In this case we need to release the buffer
896 // that we removed from pendingUnencryptedWrites before failing the promise and rethrowing it.
897 // Failing to do so would result in a buffer leak.
898 // See https://github.com/netty/netty/issues/14644
899 //
900 // We don't need to release out here as this is done in a finally block already.
901 buf.release();
902 promise.setFailure(e);
903 throw e;
904 }
905
906 if (buf.isReadable()) {
907 pendingUnencryptedWrites.addFirst(buf, promise);
908 // When we add the buffer/promise pair back we need to be sure we don't complete the promise
909 // later. We only complete the promise if the buffer is completely consumed.
910 promise = null;
911 } else {
912 buf.release();
913 }
914
915 // We need to write any data before we invoke any methods which may trigger re-entry, otherwise
916 // writes may occur out of order and TLS sequencing may be off (e.g. SSLV3_ALERT_BAD_RECORD_MAC).
917 if (out.isReadable()) {
918 final ByteBuf b = out;
919 out = null;
920 if (promise != null) {
921 ctx.write(b, promise);
922 } else {
923 ctx.write(b);
924 }
925 } else if (promise != null) {
926 ctx.write(Unpooled.EMPTY_BUFFER, promise);
927 }
928 // else out is not readable we can re-use it and so save an extra allocation
929
930 if (result.getStatus() == Status.CLOSED) {
931 // First check if there is any write left that needs to be failed, if there is none we don't need
932 // to create a new exception or obtain an existing one.
933 if (!pendingUnencryptedWrites.isEmpty()) {
934 // Make a best effort to preserve any exception that way previously encountered from the
935 // handshake or the transport, else fallback to a general error.
936 Throwable exception = handshakePromise.cause();
937 if (exception == null) {
938 exception = sslClosePromise.cause();
939 if (exception == null) {
940 exception = new SslClosedEngineException("SSLEngine closed already");
941 }
942 }
943 pendingUnencryptedWrites.releaseAndFailAll(ctx, exception);
944 }
945
946 return;
947 } else {
948 switch (result.getHandshakeStatus()) {
949 case NEED_TASK:
950 if (!runDelegatedTasks(inUnwrap)) {
951 // We scheduled a task on the delegatingTaskExecutor, so stop processing as we will
952 // resume once the task completes.
953 break outer;
954 }
955 break;
956 case FINISHED:
957 case NOT_HANDSHAKING: // work around for android bug that skips the FINISHED state.
958 setHandshakeSuccess();
959 break;
960 case NEED_WRAP:
961 // If we are expected to wrap again and we produced some data we need to ensure there
962 // is something in the queue to process as otherwise we will not try again before there
963 // was more added. Failing to do so may fail to produce an alert that can be
964 // consumed by the remote peer.
965 if (result.bytesProduced() > 0 && pendingUnencryptedWrites.isEmpty()) {
966 pendingUnencryptedWrites.add(Unpooled.EMPTY_BUFFER);
967 }
968 break;
969 case NEED_UNWRAP:
970 // The underlying engine is starving so we need to feed it with more data.
971 // See https://github.com/netty/netty/pull/5039
972 readIfNeeded(ctx);
973 return;
974 default:
975 throw new IllegalStateException(
976 "Unknown handshake status: " + result.getHandshakeStatus());
977 }
978 }
979 }
980 } finally {
981 if (out != null) {
982 out.release();
983 }
984 if (inUnwrap) {
985 setState(STATE_NEEDS_FLUSH);
986 }
987 }
988 }
989
990 /**
991 * This method will not call
992 * {@link #setHandshakeFailure(ChannelHandlerContext, Throwable, boolean, boolean, boolean)} or
993 * {@link #setHandshakeFailure(ChannelHandlerContext, Throwable)}.
994 * @return {@code true} if this method ends on {@link SSLEngineResult.HandshakeStatus#NOT_HANDSHAKING}.
995 */
996 private boolean wrapNonAppData(final ChannelHandlerContext ctx, boolean inUnwrap) throws SSLException {
997 ByteBuf out = null;
998 ByteBufAllocator alloc = ctx.alloc();
999 try {
1000 // Only continue to loop if the handler was not removed in the meantime.
1001 // See https://github.com/netty/netty/issues/5860
1002 outer: while (!ctx.isRemoved()) {
1003 if (out == null) {
1004 // As this is called for the handshake we have no real idea how big the buffer needs to be.
1005 // That said 2048 should give us enough room to include everything like ALPN / NPN data.
1006 // If this is not enough we will increase the buffer in wrap(...).
1007 out = allocateOutNetBuf(ctx, 2048, 1);
1008 }
1009 SSLEngineResult result = wrap(alloc, engine, Unpooled.EMPTY_BUFFER, out);
1010 if (result.bytesProduced() > 0) {
1011 ctx.write(out).addListener(new ChannelFutureListener() {
1012 @Override
1013 public void operationComplete(ChannelFuture future) {
1014 Throwable cause = future.cause();
1015 if (cause != null) {
1016 setHandshakeFailureTransportFailure(ctx, cause);
1017 }
1018 }
1019 });
1020 if (inUnwrap) {
1021 setState(STATE_NEEDS_FLUSH);
1022 }
1023 out = null;
1024 }
1025
1026 HandshakeStatus status = result.getHandshakeStatus();
1027 switch (status) {
1028 case FINISHED:
1029 // We may be here because we read data and discovered the remote peer initiated a renegotiation
1030 // and this write is to complete the new handshake. The user may have previously done a
1031 // writeAndFlush which wasn't able to wrap data due to needing the pending handshake, so we
1032 // attempt to wrap application data here if any is pending.
1033 if (setHandshakeSuccess() && inUnwrap && !pendingUnencryptedWrites.isEmpty()) {
1034 wrap(ctx, true);
1035 }
1036 return false;
1037 case NEED_TASK:
1038 if (!runDelegatedTasks(inUnwrap)) {
1039 // We scheduled a task on the delegatingTaskExecutor, so stop processing as we will
1040 // resume once the task completes.
1041 break outer;
1042 }
1043 break;
1044 case NEED_UNWRAP:
1045 if (inUnwrap || unwrapNonAppData(ctx) <= 0) {
1046 // If we asked for a wrap, the engine requested an unwrap, and we are in unwrap there is
1047 // no use in trying to call wrap again because we have already attempted (or will after we
1048 // return) to feed more data to the engine.
1049 return false;
1050 }
1051 break;
1052 case NEED_WRAP:
1053 break;
1054 case NOT_HANDSHAKING:
1055 if (setHandshakeSuccess() && inUnwrap && !pendingUnencryptedWrites.isEmpty()) {
1056 wrap(ctx, true);
1057 }
1058 // Workaround for TLS False Start problem reported at:
1059 // https://github.com/netty/netty/issues/1108#issuecomment-14266970
1060 if (!inUnwrap) {
1061 unwrapNonAppData(ctx);
1062 }
1063 return true;
1064 default:
1065 throw new IllegalStateException("Unknown handshake status: " + result.getHandshakeStatus());
1066 }
1067
1068 // Check if did not produce any bytes and if so break out of the loop, but only if we did not process
1069 // a task as last action. It's fine to not produce any data as part of executing a task.
1070 if (result.bytesProduced() == 0 && status != HandshakeStatus.NEED_TASK) {
1071 break;
1072 }
1073
1074 // It should not consume empty buffers when it is not handshaking
1075 // Fix for Android, where it was encrypting empty buffers even when not handshaking
1076 if (result.bytesConsumed() == 0 && result.getHandshakeStatus() == HandshakeStatus.NOT_HANDSHAKING) {
1077 break;
1078 }
1079 }
1080 } finally {
1081 if (out != null) {
1082 out.release();
1083 }
1084 }
1085 return false;
1086 }
1087
1088 private SSLEngineResult wrapMultiple(ByteBufAllocator alloc, SSLEngine engine, ByteBuf in, ByteBuf out)
1089 throws SSLException {
1090 SSLEngineResult result = null;
1091
1092 do {
1093 int nextSliceSize = Math.min(MAX_PLAINTEXT_LENGTH, in.readableBytes());
1094 // This call over-estimates, because we are slicing and not every nioBuffer will be part of
1095 // every slice. We could improve the estimate by having an nioBufferCount(offset, length).
1096 int nextOutSize = engineType.calculateRequiredOutBufSpace(this, nextSliceSize, in.nioBufferCount());
1097
1098 if (!out.isWritable(nextOutSize)) {
1099 if (result != null) {
1100 // We underestimated the space needed to encrypt the entire in buf. Break out, and
1101 // upstream will re-enqueue the buffer for later.
1102 break;
1103 }
1104 // This shouldn't happen, as the out buf was properly sized for at least packetLength
1105 // prior to calling wrap.
1106 out.ensureWritable(nextOutSize);
1107 }
1108
1109 ByteBuf wrapBuf = in.readSlice(nextSliceSize);
1110 result = wrap(alloc, engine, wrapBuf, out);
1111
1112 if (result.getStatus() == Status.CLOSED) {
1113 // If the engine gets closed, we can exit out early. Otherwise, we'll do a full handling of
1114 // possible results once finished.
1115 break;
1116 }
1117
1118 if (wrapBuf.isReadable()) {
1119 // There may be some left-over, in which case we can just pick it up next loop, so reset the original
1120 // reader index so its included again in the next slice.
1121 in.readerIndex(in.readerIndex() - wrapBuf.readableBytes());
1122 }
1123 } while (in.readableBytes() > 0);
1124
1125 return result;
1126 }
1127
1128 private SSLEngineResult wrap(ByteBufAllocator alloc, SSLEngine engine, ByteBuf in, ByteBuf out)
1129 throws SSLException {
1130 ByteBuf newDirectIn = null;
1131 try {
1132 int readerIndex = in.readerIndex();
1133 int readableBytes = in.readableBytes();
1134
1135 // We will call SslEngine.wrap(ByteBuffer[], ByteBuffer) to allow efficient handling of
1136 // CompositeByteBuf without force an extra memory copy when CompositeByteBuffer.nioBuffer() is called.
1137 final ByteBuffer[] in0;
1138 if (in.isDirect() || !engineType.wantsDirectBuffer) {
1139 // As CompositeByteBuf.nioBufferCount() can be expensive (as it needs to check all composed ByteBuf
1140 // to calculate the count) we will just assume a CompositeByteBuf contains more then 1 ByteBuf.
1141 // The worst that can happen is that we allocate an extra ByteBuffer[] in CompositeByteBuf.nioBuffers()
1142 // which is better then walking the composed ByteBuf in most cases.
1143 if (!(in instanceof CompositeByteBuf) && in.nioBufferCount() == 1) {
1144 in0 = singleBuffer;
1145 // We know its only backed by 1 ByteBuffer so use internalNioBuffer to keep object allocation
1146 // to a minimum.
1147 in0[0] = in.internalNioBuffer(readerIndex, readableBytes);
1148 } else {
1149 in0 = in.nioBuffers();
1150 }
1151 } else {
1152 // We could even go further here and check if its a CompositeByteBuf and if so try to decompose it and
1153 // only replace the ByteBuffer that are not direct. At the moment we just will replace the whole
1154 // CompositeByteBuf to keep the complexity to a minimum
1155 newDirectIn = alloc.directBuffer(readableBytes);
1156 newDirectIn.writeBytes(in, readerIndex, readableBytes);
1157 in0 = singleBuffer;
1158 in0[0] = newDirectIn.internalNioBuffer(newDirectIn.readerIndex(), readableBytes);
1159 }
1160
1161 for (;;) {
1162 // Use toByteBuffer(...) which might be able to return the internal ByteBuffer and so reduce
1163 // allocations.
1164 ByteBuffer out0 = toByteBuffer(out, out.writerIndex(), out.writableBytes());
1165 SSLEngineResult result = engine.wrap(in0, out0);
1166 in.skipBytes(result.bytesConsumed());
1167 out.writerIndex(out.writerIndex() + result.bytesProduced());
1168
1169 if (result.getStatus() == Status.BUFFER_OVERFLOW) {
1170 out.ensureWritable(engine.getSession().getPacketBufferSize());
1171 } else {
1172 return result;
1173 }
1174 }
1175 } finally {
1176 // Null out to allow GC of ByteBuffer
1177 singleBuffer[0] = null;
1178
1179 if (newDirectIn != null) {
1180 newDirectIn.release();
1181 }
1182 }
1183 }
1184
1185 @Override
1186 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
1187 boolean handshakeFailed = handshakePromise.cause() != null;
1188
1189 // Channel closed, we will generate 'ClosedChannelException' now.
1190 ClosedChannelException exception = new ClosedChannelException();
1191
1192 // Add a supressed exception if the handshake was not completed yet.
1193 if (isStateSet(STATE_HANDSHAKE_STARTED) && !handshakePromise.isDone()) {
1194 ThrowableUtil.addSuppressed(exception, StacklessSSLHandshakeException.newInstance(
1195 "Connection closed while SSL/TLS handshake was in progress",
1196 SslHandler.class, "channelInactive"));
1197 }
1198
1199 // Make sure to release SSLEngine,
1200 // and notify the handshake future if the connection has been closed during handshake.
1201 setHandshakeFailure(ctx, exception, !isStateSet(STATE_OUTBOUND_CLOSED), isStateSet(STATE_HANDSHAKE_STARTED),
1202 false);
1203
1204 // Ensure we always notify the sslClosePromise as well
1205 notifyClosePromise(exception);
1206
1207 try {
1208 super.channelInactive(ctx);
1209 } catch (DecoderException e) {
1210 if (!handshakeFailed || !(e.getCause() instanceof SSLException)) {
1211 // We only rethrow the exception if the handshake did not fail before channelInactive(...) was called
1212 // as otherwise this may produce duplicated failures as super.channelInactive(...) will also call
1213 // channelRead(...).
1214 //
1215 // See https://github.com/netty/netty/issues/10119
1216 throw e;
1217 }
1218 }
1219 }
1220
1221 @Override
1222 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
1223 if (ignoreException(cause)) {
1224 // It is safe to ignore the 'connection reset by peer' or
1225 // 'broken pipe' error after sending close_notify.
1226 if (logger.isDebugEnabled()) {
1227 logger.debug(
1228 "{} Swallowing a harmless 'connection reset by peer / broken pipe' error that occurred " +
1229 "while writing close_notify in response to the peer's close_notify", ctx.channel(), cause);
1230 }
1231
1232 // Close the connection explicitly just in case the transport
1233 // did not close the connection automatically.
1234 if (ctx.channel().isActive()) {
1235 ctx.close();
1236 }
1237 } else {
1238 ctx.fireExceptionCaught(cause);
1239 }
1240 }
1241
1242 /**
1243 * Checks if the given {@link Throwable} can be ignore and just "swallowed"
1244 *
1245 * When an ssl connection is closed a close_notify message is sent.
1246 * After that the peer also sends close_notify however, it's not mandatory to receive
1247 * the close_notify. The party who sent the initial close_notify can close the connection immediately
1248 * then the peer will get connection reset error.
1249 *
1250 */
1251 private boolean ignoreException(Throwable t) {
1252 if (!(t instanceof SSLException) && t instanceof IOException && sslClosePromise.isDone()) {
1253 String message = t.getMessage();
1254
1255 // first try to match connection reset / broke peer based on the regex. This is the fastest way
1256 // but may fail on different jdk impls or OS's
1257 if (message != null && IGNORABLE_ERROR_MESSAGE.matcher(message).matches()) {
1258 return true;
1259 }
1260
1261 // Inspect the StackTraceElements to see if it was a connection reset / broken pipe or not
1262 StackTraceElement[] elements = t.getStackTrace();
1263 for (StackTraceElement element: elements) {
1264 String classname = element.getClassName();
1265 String methodname = element.getMethodName();
1266
1267 // skip all classes that belong to the io.netty package
1268 if (classname.startsWith("io.netty.")) {
1269 continue;
1270 }
1271
1272 // check if the method name is read if not skip it
1273 if (!"read".equals(methodname)) {
1274 continue;
1275 }
1276
1277 if (isIgnorableClassInStack(classname)) {
1278 return true;
1279 }
1280
1281 try {
1282 // No match by now. Try to load the class via classloader and inspect it.
1283 // This is mainly done as other JDK implementations may differ in name of
1284 // the impl.
1285 Class<?> clazz = PlatformDependent.getClassLoader(getClass()).loadClass(classname);
1286
1287 if (SocketChannel.class.isAssignableFrom(clazz)
1288 || DatagramChannel.class.isAssignableFrom(clazz)) {
1289 return true;
1290 }
1291
1292 // also match against SctpChannel via String matching as it may not present.
1293 if ("com.sun.nio.sctp.SctpChannel".equals(clazz.getSuperclass().getName())) {
1294 return true;
1295 }
1296 } catch (Throwable cause) {
1297 if (logger.isDebugEnabled()) {
1298 logger.debug("Unexpected exception while loading class {} classname {}",
1299 getClass(), classname, cause);
1300 }
1301 }
1302 }
1303 }
1304
1305 return false;
1306 }
1307
1308 private static boolean isIgnorableClassInStack(String classname) {
1309 return classname.contains("SocketChannel") ||
1310 classname.contains("DatagramChannel") ||
1311 classname.contains("SctpChannel") ||
1312 classname.contains("UdtChannel");
1313 }
1314
1315 /**
1316 * Returns {@code true} if the given {@link ByteBuf} is encrypted. Be aware that this method
1317 * will not increase the readerIndex of the given {@link ByteBuf}.
1318 *
1319 * @param buffer
1320 * The {@link ByteBuf} to read from. Be aware that it must have at least 5 bytes to read,
1321 * otherwise it will throw an {@link IllegalArgumentException}.
1322 * @return encrypted
1323 * {@code true} if the {@link ByteBuf} is encrypted, {@code false} otherwise.
1324 * @throws IllegalArgumentException
1325 * Is thrown if the given {@link ByteBuf} has not at least 5 bytes to read.
1326 * @deprecated use {@link #isEncrypted(ByteBuf, boolean)}.
1327 */
1328 @Deprecated
1329 public static boolean isEncrypted(ByteBuf buffer) {
1330 return isEncrypted(buffer, false);
1331 }
1332
1333 /**
1334 * Returns {@code true} if the given {@link ByteBuf} is encrypted. Be aware that this method
1335 * will not increase the readerIndex of the given {@link ByteBuf}.
1336 *
1337 * @param buffer
1338 * The {@link ByteBuf} to read from. Be aware that it must have at least 5 bytes to read,
1339 * otherwise it will throw an {@link IllegalArgumentException}.
1340 * @return encrypted
1341 * {@code true} if the {@link ByteBuf} is encrypted, {@code false} otherwise.
1342 * @param probeSSLv2
1343 * {@code true} if the input {@code buffer} might be SSLv2. If {@code true} is used this
1344 * methods might produce false-positives in some cases so it's strongly suggested to
1345 * use {@code false}.
1346 * @throws IllegalArgumentException
1347 * Is thrown if the given {@link ByteBuf} has not at least 5 bytes to read.
1348 */
1349 public static boolean isEncrypted(ByteBuf buffer, boolean probeSSLv2) {
1350 if (buffer.readableBytes() < SslUtils.SSL_RECORD_HEADER_LENGTH) {
1351 throw new IllegalArgumentException(
1352 "buffer must have at least " + SslUtils.SSL_RECORD_HEADER_LENGTH + " readable bytes");
1353 }
1354 return getEncryptedPacketLength(buffer, buffer.readerIndex(), probeSSLv2) != SslUtils.NOT_ENCRYPTED;
1355 }
1356
1357 private void decodeJdkCompatible(ChannelHandlerContext ctx, ByteBuf in) throws NotSslRecordException {
1358 int packetLength = this.packetLength;
1359 // If we calculated the length of the current SSL record before, use that information.
1360 if (packetLength > 0) {
1361 if (in.readableBytes() < packetLength) {
1362 return;
1363 }
1364 } else {
1365 // Get the packet length and wait until we get a packets worth of data to unwrap.
1366 final int readableBytes = in.readableBytes();
1367 if (readableBytes < SslUtils.SSL_RECORD_HEADER_LENGTH) {
1368 return;
1369 }
1370 packetLength = getEncryptedPacketLength(in, in.readerIndex(), true);
1371 if (packetLength == SslUtils.NOT_ENCRYPTED) {
1372 // Not an SSL/TLS packet
1373 NotSslRecordException e = new NotSslRecordException(
1374 "not an SSL/TLS record: " + ByteBufUtil.hexDump(in));
1375 in.skipBytes(in.readableBytes());
1376
1377 // First fail the handshake promise as we may need to have access to the SSLEngine which may
1378 // be released because the user will remove the SslHandler in an exceptionCaught(...) implementation.
1379 setHandshakeFailure(ctx, e);
1380
1381 throw e;
1382 }
1383 if (packetLength == NOT_ENOUGH_DATA) {
1384 return;
1385 }
1386 assert packetLength > 0;
1387 if (packetLength > readableBytes) {
1388 // wait until the whole packet can be read
1389 this.packetLength = packetLength;
1390 return;
1391 }
1392 }
1393
1394 // Reset the state of this class so we can get the length of the next packet. We assume the entire packet will
1395 // be consumed by the SSLEngine.
1396 this.packetLength = 0;
1397 try {
1398 final int bytesConsumed = unwrap(ctx, in, packetLength);
1399 if (bytesConsumed != packetLength && !engine.isInboundDone()) {
1400 // The JDK equivalent of getEncryptedPacketLength has some optimizations and can behave slightly
1401 // differently to ours, but this should always be a sign of bad input data.
1402 throw new NotSslRecordException();
1403 }
1404 } catch (Throwable cause) {
1405 handleUnwrapThrowable(ctx, cause);
1406 }
1407 }
1408
1409 private void decodeNonJdkCompatible(ChannelHandlerContext ctx, ByteBuf in) {
1410 try {
1411 unwrap(ctx, in, in.readableBytes());
1412 } catch (Throwable cause) {
1413 handleUnwrapThrowable(ctx, cause);
1414 }
1415 }
1416
1417 private void handleUnwrapThrowable(ChannelHandlerContext ctx, Throwable cause) {
1418 try {
1419 // We should attempt to notify the handshake failure before writing any pending data. If we are in unwrap
1420 // and failed during the handshake process, and we attempt to wrap, then promises will fail, and if
1421 // listeners immediately close the Channel then we may end up firing the handshake event after the Channel
1422 // has been closed.
1423 if (handshakePromise.tryFailure(cause)) {
1424 ctx.fireUserEventTriggered(new SslHandshakeCompletionEvent(cause));
1425 }
1426
1427 // Let's check if the handler was removed in the meantime and so pendingUnencryptedWrites is null.
1428 if (pendingUnencryptedWrites != null) {
1429 // We need to flush one time as there may be an alert that we should send to the remote peer because
1430 // of the SSLException reported here.
1431 wrapAndFlush(ctx);
1432 }
1433 } catch (SSLException ex) {
1434 logger.debug("SSLException during trying to call SSLEngine.wrap(...)" +
1435 " because of an previous SSLException, ignoring...", ex);
1436 } finally {
1437 // ensure we always flush and close the channel.
1438 setHandshakeFailure(ctx, cause, true, false, true);
1439 }
1440 PlatformDependent.throwException(cause);
1441 }
1442
1443 @Override
1444 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws SSLException {
1445 if (isStateSet(STATE_PROCESS_TASK)) {
1446 return;
1447 }
1448 if (jdkCompatibilityMode) {
1449 decodeJdkCompatible(ctx, in);
1450 } else {
1451 decodeNonJdkCompatible(ctx, in);
1452 }
1453 }
1454
1455 @Override
1456 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
1457 channelReadComplete0(ctx);
1458 }
1459
1460 private void channelReadComplete0(ChannelHandlerContext ctx) {
1461 // Discard bytes of the cumulation buffer if needed.
1462 discardSomeReadBytes();
1463
1464 flushIfNeeded(ctx);
1465 readIfNeeded(ctx);
1466
1467 clearState(STATE_FIRE_CHANNEL_READ);
1468 ctx.fireChannelReadComplete();
1469 }
1470
1471 private void readIfNeeded(ChannelHandlerContext ctx) {
1472 // If handshake is not finished yet, we need more data.
1473 if (!ctx.channel().config().isAutoRead() &&
1474 (!isStateSet(STATE_FIRE_CHANNEL_READ) || !handshakePromise.isDone())) {
1475 // No auto-read used and no message passed through the ChannelPipeline or the handshake was not complete
1476 // yet, which means we need to trigger the read to ensure we not encounter any stalls.
1477 ctx.read();
1478 }
1479 }
1480
1481 private void flushIfNeeded(ChannelHandlerContext ctx) {
1482 if (isStateSet(STATE_NEEDS_FLUSH)) {
1483 forceFlush(ctx);
1484 }
1485 }
1486
1487 /**
1488 * Calls {@link SSLEngine#unwrap(ByteBuffer, ByteBuffer)} with an empty buffer to handle handshakes, etc.
1489 */
1490 private int unwrapNonAppData(ChannelHandlerContext ctx) throws SSLException {
1491 return unwrap(ctx, Unpooled.EMPTY_BUFFER, 0);
1492 }
1493
1494 /**
1495 * Unwraps inbound SSL records.
1496 */
1497 private int unwrap(ChannelHandlerContext ctx, ByteBuf packet, int length) throws SSLException {
1498 final int originalLength = length;
1499 boolean wrapLater = false;
1500 boolean notifyClosure = false;
1501 boolean executedRead = false;
1502 ByteBuf decodeOut = allocate(ctx, length);
1503 try {
1504 // Only continue to loop if the handler was not removed in the meantime.
1505 // See https://github.com/netty/netty/issues/5860
1506 do {
1507 final SSLEngineResult result = engineType.unwrap(this, packet, length, decodeOut);
1508 final Status status = result.getStatus();
1509 final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
1510 final int produced = result.bytesProduced();
1511 final int consumed = result.bytesConsumed();
1512
1513 // Skip bytes now in case unwrap is called in a re-entry scenario. For example LocalChannel.read()
1514 // may entry this method in a re-entry fashion and if the peer is writing into a shared buffer we may
1515 // unwrap the same data multiple times.
1516 packet.skipBytes(consumed);
1517 length -= consumed;
1518
1519 // The expected sequence of events is:
1520 // 1. Notify of handshake success
1521 // 2. fireChannelRead for unwrapped data
1522 if (handshakeStatus == HandshakeStatus.FINISHED || handshakeStatus == HandshakeStatus.NOT_HANDSHAKING) {
1523 wrapLater |= (decodeOut.isReadable() ?
1524 setHandshakeSuccessUnwrapMarkReentry() : setHandshakeSuccess()) ||
1525 handshakeStatus == HandshakeStatus.FINISHED ||
1526 // We need to check if pendingUnecryptedWrites is null as the SslHandler
1527 // might have been removed in the meantime.
1528 (pendingUnencryptedWrites != null && !pendingUnencryptedWrites.isEmpty());
1529 }
1530
1531 // Dispatch decoded data after we have notified of handshake success. If this method has been invoked
1532 // in a re-entry fashion we execute a task on the executor queue to process after the stack unwinds
1533 // to preserve order of events.
1534 if (decodeOut.isReadable()) {
1535 setState(STATE_FIRE_CHANNEL_READ);
1536 if (isStateSet(STATE_UNWRAP_REENTRY)) {
1537 executedRead = true;
1538 executeChannelRead(ctx, decodeOut);
1539 } else {
1540 ctx.fireChannelRead(decodeOut);
1541 }
1542 decodeOut = null;
1543 }
1544
1545 if (status == Status.CLOSED) {
1546 notifyClosure = true; // notify about the CLOSED state of the SSLEngine. See #137
1547 } else if (status == Status.BUFFER_OVERFLOW) {
1548 if (decodeOut != null) {
1549 decodeOut.release();
1550 }
1551 final int applicationBufferSize = engine.getSession().getApplicationBufferSize();
1552 // Allocate a new buffer which can hold all the rest data and loop again.
1553 // It may happen that applicationBufferSize < produced while there is still more to unwrap, in this
1554 // case we will just allocate a new buffer with the capacity of applicationBufferSize and call
1555 // unwrap again.
1556 decodeOut = allocate(ctx, engineType.calculatePendingData(this, applicationBufferSize < produced ?
1557 applicationBufferSize : applicationBufferSize - produced));
1558 continue;
1559 }
1560
1561 if (handshakeStatus == HandshakeStatus.NEED_TASK) {
1562 boolean pending = runDelegatedTasks(true);
1563 if (!pending) {
1564 // We scheduled a task on the delegatingTaskExecutor, so stop processing as we will
1565 // resume once the task completes.
1566 //
1567 // We break out of the loop only and do NOT return here as we still may need to notify
1568 // about the closure of the SSLEngine.
1569 wrapLater = false;
1570 break;
1571 }
1572 } else if (handshakeStatus == HandshakeStatus.NEED_WRAP) {
1573 // If the wrap operation transitions the status to NOT_HANDSHAKING and there is no more data to
1574 // unwrap then the next call to unwrap will not produce any data. We can avoid the potentially
1575 // costly unwrap operation and break out of the loop.
1576 if (wrapNonAppData(ctx, true) && length == 0) {
1577 break;
1578 }
1579 }
1580
1581 if (status == Status.BUFFER_UNDERFLOW ||
1582 // If we processed NEED_TASK we should try again even we did not consume or produce anything.
1583 handshakeStatus != HandshakeStatus.NEED_TASK && (consumed == 0 && produced == 0 ||
1584 (length == 0 && handshakeStatus == HandshakeStatus.NOT_HANDSHAKING))) {
1585 if (handshakeStatus == HandshakeStatus.NEED_UNWRAP) {
1586 // The underlying engine is starving so we need to feed it with more data.
1587 // See https://github.com/netty/netty/pull/5039
1588 readIfNeeded(ctx);
1589 }
1590
1591 break;
1592 } else if (decodeOut == null) {
1593 decodeOut = allocate(ctx, length);
1594 }
1595 } while (!ctx.isRemoved());
1596
1597 if (isStateSet(STATE_FLUSHED_BEFORE_HANDSHAKE) && handshakePromise.isDone()) {
1598 // We need to call wrap(...) in case there was a flush done before the handshake completed to ensure
1599 // we do not stale.
1600 //
1601 // See https://github.com/netty/netty/pull/2437
1602 clearState(STATE_FLUSHED_BEFORE_HANDSHAKE);
1603 wrapLater = true;
1604 }
1605
1606 if (wrapLater) {
1607 wrap(ctx, true);
1608 }
1609 } finally {
1610 if (decodeOut != null) {
1611 decodeOut.release();
1612 }
1613
1614 if (notifyClosure) {
1615 if (executedRead) {
1616 executeNotifyClosePromise(ctx);
1617 } else {
1618 notifyClosePromise(null);
1619 }
1620 }
1621 }
1622 return originalLength - length;
1623 }
1624
1625 private boolean setHandshakeSuccessUnwrapMarkReentry() throws SSLException {
1626 // setHandshakeSuccess calls out to external methods which may trigger re-entry. We need to preserve ordering of
1627 // fireChannelRead for decodeOut relative to re-entry data.
1628 final boolean setReentryState = !isStateSet(STATE_UNWRAP_REENTRY);
1629 if (setReentryState) {
1630 setState(STATE_UNWRAP_REENTRY);
1631 }
1632 try {
1633 return setHandshakeSuccess();
1634 } finally {
1635 // It is unlikely this specific method will be re-entry because handshake completion is infrequent, but just
1636 // in case we only clear the state if we set it in the first place.
1637 if (setReentryState) {
1638 clearState(STATE_UNWRAP_REENTRY);
1639 }
1640 }
1641 }
1642
1643 private void executeNotifyClosePromise(final ChannelHandlerContext ctx) {
1644 try {
1645 ctx.executor().execute(new Runnable() {
1646 @Override
1647 public void run() {
1648 notifyClosePromise(null);
1649 }
1650 });
1651 } catch (RejectedExecutionException e) {
1652 notifyClosePromise(e);
1653 }
1654 }
1655
1656 private void executeChannelRead(final ChannelHandlerContext ctx, final ByteBuf decodedOut) {
1657 try {
1658 ctx.executor().execute(new Runnable() {
1659 @Override
1660 public void run() {
1661 ctx.fireChannelRead(decodedOut);
1662 }
1663 });
1664 } catch (RejectedExecutionException e) {
1665 decodedOut.release();
1666 throw e;
1667 }
1668 }
1669
1670 private static ByteBuffer toByteBuffer(ByteBuf out, int index, int len) {
1671 return out.nioBufferCount() == 1 ? out.internalNioBuffer(index, len) :
1672 out.nioBuffer(index, len);
1673 }
1674
1675 private static boolean inEventLoop(Executor executor) {
1676 return executor instanceof EventExecutor && ((EventExecutor) executor).inEventLoop();
1677 }
1678
1679 /**
1680 * Will either run the delegated task directly calling {@link Runnable#run()} and return {@code true} or will
1681 * offload the delegated task using {@link Executor#execute(Runnable)} and return {@code false}.
1682 *
1683 * If the task is offloaded it will take care to resume its work on the {@link EventExecutor} once there are no
1684 * more tasks to process.
1685 */
1686 private boolean runDelegatedTasks(boolean inUnwrap) {
1687 if (delegatedTaskExecutor == ImmediateExecutor.INSTANCE || inEventLoop(delegatedTaskExecutor)) {
1688 // We should run the task directly in the EventExecutor thread and not offload at all. As we are on the
1689 // EventLoop we can just run all tasks at once.
1690 for (;;) {
1691 Runnable task = engine.getDelegatedTask();
1692 if (task == null) {
1693 return true;
1694 }
1695 setState(STATE_PROCESS_TASK);
1696 if (task instanceof AsyncRunnable) {
1697 // Let's set the task to processing task before we try to execute it.
1698 boolean pending = false;
1699 try {
1700 AsyncRunnable asyncTask = (AsyncRunnable) task;
1701 AsyncTaskCompletionHandler completionHandler = new AsyncTaskCompletionHandler(inUnwrap);
1702 asyncTask.run(completionHandler);
1703 pending = completionHandler.resumeLater();
1704 if (pending) {
1705 return false;
1706 }
1707 } finally {
1708 if (!pending) {
1709 // The task has completed, lets clear the state. If it is not completed we will clear the
1710 // state once it is.
1711 clearState(STATE_PROCESS_TASK);
1712 }
1713 }
1714 } else {
1715 try {
1716 task.run();
1717 } finally {
1718 clearState(STATE_PROCESS_TASK);
1719 }
1720 }
1721 }
1722 } else {
1723 executeDelegatedTask(inUnwrap);
1724 return false;
1725 }
1726 }
1727
1728 private SslTasksRunner getTaskRunner(boolean inUnwrap) {
1729 return inUnwrap ? sslTaskRunnerForUnwrap : sslTaskRunner;
1730 }
1731
1732 private void executeDelegatedTask(boolean inUnwrap) {
1733 executeDelegatedTask(getTaskRunner(inUnwrap));
1734 }
1735
1736 private void executeDelegatedTask(SslTasksRunner task) {
1737 setState(STATE_PROCESS_TASK);
1738 try {
1739 delegatedTaskExecutor.execute(task);
1740 } catch (RejectedExecutionException e) {
1741 clearState(STATE_PROCESS_TASK);
1742 throw e;
1743 }
1744 }
1745
1746 private final class AsyncTaskCompletionHandler implements Runnable {
1747 private final boolean inUnwrap;
1748 boolean didRun;
1749 boolean resumeLater;
1750
1751 AsyncTaskCompletionHandler(boolean inUnwrap) {
1752 this.inUnwrap = inUnwrap;
1753 }
1754
1755 @Override
1756 public void run() {
1757 didRun = true;
1758 if (resumeLater) {
1759 getTaskRunner(inUnwrap).runComplete();
1760 }
1761 }
1762
1763 boolean resumeLater() {
1764 if (!didRun) {
1765 resumeLater = true;
1766 return true;
1767 }
1768 return false;
1769 }
1770 }
1771
1772 /**
1773 * {@link Runnable} that will be scheduled on the {@code delegatedTaskExecutor} and will take care
1774 * of resume work on the {@link EventExecutor} once the task was executed.
1775 */
1776 private final class SslTasksRunner implements Runnable {
1777 private final boolean inUnwrap;
1778 private final Runnable runCompleteTask = new Runnable() {
1779 @Override
1780 public void run() {
1781 runComplete();
1782 }
1783 };
1784
1785 SslTasksRunner(boolean inUnwrap) {
1786 this.inUnwrap = inUnwrap;
1787 }
1788
1789 // Handle errors which happened during task processing.
1790 private void taskError(Throwable e) {
1791 if (inUnwrap) {
1792 // As the error happened while the task was scheduled as part of unwrap(...) we also need to ensure
1793 // we fire it through the pipeline as inbound error to be consistent with what we do in decode(...).
1794 //
1795 // This will also ensure we fail the handshake future and flush all produced data.
1796 try {
1797 handleUnwrapThrowable(ctx, e);
1798 } catch (Throwable cause) {
1799 safeExceptionCaught(cause);
1800 }
1801 } else {
1802 setHandshakeFailure(ctx, e);
1803 forceFlush(ctx);
1804 }
1805 }
1806
1807 // Try to call exceptionCaught(...)
1808 private void safeExceptionCaught(Throwable cause) {
1809 try {
1810 exceptionCaught(ctx, wrapIfNeeded(cause));
1811 } catch (Throwable error) {
1812 ctx.fireExceptionCaught(error);
1813 }
1814 }
1815
1816 private Throwable wrapIfNeeded(Throwable cause) {
1817 if (!inUnwrap) {
1818 // If we are not in unwrap(...) we can just rethrow without wrapping at all.
1819 return cause;
1820 }
1821 // As the exception would have been triggered by an inbound operation we will need to wrap it in a
1822 // DecoderException to mimic what a decoder would do when decode(...) throws.
1823 return cause instanceof DecoderException ? cause : new DecoderException(cause);
1824 }
1825
1826 private void tryDecodeAgain() {
1827 try {
1828 channelRead(ctx, Unpooled.EMPTY_BUFFER);
1829 } catch (Throwable cause) {
1830 safeExceptionCaught(cause);
1831 } finally {
1832 // As we called channelRead(...) we also need to call channelReadComplete(...) which
1833 // will ensure we either call ctx.fireChannelReadComplete() or will trigger a ctx.read() if
1834 // more data is needed.
1835 channelReadComplete0(ctx);
1836 }
1837 }
1838
1839 /**
1840 * Executed after the wrapped {@code task} was executed via {@code delegatedTaskExecutor} to resume work
1841 * on the {@link EventExecutor}.
1842 */
1843 private void resumeOnEventExecutor() {
1844 assert ctx.executor().inEventLoop();
1845 clearState(STATE_PROCESS_TASK);
1846 try {
1847 HandshakeStatus status = engine.getHandshakeStatus();
1848 switch (status) {
1849 // There is another task that needs to be executed and offloaded to the delegatingTaskExecutor as
1850 // a result of this. Let's reschedule....
1851 case NEED_TASK:
1852 executeDelegatedTask(this);
1853
1854 break;
1855
1856 // The handshake finished, lets notify about the completion of it and resume processing.
1857 case FINISHED:
1858 // Not handshaking anymore, lets notify about the completion if not done yet and resume processing.
1859 case NOT_HANDSHAKING:
1860 setHandshakeSuccess(); // NOT_HANDSHAKING -> workaround for android skipping FINISHED state.
1861 try {
1862 // Lets call wrap to ensure we produce the alert if there is any pending and also to
1863 // ensure we flush any queued data..
1864 wrap(ctx, inUnwrap);
1865 } catch (Throwable e) {
1866 taskError(e);
1867 return;
1868 }
1869 if (inUnwrap) {
1870 // If we were in the unwrap call when the task was processed we should also try to unwrap
1871 // non app data first as there may not anything left in the inbound buffer to process.
1872 unwrapNonAppData(ctx);
1873 }
1874
1875 // Flush now as we may have written some data as part of the wrap call.
1876 forceFlush(ctx);
1877
1878 tryDecodeAgain();
1879 break;
1880
1881 // We need more data so lets try to unwrap first and then call decode again which will feed us
1882 // with buffered data (if there is any).
1883 case NEED_UNWRAP:
1884 try {
1885 unwrapNonAppData(ctx);
1886 } catch (SSLException e) {
1887 handleUnwrapThrowable(ctx, e);
1888 return;
1889 }
1890 tryDecodeAgain();
1891 break;
1892
1893 // To make progress we need to call SSLEngine.wrap(...) which may produce more output data
1894 // that will be written to the Channel.
1895 case NEED_WRAP:
1896 try {
1897 if (!wrapNonAppData(ctx, false) && inUnwrap) {
1898 // The handshake finished in wrapNonAppData(...), we need to try call
1899 // unwrapNonAppData(...) as we may have some alert that we should read.
1900 //
1901 // This mimics what we would do when we are calling this method while in unwrap(...).
1902 unwrapNonAppData(ctx);
1903 }
1904
1905 // Flush now as we may have written some data as part of the wrap call.
1906 forceFlush(ctx);
1907 } catch (Throwable e) {
1908 taskError(e);
1909 return;
1910 }
1911
1912 // Now try to feed in more data that we have buffered.
1913 tryDecodeAgain();
1914 break;
1915
1916 default:
1917 // Should never reach here as we handle all cases.
1918 throw new AssertionError();
1919 }
1920 } catch (Throwable cause) {
1921 safeExceptionCaught(cause);
1922 }
1923 }
1924
1925 void runComplete() {
1926 EventExecutor executor = ctx.executor();
1927 // Jump back on the EventExecutor. We do this even if we are already on the EventLoop to guard against
1928 // reentrancy issues. Failing to do so could lead to the situation of tryDecode(...) be called and so
1929 // channelRead(...) while still in the decode loop. In this case channelRead(...) might release the input
1930 // buffer if its empty which would then result in an IllegalReferenceCountException when we try to continue
1931 // decoding.
1932 //
1933 // See https://github.com/netty/netty-tcnative/issues/680
1934 executor.execute(new Runnable() {
1935 @Override
1936 public void run() {
1937 resumeOnEventExecutor();
1938 }
1939 });
1940 }
1941
1942 @Override
1943 public void run() {
1944 try {
1945 Runnable task = engine.getDelegatedTask();
1946 if (task == null) {
1947 // The task was processed in the meantime. Let's just return.
1948 return;
1949 }
1950 if (task instanceof AsyncRunnable) {
1951 AsyncRunnable asyncTask = (AsyncRunnable) task;
1952 asyncTask.run(runCompleteTask);
1953 } else {
1954 task.run();
1955 runComplete();
1956 }
1957 } catch (final Throwable cause) {
1958 handleException(cause);
1959 }
1960 }
1961
1962 private void handleException(final Throwable cause) {
1963 EventExecutor executor = ctx.executor();
1964 if (executor.inEventLoop()) {
1965 clearState(STATE_PROCESS_TASK);
1966 safeExceptionCaught(cause);
1967 } else {
1968 try {
1969 executor.execute(new Runnable() {
1970 @Override
1971 public void run() {
1972 clearState(STATE_PROCESS_TASK);
1973 safeExceptionCaught(cause);
1974 }
1975 });
1976 } catch (RejectedExecutionException ignore) {
1977 clearState(STATE_PROCESS_TASK);
1978 // the context itself will handle the rejected exception when try to schedule the operation so
1979 // ignore the RejectedExecutionException
1980 ctx.fireExceptionCaught(cause);
1981 }
1982 }
1983 }
1984 }
1985
1986 /**
1987 * Notify all the handshake futures about the successfully handshake
1988 * @return {@code true} if {@link #handshakePromise} was set successfully and a {@link SslHandshakeCompletionEvent}
1989 * was fired. {@code false} otherwise.
1990 */
1991 private boolean setHandshakeSuccess() throws SSLException {
1992 // Our control flow may invoke this method multiple times for a single FINISHED event. For example
1993 // wrapNonAppData may drain pendingUnencryptedWrites in wrap which transitions to handshake from FINISHED to
1994 // NOT_HANDSHAKING which invokes setHandshakeSuccess, and then wrapNonAppData also directly invokes this method.
1995 final SSLSession session = engine.getSession();
1996 if (resumptionController != null && !handshakePromise.isDone()) {
1997 try {
1998 if (resumptionController.validateResumeIfNeeded(engine) && logger.isDebugEnabled()) {
1999 logger.debug("{} Resumed and reauthenticated session", ctx.channel());
2000 }
2001 } catch (CertificateException e) {
2002 SSLHandshakeException exception = new SSLHandshakeException(e.getMessage());
2003 exception.initCause(e);
2004 throw exception;
2005 }
2006 }
2007 final boolean notified = !handshakePromise.isDone() && handshakePromise.trySuccess(ctx.channel());
2008 if (notified) {
2009 if (logger.isDebugEnabled()) {
2010 logger.debug(
2011 "{} HANDSHAKEN: protocol:{} cipher suite:{}",
2012 ctx.channel(),
2013 session.getProtocol(),
2014 session.getCipherSuite());
2015 }
2016 ctx.fireUserEventTriggered(SslHandshakeCompletionEvent.SUCCESS);
2017 }
2018 if (isStateSet(STATE_READ_DURING_HANDSHAKE)) {
2019 clearState(STATE_READ_DURING_HANDSHAKE);
2020 if (!ctx.channel().config().isAutoRead()) {
2021 ctx.read();
2022 }
2023 }
2024 return notified;
2025 }
2026
2027 /**
2028 * Notify all the handshake futures about the failure during the handshake.
2029 */
2030 private void setHandshakeFailure(ChannelHandlerContext ctx, Throwable cause) {
2031 setHandshakeFailure(ctx, cause, true, true, false);
2032 }
2033
2034 /**
2035 * Notify all the handshake futures about the failure during the handshake.
2036 */
2037 private void setHandshakeFailure(ChannelHandlerContext ctx, Throwable cause, boolean closeInbound,
2038 boolean notify, boolean alwaysFlushAndClose) {
2039 try {
2040 // Release all resources such as internal buffers that SSLEngine is managing.
2041 setState(STATE_OUTBOUND_CLOSED);
2042 engine.closeOutbound();
2043
2044 if (closeInbound) {
2045 try {
2046 engine.closeInbound();
2047 } catch (SSLException e) {
2048 if (logger.isDebugEnabled()) {
2049 // only log in debug mode as it most likely harmless and latest chrome still trigger
2050 // this all the time.
2051 //
2052 // See https://github.com/netty/netty/issues/1340
2053 String msg = e.getMessage();
2054 if (msg == null || !(msg.contains("possible truncation attack") ||
2055 msg.contains("closing inbound before receiving peer's close_notify"))) {
2056 logger.debug("{} SSLEngine.closeInbound() raised an exception.", ctx.channel(), e);
2057 }
2058 }
2059 }
2060 }
2061 if (handshakePromise.tryFailure(cause) || alwaysFlushAndClose) {
2062 SslUtils.handleHandshakeFailure(ctx, cause, notify);
2063 }
2064 } finally {
2065 // Ensure we remove and fail all pending writes in all cases and so release memory quickly.
2066 releaseAndFailAll(ctx, cause);
2067 }
2068 }
2069
2070 private void setHandshakeFailureTransportFailure(ChannelHandlerContext ctx, Throwable cause) {
2071 // If TLS control frames fail to write we are in an unknown state and may become out of
2072 // sync with our peer. We give up and close the channel. This will also take care of
2073 // cleaning up any outstanding state (e.g. handshake promise, queued unencrypted data).
2074 try {
2075 SSLException transportFailure = new SSLException("failure when writing TLS control frames", cause);
2076 releaseAndFailAll(ctx, transportFailure);
2077 if (handshakePromise.tryFailure(transportFailure)) {
2078 ctx.fireUserEventTriggered(new SslHandshakeCompletionEvent(transportFailure));
2079 }
2080 } finally {
2081 ctx.close();
2082 }
2083 }
2084
2085 private void releaseAndFailAll(ChannelHandlerContext ctx, Throwable cause) {
2086 if (resumptionController != null &&
2087 (!engine.getSession().isValid() || cause instanceof SSLHandshakeException)) {
2088 resumptionController.remove(engine());
2089 }
2090 if (pendingUnencryptedWrites != null) {
2091 pendingUnencryptedWrites.releaseAndFailAll(ctx, cause);
2092 }
2093 }
2094
2095 private void notifyClosePromise(Throwable cause) {
2096 if (cause == null) {
2097 if (sslClosePromise.trySuccess(ctx.channel())) {
2098 ctx.fireUserEventTriggered(SslCloseCompletionEvent.SUCCESS);
2099 }
2100 } else {
2101 if (sslClosePromise.tryFailure(cause)) {
2102 ctx.fireUserEventTriggered(new SslCloseCompletionEvent(cause));
2103 }
2104 }
2105 }
2106
2107 private void closeOutboundAndChannel(
2108 final ChannelHandlerContext ctx, final ChannelPromise promise, boolean disconnect) throws Exception {
2109 setState(STATE_OUTBOUND_CLOSED);
2110 engine.closeOutbound();
2111
2112 if (!ctx.channel().isActive()) {
2113 if (disconnect) {
2114 ctx.disconnect(promise);
2115 } else {
2116 ctx.close(promise);
2117 }
2118 return;
2119 }
2120
2121 ChannelPromise closeNotifyPromise = ctx.newPromise();
2122 try {
2123 flush(ctx, closeNotifyPromise);
2124 } finally {
2125 if (!isStateSet(STATE_CLOSE_NOTIFY)) {
2126 setState(STATE_CLOSE_NOTIFY);
2127 // It's important that we do not pass the original ChannelPromise to safeClose(...) as when flush(....)
2128 // throws an Exception it will be propagated to the AbstractChannelHandlerContext which will try
2129 // to fail the promise because of this. This will then fail as it was already completed by
2130 // safeClose(...). We create a new ChannelPromise and try to notify the original ChannelPromise
2131 // once it is complete. If we fail to do so we just ignore it as in this case it was failed already
2132 // because of a propagated Exception.
2133 //
2134 // See https://github.com/netty/netty/issues/5931
2135 safeClose(ctx, closeNotifyPromise, PromiseNotifier.cascade(false, ctx.newPromise(), promise));
2136 } else {
2137 /// We already handling the close_notify so just attach the promise to the sslClosePromise.
2138 sslClosePromise.addListener(new FutureListener<Channel>() {
2139 @Override
2140 public void operationComplete(Future<Channel> future) {
2141 promise.setSuccess();
2142 }
2143 });
2144 }
2145 }
2146 }
2147
2148 private void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
2149 if (pendingUnencryptedWrites != null) {
2150 pendingUnencryptedWrites.add(Unpooled.EMPTY_BUFFER, promise);
2151 } else {
2152 promise.setFailure(newPendingWritesNullException());
2153 }
2154 flush(ctx);
2155 }
2156
2157 @Override
2158 public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
2159 this.ctx = ctx;
2160 Channel channel = ctx.channel();
2161 pendingUnencryptedWrites = new SslHandlerCoalescingBufferQueue(channel, 16, engineType.wantsDirectBuffer) {
2162 @Override
2163 protected int wrapDataSize() {
2164 return SslHandler.this.wrapDataSize;
2165 }
2166 };
2167
2168 setOpensslEngineSocketFd(channel);
2169 boolean fastOpen = Boolean.TRUE.equals(channel.config().getOption(ChannelOption.TCP_FASTOPEN_CONNECT));
2170 boolean active = channel.isActive();
2171 if (active || fastOpen) {
2172 // Explicitly flush the handshake only if the channel is already active.
2173 // With TCP Fast Open, we write to the outbound buffer before the TCP connect is established.
2174 // The buffer will then be flushed as part of establishing the connection, saving us a round-trip.
2175 startHandshakeProcessing(active);
2176 // If we weren't able to include client_hello in the TCP SYN (e.g. no token, disabled at the OS) we have to
2177 // flush pending data in the outbound buffer later in channelActive().
2178 final ChannelOutboundBuffer outboundBuffer;
2179 if (fastOpen && ((outboundBuffer = channel.unsafe().outboundBuffer()) == null ||
2180 outboundBuffer.totalPendingWriteBytes() > 0)) {
2181 setState(STATE_NEEDS_FLUSH);
2182 }
2183 }
2184 }
2185
2186 private void startHandshakeProcessing(boolean flushAtEnd) {
2187 if (!isStateSet(STATE_HANDSHAKE_STARTED)) {
2188 setState(STATE_HANDSHAKE_STARTED);
2189 if (engine.getUseClientMode()) {
2190 // Begin the initial handshake.
2191 // channelActive() event has been fired already, which means this.channelActive() will
2192 // not be invoked. We have to initialize here instead.
2193 handshake(flushAtEnd);
2194 }
2195 applyHandshakeTimeout();
2196 } else if (isStateSet(STATE_NEEDS_FLUSH)) {
2197 forceFlush(ctx);
2198 }
2199 }
2200
2201 /**
2202 * Performs TLS renegotiation.
2203 */
2204 public Future<Channel> renegotiate() {
2205 ChannelHandlerContext ctx = this.ctx;
2206 if (ctx == null) {
2207 throw new IllegalStateException();
2208 }
2209
2210 return renegotiate(ctx.executor().<Channel>newPromise());
2211 }
2212
2213 /**
2214 * Performs TLS renegotiation.
2215 */
2216 public Future<Channel> renegotiate(final Promise<Channel> promise) {
2217 ObjectUtil.checkNotNull(promise, "promise");
2218
2219 ChannelHandlerContext ctx = this.ctx;
2220 if (ctx == null) {
2221 throw new IllegalStateException();
2222 }
2223
2224 EventExecutor executor = ctx.executor();
2225 if (!executor.inEventLoop()) {
2226 executor.execute(new Runnable() {
2227 @Override
2228 public void run() {
2229 renegotiateOnEventLoop(promise);
2230 }
2231 });
2232 return promise;
2233 }
2234
2235 renegotiateOnEventLoop(promise);
2236 return promise;
2237 }
2238
2239 private void renegotiateOnEventLoop(final Promise<Channel> newHandshakePromise) {
2240 final Promise<Channel> oldHandshakePromise = handshakePromise;
2241 if (!oldHandshakePromise.isDone()) {
2242 // There's no need to handshake because handshake is in progress already.
2243 // Merge the new promise into the old one.
2244 PromiseNotifier.cascade(oldHandshakePromise, newHandshakePromise);
2245 } else {
2246 handshakePromise = newHandshakePromise;
2247 handshake(true);
2248 applyHandshakeTimeout();
2249 }
2250 }
2251
2252 /**
2253 * Performs TLS (re)negotiation.
2254 * @param flushAtEnd Set to {@code true} if the outbound buffer should be flushed (written to the network) at the
2255 * end. Set to {@code false} if the handshake will be flushed later, e.g. as part of TCP Fast Open
2256 * connect.
2257 */
2258 private void handshake(boolean flushAtEnd) {
2259 if (engine.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING) {
2260 // Not all SSLEngine implementations support calling beginHandshake multiple times while a handshake
2261 // is in progress. See https://github.com/netty/netty/issues/4718.
2262 return;
2263 }
2264 if (handshakePromise.isDone()) {
2265 // If the handshake is done already lets just return directly as there is no need to trigger it again.
2266 // This can happen if the handshake(...) was triggered before we called channelActive(...) by a
2267 // flush() that was triggered by a ChannelFutureListener that was added to the ChannelFuture returned
2268 // from the connect(...) method. In this case we will see the flush() happen before we had a chance to
2269 // call fireChannelActive() on the pipeline.
2270 return;
2271 }
2272
2273 // Begin handshake.
2274 final ChannelHandlerContext ctx = this.ctx;
2275 try {
2276 engine.beginHandshake();
2277 wrapNonAppData(ctx, false);
2278 } catch (Throwable e) {
2279 setHandshakeFailure(ctx, e);
2280 } finally {
2281 if (flushAtEnd) {
2282 forceFlush(ctx);
2283 }
2284 }
2285 }
2286
2287 private void applyHandshakeTimeout() {
2288 final Promise<Channel> localHandshakePromise = this.handshakePromise;
2289
2290 // Set timeout if necessary.
2291 final long handshakeTimeoutMillis = this.handshakeTimeoutMillis;
2292 if (handshakeTimeoutMillis <= 0 || localHandshakePromise.isDone()) {
2293 return;
2294 }
2295
2296 final Future<?> timeoutFuture = ctx.executor().schedule(new Runnable() {
2297 @Override
2298 public void run() {
2299 if (localHandshakePromise.isDone()) {
2300 return;
2301 }
2302 SSLException exception =
2303 new SslHandshakeTimeoutException("handshake timed out after " + handshakeTimeoutMillis + "ms");
2304 try {
2305 if (localHandshakePromise.tryFailure(exception)) {
2306 SslUtils.handleHandshakeFailure(ctx, exception, true);
2307 }
2308 } finally {
2309 releaseAndFailAll(ctx, exception);
2310 }
2311 }
2312 }, handshakeTimeoutMillis, TimeUnit.MILLISECONDS);
2313
2314 // Cancel the handshake timeout when handshake is finished.
2315 localHandshakePromise.addListener(new FutureListener<Channel>() {
2316 @Override
2317 public void operationComplete(Future<Channel> f) throws Exception {
2318 timeoutFuture.cancel(false);
2319 }
2320 });
2321 }
2322
2323 private void forceFlush(ChannelHandlerContext ctx) {
2324 clearState(STATE_NEEDS_FLUSH);
2325 ctx.flush();
2326 }
2327
2328 private void setOpensslEngineSocketFd(Channel c) {
2329 if (c instanceof UnixChannel && engine instanceof ReferenceCountedOpenSslEngine) {
2330 ((ReferenceCountedOpenSslEngine) engine).bioSetFd(((UnixChannel) c).fd().intValue());
2331 }
2332 }
2333
2334 /**
2335 * Issues an initial TLS handshake once connected when used in client-mode
2336 */
2337 @Override
2338 public void channelActive(final ChannelHandlerContext ctx) throws Exception {
2339 setOpensslEngineSocketFd(ctx.channel());
2340 if (!startTls) {
2341 startHandshakeProcessing(true);
2342 }
2343 ctx.fireChannelActive();
2344 }
2345
2346 private void safeClose(
2347 final ChannelHandlerContext ctx, final ChannelFuture flushFuture,
2348 final ChannelPromise promise) {
2349 if (!ctx.channel().isActive()) {
2350 ctx.close(promise);
2351 return;
2352 }
2353
2354 final Future<?> timeoutFuture;
2355 if (!flushFuture.isDone()) {
2356 long closeNotifyTimeout = closeNotifyFlushTimeoutMillis;
2357 if (closeNotifyTimeout > 0) {
2358 // Force-close the connection if close_notify is not fully sent in time.
2359 timeoutFuture = ctx.executor().schedule(new Runnable() {
2360 @Override
2361 public void run() {
2362 // May be done in the meantime as cancel(...) is only best effort.
2363 if (!flushFuture.isDone()) {
2364 logger.warn("{} Last write attempt timed out; force-closing the connection.",
2365 ctx.channel());
2366 addCloseListener(ctx.close(ctx.newPromise()), promise);
2367 }
2368 }
2369 }, closeNotifyTimeout, TimeUnit.MILLISECONDS);
2370 } else {
2371 timeoutFuture = null;
2372 }
2373 } else {
2374 timeoutFuture = null;
2375 }
2376
2377 // Close the connection if close_notify is sent in time.
2378 flushFuture.addListener(new ChannelFutureListener() {
2379 @Override
2380 public void operationComplete(ChannelFuture f) {
2381 if (timeoutFuture != null) {
2382 timeoutFuture.cancel(false);
2383 }
2384 final long closeNotifyReadTimeout = closeNotifyReadTimeoutMillis;
2385 if (closeNotifyReadTimeout <= 0) {
2386 // Trigger the close in all cases to make sure the promise is notified
2387 // See https://github.com/netty/netty/issues/2358
2388 addCloseListener(ctx.close(ctx.newPromise()), promise);
2389 } else {
2390 final Future<?> closeNotifyReadTimeoutFuture;
2391
2392 if (!sslClosePromise.isDone()) {
2393 closeNotifyReadTimeoutFuture = ctx.executor().schedule(new Runnable() {
2394 @Override
2395 public void run() {
2396 if (!sslClosePromise.isDone()) {
2397 logger.debug(
2398 "{} did not receive close_notify in {}ms; force-closing the connection.",
2399 ctx.channel(), closeNotifyReadTimeout);
2400
2401 // Do the close now...
2402 addCloseListener(ctx.close(ctx.newPromise()), promise);
2403 }
2404 }
2405 }, closeNotifyReadTimeout, TimeUnit.MILLISECONDS);
2406 } else {
2407 closeNotifyReadTimeoutFuture = null;
2408 }
2409
2410 // Do the close once the we received the close_notify.
2411 sslClosePromise.addListener(new FutureListener<Channel>() {
2412 @Override
2413 public void operationComplete(Future<Channel> future) throws Exception {
2414 if (closeNotifyReadTimeoutFuture != null) {
2415 closeNotifyReadTimeoutFuture.cancel(false);
2416 }
2417 addCloseListener(ctx.close(ctx.newPromise()), promise);
2418 }
2419 });
2420 }
2421 }
2422 });
2423 }
2424
2425 private static void addCloseListener(ChannelFuture future, ChannelPromise promise) {
2426 // We notify the promise in the ChannelPromiseNotifier as there is a "race" where the close(...) call
2427 // by the timeoutFuture and the close call in the flushFuture listener will be called. Because of
2428 // this we need to use trySuccess() and tryFailure(...) as otherwise we can cause an
2429 // IllegalStateException.
2430 // Also we not want to log if the notification happens as this is expected in some cases.
2431 // See https://github.com/netty/netty/issues/5598
2432 PromiseNotifier.cascade(false, future, promise);
2433 }
2434
2435 /**
2436 * Always prefer a direct buffer when it's pooled, so that we reduce the number of memory copies
2437 * in {@link OpenSslEngine}.
2438 */
2439 private ByteBuf allocate(ChannelHandlerContext ctx, int capacity) {
2440 ByteBufAllocator alloc = ctx.alloc();
2441 if (engineType.wantsDirectBuffer) {
2442 return alloc.directBuffer(capacity);
2443 } else {
2444 return alloc.buffer(capacity);
2445 }
2446 }
2447
2448 /**
2449 * Allocates an outbound network buffer for {@link SSLEngine#wrap(ByteBuffer, ByteBuffer)} which can encrypt
2450 * the specified amount of pending bytes.
2451 */
2452 private ByteBuf allocateOutNetBuf(ChannelHandlerContext ctx, int pendingBytes, int numComponents) {
2453 return engineType.allocateWrapBuffer(this, ctx.alloc(), pendingBytes, numComponents);
2454 }
2455
2456 private boolean isStateSet(int bit) {
2457 return (state & bit) == bit;
2458 }
2459
2460 private void setState(int bit) {
2461 state |= bit;
2462 }
2463
2464 private void clearState(int bit) {
2465 state &= ~bit;
2466 }
2467
2468 private final class LazyChannelPromise extends DefaultPromise<Channel> {
2469
2470 @Override
2471 protected EventExecutor executor() {
2472 if (ctx == null) {
2473 throw new IllegalStateException();
2474 }
2475 return ctx.executor();
2476 }
2477
2478 @Override
2479 protected void checkDeadLock() {
2480 if (ctx == null) {
2481 // If ctx is null the handlerAdded(...) callback was not called, in this case the checkDeadLock()
2482 // method was called from another Thread then the one that is used by ctx.executor(). We need to
2483 // guard against this as a user can see a race if handshakeFuture().sync() is called but the
2484 // handlerAdded(..) method was not yet as it is called from the EventExecutor of the
2485 // ChannelHandlerContext. If we not guard against this super.checkDeadLock() would cause an
2486 // IllegalStateException when trying to call executor().
2487 return;
2488 }
2489 super.checkDeadLock();
2490 }
2491 }
2492 }