1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.ssl;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.ByteBufUtil;
21 import io.netty.buffer.CompositeByteBuf;
22 import io.netty.buffer.Unpooled;
23 import io.netty.channel.Channel;
24 import io.netty.channel.ChannelConfig;
25 import io.netty.channel.ChannelException;
26 import io.netty.channel.ChannelFuture;
27 import io.netty.channel.ChannelFutureListener;
28 import io.netty.channel.ChannelHandlerContext;
29 import io.netty.channel.ChannelInboundHandler;
30 import io.netty.channel.ChannelOption;
31 import io.netty.channel.ChannelOutboundBuffer;
32 import io.netty.channel.ChannelOutboundHandler;
33 import io.netty.channel.ChannelPipeline;
34 import io.netty.channel.ChannelPromise;
35 import io.netty.channel.unix.UnixChannel;
36 import io.netty.handler.codec.ByteToMessageDecoder;
37 import io.netty.handler.codec.DecoderException;
38 import io.netty.handler.codec.UnsupportedMessageTypeException;
39 import io.netty.util.ReferenceCountUtil;
40 import io.netty.util.concurrent.DefaultPromise;
41 import io.netty.util.concurrent.EventExecutor;
42 import io.netty.util.concurrent.Future;
43 import io.netty.util.concurrent.FutureListener;
44 import io.netty.util.concurrent.ImmediateExecutor;
45 import io.netty.util.concurrent.Promise;
46 import io.netty.util.concurrent.PromiseNotifier;
47 import io.netty.util.internal.ObjectUtil;
48 import io.netty.util.internal.PlatformDependent;
49 import io.netty.util.internal.ThrowableUtil;
50 import io.netty.util.internal.UnstableApi;
51 import io.netty.util.internal.logging.InternalLogger;
52 import io.netty.util.internal.logging.InternalLoggerFactory;
53
54 import java.io.IOException;
55 import java.net.SocketAddress;
56 import java.nio.ByteBuffer;
57 import java.nio.channels.ClosedChannelException;
58 import java.nio.channels.DatagramChannel;
59 import java.nio.channels.SocketChannel;
60 import java.security.cert.CertificateException;
61 import java.util.List;
62 import java.util.concurrent.Executor;
63 import java.util.concurrent.RejectedExecutionException;
64 import java.util.concurrent.TimeUnit;
65 import java.util.regex.Pattern;
66 import javax.net.ssl.SSLEngine;
67 import javax.net.ssl.SSLEngineResult;
68 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
69 import javax.net.ssl.SSLEngineResult.Status;
70 import javax.net.ssl.SSLException;
71 import javax.net.ssl.SSLHandshakeException;
72 import javax.net.ssl.SSLSession;
73
74 import static io.netty.handler.ssl.SslUtils.NOT_ENOUGH_DATA;
75 import static io.netty.handler.ssl.SslUtils.getEncryptedPacketLength;
76 import static io.netty.util.internal.ObjectUtil.checkNotNull;
77 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169 public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundHandler {
170 private static final InternalLogger logger =
171 InternalLoggerFactory.getInstance(SslHandler.class);
172 private static final Pattern IGNORABLE_CLASS_IN_STACK = Pattern.compile(
173 "^.*(?:Socket|Datagram|Sctp|Udt)Channel.*$");
174 private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile(
175 "^.*(?:connection.*(?:reset|closed|abort|broken)|broken.*pipe).*$", Pattern.CASE_INSENSITIVE);
176 private static final int STATE_SENT_FIRST_MESSAGE = 1;
177 private static final int STATE_FLUSHED_BEFORE_HANDSHAKE = 1 << 1;
178 private static final int STATE_READ_DURING_HANDSHAKE = 1 << 2;
179 private static final int STATE_HANDSHAKE_STARTED = 1 << 3;
180
181
182
183
184 private static final int STATE_NEEDS_FLUSH = 1 << 4;
185 private static final int STATE_OUTBOUND_CLOSED = 1 << 5;
186 private static final int STATE_CLOSE_NOTIFY = 1 << 6;
187 private static final int STATE_PROCESS_TASK = 1 << 7;
188
189
190
191
192 private static final int STATE_FIRE_CHANNEL_READ = 1 << 8;
193 private static final int STATE_UNWRAP_REENTRY = 1 << 9;
194
195
196
197
198
199 private static final int MAX_PLAINTEXT_LENGTH = 16 * 1024;
200
201 private enum SslEngineType {
202 TCNATIVE(true, COMPOSITE_CUMULATOR) {
203 @Override
204 SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException {
205 int nioBufferCount = in.nioBufferCount();
206 int writerIndex = out.writerIndex();
207 final SSLEngineResult result;
208 if (nioBufferCount > 1) {
209
210
211
212
213
214 ReferenceCountedOpenSslEngine opensslEngine = (ReferenceCountedOpenSslEngine) handler.engine;
215 try {
216 handler.singleBuffer[0] = toByteBuffer(out, writerIndex, out.writableBytes());
217 result = opensslEngine.unwrap(in.nioBuffers(in.readerIndex(), len), handler.singleBuffer);
218 } finally {
219 handler.singleBuffer[0] = null;
220 }
221 } else {
222 result = handler.engine.unwrap(toByteBuffer(in, in.readerIndex(), len),
223 toByteBuffer(out, writerIndex, out.writableBytes()));
224 }
225 out.writerIndex(writerIndex + result.bytesProduced());
226 return result;
227 }
228
229 @Override
230 ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
231 int pendingBytes, int numComponents) {
232 return allocator.directBuffer(((ReferenceCountedOpenSslEngine) handler.engine)
233 .calculateOutNetBufSize(pendingBytes, numComponents));
234 }
235
236 @Override
237 int calculateRequiredOutBufSpace(SslHandler handler, int pendingBytes, int numComponents) {
238 return ((ReferenceCountedOpenSslEngine) handler.engine)
239 .calculateMaxLengthForWrap(pendingBytes, numComponents);
240 }
241
242 @Override
243 int calculatePendingData(SslHandler handler, int guess) {
244 int sslPending = ((ReferenceCountedOpenSslEngine) handler.engine).sslPending();
245 return sslPending > 0 ? sslPending : guess;
246 }
247
248 @Override
249 boolean jdkCompatibilityMode(SSLEngine engine) {
250 return ((ReferenceCountedOpenSslEngine) engine).jdkCompatibilityMode;
251 }
252 },
253 CONSCRYPT(true, COMPOSITE_CUMULATOR) {
254 @Override
255 SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException {
256 int nioBufferCount = in.nioBufferCount();
257 int writerIndex = out.writerIndex();
258 final SSLEngineResult result;
259 if (nioBufferCount > 1) {
260
261
262
263 try {
264 handler.singleBuffer[0] = toByteBuffer(out, writerIndex, out.writableBytes());
265 result = ((ConscryptAlpnSslEngine) handler.engine).unwrap(
266 in.nioBuffers(in.readerIndex(), len),
267 handler.singleBuffer);
268 } finally {
269 handler.singleBuffer[0] = null;
270 }
271 } else {
272 result = handler.engine.unwrap(toByteBuffer(in, in.readerIndex(), len),
273 toByteBuffer(out, writerIndex, out.writableBytes()));
274 }
275 out.writerIndex(writerIndex + result.bytesProduced());
276 return result;
277 }
278
279 @Override
280 ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
281 int pendingBytes, int numComponents) {
282 return allocator.directBuffer(
283 ((ConscryptAlpnSslEngine) handler.engine).calculateOutNetBufSize(pendingBytes, numComponents));
284 }
285
286 @Override
287 int calculateRequiredOutBufSpace(SslHandler handler, int pendingBytes, int numComponents) {
288 return ((ConscryptAlpnSslEngine) handler.engine)
289 .calculateRequiredOutBufSpace(pendingBytes, numComponents);
290 }
291
292 @Override
293 int calculatePendingData(SslHandler handler, int guess) {
294 return guess;
295 }
296
297 @Override
298 boolean jdkCompatibilityMode(SSLEngine engine) {
299 return true;
300 }
301 },
302 JDK(false, MERGE_CUMULATOR) {
303 @Override
304 SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException {
305 int writerIndex = out.writerIndex();
306 ByteBuffer inNioBuffer = toByteBuffer(in, in.readerIndex(), len);
307 int position = inNioBuffer.position();
308 final SSLEngineResult result = handler.engine.unwrap(inNioBuffer,
309 toByteBuffer(out, writerIndex, out.writableBytes()));
310 out.writerIndex(writerIndex + result.bytesProduced());
311
312
313
314
315
316
317
318 if (result.bytesConsumed() == 0) {
319 int consumed = inNioBuffer.position() - position;
320 if (consumed != result.bytesConsumed()) {
321
322 return new SSLEngineResult(
323 result.getStatus(), result.getHandshakeStatus(), consumed, result.bytesProduced());
324 }
325 }
326 return result;
327 }
328
329 @Override
330 ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
331 int pendingBytes, int numComponents) {
332
333
334 return allocator.heapBuffer(Math.max(pendingBytes, handler.engine.getSession().getPacketBufferSize()));
335 }
336
337 @Override
338 int calculateRequiredOutBufSpace(SslHandler handler, int pendingBytes, int numComponents) {
339
340
341
342
343
344
345 return handler.engine.getSession().getPacketBufferSize();
346 }
347
348 @Override
349 int calculatePendingData(SslHandler handler, int guess) {
350 return guess;
351 }
352
353 @Override
354 boolean jdkCompatibilityMode(SSLEngine engine) {
355 return true;
356 }
357 };
358
359 static SslEngineType forEngine(SSLEngine engine) {
360 return engine instanceof ReferenceCountedOpenSslEngine ? TCNATIVE :
361 engine instanceof ConscryptAlpnSslEngine ? CONSCRYPT : JDK;
362 }
363
364 SslEngineType(boolean wantsDirectBuffer, Cumulator cumulator) {
365 this.wantsDirectBuffer = wantsDirectBuffer;
366 this.cumulator = cumulator;
367 }
368
369 abstract SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException;
370
371 abstract int calculatePendingData(SslHandler handler, int guess);
372
373 abstract boolean jdkCompatibilityMode(SSLEngine engine);
374
375 abstract ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
376 int pendingBytes, int numComponents);
377
378 abstract int calculateRequiredOutBufSpace(SslHandler handler, int pendingBytes, int numComponents);
379
380
381
382
383
384
385
386 final boolean wantsDirectBuffer;
387
388
389
390
391
392
393
394
395
396
397
398 final Cumulator cumulator;
399 }
400
401 private volatile ChannelHandlerContext ctx;
402 private final SSLEngine engine;
403 private final SslEngineType engineType;
404 private final Executor delegatedTaskExecutor;
405 private final boolean jdkCompatibilityMode;
406
407
408
409
410
411
412 private final ByteBuffer[] singleBuffer = new ByteBuffer[1];
413
414 private final boolean startTls;
415 private final ResumptionController resumptionController;
416
417 private final SslTasksRunner sslTaskRunnerForUnwrap = new SslTasksRunner(true);
418 private final SslTasksRunner sslTaskRunner = new SslTasksRunner(false);
419
420 private SslHandlerCoalescingBufferQueue pendingUnencryptedWrites;
421 private Promise<Channel> handshakePromise = new LazyChannelPromise();
422 private final LazyChannelPromise sslClosePromise = new LazyChannelPromise();
423
424 private int packetLength;
425 private short state;
426
427 private volatile long handshakeTimeoutMillis = 10000;
428 private volatile long closeNotifyFlushTimeoutMillis = 3000;
429 private volatile long closeNotifyReadTimeoutMillis;
430 volatile int wrapDataSize = MAX_PLAINTEXT_LENGTH;
431
432
433
434
435
436
437 public SslHandler(SSLEngine engine) {
438 this(engine, false);
439 }
440
441
442
443
444
445
446
447
448 public SslHandler(SSLEngine engine, boolean startTls) {
449 this(engine, startTls, ImmediateExecutor.INSTANCE);
450 }
451
452
453
454
455
456
457
458
459 public SslHandler(SSLEngine engine, Executor delegatedTaskExecutor) {
460 this(engine, false, delegatedTaskExecutor);
461 }
462
463
464
465
466
467
468
469
470
471
472 public SslHandler(SSLEngine engine, boolean startTls, Executor delegatedTaskExecutor) {
473 this(engine, startTls, delegatedTaskExecutor, null);
474 }
475
476 SslHandler(SSLEngine engine, boolean startTls, Executor delegatedTaskExecutor,
477 ResumptionController resumptionController) {
478 this.engine = ObjectUtil.checkNotNull(engine, "engine");
479 this.delegatedTaskExecutor = ObjectUtil.checkNotNull(delegatedTaskExecutor, "delegatedTaskExecutor");
480 engineType = SslEngineType.forEngine(engine);
481 this.startTls = startTls;
482 this.jdkCompatibilityMode = engineType.jdkCompatibilityMode(engine);
483 setCumulator(engineType.cumulator);
484 this.resumptionController = resumptionController;
485 }
486
487 public long getHandshakeTimeoutMillis() {
488 return handshakeTimeoutMillis;
489 }
490
491 public void setHandshakeTimeout(long handshakeTimeout, TimeUnit unit) {
492 checkNotNull(unit, "unit");
493 setHandshakeTimeoutMillis(unit.toMillis(handshakeTimeout));
494 }
495
496 public void setHandshakeTimeoutMillis(long handshakeTimeoutMillis) {
497 this.handshakeTimeoutMillis = checkPositiveOrZero(handshakeTimeoutMillis, "handshakeTimeoutMillis");
498 }
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520 @UnstableApi
521 public final void setWrapDataSize(int wrapDataSize) {
522 this.wrapDataSize = wrapDataSize;
523 }
524
525
526
527
528 @Deprecated
529 public long getCloseNotifyTimeoutMillis() {
530 return getCloseNotifyFlushTimeoutMillis();
531 }
532
533
534
535
536 @Deprecated
537 public void setCloseNotifyTimeout(long closeNotifyTimeout, TimeUnit unit) {
538 setCloseNotifyFlushTimeout(closeNotifyTimeout, unit);
539 }
540
541
542
543
544 @Deprecated
545 public void setCloseNotifyTimeoutMillis(long closeNotifyFlushTimeoutMillis) {
546 setCloseNotifyFlushTimeoutMillis(closeNotifyFlushTimeoutMillis);
547 }
548
549
550
551
552
553
554 public final long getCloseNotifyFlushTimeoutMillis() {
555 return closeNotifyFlushTimeoutMillis;
556 }
557
558
559
560
561
562
563 public final void setCloseNotifyFlushTimeout(long closeNotifyFlushTimeout, TimeUnit unit) {
564 setCloseNotifyFlushTimeoutMillis(unit.toMillis(closeNotifyFlushTimeout));
565 }
566
567
568
569
570 public final void setCloseNotifyFlushTimeoutMillis(long closeNotifyFlushTimeoutMillis) {
571 this.closeNotifyFlushTimeoutMillis = checkPositiveOrZero(closeNotifyFlushTimeoutMillis,
572 "closeNotifyFlushTimeoutMillis");
573 }
574
575
576
577
578
579
580 public final long getCloseNotifyReadTimeoutMillis() {
581 return closeNotifyReadTimeoutMillis;
582 }
583
584
585
586
587
588
589 public final void setCloseNotifyReadTimeout(long closeNotifyReadTimeout, TimeUnit unit) {
590 setCloseNotifyReadTimeoutMillis(unit.toMillis(closeNotifyReadTimeout));
591 }
592
593
594
595
596 public final void setCloseNotifyReadTimeoutMillis(long closeNotifyReadTimeoutMillis) {
597 this.closeNotifyReadTimeoutMillis = checkPositiveOrZero(closeNotifyReadTimeoutMillis,
598 "closeNotifyReadTimeoutMillis");
599 }
600
601
602
603
604 public SSLEngine engine() {
605 return engine;
606 }
607
608
609
610
611
612
613 public String applicationProtocol() {
614 SSLEngine engine = engine();
615 if (!(engine instanceof ApplicationProtocolAccessor)) {
616 return null;
617 }
618
619 return ((ApplicationProtocolAccessor) engine).getNegotiatedApplicationProtocol();
620 }
621
622
623
624
625
626
627
628 public Future<Channel> handshakeFuture() {
629 return handshakePromise;
630 }
631
632
633
634
635 @Deprecated
636 public ChannelFuture close() {
637 return closeOutbound();
638 }
639
640
641
642
643 @Deprecated
644 public ChannelFuture close(ChannelPromise promise) {
645 return closeOutbound(promise);
646 }
647
648
649
650
651
652
653
654 public ChannelFuture closeOutbound() {
655 return closeOutbound(ctx.newPromise());
656 }
657
658
659
660
661
662
663
664 public ChannelFuture closeOutbound(final ChannelPromise promise) {
665 final ChannelHandlerContext ctx = this.ctx;
666 if (ctx.executor().inEventLoop()) {
667 closeOutbound0(promise);
668 } else {
669 ctx.executor().execute(new Runnable() {
670 @Override
671 public void run() {
672 closeOutbound0(promise);
673 }
674 });
675 }
676 return promise;
677 }
678
679 private void closeOutbound0(ChannelPromise promise) {
680 setState(STATE_OUTBOUND_CLOSED);
681 engine.closeOutbound();
682 try {
683 flush(ctx, promise);
684 } catch (Exception e) {
685 if (!promise.tryFailure(e)) {
686 logger.warn("{} flush() raised a masked exception.", ctx.channel(), e);
687 }
688 }
689 }
690
691
692
693
694
695
696
697
698 public Future<Channel> sslCloseFuture() {
699 return sslClosePromise;
700 }
701
702 @Override
703 public void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
704 try {
705 if (pendingUnencryptedWrites != null && !pendingUnencryptedWrites.isEmpty()) {
706
707 pendingUnencryptedWrites.releaseAndFailAll(ctx,
708 new ChannelException("Pending write on removal of SslHandler"));
709 }
710 pendingUnencryptedWrites = null;
711
712 SSLException cause = null;
713
714
715
716
717 if (!handshakePromise.isDone()) {
718 cause = new SSLHandshakeException("SslHandler removed before handshake completed");
719 if (handshakePromise.tryFailure(cause)) {
720 ctx.fireUserEventTriggered(new SslHandshakeCompletionEvent(cause));
721 }
722 }
723 if (!sslClosePromise.isDone()) {
724 if (cause == null) {
725 cause = new SSLException("SslHandler removed before SSLEngine was closed");
726 }
727 notifyClosePromise(cause);
728 }
729 } finally {
730 ReferenceCountUtil.release(engine);
731 }
732 }
733
734 @Override
735 public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
736 ctx.bind(localAddress, promise);
737 }
738
739 @Override
740 public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
741 ChannelPromise promise) throws Exception {
742 ctx.connect(remoteAddress, localAddress, promise);
743 }
744
745 @Override
746 public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
747 ctx.deregister(promise);
748 }
749
750 @Override
751 public void disconnect(final ChannelHandlerContext ctx,
752 final ChannelPromise promise) throws Exception {
753 closeOutboundAndChannel(ctx, promise, true);
754 }
755
756 @Override
757 public void close(final ChannelHandlerContext ctx,
758 final ChannelPromise promise) throws Exception {
759 closeOutboundAndChannel(ctx, promise, false);
760 }
761
762 @Override
763 public void read(ChannelHandlerContext ctx) throws Exception {
764 if (!handshakePromise.isDone()) {
765 setState(STATE_READ_DURING_HANDSHAKE);
766 }
767
768 ctx.read();
769 }
770
771 private static IllegalStateException newPendingWritesNullException() {
772 return new IllegalStateException("pendingUnencryptedWrites is null, handlerRemoved0 called?");
773 }
774
775 @Override
776 public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
777 if (!(msg instanceof ByteBuf)) {
778 UnsupportedMessageTypeException exception = new UnsupportedMessageTypeException(msg, ByteBuf.class);
779 ReferenceCountUtil.safeRelease(msg);
780 promise.setFailure(exception);
781 } else if (pendingUnencryptedWrites == null) {
782 ReferenceCountUtil.safeRelease(msg);
783 promise.setFailure(newPendingWritesNullException());
784 } else {
785 pendingUnencryptedWrites.add((ByteBuf) msg, promise);
786 }
787 }
788
789 @Override
790 public void flush(ChannelHandlerContext ctx) throws Exception {
791
792
793 if (startTls && !isStateSet(STATE_SENT_FIRST_MESSAGE)) {
794 setState(STATE_SENT_FIRST_MESSAGE);
795 pendingUnencryptedWrites.writeAndRemoveAll(ctx);
796 forceFlush(ctx);
797
798
799 startHandshakeProcessing(true);
800 return;
801 }
802
803 if (isStateSet(STATE_PROCESS_TASK)) {
804 return;
805 }
806
807 try {
808 wrapAndFlush(ctx);
809 } catch (Throwable cause) {
810 setHandshakeFailure(ctx, cause);
811 PlatformDependent.throwException(cause);
812 }
813 }
814
815 private void wrapAndFlush(ChannelHandlerContext ctx) throws SSLException {
816 if (pendingUnencryptedWrites.isEmpty()) {
817
818
819
820
821 pendingUnencryptedWrites.add(Unpooled.EMPTY_BUFFER, ctx.newPromise());
822 }
823 if (!handshakePromise.isDone()) {
824 setState(STATE_FLUSHED_BEFORE_HANDSHAKE);
825 }
826 try {
827 wrap(ctx, false);
828 } finally {
829
830
831 forceFlush(ctx);
832 }
833 }
834
835
836 private void wrap(ChannelHandlerContext ctx, boolean inUnwrap) throws SSLException {
837 ByteBuf out = null;
838 ByteBufAllocator alloc = ctx.alloc();
839 try {
840 final int wrapDataSize = this.wrapDataSize;
841
842
843 outer: while (!ctx.isRemoved()) {
844 ChannelPromise promise = ctx.newPromise();
845 ByteBuf buf = wrapDataSize > 0 ?
846 pendingUnencryptedWrites.remove(alloc, wrapDataSize, promise) :
847 pendingUnencryptedWrites.removeFirst(promise);
848 if (buf == null) {
849 break;
850 }
851
852 SSLEngineResult result;
853
854 try {
855 if (buf.readableBytes() > MAX_PLAINTEXT_LENGTH) {
856
857
858
859
860
861 int readableBytes = buf.readableBytes();
862 int numPackets = readableBytes / MAX_PLAINTEXT_LENGTH;
863 if (readableBytes % MAX_PLAINTEXT_LENGTH != 0) {
864 numPackets += 1;
865 }
866
867 if (out == null) {
868 out = allocateOutNetBuf(ctx, readableBytes, buf.nioBufferCount() + numPackets);
869 }
870 result = wrapMultiple(alloc, engine, buf, out);
871 } else {
872 if (out == null) {
873 out = allocateOutNetBuf(ctx, buf.readableBytes(), buf.nioBufferCount());
874 }
875 result = wrap(alloc, engine, buf, out);
876 }
877 } catch (SSLException e) {
878
879
880
881
882
883
884 buf.release();
885 promise.setFailure(e);
886 throw e;
887 }
888
889 if (buf.isReadable()) {
890 pendingUnencryptedWrites.addFirst(buf, promise);
891
892
893 promise = null;
894 } else {
895 buf.release();
896 }
897
898
899
900 if (out.isReadable()) {
901 final ByteBuf b = out;
902 out = null;
903 if (promise != null) {
904 ctx.write(b, promise);
905 } else {
906 ctx.write(b);
907 }
908 } else if (promise != null) {
909 ctx.write(Unpooled.EMPTY_BUFFER, promise);
910 }
911
912
913 if (result.getStatus() == Status.CLOSED) {
914
915
916 if (!pendingUnencryptedWrites.isEmpty()) {
917
918
919 Throwable exception = handshakePromise.cause();
920 if (exception == null) {
921 exception = sslClosePromise.cause();
922 if (exception == null) {
923 exception = new SslClosedEngineException("SSLEngine closed already");
924 }
925 }
926 pendingUnencryptedWrites.releaseAndFailAll(ctx, exception);
927 }
928
929 return;
930 } else {
931 switch (result.getHandshakeStatus()) {
932 case NEED_TASK:
933 if (!runDelegatedTasks(inUnwrap)) {
934
935
936 break outer;
937 }
938 break;
939 case FINISHED:
940 case NOT_HANDSHAKING:
941 setHandshakeSuccess();
942 break;
943 case NEED_WRAP:
944
945
946
947
948 if (result.bytesProduced() > 0 && pendingUnencryptedWrites.isEmpty()) {
949 pendingUnencryptedWrites.add(Unpooled.EMPTY_BUFFER);
950 }
951 break;
952 case NEED_UNWRAP:
953
954
955 readIfNeeded(ctx);
956 return;
957 default:
958 throw new IllegalStateException(
959 "Unknown handshake status: " + result.getHandshakeStatus());
960 }
961 }
962 }
963 } finally {
964 if (out != null) {
965 out.release();
966 }
967 if (inUnwrap) {
968 setState(STATE_NEEDS_FLUSH);
969 }
970 }
971 }
972
973
974
975
976
977
978
979 private boolean wrapNonAppData(final ChannelHandlerContext ctx, boolean inUnwrap) throws SSLException {
980 ByteBuf out = null;
981 ByteBufAllocator alloc = ctx.alloc();
982 try {
983
984
985 outer: while (!ctx.isRemoved()) {
986 if (out == null) {
987
988
989
990 out = allocateOutNetBuf(ctx, 2048, 1);
991 }
992 SSLEngineResult result = wrap(alloc, engine, Unpooled.EMPTY_BUFFER, out);
993 if (result.bytesProduced() > 0) {
994 ctx.write(out).addListener(new ChannelFutureListener() {
995 @Override
996 public void operationComplete(ChannelFuture future) {
997 Throwable cause = future.cause();
998 if (cause != null) {
999 setHandshakeFailureTransportFailure(ctx, cause);
1000 }
1001 }
1002 });
1003 if (inUnwrap) {
1004 setState(STATE_NEEDS_FLUSH);
1005 }
1006 out = null;
1007 }
1008
1009 HandshakeStatus status = result.getHandshakeStatus();
1010 switch (status) {
1011 case FINISHED:
1012
1013
1014
1015
1016 if (setHandshakeSuccess() && inUnwrap && !pendingUnencryptedWrites.isEmpty()) {
1017 wrap(ctx, true);
1018 }
1019 return false;
1020 case NEED_TASK:
1021 if (!runDelegatedTasks(inUnwrap)) {
1022
1023
1024 break outer;
1025 }
1026 break;
1027 case NEED_UNWRAP:
1028 if (inUnwrap || unwrapNonAppData(ctx) <= 0) {
1029
1030
1031
1032 return false;
1033 }
1034 break;
1035 case NEED_WRAP:
1036 break;
1037 case NOT_HANDSHAKING:
1038 if (setHandshakeSuccess() && inUnwrap && !pendingUnencryptedWrites.isEmpty()) {
1039 wrap(ctx, true);
1040 }
1041
1042
1043 if (!inUnwrap) {
1044 unwrapNonAppData(ctx);
1045 }
1046 return true;
1047 default:
1048 throw new IllegalStateException("Unknown handshake status: " + result.getHandshakeStatus());
1049 }
1050
1051
1052
1053 if (result.bytesProduced() == 0 && status != HandshakeStatus.NEED_TASK) {
1054 break;
1055 }
1056
1057
1058
1059 if (result.bytesConsumed() == 0 && result.getHandshakeStatus() == HandshakeStatus.NOT_HANDSHAKING) {
1060 break;
1061 }
1062 }
1063 } finally {
1064 if (out != null) {
1065 out.release();
1066 }
1067 }
1068 return false;
1069 }
1070
1071 private SSLEngineResult wrapMultiple(ByteBufAllocator alloc, SSLEngine engine, ByteBuf in, ByteBuf out)
1072 throws SSLException {
1073 SSLEngineResult result = null;
1074
1075 do {
1076 int nextSliceSize = Math.min(MAX_PLAINTEXT_LENGTH, in.readableBytes());
1077
1078
1079 int nextOutSize = engineType.calculateRequiredOutBufSpace(this, nextSliceSize, in.nioBufferCount());
1080
1081 if (!out.isWritable(nextOutSize)) {
1082 if (result != null) {
1083
1084
1085 break;
1086 }
1087
1088
1089 out.ensureWritable(nextOutSize);
1090 }
1091
1092 ByteBuf wrapBuf = in.readSlice(nextSliceSize);
1093 result = wrap(alloc, engine, wrapBuf, out);
1094
1095 if (result.getStatus() == Status.CLOSED) {
1096
1097
1098 break;
1099 }
1100
1101 if (wrapBuf.isReadable()) {
1102
1103
1104 in.readerIndex(in.readerIndex() - wrapBuf.readableBytes());
1105 }
1106 } while (in.readableBytes() > 0);
1107
1108 return result;
1109 }
1110
1111 private SSLEngineResult wrap(ByteBufAllocator alloc, SSLEngine engine, ByteBuf in, ByteBuf out)
1112 throws SSLException {
1113 ByteBuf newDirectIn = null;
1114 try {
1115 int readerIndex = in.readerIndex();
1116 int readableBytes = in.readableBytes();
1117
1118
1119
1120 final ByteBuffer[] in0;
1121 if (in.isDirect() || !engineType.wantsDirectBuffer) {
1122
1123
1124
1125
1126 if (!(in instanceof CompositeByteBuf) && in.nioBufferCount() == 1) {
1127 in0 = singleBuffer;
1128
1129
1130 in0[0] = in.internalNioBuffer(readerIndex, readableBytes);
1131 } else {
1132 in0 = in.nioBuffers();
1133 }
1134 } else {
1135
1136
1137
1138 newDirectIn = alloc.directBuffer(readableBytes);
1139 newDirectIn.writeBytes(in, readerIndex, readableBytes);
1140 in0 = singleBuffer;
1141 in0[0] = newDirectIn.internalNioBuffer(newDirectIn.readerIndex(), readableBytes);
1142 }
1143
1144 for (;;) {
1145
1146
1147 ByteBuffer out0 = toByteBuffer(out, out.writerIndex(), out.writableBytes());
1148 SSLEngineResult result = engine.wrap(in0, out0);
1149 in.skipBytes(result.bytesConsumed());
1150 out.writerIndex(out.writerIndex() + result.bytesProduced());
1151
1152 if (result.getStatus() == Status.BUFFER_OVERFLOW) {
1153 out.ensureWritable(engine.getSession().getPacketBufferSize());
1154 } else {
1155 return result;
1156 }
1157 }
1158 } finally {
1159
1160 singleBuffer[0] = null;
1161
1162 if (newDirectIn != null) {
1163 newDirectIn.release();
1164 }
1165 }
1166 }
1167
1168 @Override
1169 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
1170 boolean handshakeFailed = handshakePromise.cause() != null;
1171
1172
1173 ClosedChannelException exception = new ClosedChannelException();
1174
1175
1176 if (isStateSet(STATE_HANDSHAKE_STARTED) && !handshakePromise.isDone()) {
1177 ThrowableUtil.addSuppressed(exception, StacklessSSLHandshakeException.newInstance(
1178 "Connection closed while SSL/TLS handshake was in progress",
1179 SslHandler.class, "channelInactive"));
1180 }
1181
1182
1183
1184 setHandshakeFailure(ctx, exception, !isStateSet(STATE_OUTBOUND_CLOSED), isStateSet(STATE_HANDSHAKE_STARTED),
1185 false);
1186
1187
1188 notifyClosePromise(exception);
1189
1190 try {
1191 super.channelInactive(ctx);
1192 } catch (DecoderException e) {
1193 if (!handshakeFailed || !(e.getCause() instanceof SSLException)) {
1194
1195
1196
1197
1198
1199 throw e;
1200 }
1201 }
1202 }
1203
1204 @Override
1205 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
1206 if (ignoreException(cause)) {
1207
1208
1209 if (logger.isDebugEnabled()) {
1210 logger.debug(
1211 "{} Swallowing a harmless 'connection reset by peer / broken pipe' error that occurred " +
1212 "while writing close_notify in response to the peer's close_notify", ctx.channel(), cause);
1213 }
1214
1215
1216
1217 if (ctx.channel().isActive()) {
1218 ctx.close();
1219 }
1220 } else {
1221 ctx.fireExceptionCaught(cause);
1222 }
1223 }
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234 private boolean ignoreException(Throwable t) {
1235 if (!(t instanceof SSLException) && t instanceof IOException && sslClosePromise.isDone()) {
1236 String message = t.getMessage();
1237
1238
1239
1240 if (message != null && IGNORABLE_ERROR_MESSAGE.matcher(message).matches()) {
1241 return true;
1242 }
1243
1244
1245 StackTraceElement[] elements = t.getStackTrace();
1246 for (StackTraceElement element: elements) {
1247 String classname = element.getClassName();
1248 String methodname = element.getMethodName();
1249
1250
1251 if (classname.startsWith("io.netty.")) {
1252 continue;
1253 }
1254
1255
1256 if (!"read".equals(methodname)) {
1257 continue;
1258 }
1259
1260
1261
1262 if (IGNORABLE_CLASS_IN_STACK.matcher(classname).matches()) {
1263 return true;
1264 }
1265
1266 try {
1267
1268
1269
1270 Class<?> clazz = PlatformDependent.getClassLoader(getClass()).loadClass(classname);
1271
1272 if (SocketChannel.class.isAssignableFrom(clazz)
1273 || DatagramChannel.class.isAssignableFrom(clazz)) {
1274 return true;
1275 }
1276
1277
1278 if ("com.sun.nio.sctp.SctpChannel".equals(clazz.getSuperclass().getName())) {
1279 return true;
1280 }
1281 } catch (Throwable cause) {
1282 if (logger.isDebugEnabled()) {
1283 logger.debug("Unexpected exception while loading class {} classname {}",
1284 getClass(), classname, cause);
1285 }
1286 }
1287 }
1288 }
1289
1290 return false;
1291 }
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306 @Deprecated
1307 public static boolean isEncrypted(ByteBuf buffer) {
1308 return isEncrypted(buffer, false);
1309 }
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327 public static boolean isEncrypted(ByteBuf buffer, boolean probeSSLv2) {
1328 if (buffer.readableBytes() < SslUtils.SSL_RECORD_HEADER_LENGTH) {
1329 throw new IllegalArgumentException(
1330 "buffer must have at least " + SslUtils.SSL_RECORD_HEADER_LENGTH + " readable bytes");
1331 }
1332 return getEncryptedPacketLength(buffer, buffer.readerIndex(), probeSSLv2) != SslUtils.NOT_ENCRYPTED;
1333 }
1334
1335 private void decodeJdkCompatible(ChannelHandlerContext ctx, ByteBuf in) throws NotSslRecordException {
1336 int packetLength = this.packetLength;
1337
1338 if (packetLength > 0) {
1339 if (in.readableBytes() < packetLength) {
1340 return;
1341 }
1342 } else {
1343
1344 final int readableBytes = in.readableBytes();
1345 if (readableBytes < SslUtils.SSL_RECORD_HEADER_LENGTH) {
1346 return;
1347 }
1348 packetLength = getEncryptedPacketLength(in, in.readerIndex(), true);
1349 if (packetLength == SslUtils.NOT_ENCRYPTED) {
1350
1351 NotSslRecordException e = new NotSslRecordException(
1352 "not an SSL/TLS record: " + ByteBufUtil.hexDump(in));
1353 in.skipBytes(in.readableBytes());
1354
1355
1356
1357 setHandshakeFailure(ctx, e);
1358
1359 throw e;
1360 }
1361 if (packetLength == NOT_ENOUGH_DATA) {
1362 return;
1363 }
1364 assert packetLength > 0;
1365 if (packetLength > readableBytes) {
1366
1367 this.packetLength = packetLength;
1368 return;
1369 }
1370 }
1371
1372
1373
1374 this.packetLength = 0;
1375 try {
1376 final int bytesConsumed = unwrap(ctx, in, packetLength);
1377 assert bytesConsumed == packetLength || engine.isInboundDone() :
1378 "we feed the SSLEngine a packets worth of data: " + packetLength + " but it only consumed: " +
1379 bytesConsumed;
1380 } catch (Throwable cause) {
1381 handleUnwrapThrowable(ctx, cause);
1382 }
1383 }
1384
1385 private void decodeNonJdkCompatible(ChannelHandlerContext ctx, ByteBuf in) {
1386 try {
1387 unwrap(ctx, in, in.readableBytes());
1388 } catch (Throwable cause) {
1389 handleUnwrapThrowable(ctx, cause);
1390 }
1391 }
1392
1393 private void handleUnwrapThrowable(ChannelHandlerContext ctx, Throwable cause) {
1394 try {
1395
1396
1397
1398
1399 if (handshakePromise.tryFailure(cause)) {
1400 ctx.fireUserEventTriggered(new SslHandshakeCompletionEvent(cause));
1401 }
1402
1403
1404 if (pendingUnencryptedWrites != null) {
1405
1406
1407 wrapAndFlush(ctx);
1408 }
1409 } catch (SSLException ex) {
1410 logger.debug("SSLException during trying to call SSLEngine.wrap(...)" +
1411 " because of an previous SSLException, ignoring...", ex);
1412 } finally {
1413
1414 setHandshakeFailure(ctx, cause, true, false, true);
1415 }
1416 PlatformDependent.throwException(cause);
1417 }
1418
1419 @Override
1420 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws SSLException {
1421 if (isStateSet(STATE_PROCESS_TASK)) {
1422 return;
1423 }
1424 if (jdkCompatibilityMode) {
1425 decodeJdkCompatible(ctx, in);
1426 } else {
1427 decodeNonJdkCompatible(ctx, in);
1428 }
1429 }
1430
1431 @Override
1432 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
1433 channelReadComplete0(ctx);
1434 }
1435
1436 private void channelReadComplete0(ChannelHandlerContext ctx) {
1437
1438 discardSomeReadBytes();
1439
1440 flushIfNeeded(ctx);
1441 readIfNeeded(ctx);
1442
1443 clearState(STATE_FIRE_CHANNEL_READ);
1444 ctx.fireChannelReadComplete();
1445 }
1446
1447 private void readIfNeeded(ChannelHandlerContext ctx) {
1448
1449 if (!ctx.channel().config().isAutoRead() &&
1450 (!isStateSet(STATE_FIRE_CHANNEL_READ) || !handshakePromise.isDone())) {
1451
1452
1453 ctx.read();
1454 }
1455 }
1456
1457 private void flushIfNeeded(ChannelHandlerContext ctx) {
1458 if (isStateSet(STATE_NEEDS_FLUSH)) {
1459 forceFlush(ctx);
1460 }
1461 }
1462
1463
1464
1465
1466 private int unwrapNonAppData(ChannelHandlerContext ctx) throws SSLException {
1467 return unwrap(ctx, Unpooled.EMPTY_BUFFER, 0);
1468 }
1469
1470
1471
1472
1473 private int unwrap(ChannelHandlerContext ctx, ByteBuf packet, int length) throws SSLException {
1474 final int originalLength = length;
1475 boolean wrapLater = false;
1476 boolean notifyClosure = false;
1477 boolean executedRead = false;
1478 ByteBuf decodeOut = allocate(ctx, length);
1479 try {
1480
1481
1482 do {
1483 final SSLEngineResult result = engineType.unwrap(this, packet, length, decodeOut);
1484 final Status status = result.getStatus();
1485 final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
1486 final int produced = result.bytesProduced();
1487 final int consumed = result.bytesConsumed();
1488
1489
1490
1491
1492 packet.skipBytes(consumed);
1493 length -= consumed;
1494
1495
1496
1497
1498 if (handshakeStatus == HandshakeStatus.FINISHED || handshakeStatus == HandshakeStatus.NOT_HANDSHAKING) {
1499 wrapLater |= (decodeOut.isReadable() ?
1500 setHandshakeSuccessUnwrapMarkReentry() : setHandshakeSuccess()) ||
1501 handshakeStatus == HandshakeStatus.FINISHED || !pendingUnencryptedWrites.isEmpty();
1502 }
1503
1504
1505
1506
1507 if (decodeOut.isReadable()) {
1508 setState(STATE_FIRE_CHANNEL_READ);
1509 if (isStateSet(STATE_UNWRAP_REENTRY)) {
1510 executedRead = true;
1511 executeChannelRead(ctx, decodeOut);
1512 } else {
1513 ctx.fireChannelRead(decodeOut);
1514 }
1515 decodeOut = null;
1516 }
1517
1518 if (status == Status.CLOSED) {
1519 notifyClosure = true;
1520 } else if (status == Status.BUFFER_OVERFLOW) {
1521 if (decodeOut != null) {
1522 decodeOut.release();
1523 }
1524 final int applicationBufferSize = engine.getSession().getApplicationBufferSize();
1525
1526
1527
1528
1529 decodeOut = allocate(ctx, engineType.calculatePendingData(this, applicationBufferSize < produced ?
1530 applicationBufferSize : applicationBufferSize - produced));
1531 continue;
1532 }
1533
1534 if (handshakeStatus == HandshakeStatus.NEED_TASK) {
1535 boolean pending = runDelegatedTasks(true);
1536 if (!pending) {
1537
1538
1539
1540
1541
1542 wrapLater = false;
1543 break;
1544 }
1545 } else if (handshakeStatus == HandshakeStatus.NEED_WRAP) {
1546
1547
1548
1549 if (wrapNonAppData(ctx, true) && length == 0) {
1550 break;
1551 }
1552 }
1553
1554 if (status == Status.BUFFER_UNDERFLOW ||
1555
1556 handshakeStatus != HandshakeStatus.NEED_TASK && (consumed == 0 && produced == 0 ||
1557 (length == 0 && handshakeStatus == HandshakeStatus.NOT_HANDSHAKING))) {
1558 if (handshakeStatus == HandshakeStatus.NEED_UNWRAP) {
1559
1560
1561 readIfNeeded(ctx);
1562 }
1563
1564 break;
1565 } else if (decodeOut == null) {
1566 decodeOut = allocate(ctx, length);
1567 }
1568 } while (!ctx.isRemoved());
1569
1570 if (isStateSet(STATE_FLUSHED_BEFORE_HANDSHAKE) && handshakePromise.isDone()) {
1571
1572
1573
1574
1575 clearState(STATE_FLUSHED_BEFORE_HANDSHAKE);
1576 wrapLater = true;
1577 }
1578
1579 if (wrapLater) {
1580 wrap(ctx, true);
1581 }
1582 } finally {
1583 if (decodeOut != null) {
1584 decodeOut.release();
1585 }
1586
1587 if (notifyClosure) {
1588 if (executedRead) {
1589 executeNotifyClosePromise(ctx);
1590 } else {
1591 notifyClosePromise(null);
1592 }
1593 }
1594 }
1595 return originalLength - length;
1596 }
1597
1598 private boolean setHandshakeSuccessUnwrapMarkReentry() throws SSLException {
1599
1600
1601 final boolean setReentryState = !isStateSet(STATE_UNWRAP_REENTRY);
1602 if (setReentryState) {
1603 setState(STATE_UNWRAP_REENTRY);
1604 }
1605 try {
1606 return setHandshakeSuccess();
1607 } finally {
1608
1609
1610 if (setReentryState) {
1611 clearState(STATE_UNWRAP_REENTRY);
1612 }
1613 }
1614 }
1615
1616 private void executeNotifyClosePromise(final ChannelHandlerContext ctx) {
1617 try {
1618 ctx.executor().execute(new Runnable() {
1619 @Override
1620 public void run() {
1621 notifyClosePromise(null);
1622 }
1623 });
1624 } catch (RejectedExecutionException e) {
1625 notifyClosePromise(e);
1626 }
1627 }
1628
1629 private void executeChannelRead(final ChannelHandlerContext ctx, final ByteBuf decodedOut) {
1630 try {
1631 ctx.executor().execute(new Runnable() {
1632 @Override
1633 public void run() {
1634 ctx.fireChannelRead(decodedOut);
1635 }
1636 });
1637 } catch (RejectedExecutionException e) {
1638 decodedOut.release();
1639 throw e;
1640 }
1641 }
1642
1643 private static ByteBuffer toByteBuffer(ByteBuf out, int index, int len) {
1644 return out.nioBufferCount() == 1 ? out.internalNioBuffer(index, len) :
1645 out.nioBuffer(index, len);
1646 }
1647
1648 private static boolean inEventLoop(Executor executor) {
1649 return executor instanceof EventExecutor && ((EventExecutor) executor).inEventLoop();
1650 }
1651
1652
1653
1654
1655
1656
1657
1658
1659 private boolean runDelegatedTasks(boolean inUnwrap) {
1660 if (delegatedTaskExecutor == ImmediateExecutor.INSTANCE || inEventLoop(delegatedTaskExecutor)) {
1661
1662
1663 for (;;) {
1664 Runnable task = engine.getDelegatedTask();
1665 if (task == null) {
1666 return true;
1667 }
1668 setState(STATE_PROCESS_TASK);
1669 if (task instanceof AsyncRunnable) {
1670
1671 boolean pending = false;
1672 try {
1673 AsyncRunnable asyncTask = (AsyncRunnable) task;
1674 AsyncTaskCompletionHandler completionHandler = new AsyncTaskCompletionHandler(inUnwrap);
1675 asyncTask.run(completionHandler);
1676 pending = completionHandler.resumeLater();
1677 if (pending) {
1678 return false;
1679 }
1680 } finally {
1681 if (!pending) {
1682
1683
1684 clearState(STATE_PROCESS_TASK);
1685 }
1686 }
1687 } else {
1688 try {
1689 task.run();
1690 } finally {
1691 clearState(STATE_PROCESS_TASK);
1692 }
1693 }
1694 }
1695 } else {
1696 executeDelegatedTask(inUnwrap);
1697 return false;
1698 }
1699 }
1700
1701 private SslTasksRunner getTaskRunner(boolean inUnwrap) {
1702 return inUnwrap ? sslTaskRunnerForUnwrap : sslTaskRunner;
1703 }
1704
1705 private void executeDelegatedTask(boolean inUnwrap) {
1706 executeDelegatedTask(getTaskRunner(inUnwrap));
1707 }
1708
1709 private void executeDelegatedTask(SslTasksRunner task) {
1710 setState(STATE_PROCESS_TASK);
1711 try {
1712 delegatedTaskExecutor.execute(task);
1713 } catch (RejectedExecutionException e) {
1714 clearState(STATE_PROCESS_TASK);
1715 throw e;
1716 }
1717 }
1718
1719 private final class AsyncTaskCompletionHandler implements Runnable {
1720 private final boolean inUnwrap;
1721 boolean didRun;
1722 boolean resumeLater;
1723
1724 AsyncTaskCompletionHandler(boolean inUnwrap) {
1725 this.inUnwrap = inUnwrap;
1726 }
1727
1728 @Override
1729 public void run() {
1730 didRun = true;
1731 if (resumeLater) {
1732 getTaskRunner(inUnwrap).runComplete();
1733 }
1734 }
1735
1736 boolean resumeLater() {
1737 if (!didRun) {
1738 resumeLater = true;
1739 return true;
1740 }
1741 return false;
1742 }
1743 }
1744
1745
1746
1747
1748
1749 private final class SslTasksRunner implements Runnable {
1750 private final boolean inUnwrap;
1751 private final Runnable runCompleteTask = new Runnable() {
1752 @Override
1753 public void run() {
1754 runComplete();
1755 }
1756 };
1757
1758 SslTasksRunner(boolean inUnwrap) {
1759 this.inUnwrap = inUnwrap;
1760 }
1761
1762
1763 private void taskError(Throwable e) {
1764 if (inUnwrap) {
1765
1766
1767
1768
1769 try {
1770 handleUnwrapThrowable(ctx, e);
1771 } catch (Throwable cause) {
1772 safeExceptionCaught(cause);
1773 }
1774 } else {
1775 setHandshakeFailure(ctx, e);
1776 forceFlush(ctx);
1777 }
1778 }
1779
1780
1781 private void safeExceptionCaught(Throwable cause) {
1782 try {
1783 exceptionCaught(ctx, wrapIfNeeded(cause));
1784 } catch (Throwable error) {
1785 ctx.fireExceptionCaught(error);
1786 }
1787 }
1788
1789 private Throwable wrapIfNeeded(Throwable cause) {
1790 if (!inUnwrap) {
1791
1792 return cause;
1793 }
1794
1795
1796 return cause instanceof DecoderException ? cause : new DecoderException(cause);
1797 }
1798
1799 private void tryDecodeAgain() {
1800 try {
1801 channelRead(ctx, Unpooled.EMPTY_BUFFER);
1802 } catch (Throwable cause) {
1803 safeExceptionCaught(cause);
1804 } finally {
1805
1806
1807
1808 channelReadComplete0(ctx);
1809 }
1810 }
1811
1812
1813
1814
1815
1816 private void resumeOnEventExecutor() {
1817 assert ctx.executor().inEventLoop();
1818 clearState(STATE_PROCESS_TASK);
1819 try {
1820 HandshakeStatus status = engine.getHandshakeStatus();
1821 switch (status) {
1822
1823
1824 case NEED_TASK:
1825 executeDelegatedTask(this);
1826
1827 break;
1828
1829
1830 case FINISHED:
1831
1832 case NOT_HANDSHAKING:
1833 setHandshakeSuccess();
1834 try {
1835
1836
1837 wrap(ctx, inUnwrap);
1838 } catch (Throwable e) {
1839 taskError(e);
1840 return;
1841 }
1842 if (inUnwrap) {
1843
1844
1845 unwrapNonAppData(ctx);
1846 }
1847
1848
1849 forceFlush(ctx);
1850
1851 tryDecodeAgain();
1852 break;
1853
1854
1855
1856 case NEED_UNWRAP:
1857 try {
1858 unwrapNonAppData(ctx);
1859 } catch (SSLException e) {
1860 handleUnwrapThrowable(ctx, e);
1861 return;
1862 }
1863 tryDecodeAgain();
1864 break;
1865
1866
1867
1868 case NEED_WRAP:
1869 try {
1870 if (!wrapNonAppData(ctx, false) && inUnwrap) {
1871
1872
1873
1874
1875 unwrapNonAppData(ctx);
1876 }
1877
1878
1879 forceFlush(ctx);
1880 } catch (Throwable e) {
1881 taskError(e);
1882 return;
1883 }
1884
1885
1886 tryDecodeAgain();
1887 break;
1888
1889 default:
1890
1891 throw new AssertionError();
1892 }
1893 } catch (Throwable cause) {
1894 safeExceptionCaught(cause);
1895 }
1896 }
1897
1898 void runComplete() {
1899 EventExecutor executor = ctx.executor();
1900
1901
1902
1903
1904
1905
1906
1907 executor.execute(new Runnable() {
1908 @Override
1909 public void run() {
1910 resumeOnEventExecutor();
1911 }
1912 });
1913 }
1914
1915 @Override
1916 public void run() {
1917 try {
1918 Runnable task = engine.getDelegatedTask();
1919 if (task == null) {
1920
1921 return;
1922 }
1923 if (task instanceof AsyncRunnable) {
1924 AsyncRunnable asyncTask = (AsyncRunnable) task;
1925 asyncTask.run(runCompleteTask);
1926 } else {
1927 task.run();
1928 runComplete();
1929 }
1930 } catch (final Throwable cause) {
1931 handleException(cause);
1932 }
1933 }
1934
1935 private void handleException(final Throwable cause) {
1936 EventExecutor executor = ctx.executor();
1937 if (executor.inEventLoop()) {
1938 clearState(STATE_PROCESS_TASK);
1939 safeExceptionCaught(cause);
1940 } else {
1941 try {
1942 executor.execute(new Runnable() {
1943 @Override
1944 public void run() {
1945 clearState(STATE_PROCESS_TASK);
1946 safeExceptionCaught(cause);
1947 }
1948 });
1949 } catch (RejectedExecutionException ignore) {
1950 clearState(STATE_PROCESS_TASK);
1951
1952
1953 ctx.fireExceptionCaught(cause);
1954 }
1955 }
1956 }
1957 }
1958
1959
1960
1961
1962
1963
1964 private boolean setHandshakeSuccess() throws SSLException {
1965
1966
1967
1968 final SSLSession session = engine.getSession();
1969 if (resumptionController != null && !handshakePromise.isDone()) {
1970 try {
1971 if (resumptionController.validateResumeIfNeeded(engine) && logger.isDebugEnabled()) {
1972 logger.debug("{} Resumed and reauthenticated session", ctx.channel());
1973 }
1974 } catch (CertificateException e) {
1975 SSLHandshakeException exception = new SSLHandshakeException(e.getMessage());
1976 exception.initCause(e);
1977 throw exception;
1978 }
1979 }
1980 final boolean notified;
1981 if (notified = !handshakePromise.isDone() && handshakePromise.trySuccess(ctx.channel())) {
1982 if (logger.isDebugEnabled()) {
1983 logger.debug(
1984 "{} HANDSHAKEN: protocol:{} cipher suite:{}",
1985 ctx.channel(),
1986 session.getProtocol(),
1987 session.getCipherSuite());
1988 }
1989 ctx.fireUserEventTriggered(SslHandshakeCompletionEvent.SUCCESS);
1990 }
1991 if (isStateSet(STATE_READ_DURING_HANDSHAKE)) {
1992 clearState(STATE_READ_DURING_HANDSHAKE);
1993 if (!ctx.channel().config().isAutoRead()) {
1994 ctx.read();
1995 }
1996 }
1997 return notified;
1998 }
1999
2000
2001
2002
2003 private void setHandshakeFailure(ChannelHandlerContext ctx, Throwable cause) {
2004 setHandshakeFailure(ctx, cause, true, true, false);
2005 }
2006
2007
2008
2009
2010 private void setHandshakeFailure(ChannelHandlerContext ctx, Throwable cause, boolean closeInbound,
2011 boolean notify, boolean alwaysFlushAndClose) {
2012 try {
2013
2014 setState(STATE_OUTBOUND_CLOSED);
2015 engine.closeOutbound();
2016
2017 if (closeInbound) {
2018 try {
2019 engine.closeInbound();
2020 } catch (SSLException e) {
2021 if (logger.isDebugEnabled()) {
2022
2023
2024
2025
2026 String msg = e.getMessage();
2027 if (msg == null || !(msg.contains("possible truncation attack") ||
2028 msg.contains("closing inbound before receiving peer's close_notify"))) {
2029 logger.debug("{} SSLEngine.closeInbound() raised an exception.", ctx.channel(), e);
2030 }
2031 }
2032 }
2033 }
2034 if (handshakePromise.tryFailure(cause) || alwaysFlushAndClose) {
2035 SslUtils.handleHandshakeFailure(ctx, cause, notify);
2036 }
2037 } finally {
2038
2039 releaseAndFailAll(ctx, cause);
2040 }
2041 }
2042
2043 private void setHandshakeFailureTransportFailure(ChannelHandlerContext ctx, Throwable cause) {
2044
2045
2046
2047 try {
2048 SSLException transportFailure = new SSLException("failure when writing TLS control frames", cause);
2049 releaseAndFailAll(ctx, transportFailure);
2050 if (handshakePromise.tryFailure(transportFailure)) {
2051 ctx.fireUserEventTriggered(new SslHandshakeCompletionEvent(transportFailure));
2052 }
2053 } finally {
2054 ctx.close();
2055 }
2056 }
2057
2058 private void releaseAndFailAll(ChannelHandlerContext ctx, Throwable cause) {
2059 if (resumptionController != null &&
2060 (!engine.getSession().isValid() || cause instanceof SSLHandshakeException)) {
2061 resumptionController.remove(engine());
2062 }
2063 if (pendingUnencryptedWrites != null) {
2064 pendingUnencryptedWrites.releaseAndFailAll(ctx, cause);
2065 }
2066 }
2067
2068 private void notifyClosePromise(Throwable cause) {
2069 if (cause == null) {
2070 if (sslClosePromise.trySuccess(ctx.channel())) {
2071 ctx.fireUserEventTriggered(SslCloseCompletionEvent.SUCCESS);
2072 }
2073 } else {
2074 if (sslClosePromise.tryFailure(cause)) {
2075 ctx.fireUserEventTriggered(new SslCloseCompletionEvent(cause));
2076 }
2077 }
2078 }
2079
2080 private void closeOutboundAndChannel(
2081 final ChannelHandlerContext ctx, final ChannelPromise promise, boolean disconnect) throws Exception {
2082 setState(STATE_OUTBOUND_CLOSED);
2083 engine.closeOutbound();
2084
2085 if (!ctx.channel().isActive()) {
2086 if (disconnect) {
2087 ctx.disconnect(promise);
2088 } else {
2089 ctx.close(promise);
2090 }
2091 return;
2092 }
2093
2094 ChannelPromise closeNotifyPromise = ctx.newPromise();
2095 try {
2096 flush(ctx, closeNotifyPromise);
2097 } finally {
2098 if (!isStateSet(STATE_CLOSE_NOTIFY)) {
2099 setState(STATE_CLOSE_NOTIFY);
2100
2101
2102
2103
2104
2105
2106
2107
2108 safeClose(ctx, closeNotifyPromise, PromiseNotifier.cascade(false, ctx.newPromise(), promise));
2109 } else {
2110
2111 sslClosePromise.addListener(new FutureListener<Channel>() {
2112 @Override
2113 public void operationComplete(Future<Channel> future) {
2114 promise.setSuccess();
2115 }
2116 });
2117 }
2118 }
2119 }
2120
2121 private void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
2122 if (pendingUnencryptedWrites != null) {
2123 pendingUnencryptedWrites.add(Unpooled.EMPTY_BUFFER, promise);
2124 } else {
2125 promise.setFailure(newPendingWritesNullException());
2126 }
2127 flush(ctx);
2128 }
2129
2130 @Override
2131 public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
2132 this.ctx = ctx;
2133 Channel channel = ctx.channel();
2134 pendingUnencryptedWrites = new SslHandlerCoalescingBufferQueue(channel, 16, engineType.wantsDirectBuffer) {
2135 @Override
2136 protected int wrapDataSize() {
2137 return SslHandler.this.wrapDataSize;
2138 }
2139 };
2140
2141 setOpensslEngineSocketFd(channel);
2142 boolean fastOpen = Boolean.TRUE.equals(channel.config().getOption(ChannelOption.TCP_FASTOPEN_CONNECT));
2143 boolean active = channel.isActive();
2144 if (active || fastOpen) {
2145
2146
2147
2148 startHandshakeProcessing(active);
2149
2150
2151 final ChannelOutboundBuffer outboundBuffer;
2152 if (fastOpen && ((outboundBuffer = channel.unsafe().outboundBuffer()) == null ||
2153 outboundBuffer.totalPendingWriteBytes() > 0)) {
2154 setState(STATE_NEEDS_FLUSH);
2155 }
2156 }
2157 }
2158
2159 private void startHandshakeProcessing(boolean flushAtEnd) {
2160 if (!isStateSet(STATE_HANDSHAKE_STARTED)) {
2161 setState(STATE_HANDSHAKE_STARTED);
2162 if (engine.getUseClientMode()) {
2163
2164
2165
2166 handshake(flushAtEnd);
2167 }
2168 applyHandshakeTimeout();
2169 } else if (isStateSet(STATE_NEEDS_FLUSH)) {
2170 forceFlush(ctx);
2171 }
2172 }
2173
2174
2175
2176
2177 public Future<Channel> renegotiate() {
2178 ChannelHandlerContext ctx = this.ctx;
2179 if (ctx == null) {
2180 throw new IllegalStateException();
2181 }
2182
2183 return renegotiate(ctx.executor().<Channel>newPromise());
2184 }
2185
2186
2187
2188
2189 public Future<Channel> renegotiate(final Promise<Channel> promise) {
2190 ObjectUtil.checkNotNull(promise, "promise");
2191
2192 ChannelHandlerContext ctx = this.ctx;
2193 if (ctx == null) {
2194 throw new IllegalStateException();
2195 }
2196
2197 EventExecutor executor = ctx.executor();
2198 if (!executor.inEventLoop()) {
2199 executor.execute(new Runnable() {
2200 @Override
2201 public void run() {
2202 renegotiateOnEventLoop(promise);
2203 }
2204 });
2205 return promise;
2206 }
2207
2208 renegotiateOnEventLoop(promise);
2209 return promise;
2210 }
2211
2212 private void renegotiateOnEventLoop(final Promise<Channel> newHandshakePromise) {
2213 final Promise<Channel> oldHandshakePromise = handshakePromise;
2214 if (!oldHandshakePromise.isDone()) {
2215
2216
2217 PromiseNotifier.cascade(oldHandshakePromise, newHandshakePromise);
2218 } else {
2219 handshakePromise = newHandshakePromise;
2220 handshake(true);
2221 applyHandshakeTimeout();
2222 }
2223 }
2224
2225
2226
2227
2228
2229
2230
2231 private void handshake(boolean flushAtEnd) {
2232 if (engine.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING) {
2233
2234
2235 return;
2236 }
2237 if (handshakePromise.isDone()) {
2238
2239
2240
2241
2242
2243 return;
2244 }
2245
2246
2247 final ChannelHandlerContext ctx = this.ctx;
2248 try {
2249 engine.beginHandshake();
2250 wrapNonAppData(ctx, false);
2251 } catch (Throwable e) {
2252 setHandshakeFailure(ctx, e);
2253 } finally {
2254 if (flushAtEnd) {
2255 forceFlush(ctx);
2256 }
2257 }
2258 }
2259
2260 private void applyHandshakeTimeout() {
2261 final Promise<Channel> localHandshakePromise = this.handshakePromise;
2262
2263
2264 final long handshakeTimeoutMillis = this.handshakeTimeoutMillis;
2265 if (handshakeTimeoutMillis <= 0 || localHandshakePromise.isDone()) {
2266 return;
2267 }
2268
2269 final Future<?> timeoutFuture = ctx.executor().schedule(new Runnable() {
2270 @Override
2271 public void run() {
2272 if (localHandshakePromise.isDone()) {
2273 return;
2274 }
2275 SSLException exception =
2276 new SslHandshakeTimeoutException("handshake timed out after " + handshakeTimeoutMillis + "ms");
2277 try {
2278 if (localHandshakePromise.tryFailure(exception)) {
2279 SslUtils.handleHandshakeFailure(ctx, exception, true);
2280 }
2281 } finally {
2282 releaseAndFailAll(ctx, exception);
2283 }
2284 }
2285 }, handshakeTimeoutMillis, TimeUnit.MILLISECONDS);
2286
2287
2288 localHandshakePromise.addListener(new FutureListener<Channel>() {
2289 @Override
2290 public void operationComplete(Future<Channel> f) throws Exception {
2291 timeoutFuture.cancel(false);
2292 }
2293 });
2294 }
2295
2296 private void forceFlush(ChannelHandlerContext ctx) {
2297 clearState(STATE_NEEDS_FLUSH);
2298 ctx.flush();
2299 }
2300
2301 private void setOpensslEngineSocketFd(Channel c) {
2302 if (c instanceof UnixChannel && engine instanceof ReferenceCountedOpenSslEngine) {
2303 ((ReferenceCountedOpenSslEngine) engine).bioSetFd(((UnixChannel) c).fd().intValue());
2304 }
2305 }
2306
2307
2308
2309
2310 @Override
2311 public void channelActive(final ChannelHandlerContext ctx) throws Exception {
2312 setOpensslEngineSocketFd(ctx.channel());
2313 if (!startTls) {
2314 startHandshakeProcessing(true);
2315 }
2316 ctx.fireChannelActive();
2317 }
2318
2319 private void safeClose(
2320 final ChannelHandlerContext ctx, final ChannelFuture flushFuture,
2321 final ChannelPromise promise) {
2322 if (!ctx.channel().isActive()) {
2323 ctx.close(promise);
2324 return;
2325 }
2326
2327 final Future<?> timeoutFuture;
2328 if (!flushFuture.isDone()) {
2329 long closeNotifyTimeout = closeNotifyFlushTimeoutMillis;
2330 if (closeNotifyTimeout > 0) {
2331
2332 timeoutFuture = ctx.executor().schedule(new Runnable() {
2333 @Override
2334 public void run() {
2335
2336 if (!flushFuture.isDone()) {
2337 logger.warn("{} Last write attempt timed out; force-closing the connection.",
2338 ctx.channel());
2339 addCloseListener(ctx.close(ctx.newPromise()), promise);
2340 }
2341 }
2342 }, closeNotifyTimeout, TimeUnit.MILLISECONDS);
2343 } else {
2344 timeoutFuture = null;
2345 }
2346 } else {
2347 timeoutFuture = null;
2348 }
2349
2350
2351 flushFuture.addListener(new ChannelFutureListener() {
2352 @Override
2353 public void operationComplete(ChannelFuture f) {
2354 if (timeoutFuture != null) {
2355 timeoutFuture.cancel(false);
2356 }
2357 final long closeNotifyReadTimeout = closeNotifyReadTimeoutMillis;
2358 if (closeNotifyReadTimeout <= 0) {
2359
2360
2361 addCloseListener(ctx.close(ctx.newPromise()), promise);
2362 } else {
2363 final Future<?> closeNotifyReadTimeoutFuture;
2364
2365 if (!sslClosePromise.isDone()) {
2366 closeNotifyReadTimeoutFuture = ctx.executor().schedule(new Runnable() {
2367 @Override
2368 public void run() {
2369 if (!sslClosePromise.isDone()) {
2370 logger.debug(
2371 "{} did not receive close_notify in {}ms; force-closing the connection.",
2372 ctx.channel(), closeNotifyReadTimeout);
2373
2374
2375 addCloseListener(ctx.close(ctx.newPromise()), promise);
2376 }
2377 }
2378 }, closeNotifyReadTimeout, TimeUnit.MILLISECONDS);
2379 } else {
2380 closeNotifyReadTimeoutFuture = null;
2381 }
2382
2383
2384 sslClosePromise.addListener(new FutureListener<Channel>() {
2385 @Override
2386 public void operationComplete(Future<Channel> future) throws Exception {
2387 if (closeNotifyReadTimeoutFuture != null) {
2388 closeNotifyReadTimeoutFuture.cancel(false);
2389 }
2390 addCloseListener(ctx.close(ctx.newPromise()), promise);
2391 }
2392 });
2393 }
2394 }
2395 });
2396 }
2397
2398 private static void addCloseListener(ChannelFuture future, ChannelPromise promise) {
2399
2400
2401
2402
2403
2404
2405 PromiseNotifier.cascade(false, future, promise);
2406 }
2407
2408
2409
2410
2411
2412 private ByteBuf allocate(ChannelHandlerContext ctx, int capacity) {
2413 ByteBufAllocator alloc = ctx.alloc();
2414 if (engineType.wantsDirectBuffer) {
2415 return alloc.directBuffer(capacity);
2416 } else {
2417 return alloc.buffer(capacity);
2418 }
2419 }
2420
2421
2422
2423
2424
2425 private ByteBuf allocateOutNetBuf(ChannelHandlerContext ctx, int pendingBytes, int numComponents) {
2426 return engineType.allocateWrapBuffer(this, ctx.alloc(), pendingBytes, numComponents);
2427 }
2428
2429 private boolean isStateSet(int bit) {
2430 return (state & bit) == bit;
2431 }
2432
2433 private void setState(int bit) {
2434 state |= bit;
2435 }
2436
2437 private void clearState(int bit) {
2438 state &= ~bit;
2439 }
2440
2441 private final class LazyChannelPromise extends DefaultPromise<Channel> {
2442
2443 @Override
2444 protected EventExecutor executor() {
2445 if (ctx == null) {
2446 throw new IllegalStateException();
2447 }
2448 return ctx.executor();
2449 }
2450
2451 @Override
2452 protected void checkDeadLock() {
2453 if (ctx == null) {
2454
2455
2456
2457
2458
2459
2460 return;
2461 }
2462 super.checkDeadLock();
2463 }
2464 }
2465 }