1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.handler.ssl;
17
18 import io.netty5.buffer.BufferUtil;
19 import io.netty5.buffer.api.Buffer;
20 import io.netty5.buffer.api.BufferAllocator;
21 import io.netty5.buffer.api.CompositeBuffer;
22 import io.netty5.buffer.api.DefaultBufferAllocators;
23 import io.netty5.util.Resource;
24 import io.netty5.buffer.api.StandardAllocationTypes;
25 import io.netty5.channel.AbstractCoalescingBufferQueue;
26 import io.netty5.channel.Channel;
27 import io.netty5.channel.ChannelException;
28 import io.netty5.channel.ChannelHandler;
29 import io.netty5.channel.ChannelHandlerContext;
30 import io.netty5.channel.ChannelOption;
31 import io.netty5.channel.ChannelPipeline;
32 import io.netty5.channel.unix.UnixChannel;
33 import io.netty5.handler.codec.ByteToMessageDecoder;
34 import io.netty5.handler.codec.DecoderException;
35 import io.netty5.handler.codec.UnsupportedMessageTypeException;
36 import io.netty5.util.concurrent.DefaultPromise;
37 import io.netty5.util.concurrent.EventExecutor;
38 import io.netty5.util.concurrent.Future;
39 import io.netty5.util.concurrent.ImmediateEventExecutor;
40 import io.netty5.util.concurrent.ImmediateExecutor;
41 import io.netty5.util.concurrent.Promise;
42 import io.netty5.util.internal.PlatformDependent;
43 import io.netty5.util.internal.SilentDispose;
44 import io.netty5.util.internal.UnstableApi;
45 import io.netty5.util.internal.logging.InternalLogger;
46 import io.netty5.util.internal.logging.InternalLoggerFactory;
47
48 import javax.net.ssl.SSLEngine;
49 import javax.net.ssl.SSLEngineResult;
50 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
51 import javax.net.ssl.SSLEngineResult.Status;
52 import javax.net.ssl.SSLException;
53 import javax.net.ssl.SSLHandshakeException;
54 import javax.net.ssl.SSLSession;
55 import java.io.IOException;
56 import java.nio.ByteBuffer;
57 import java.nio.channels.ClosedChannelException;
58 import java.nio.channels.DatagramChannel;
59 import java.nio.channels.SocketChannel;
60 import java.util.concurrent.Executor;
61 import java.util.concurrent.RejectedExecutionException;
62 import java.util.concurrent.TimeUnit;
63 import java.util.regex.Pattern;
64
65 import static io.netty5.handler.ssl.SslUtils.getEncryptedPacketLength;
66 import static io.netty5.util.internal.ObjectUtil.checkPositiveOrZero;
67 import static io.netty5.util.internal.SilentDispose.autoClosing;
68 import static java.util.Objects.requireNonNull;
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160 public class SslHandler extends ByteToMessageDecoder {
161 private static final InternalLogger logger =
162 InternalLoggerFactory.getInstance(SslHandler.class);
163 private static final Pattern IGNORABLE_CLASS_IN_STACK = Pattern.compile(
164 "^.*(?:Socket|Datagram)Channel.*$");
165 private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile(
166 "^.*(?:connection.*(?:reset|closed|abort|broken)|broken.*pipe).*$", Pattern.CASE_INSENSITIVE);
167 private static final int STATE_SENT_FIRST_MESSAGE = 1;
168 private static final int STATE_FLUSHED_BEFORE_HANDSHAKE = 1 << 1;
169 private static final int STATE_READ_DURING_HANDSHAKE = 1 << 2;
170 private static final int STATE_HANDSHAKE_STARTED = 1 << 3;
171
172
173
174
175 private static final int STATE_NEEDS_FLUSH = 1 << 4;
176 private static final int STATE_OUTBOUND_CLOSED = 1 << 5;
177 private static final int STATE_CLOSE_NOTIFY = 1 << 6;
178 private static final int STATE_PROCESS_TASK = 1 << 7;
179
180
181
182
183 private static final int STATE_FIRE_CHANNEL_READ = 1 << 8;
184 private static final int STATE_UNWRAP_REENTRY = 1 << 9;
185
186
187
188
189
190 private static final int MAX_PLAINTEXT_LENGTH = 16 * 1024;
191
192 private enum SslEngineType {
193 TCNATIVE(true, COMPOSITE_CUMULATOR) {
194 @Override
195 Buffer allocateWrapBuffer(SslHandler handler, BufferAllocator allocator,
196 int pendingBytes, int numComponents) {
197 ReferenceCountedOpenSslEngine engine = (ReferenceCountedOpenSslEngine) handler.engine;
198 int maxWrapLength = engine.calculateMaxLengthForWrap(pendingBytes, numComponents);
199 if (allocator.getAllocationType() != StandardAllocationTypes.OFF_HEAP) {
200 allocator = DefaultBufferAllocators.offHeapAllocator();
201 }
202 return allocator.allocate(maxWrapLength);
203 }
204
205 @Override
206 int calculatePendingData(SslHandler handler, int guess) {
207 int sslPending = ((ReferenceCountedOpenSslEngine) handler.engine).sslPending();
208 return sslPending > 0 ? sslPending : guess;
209 }
210
211 @Override
212 boolean jdkCompatibilityMode(SSLEngine engine) {
213 return ((ReferenceCountedOpenSslEngine) engine).jdkCompatibilityMode;
214 }
215 },
216 CONSCRYPT(true, COMPOSITE_CUMULATOR) {
217 @Override
218 Buffer allocateWrapBuffer(SslHandler handler, BufferAllocator allocator,
219 int pendingBytes, int numComponents) {
220 ConscryptAlpnSslEngine engine = (ConscryptAlpnSslEngine) handler.engine;
221 int size = engine.calculateOutNetBufSize(pendingBytes, numComponents);
222 if (allocator.getAllocationType() != StandardAllocationTypes.OFF_HEAP) {
223 allocator = DefaultBufferAllocators.offHeapAllocator();
224 }
225 return allocator.allocate(size);
226 }
227
228 @Override
229 int calculatePendingData(SslHandler handler, int guess) {
230 return guess;
231 }
232
233 @Override
234 boolean jdkCompatibilityMode(SSLEngine engine) {
235 return true;
236 }
237 },
238 JDK(false, MERGE_CUMULATOR) {
239 @Override
240 Buffer allocateWrapBuffer(SslHandler handler, BufferAllocator allocator,
241 int pendingBytes, int numComponents) {
242
243
244
245
246
247
248 if (allocator.getAllocationType() == StandardAllocationTypes.OFF_HEAP) {
249 allocator = DefaultBufferAllocators.onHeapAllocator();
250 }
251 return allocator.allocate(handler.engine.getSession().getPacketBufferSize());
252 }
253
254 @Override
255 int calculatePendingData(SslHandler handler, int guess) {
256 return guess;
257 }
258
259 @Override
260 boolean jdkCompatibilityMode(SSLEngine engine) {
261 return true;
262 }
263 };
264
265 static SslEngineType forEngine(SSLEngine engine) {
266 return engine instanceof ReferenceCountedOpenSslEngine ? TCNATIVE :
267 engine instanceof ConscryptAlpnSslEngine ? CONSCRYPT : JDK;
268 }
269
270 SslEngineType(boolean wantsDirectBuffer, Cumulator cumulator) {
271 this.wantsDirectBuffer = wantsDirectBuffer;
272 this.cumulator = cumulator;
273 }
274
275 abstract int calculatePendingData(SslHandler handler, int guess);
276
277 abstract boolean jdkCompatibilityMode(SSLEngine engine);
278
279 abstract Buffer allocateWrapBuffer(SslHandler handler, BufferAllocator allocator,
280 int pendingBytes, int numComponents);
281
282
283
284
285
286
287
288 final boolean wantsDirectBuffer;
289
290
291
292
293
294
295
296
297
298
299
300 final Cumulator cumulator;
301 }
302
303 private volatile ChannelHandlerContext ctx;
304 private final SSLEngine engine;
305 private final SslEngineType engineType;
306 private final Executor delegatedTaskExecutor;
307 private final boolean jdkCompatibilityMode;
308
309
310
311
312
313 private final EngineWrapper engineWrapper;
314
315 private final boolean startTls;
316
317 private final SslTasksRunner sslTaskRunnerForUnwrap = new SslTasksRunner(true);
318 private final SslTasksRunner sslTaskRunner = new SslTasksRunner(false);
319
320 private SslHandlerCoalescingBufferQueue pendingUnencryptedWrites;
321 private Promise<Channel> handshakePromise = new LazyPromise();
322 private final Promise<Channel> sslClosePromise = new LazyPromise();
323
324 private int packetLength;
325 private short state;
326
327 private volatile long handshakeTimeoutMillis = 10000;
328 private volatile long closeNotifyFlushTimeoutMillis = 3000;
329 private volatile long closeNotifyReadTimeoutMillis;
330 volatile int wrapDataSize = MAX_PLAINTEXT_LENGTH;
331
332
333
334
335
336
337 public SslHandler(SSLEngine engine) {
338 this(engine, false);
339 }
340
341
342
343
344
345
346
347
348 public SslHandler(SSLEngine engine, boolean startTls) {
349 this(engine, startTls, ImmediateExecutor.INSTANCE);
350 }
351
352
353
354
355
356
357
358
359 public SslHandler(SSLEngine engine, Executor delegatedTaskExecutor) {
360 this(engine, false, delegatedTaskExecutor);
361 }
362
363
364
365
366
367
368
369
370
371
372 public SslHandler(SSLEngine engine, boolean startTls, Executor delegatedTaskExecutor) {
373 super(SslEngineType.forEngine(engine).cumulator);
374 requireNonNull(engine, "engine");
375 requireNonNull(delegatedTaskExecutor, "delegatedTaskExecutor");
376 this.engine = engine;
377 engineType = SslEngineType.forEngine(engine);
378 this.delegatedTaskExecutor = delegatedTaskExecutor;
379 this.startTls = startTls;
380 engineWrapper = new EngineWrapper(engine, engineType.wantsDirectBuffer);
381 jdkCompatibilityMode = engineType.jdkCompatibilityMode(engine);
382 }
383
384 public long getHandshakeTimeoutMillis() {
385 return handshakeTimeoutMillis;
386 }
387
388 public void setHandshakeTimeout(long handshakeTimeout, TimeUnit unit) {
389 requireNonNull(unit, "unit");
390 setHandshakeTimeoutMillis(unit.toMillis(handshakeTimeout));
391 }
392
393 public void setHandshakeTimeoutMillis(long handshakeTimeoutMillis) {
394 this.handshakeTimeoutMillis = checkPositiveOrZero(handshakeTimeoutMillis, "handshakeTimeoutMillis");
395 }
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417 @UnstableApi
418 public final void setWrapDataSize(int wrapDataSize) {
419 this.wrapDataSize = wrapDataSize;
420 }
421
422
423
424
425 @Deprecated
426 public long getCloseNotifyTimeoutMillis() {
427 return getCloseNotifyFlushTimeoutMillis();
428 }
429
430
431
432
433 @Deprecated
434 public void setCloseNotifyTimeout(long closeNotifyTimeout, TimeUnit unit) {
435 setCloseNotifyFlushTimeout(closeNotifyTimeout, unit);
436 }
437
438
439
440
441 @Deprecated
442 public void setCloseNotifyTimeoutMillis(long closeNotifyFlushTimeoutMillis) {
443 setCloseNotifyFlushTimeoutMillis(closeNotifyFlushTimeoutMillis);
444 }
445
446
447
448
449
450
451 public final long getCloseNotifyFlushTimeoutMillis() {
452 return closeNotifyFlushTimeoutMillis;
453 }
454
455
456
457
458
459
460 public final void setCloseNotifyFlushTimeout(long closeNotifyFlushTimeout, TimeUnit unit) {
461 setCloseNotifyFlushTimeoutMillis(unit.toMillis(closeNotifyFlushTimeout));
462 }
463
464
465
466
467 public final void setCloseNotifyFlushTimeoutMillis(long closeNotifyFlushTimeoutMillis) {
468 this.closeNotifyFlushTimeoutMillis = checkPositiveOrZero(closeNotifyFlushTimeoutMillis,
469 "closeNotifyFlushTimeoutMillis");
470 }
471
472
473
474
475
476
477 public final long getCloseNotifyReadTimeoutMillis() {
478 return closeNotifyReadTimeoutMillis;
479 }
480
481
482
483
484
485
486 public final void setCloseNotifyReadTimeout(long closeNotifyReadTimeout, TimeUnit unit) {
487 setCloseNotifyReadTimeoutMillis(unit.toMillis(closeNotifyReadTimeout));
488 }
489
490
491
492
493 public final void setCloseNotifyReadTimeoutMillis(long closeNotifyReadTimeoutMillis) {
494 this.closeNotifyReadTimeoutMillis = checkPositiveOrZero(closeNotifyReadTimeoutMillis,
495 "closeNotifyReadTimeoutMillis");
496 }
497
498
499
500
501 public SSLEngine engine() {
502 return engine;
503 }
504
505
506
507
508
509
510 public String applicationProtocol() {
511 SSLEngine engine = engine();
512 if (!(engine instanceof ApplicationProtocolAccessor)) {
513 return null;
514 }
515
516 return ((ApplicationProtocolAccessor) engine).getNegotiatedApplicationProtocol();
517 }
518
519
520
521
522
523
524
525 public Future<Channel> handshakeFuture() {
526 return handshakePromise.asFuture();
527 }
528
529
530
531
532
533
534
535 public Future<Void> closeOutbound() {
536 final ChannelHandlerContext ctx = this.ctx;
537 Promise<Void> promise = ctx.newPromise();
538 if (ctx.executor().inEventLoop()) {
539 closeOutbound0(promise);
540 } else {
541 ctx.executor().execute(() -> closeOutbound0(promise));
542 }
543 return promise.asFuture();
544 }
545
546 private void closeOutbound0(Promise<Void> promise) {
547 setState(STATE_OUTBOUND_CLOSED);
548 engine.closeOutbound();
549 try {
550 flush(ctx, promise);
551 } catch (Exception e) {
552 if (!promise.tryFailure(e)) {
553 logger.warn("{} flush() raised a masked exception.", ctx.channel(), e);
554 }
555 }
556 }
557
558
559
560
561
562
563
564
565 public Future<Channel> sslCloseFuture() {
566 return sslClosePromise.asFuture();
567 }
568
569 @Override
570 public void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
571 try (AutoCloseable ignore = autoClosing(engine)) {
572 if (pendingUnencryptedWrites != null && !pendingUnencryptedWrites.isEmpty()) {
573
574 pendingUnencryptedWrites.releaseAndFailAll(ctx,
575 new ChannelException("Pending write on removal of SslHandler"));
576 }
577 pendingUnencryptedWrites = null;
578
579 SSLException cause = null;
580
581
582
583
584 if (!handshakePromise.isDone()) {
585 cause = new SSLHandshakeException("SslHandler removed before handshake completed");
586 if (handshakePromise.tryFailure(cause)) {
587 ctx.fireChannelInboundEvent(new SslHandshakeCompletionEvent(
588 engine().getSession(), applicationProtocol(), cause));
589 }
590 }
591 if (!sslClosePromise.isDone()) {
592 if (cause == null) {
593 cause = new SSLException("SslHandler removed before SSLEngine was closed");
594 }
595 notifyClosePromise(cause);
596 }
597 }
598 }
599
600 @Override
601 public Future<Void> disconnect(final ChannelHandlerContext ctx) {
602 return closeOutboundAndChannel(ctx, true);
603 }
604
605 @Override
606 public Future<Void> close(final ChannelHandlerContext ctx) {
607 return closeOutboundAndChannel(ctx, false);
608 }
609
610 @Override
611 public void read(ChannelHandlerContext ctx) {
612 if (!handshakePromise.isDone()) {
613 setState(STATE_READ_DURING_HANDSHAKE);
614 }
615
616 ctx.read();
617 }
618
619 private static IllegalStateException newPendingWritesNullException() {
620 return new IllegalStateException("pendingUnencryptedWrites is null, handlerRemoved0 called?");
621 }
622
623 @Override
624 public Future<Void> write(final ChannelHandlerContext ctx, Object msg) {
625 if (!(msg instanceof Buffer)) {
626 UnsupportedMessageTypeException exception = new UnsupportedMessageTypeException(msg, Buffer.class);
627 logger.warn(exception);
628 SilentDispose.dispose(msg, logger);
629 return ctx.newFailedFuture(exception);
630 }
631 if (pendingUnencryptedWrites == null) {
632 IllegalStateException exception = newPendingWritesNullException();
633 try {
634 Resource.dispose(msg);
635 } catch (Exception e) {
636 exception.addSuppressed(e);
637 }
638 return ctx.newFailedFuture(exception);
639 } else {
640 Promise<Void> promise = ctx.newPromise();
641 pendingUnencryptedWrites.add((Buffer) msg, promise);
642 return promise.asFuture();
643 }
644 }
645
646 @Override
647 public void flush(ChannelHandlerContext ctx) {
648
649
650 if (startTls && !isStateSet(STATE_SENT_FIRST_MESSAGE)) {
651 setState(STATE_SENT_FIRST_MESSAGE);
652 pendingUnencryptedWrites.writeAndRemoveAll(ctx);
653 forceFlush(ctx);
654
655
656 startHandshakeProcessing(true);
657 return;
658 }
659
660 if (isStateSet(STATE_PROCESS_TASK)) {
661 return;
662 }
663
664 try {
665 wrapAndFlush(ctx);
666 } catch (Throwable cause) {
667 setHandshakeFailure(ctx, cause);
668 }
669 }
670
671 private void wrapAndFlush(ChannelHandlerContext ctx) throws SSLException {
672 if (pendingUnencryptedWrites.isEmpty()) {
673 pendingUnencryptedWrites.add(ctx.bufferAllocator().allocate(0), ctx.newPromise());
674 }
675 if (!handshakePromise.isDone()) {
676 setState(STATE_FLUSHED_BEFORE_HANDSHAKE);
677 }
678 try {
679 wrap(ctx, false);
680 } finally {
681
682
683 forceFlush(ctx);
684 }
685 }
686
687
688 private void wrap(ChannelHandlerContext ctx, boolean inUnwrap) throws SSLException {
689 Buffer out = null;
690 BufferAllocator alloc = ctx.bufferAllocator();
691 try {
692 final int wrapDataSize = this.wrapDataSize;
693
694
695 outer: while (!ctx.isRemoved()) {
696 Promise<Void> promise = ctx.newPromise();
697 Buffer buf = wrapDataSize > 0 ?
698 pendingUnencryptedWrites.remove(alloc, wrapDataSize, promise) :
699 pendingUnencryptedWrites.removeFirst(promise);
700 if (buf == null) {
701 break;
702 }
703
704 if (out == null) {
705 out = allocateOutNetBuf(ctx, buf.readableBytes(), buf.countReadableComponents());
706 }
707
708 SSLEngineResult result = wrap(engine, buf, out);
709 if (buf.readableBytes() > 0) {
710 pendingUnencryptedWrites.addFirst(buf, promise);
711
712
713 promise = null;
714 } else {
715 buf.close();
716 }
717
718
719
720 if (out.readableBytes() > 0) {
721 final Buffer b = out;
722 out = null;
723 if (promise != null) {
724 ctx.write(b).cascadeTo(promise);
725 } else {
726 ctx.write(b);
727 }
728 } else if (promise != null) {
729 ctx.write(alloc.allocate(0)).cascadeTo(promise);
730 }
731
732
733 if (result.getStatus() == Status.CLOSED) {
734
735
736 Throwable exception = handshakePromise.isDone() ? handshakePromise.cause() : null;
737 if (exception == null) {
738 exception = sslClosePromise.isDone() ? sslClosePromise.cause() : null;
739 if (exception == null) {
740 exception = new SslClosedEngineException("SSLEngine closed already");
741 }
742 }
743 pendingUnencryptedWrites.releaseAndFailAll(ctx, exception);
744 return;
745 } else {
746 switch (result.getHandshakeStatus()) {
747 case NEED_TASK:
748 if (!runDelegatedTasks(inUnwrap)) {
749
750
751 break outer;
752 }
753 break;
754 case FINISHED:
755 case NOT_HANDSHAKING:
756 setHandshakeSuccess();
757 break;
758 case NEED_WRAP:
759
760
761
762
763 if (result.bytesProduced() > 0 && pendingUnencryptedWrites.isEmpty()) {
764 pendingUnencryptedWrites.add(alloc.allocate(0));
765 }
766 break;
767 case NEED_UNWRAP:
768
769
770 readIfNeeded(ctx);
771 return;
772 default:
773 throw new IllegalStateException(
774 "Unknown handshake status: " + result.getHandshakeStatus());
775 }
776 }
777 }
778 } finally {
779 if (out != null) {
780 out.close();
781 }
782 if (inUnwrap) {
783 setState(STATE_NEEDS_FLUSH);
784 }
785 }
786 }
787
788
789
790
791
792
793
794 private boolean wrapNonAppData(final ChannelHandlerContext ctx, boolean inUnwrap) throws SSLException {
795 Buffer out = null;
796 try {
797
798
799 outer: while (!ctx.isRemoved()) {
800 if (out == null) {
801
802
803
804 out = allocateOutNetBuf(ctx, 2048, 1);
805 }
806 SSLEngineResult result = wrap(engine, null, out);
807 if (result.bytesProduced() > 0) {
808 ctx.write(out).addListener(future -> {
809 Throwable cause = future.cause();
810 if (cause != null) {
811 setHandshakeFailureTransportFailure(ctx, cause);
812 }
813 });
814 if (inUnwrap) {
815 setState(STATE_NEEDS_FLUSH);
816 }
817 out = null;
818 }
819
820 HandshakeStatus status = result.getHandshakeStatus();
821 switch (status) {
822 case FINISHED:
823
824
825
826
827 if (setHandshakeSuccess() && inUnwrap && !pendingUnencryptedWrites.isEmpty()) {
828 wrap(ctx, true);
829 }
830 return false;
831 case NEED_TASK:
832 if (!runDelegatedTasks(inUnwrap)) {
833
834
835 break outer;
836 }
837 break;
838 case NEED_UNWRAP:
839 if (inUnwrap || unwrapNonAppData(ctx) <= 0) {
840
841
842
843 return false;
844 }
845 break;
846 case NEED_WRAP:
847 break;
848 case NOT_HANDSHAKING:
849 if (setHandshakeSuccess() && inUnwrap && !pendingUnencryptedWrites.isEmpty()) {
850 wrap(ctx, true);
851 }
852
853
854 if (!inUnwrap) {
855 unwrapNonAppData(ctx);
856 }
857 return true;
858 default:
859 throw new IllegalStateException("Unknown handshake status: " + result.getHandshakeStatus());
860 }
861
862
863
864 if (result.bytesProduced() == 0 && status != HandshakeStatus.NEED_TASK) {
865 break;
866 }
867
868
869
870 if (result.bytesConsumed() == 0 && result.getHandshakeStatus() == HandshakeStatus.NOT_HANDSHAKING) {
871 break;
872 }
873 }
874 } finally {
875 if (out != null) {
876 out.close();
877 }
878 }
879 return false;
880 }
881
882 private SSLEngineResult wrap(SSLEngine engine, Buffer in, Buffer out)
883 throws SSLException {
884 for (;;) {
885 SSLEngineResult result = engineWrapper.wrap(in, out);
886
887 if (result.getStatus() == Status.BUFFER_OVERFLOW) {
888 out.ensureWritable(engine.getSession().getPacketBufferSize());
889 } else {
890 return result;
891 }
892 }
893 }
894
895 @Override
896 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
897 boolean handshakeFailed = handshakePromise.isFailed();
898
899 ClosedChannelException exception = new ClosedChannelException();
900
901
902 setHandshakeFailure(ctx, exception, !isStateSet(STATE_OUTBOUND_CLOSED), isStateSet(STATE_HANDSHAKE_STARTED),
903 false);
904
905
906 notifyClosePromise(exception);
907
908 try {
909 super.channelInactive(ctx);
910 } catch (DecoderException e) {
911 if (!handshakeFailed || !(e.getCause() instanceof SSLException)) {
912
913
914
915
916
917 throw e;
918 }
919 }
920 }
921
922 @Override
923 public void channelExceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
924 if (ignoreException(cause)) {
925
926
927 if (logger.isDebugEnabled()) {
928 logger.debug(
929 "{} Swallowing a harmless 'connection reset by peer / broken pipe' error that occurred " +
930 "while writing close_notify in response to the peer's close_notify",
931 ctx.channel(), cause);
932 }
933
934
935
936 if (ctx.channel().isActive()) {
937 ctx.close();
938 }
939 } else {
940 ctx.fireChannelExceptionCaught(cause);
941
942 if (cause instanceof SSLException ||
943 cause instanceof DecoderException && cause.getCause() instanceof SSLException) {
944 ctx.close();
945 }
946 }
947 }
948
949
950
951
952
953
954
955
956
957
958 private boolean ignoreException(Throwable t) {
959 if (!(t instanceof SSLException) && t instanceof IOException && sslClosePromise.isDone()) {
960 String message = t.getMessage();
961
962
963
964 if (message != null && IGNORABLE_ERROR_MESSAGE.matcher(message).matches()) {
965 return true;
966 }
967
968
969 StackTraceElement[] elements = t.getStackTrace();
970 for (StackTraceElement element: elements) {
971 String classname = element.getClassName();
972 String methodname = element.getMethodName();
973
974
975 if (classname.startsWith("io.netty5.")) {
976 continue;
977 }
978
979
980 if (!"read".equals(methodname)) {
981 continue;
982 }
983
984
985
986 if (IGNORABLE_CLASS_IN_STACK.matcher(classname).matches()) {
987 return true;
988 }
989
990 try {
991
992
993
994 Class<?> clazz = PlatformDependent.getClassLoader(getClass()).loadClass(classname);
995
996 if (SocketChannel.class.isAssignableFrom(clazz)
997 || DatagramChannel.class.isAssignableFrom(clazz)) {
998 return true;
999 }
1000 } catch (Throwable cause) {
1001 if (logger.isDebugEnabled()) {
1002 logger.debug("Unexpected exception while loading class {} classname {}",
1003 getClass(), classname, cause);
1004 }
1005 }
1006 }
1007 }
1008
1009 return false;
1010 }
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024 public static boolean isEncrypted(Buffer buffer) {
1025 if (buffer.readableBytes() < SslUtils.SSL_RECORD_HEADER_LENGTH) {
1026 throw new IllegalArgumentException(
1027 "buffer must have at least " + SslUtils.SSL_RECORD_HEADER_LENGTH + " readable bytes");
1028 }
1029 return getEncryptedPacketLength(buffer, buffer.readerOffset()) != SslUtils.NOT_ENCRYPTED;
1030 }
1031
1032 private void decodeJdkCompatible(ChannelHandlerContext ctx, Buffer in) throws NotSslRecordException {
1033 int packetLength = this.packetLength;
1034
1035 if (packetLength > 0) {
1036 if (in.readableBytes() < packetLength) {
1037 return;
1038 }
1039 } else {
1040
1041 final int readableBytes = in.readableBytes();
1042 if (readableBytes < SslUtils.SSL_RECORD_HEADER_LENGTH) {
1043 return;
1044 }
1045 packetLength = getEncryptedPacketLength(in, in.readerOffset());
1046 if (packetLength == SslUtils.NOT_ENCRYPTED) {
1047
1048 NotSslRecordException e = new NotSslRecordException(
1049 "not an SSL/TLS record: " + BufferUtil.hexDump(in));
1050 in.skipReadableBytes(in.readableBytes());
1051
1052
1053
1054 setHandshakeFailure(ctx, e);
1055
1056 throw e;
1057 }
1058 assert packetLength > 0;
1059 if (packetLength > readableBytes) {
1060
1061 this.packetLength = packetLength;
1062 return;
1063 }
1064 }
1065
1066
1067
1068 this.packetLength = 0;
1069 try {
1070 final int bytesConsumed = unwrap(ctx, in, packetLength);
1071 assert bytesConsumed == packetLength || engine.isInboundDone() :
1072 "we feed the SSLEngine a packets worth of data: " + packetLength + " but it only consumed: " +
1073 bytesConsumed;
1074 } catch (Throwable cause) {
1075 handleUnwrapThrowable(ctx, cause);
1076 }
1077 }
1078
1079 private void decodeNonJdkCompatible(ChannelHandlerContext ctx, Buffer in) {
1080 try {
1081 unwrap(ctx, in, in.readableBytes());
1082 } catch (Throwable cause) {
1083 handleUnwrapThrowable(ctx, cause);
1084 }
1085 }
1086
1087 private void handleUnwrapThrowable(ChannelHandlerContext ctx, Throwable cause) {
1088 try {
1089
1090
1091
1092
1093 if (handshakePromise.tryFailure(cause)) {
1094 ctx.fireChannelInboundEvent(new SslHandshakeCompletionEvent(
1095 engine().getSession(), applicationProtocol(), cause));
1096 }
1097
1098
1099
1100 wrapAndFlush(ctx);
1101 } catch (SSLException ex) {
1102 logger.debug("SSLException during trying to call SSLEngine.wrap(...)" +
1103 " because of an previous SSLException, ignoring...", ex);
1104 } finally {
1105
1106 setHandshakeFailure(ctx, cause, true, false, true);
1107 }
1108 PlatformDependent.throwException(cause);
1109 }
1110
1111 @Override
1112 protected void decode(ChannelHandlerContext ctx, Buffer in) throws SSLException {
1113 if (isStateSet(STATE_PROCESS_TASK)) {
1114 return;
1115 }
1116 if (jdkCompatibilityMode) {
1117 decodeJdkCompatible(ctx, in);
1118 } else {
1119 decodeNonJdkCompatible(ctx, in);
1120 }
1121 }
1122
1123 @Override
1124 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
1125 channelReadComplete0(ctx);
1126 }
1127
1128 private void channelReadComplete0(ChannelHandlerContext ctx) {
1129
1130 discardSomeReadBytes();
1131
1132 flushIfNeeded(ctx);
1133 readIfNeeded(ctx);
1134
1135 clearState(STATE_FIRE_CHANNEL_READ);
1136 ctx.fireChannelReadComplete();
1137 }
1138
1139 private void readIfNeeded(ChannelHandlerContext ctx) {
1140
1141 if (!ctx.channel().getOption(ChannelOption.AUTO_READ) &&
1142 (!isStateSet(STATE_FIRE_CHANNEL_READ) || !handshakePromise.isDone())) {
1143
1144
1145 ctx.read();
1146 }
1147 }
1148
1149 private void flushIfNeeded(ChannelHandlerContext ctx) {
1150 if (isStateSet(STATE_NEEDS_FLUSH)) {
1151 forceFlush(ctx);
1152 }
1153 }
1154
1155
1156
1157
1158 private int unwrapNonAppData(ChannelHandlerContext ctx) throws SSLException {
1159 return unwrap(ctx, null, 0);
1160 }
1161
1162
1163
1164
1165 private int unwrap(ChannelHandlerContext ctx, Buffer packet, int length) throws SSLException {
1166 final int originalLength = length;
1167 boolean wrapLater = false;
1168 boolean notifyClosure = false;
1169 boolean executedRead = false;
1170 Buffer decodeOut = allocate(ctx, length);
1171 try {
1172
1173
1174 do {
1175
1176
1177
1178
1179 final SSLEngineResult result = engineWrapper.unwrap(packet, length, decodeOut);
1180 final Status status = result.getStatus();
1181 final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
1182 final int produced = result.bytesProduced();
1183 final int consumed = result.bytesConsumed();
1184 length -= consumed;
1185
1186
1187
1188
1189 if (handshakeStatus == HandshakeStatus.FINISHED || handshakeStatus == HandshakeStatus.NOT_HANDSHAKING) {
1190 wrapLater |= (decodeOut.readableBytes() > 0 ?
1191 setHandshakeSuccessUnwrapMarkReentry() : setHandshakeSuccess()) ||
1192 handshakeStatus == HandshakeStatus.FINISHED;
1193 }
1194
1195
1196
1197
1198 if (decodeOut.readableBytes() > 0) {
1199 setState(STATE_FIRE_CHANNEL_READ);
1200 if (isStateSet(STATE_UNWRAP_REENTRY)) {
1201 executedRead = true;
1202 executeChannelRead(ctx, decodeOut);
1203 } else {
1204 ctx.fireChannelRead(decodeOut);
1205 }
1206 decodeOut = null;
1207 }
1208
1209 if (status == Status.CLOSED) {
1210 notifyClosure = true;
1211 } else if (status == Status.BUFFER_OVERFLOW) {
1212 if (decodeOut != null) {
1213 decodeOut.close();
1214 }
1215 final int applicationBufferSize = engine.getSession().getApplicationBufferSize();
1216
1217
1218
1219
1220 decodeOut = allocate(ctx, engineType.calculatePendingData(this, applicationBufferSize < produced ?
1221 applicationBufferSize : applicationBufferSize - produced));
1222 continue;
1223 }
1224
1225 if (handshakeStatus == HandshakeStatus.NEED_TASK) {
1226 boolean pending = runDelegatedTasks(true);
1227 if (!pending) {
1228
1229
1230
1231
1232
1233 wrapLater = false;
1234 break;
1235 }
1236 } else if (handshakeStatus == HandshakeStatus.NEED_WRAP) {
1237
1238
1239
1240 if (wrapNonAppData(ctx, true) && length == 0) {
1241 break;
1242 }
1243 }
1244
1245 if (status == Status.BUFFER_UNDERFLOW ||
1246
1247 handshakeStatus != HandshakeStatus.NEED_TASK &&
1248 (consumed == 0 && produced == 0 ||
1249 length == 0 && handshakeStatus == HandshakeStatus.NOT_HANDSHAKING)) {
1250 if (handshakeStatus == HandshakeStatus.NEED_UNWRAP) {
1251
1252
1253 readIfNeeded(ctx);
1254 }
1255
1256 break;
1257 } else if (decodeOut == null) {
1258 decodeOut = allocate(ctx, length);
1259 }
1260 } while (!ctx.isRemoved());
1261
1262 if (isStateSet(STATE_FLUSHED_BEFORE_HANDSHAKE) && handshakePromise.isDone()) {
1263
1264
1265
1266
1267 clearState(STATE_FLUSHED_BEFORE_HANDSHAKE);
1268 wrapLater = true;
1269 }
1270
1271 if (wrapLater) {
1272 wrap(ctx, true);
1273 }
1274 } finally {
1275 if (decodeOut != null) {
1276 decodeOut.close();
1277 }
1278
1279 if (notifyClosure) {
1280 if (executedRead) {
1281 executeNotifyClosePromise(ctx);
1282 } else {
1283 notifyClosePromise(null);
1284 }
1285 }
1286 }
1287 return originalLength - length;
1288 }
1289
1290 private boolean setHandshakeSuccessUnwrapMarkReentry() {
1291
1292
1293 final boolean setReentryState = !isStateSet(STATE_UNWRAP_REENTRY);
1294 if (setReentryState) {
1295 setState(STATE_UNWRAP_REENTRY);
1296 }
1297 try {
1298 return setHandshakeSuccess();
1299 } finally {
1300
1301
1302 if (setReentryState) {
1303 clearState(STATE_UNWRAP_REENTRY);
1304 }
1305 }
1306 }
1307
1308 private void executeNotifyClosePromise(final ChannelHandlerContext ctx) {
1309 try {
1310 ctx.executor().execute(() -> notifyClosePromise(null));
1311 } catch (RejectedExecutionException e) {
1312 notifyClosePromise(e);
1313 }
1314 }
1315
1316 private static void executeChannelRead(final ChannelHandlerContext ctx, final Buffer decodedOut) {
1317 try {
1318 ctx.executor().execute(() -> ctx.fireChannelRead(decodedOut));
1319 } catch (RejectedExecutionException e) {
1320 decodedOut.close();
1321 throw e;
1322 }
1323 }
1324
1325 private static boolean inEventLoop(Executor executor) {
1326 return executor instanceof EventExecutor && ((EventExecutor) executor).inEventLoop();
1327 }
1328
1329
1330
1331
1332
1333
1334
1335
1336 private boolean runDelegatedTasks(boolean inUnwrap) {
1337 if (delegatedTaskExecutor == ImmediateExecutor.INSTANCE || inEventLoop(delegatedTaskExecutor)) {
1338
1339
1340 for (;;) {
1341 Runnable task = engine.getDelegatedTask();
1342 if (task == null) {
1343 return true;
1344 }
1345 setState(STATE_PROCESS_TASK);
1346 if (task instanceof AsyncRunnable) {
1347
1348 boolean pending = false;
1349 try {
1350 AsyncRunnable asyncTask = (AsyncRunnable) task;
1351 AsyncTaskCompletionHandler completionHandler = new AsyncTaskCompletionHandler(inUnwrap);
1352 asyncTask.run(completionHandler);
1353 pending = completionHandler.resumeLater();
1354 if (pending) {
1355 return false;
1356 }
1357 } finally {
1358 if (!pending) {
1359
1360
1361 clearState(STATE_PROCESS_TASK);
1362 }
1363 }
1364 } else {
1365 try {
1366 task.run();
1367 } finally {
1368 clearState(STATE_PROCESS_TASK);
1369 }
1370 }
1371 }
1372 } else {
1373 executeDelegatedTask(inUnwrap);
1374 return false;
1375 }
1376 }
1377
1378 private SslTasksRunner getTaskRunner(boolean inUnwrap) {
1379 return inUnwrap ? sslTaskRunnerForUnwrap : sslTaskRunner;
1380 }
1381
1382 private void executeDelegatedTask(boolean inUnwrap) {
1383 executeDelegatedTask(getTaskRunner(inUnwrap));
1384 }
1385
1386 private void executeDelegatedTask(SslTasksRunner task) {
1387 setState(STATE_PROCESS_TASK);
1388 try {
1389 delegatedTaskExecutor.execute(task);
1390 } catch (RejectedExecutionException e) {
1391 clearState(STATE_PROCESS_TASK);
1392 throw e;
1393 }
1394 }
1395
1396 private final class AsyncTaskCompletionHandler implements Runnable {
1397 private final boolean inUnwrap;
1398 boolean didRun;
1399 boolean resumeLater;
1400
1401 AsyncTaskCompletionHandler(boolean inUnwrap) {
1402 this.inUnwrap = inUnwrap;
1403 }
1404
1405 @Override
1406 public void run() {
1407 didRun = true;
1408 if (resumeLater) {
1409 getTaskRunner(inUnwrap).runComplete();
1410 }
1411 }
1412
1413 boolean resumeLater() {
1414 if (!didRun) {
1415 resumeLater = true;
1416 return true;
1417 }
1418 return false;
1419 }
1420 }
1421
1422
1423
1424
1425
1426 private final class SslTasksRunner implements Runnable {
1427 private final boolean inUnwrap;
1428 private final Runnable runCompleteTask = this::runComplete;
1429
1430 SslTasksRunner(boolean inUnwrap) {
1431 this.inUnwrap = inUnwrap;
1432 }
1433
1434
1435 private void taskError(Throwable e) {
1436 if (inUnwrap) {
1437
1438
1439
1440
1441 try {
1442 handleUnwrapThrowable(ctx, e);
1443 } catch (Throwable cause) {
1444 safeExceptionCaught(cause);
1445 }
1446 } else {
1447 setHandshakeFailure(ctx, e);
1448 forceFlush(ctx);
1449 }
1450 }
1451
1452
1453 private void safeExceptionCaught(Throwable cause) {
1454 try {
1455 channelExceptionCaught(ctx, wrapIfNeeded(cause));
1456 } catch (Throwable error) {
1457 ctx.fireChannelExceptionCaught(error);
1458 }
1459 }
1460
1461 private Throwable wrapIfNeeded(Throwable cause) {
1462 if (!inUnwrap) {
1463
1464 return cause;
1465 }
1466
1467
1468 return cause instanceof DecoderException ? cause : new DecoderException(cause);
1469 }
1470
1471 private void tryDecodeAgain() {
1472 try {
1473 channelRead(ctx, ctx.bufferAllocator().allocate(0));
1474 } catch (Throwable cause) {
1475 safeExceptionCaught(cause);
1476 } finally {
1477
1478
1479
1480 channelReadComplete0(ctx);
1481 }
1482 }
1483
1484
1485
1486
1487
1488 private void resumeOnEventExecutor() {
1489 assert ctx.executor().inEventLoop();
1490 clearState(STATE_PROCESS_TASK);
1491 try {
1492 HandshakeStatus status = engine.getHandshakeStatus();
1493 switch (status) {
1494
1495
1496 case NEED_TASK:
1497 executeDelegatedTask(this);
1498
1499 break;
1500
1501
1502 case FINISHED:
1503
1504 case NOT_HANDSHAKING:
1505 setHandshakeSuccess();
1506 try {
1507
1508
1509 wrap(ctx, inUnwrap);
1510 } catch (Throwable e) {
1511 taskError(e);
1512 return;
1513 }
1514 if (inUnwrap) {
1515
1516
1517 unwrapNonAppData(ctx);
1518 }
1519
1520
1521 forceFlush(ctx);
1522
1523 tryDecodeAgain();
1524 break;
1525
1526
1527
1528 case NEED_UNWRAP:
1529 try {
1530 unwrapNonAppData(ctx);
1531 } catch (SSLException e) {
1532 handleUnwrapThrowable(ctx, e);
1533 return;
1534 }
1535 tryDecodeAgain();
1536 break;
1537
1538
1539
1540 case NEED_WRAP:
1541 try {
1542 if (!wrapNonAppData(ctx, false) && inUnwrap) {
1543
1544
1545
1546
1547 unwrapNonAppData(ctx);
1548 }
1549
1550
1551 forceFlush(ctx);
1552 } catch (Throwable e) {
1553 taskError(e);
1554 return;
1555 }
1556
1557
1558 tryDecodeAgain();
1559 break;
1560
1561 default:
1562
1563 throw new AssertionError();
1564 }
1565 } catch (Throwable cause) {
1566 safeExceptionCaught(cause);
1567 }
1568 }
1569
1570 void runComplete() {
1571 EventExecutor executor = ctx.executor();
1572
1573
1574
1575
1576
1577
1578
1579 executor.execute(this::resumeOnEventExecutor);
1580 }
1581
1582 @Override
1583 public void run() {
1584 try {
1585 Runnable task = engine.getDelegatedTask();
1586 if (task == null) {
1587
1588 return;
1589 }
1590 if (task instanceof AsyncRunnable) {
1591 AsyncRunnable asyncTask = (AsyncRunnable) task;
1592 asyncTask.run(runCompleteTask);
1593 } else {
1594 task.run();
1595 runComplete();
1596 }
1597 } catch (Throwable cause) {
1598 handleException(cause);
1599 }
1600 }
1601
1602 private void handleException(final Throwable cause) {
1603 EventExecutor executor = ctx.executor();
1604 if (executor.inEventLoop()) {
1605 clearState(STATE_PROCESS_TASK);
1606 safeExceptionCaught(cause);
1607 } else {
1608 try {
1609 executor.execute(() -> {
1610 clearState(STATE_PROCESS_TASK);
1611 safeExceptionCaught(cause);
1612 });
1613 } catch (RejectedExecutionException ignore) {
1614 clearState(STATE_PROCESS_TASK);
1615
1616
1617 ctx.fireChannelExceptionCaught(cause);
1618 }
1619 }
1620 }
1621 }
1622
1623
1624
1625
1626
1627
1628 private boolean setHandshakeSuccess() {
1629
1630
1631
1632 final boolean notified = !handshakePromise.isDone() && handshakePromise.trySuccess(ctx.channel());
1633 if (notified) {
1634 if (logger.isDebugEnabled()) {
1635 SSLSession session = engine.getSession();
1636 logger.debug(
1637 "{} HANDSHAKEN: protocol:{} cipher suite:{}",
1638 ctx.channel(),
1639 session.getProtocol(),
1640 session.getCipherSuite());
1641 }
1642 ctx.fireChannelInboundEvent(
1643 new SslHandshakeCompletionEvent(engine().getSession(), applicationProtocol()));
1644 }
1645 if (isStateSet(STATE_READ_DURING_HANDSHAKE)) {
1646 clearState(STATE_READ_DURING_HANDSHAKE);
1647 if (!ctx.channel().getOption(ChannelOption.AUTO_READ)) {
1648 ctx.read();
1649 }
1650 }
1651 return notified;
1652 }
1653
1654
1655
1656
1657 private void setHandshakeFailure(ChannelHandlerContext ctx, Throwable cause) {
1658 setHandshakeFailure(ctx, cause, true, true, false);
1659 }
1660
1661
1662
1663
1664 private void setHandshakeFailure(ChannelHandlerContext ctx, Throwable cause, boolean closeInbound,
1665 boolean notify, boolean alwaysFlushAndClose) {
1666 try {
1667
1668 setState(STATE_OUTBOUND_CLOSED);
1669 engine.closeOutbound();
1670
1671 if (closeInbound) {
1672 try {
1673 engine.closeInbound();
1674 } catch (SSLException e) {
1675 if (logger.isDebugEnabled()) {
1676
1677
1678
1679
1680 String msg = e.getMessage();
1681 if (msg == null || !(msg.contains("possible truncation attack") ||
1682 msg.contains("closing inbound before receiving peer's close_notify"))) {
1683 logger.debug("{} SSLEngine.closeInbound() raised an exception.", ctx.channel(), e);
1684 }
1685 }
1686 }
1687 }
1688 if (handshakePromise.tryFailure(cause) || alwaysFlushAndClose) {
1689 SslUtils.handleHandshakeFailure(
1690 ctx, engine().getSession(), applicationProtocol(), cause, notify);
1691 }
1692 } finally {
1693
1694 releaseAndFailAll(ctx, cause);
1695 }
1696 }
1697
1698 private void setHandshakeFailureTransportFailure(ChannelHandlerContext ctx, Throwable cause) {
1699
1700
1701
1702 try {
1703 SSLException transportFailure = new SSLException("failure when writing TLS control frames", cause);
1704 releaseAndFailAll(ctx, transportFailure);
1705 if (handshakePromise.tryFailure(transportFailure)) {
1706 ctx.fireChannelInboundEvent(new SslHandshakeCompletionEvent(
1707 engine().getSession(), applicationProtocol(), transportFailure));
1708 }
1709 } finally {
1710 ctx.close();
1711 }
1712 }
1713
1714 private void releaseAndFailAll(ChannelHandlerContext ctx, Throwable cause) {
1715 if (pendingUnencryptedWrites != null) {
1716 pendingUnencryptedWrites.releaseAndFailAll(ctx, cause);
1717 }
1718 }
1719
1720 private void notifyClosePromise(Throwable cause) {
1721 if (cause == null) {
1722 if (sslClosePromise.trySuccess(ctx.channel())) {
1723 ctx.fireChannelInboundEvent(new SslCloseCompletionEvent(engine().getSession()));
1724 }
1725 } else {
1726 if (sslClosePromise.tryFailure(cause)) {
1727 ctx.fireChannelInboundEvent(new SslCloseCompletionEvent(engine().getSession(), cause));
1728 }
1729 }
1730 }
1731
1732 private Future<Void> closeOutboundAndChannel(
1733 final ChannelHandlerContext ctx, boolean disconnect) {
1734 setState(STATE_OUTBOUND_CLOSED);
1735 engine.closeOutbound();
1736
1737 if (!ctx.channel().isActive()) {
1738 if (disconnect) {
1739 return ctx.disconnect();
1740 }
1741 return ctx.close();
1742 }
1743 Promise<Void> promise = ctx.newPromise();
1744 Promise<Void> closeNotifyPromise = ctx.newPromise();
1745 try {
1746 flush(ctx, closeNotifyPromise);
1747 } finally {
1748 if (!isStateSet(STATE_CLOSE_NOTIFY)) {
1749 setState(STATE_CLOSE_NOTIFY);
1750
1751
1752
1753
1754
1755
1756
1757
1758 Promise<Void> cascade = ctx.newPromise();
1759 cascade.asFuture().cascadeTo(promise);
1760 safeClose(ctx, closeNotifyPromise.asFuture(), cascade);
1761 } else {
1762
1763 sslCloseFuture().addListener(future -> promise.setSuccess(null));
1764 }
1765 }
1766 return promise.asFuture();
1767 }
1768
1769 private void flush(ChannelHandlerContext ctx, Promise<Void> promise) {
1770 if (pendingUnencryptedWrites != null) {
1771 pendingUnencryptedWrites.add(ctx.bufferAllocator().allocate(0), promise);
1772 } else {
1773 promise.setFailure(newPendingWritesNullException());
1774 }
1775 flush(ctx);
1776 }
1777
1778 @Override
1779 public void handlerAdded0(final ChannelHandlerContext ctx) throws Exception {
1780 this.ctx = ctx;
1781
1782 Channel channel = ctx.channel();
1783 pendingUnencryptedWrites = new SslHandlerCoalescingBufferQueue(16);
1784
1785 setOpensslEngineSocketFd(channel);
1786 boolean fastOpen = channel.isOptionSupported(ChannelOption.TCP_FASTOPEN_CONNECT) &&
1787 Boolean.TRUE.equals(channel.getOption(ChannelOption.TCP_FASTOPEN_CONNECT));
1788 boolean active = channel.isActive();
1789 if (active || fastOpen) {
1790
1791
1792
1793 startHandshakeProcessing(active);
1794
1795
1796 if (fastOpen) {
1797 setState(STATE_NEEDS_FLUSH);
1798 }
1799 }
1800 }
1801
1802 private void startHandshakeProcessing(boolean flushAtEnd) {
1803 if (!isStateSet(STATE_HANDSHAKE_STARTED)) {
1804 setState(STATE_HANDSHAKE_STARTED);
1805 if (engine.getUseClientMode()) {
1806
1807
1808
1809 handshake(flushAtEnd);
1810 }
1811 applyHandshakeTimeout();
1812 } else if (isStateSet(STATE_NEEDS_FLUSH)) {
1813 forceFlush(ctx);
1814 }
1815 }
1816
1817
1818
1819
1820 public Future<Channel> renegotiate() {
1821 ChannelHandlerContext ctx = this.ctx;
1822 if (ctx == null) {
1823 throw new IllegalStateException();
1824 }
1825
1826 return renegotiate(ctx.executor().newPromise());
1827 }
1828
1829
1830
1831
1832 public Future<Channel> renegotiate(final Promise<Channel> promise) {
1833 requireNonNull(promise, "promise");
1834
1835 ChannelHandlerContext ctx = this.ctx;
1836 if (ctx == null) {
1837 throw new IllegalStateException();
1838 }
1839
1840 EventExecutor executor = ctx.executor();
1841 if (!executor.inEventLoop()) {
1842 executor.execute(() -> renegotiateOnEventLoop(promise));
1843 return promise.asFuture();
1844 }
1845
1846 renegotiateOnEventLoop(promise);
1847 return promise.asFuture();
1848 }
1849
1850 private void renegotiateOnEventLoop(final Promise<Channel> newHandshakePromise) {
1851 Future<Channel> oldHandshakePromise = handshakeFuture();
1852 if (!oldHandshakePromise.isDone()) {
1853
1854
1855 oldHandshakePromise.cascadeTo(newHandshakePromise);
1856 } else {
1857 handshakePromise = newHandshakePromise;
1858 handshake(true);
1859 applyHandshakeTimeout();
1860 }
1861 }
1862
1863
1864
1865
1866
1867
1868
1869 private void handshake(boolean flushAtEnd) {
1870 if (engine.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING) {
1871
1872
1873 return;
1874 }
1875 if (handshakePromise.isDone()) {
1876
1877
1878
1879
1880
1881 return;
1882 }
1883
1884
1885 final ChannelHandlerContext ctx = this.ctx;
1886 try {
1887 engine.beginHandshake();
1888 wrapNonAppData(ctx, false);
1889 } catch (Throwable e) {
1890 setHandshakeFailure(ctx, e);
1891 } finally {
1892 if (flushAtEnd) {
1893 forceFlush(ctx);
1894 }
1895 }
1896 }
1897
1898 private void applyHandshakeTimeout() {
1899 final Promise<Channel> localHandshakePromise = handshakePromise;
1900
1901
1902 final long handshakeTimeoutMillis = this.handshakeTimeoutMillis;
1903 if (handshakeTimeoutMillis <= 0 || localHandshakePromise.isDone()) {
1904 return;
1905 }
1906
1907 Future<?> timeoutFuture = ctx.executor().schedule(() -> {
1908 if (localHandshakePromise.isDone()) {
1909 return;
1910 }
1911
1912 SSLException exception =
1913 new SslHandshakeTimeoutException("handshake timed out after " + handshakeTimeoutMillis + "ms");
1914 try {
1915 if (localHandshakePromise.tryFailure(exception)) {
1916 SslUtils.handleHandshakeFailure(
1917 ctx, engine().getSession(), applicationProtocol(), exception, true);
1918 }
1919 } finally {
1920 releaseAndFailAll(ctx, exception);
1921 }
1922 }, handshakeTimeoutMillis, TimeUnit.MILLISECONDS);
1923
1924
1925 localHandshakePromise.asFuture().addListener(f -> timeoutFuture.cancel());
1926 }
1927
1928 private void forceFlush(ChannelHandlerContext ctx) {
1929 clearState(STATE_NEEDS_FLUSH);
1930 ctx.flush();
1931 }
1932
1933 private void setOpensslEngineSocketFd(Channel c) {
1934 if (c instanceof UnixChannel && engine instanceof ReferenceCountedOpenSslEngine) {
1935 ((ReferenceCountedOpenSslEngine) engine).bioSetFd(((UnixChannel) c).fd().intValue());
1936 }
1937 }
1938
1939
1940
1941
1942 @Override
1943 public void channelActive(final ChannelHandlerContext ctx) throws Exception {
1944 setOpensslEngineSocketFd(ctx.channel());
1945 if (!startTls) {
1946 startHandshakeProcessing(true);
1947 }
1948 ctx.fireChannelActive();
1949 }
1950
1951 private void safeClose(
1952 final ChannelHandlerContext ctx, final Future<Void> flushFuture,
1953 final Promise<Void> promise) {
1954 if (!ctx.channel().isActive()) {
1955 ctx.close().cascadeTo(promise);
1956 return;
1957 }
1958
1959 Future<?> timeoutFuture;
1960 if (!flushFuture.isDone()) {
1961 long closeNotifyTimeout = closeNotifyFlushTimeoutMillis;
1962 if (closeNotifyTimeout > 0) {
1963
1964 timeoutFuture = ctx.executor().schedule(() -> {
1965
1966 if (!flushFuture.isDone()) {
1967 logger.warn("{} Last write attempt timed out; force-closing the connection.",
1968 ctx.channel());
1969 addCloseListener(ctx.close(), promise);
1970 }
1971 }, closeNotifyTimeout, TimeUnit.MILLISECONDS);
1972 } else {
1973 timeoutFuture = null;
1974 }
1975 } else {
1976 timeoutFuture = null;
1977 }
1978
1979
1980 flushFuture.addListener(f -> {
1981 if (timeoutFuture != null) {
1982 timeoutFuture.cancel();
1983 }
1984 final long closeNotifyReadTimeout = closeNotifyReadTimeoutMillis;
1985 if (closeNotifyReadTimeout <= 0) {
1986 if (ctx.channel().isActive()) {
1987
1988
1989 addCloseListener(ctx.close(), promise);
1990 } else {
1991 promise.trySuccess(null);
1992 }
1993 } else {
1994 Future<?> closeNotifyReadTimeoutFuture;
1995
1996 Future<Channel> closeFuture = sslCloseFuture();
1997
1998 if (!closeFuture.isDone()) {
1999 closeNotifyReadTimeoutFuture = ctx.executor().schedule(() -> {
2000 if (!closeFuture.isDone()) {
2001 logger.debug(
2002 "{} did not receive close_notify in {}ms; force-closing the connection.",
2003 ctx.channel(), closeNotifyReadTimeout);
2004
2005
2006 addCloseListener(ctx.close(), promise);
2007 }
2008 }, closeNotifyReadTimeout, TimeUnit.MILLISECONDS);
2009 } else {
2010 closeNotifyReadTimeoutFuture = null;
2011 }
2012
2013
2014 closeFuture.addListener(future -> {
2015 if (closeNotifyReadTimeoutFuture != null) {
2016 closeNotifyReadTimeoutFuture.cancel();
2017 }
2018 if (ctx.channel().isActive()) {
2019 addCloseListener(ctx.close(), promise);
2020 } else {
2021 promise.trySuccess(null);
2022 }
2023 });
2024 }
2025 });
2026 }
2027
2028 private static void addCloseListener(Future<Void> future, Promise<Void> promise) {
2029
2030
2031
2032
2033
2034
2035 future.cascadeTo(promise);
2036 }
2037
2038
2039
2040
2041 private Buffer allocate(ChannelHandlerContext ctx, int capacity) {
2042 BufferAllocator alloc = ctx.bufferAllocator();
2043
2044 boolean hasDirect = alloc.getAllocationType().isDirect();
2045 boolean wantsDirect = engineType.wantsDirectBuffer;
2046 if (hasDirect && !wantsDirect) {
2047 alloc = DefaultBufferAllocators.onHeapAllocator();
2048 } else if (!hasDirect && wantsDirect) {
2049 alloc = DefaultBufferAllocators.offHeapAllocator();
2050 }
2051
2052 return alloc.allocate(capacity);
2053 }
2054
2055
2056
2057
2058
2059 private Buffer allocateOutNetBuf(ChannelHandlerContext ctx, int pendingBytes, int numComponents) {
2060 return engineType.allocateWrapBuffer(this, ctx.bufferAllocator(), pendingBytes, numComponents);
2061 }
2062
2063 private boolean isStateSet(int bit) {
2064 return (state & bit) == bit;
2065 }
2066
2067 private void setState(int bit) {
2068 state |= bit;
2069 }
2070
2071 private void clearState(int bit) {
2072 state &= ~bit;
2073 }
2074
2075 @Override
2076 public long pendingOutboundBytes(ChannelHandlerContext ctx) {
2077 return pendingUnencryptedWrites == null ? 0 : pendingUnencryptedWrites.readableBytes();
2078 }
2079
2080
2081
2082
2083
2084
2085 private final class SslHandlerCoalescingBufferQueue extends AbstractCoalescingBufferQueue {
2086
2087 SslHandlerCoalescingBufferQueue(int initSize) {
2088 super(initSize);
2089 }
2090
2091 @Override
2092 protected Buffer compose(BufferAllocator alloc, Buffer cumulation, Buffer next) {
2093 if (cumulation.writableBytes() >= next.readableBytes()) {
2094 cumulation.writeBytes(next);
2095 next.close();
2096 return cumulation;
2097 }
2098 if (cumulation instanceof CompositeBuffer) {
2099 CompositeBuffer composite = (CompositeBuffer) cumulation;
2100 if (next.readableBytes() < wrapDataSize / 2) {
2101 composite.ensureWritable(wrapDataSize);
2102 composite.writeBytes(next);
2103 next.close();
2104 } else {
2105 composite.extendWith(next.send());
2106 }
2107 return composite;
2108 }
2109 int minIncrement = 2 * wrapDataSize;
2110 cumulation = copyAndCompose(alloc, cumulation, next, minIncrement);
2111 return cumulation;
2112 }
2113
2114 @Override
2115 protected Buffer composeFirst(BufferAllocator allocator, Buffer first) {
2116 if (first instanceof CompositeBuffer) {
2117 CompositeBuffer composite = (CompositeBuffer) first;
2118 if (engineType.wantsDirectBuffer && !allocator.getAllocationType().isDirect()) {
2119 first = DefaultBufferAllocators.offHeapAllocator().allocate(composite.readableBytes());
2120 } else {
2121 first = allocator.allocate(composite.readableBytes());
2122 }
2123 try {
2124 first.writeBytes(composite);
2125 } catch (Throwable cause) {
2126 first.close();
2127 throw cause;
2128 }
2129 composite.close();
2130 }
2131 return first;
2132 }
2133
2134 @Override
2135 protected Buffer removeEmptyValue() {
2136 return null;
2137 }
2138 }
2139
2140 private final class LazyPromise extends DefaultPromise<Channel> {
2141
2142 LazyPromise() {
2143 super(ImmediateEventExecutor.INSTANCE);
2144 }
2145
2146 @Override
2147 protected void checkDeadLock() {
2148 if (ctx == null) {
2149
2150
2151
2152
2153
2154 return;
2155 }
2156 checkDeadLock(ctx.executor());
2157 }
2158 }
2159 }