1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.ssl;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.ByteBufUtil;
21 import io.netty.buffer.CompositeByteBuf;
22 import io.netty.buffer.Unpooled;
23 import io.netty.channel.Channel;
24 import io.netty.channel.ChannelConfig;
25 import io.netty.channel.ChannelException;
26 import io.netty.channel.ChannelFuture;
27 import io.netty.channel.ChannelFutureListener;
28 import io.netty.channel.ChannelHandlerContext;
29 import io.netty.channel.ChannelInboundHandler;
30 import io.netty.channel.ChannelOption;
31 import io.netty.channel.ChannelOutboundBuffer;
32 import io.netty.channel.ChannelOutboundHandler;
33 import io.netty.channel.ChannelPipeline;
34 import io.netty.channel.ChannelPromise;
35 import io.netty.channel.unix.UnixChannel;
36 import io.netty.handler.codec.ByteToMessageDecoder;
37 import io.netty.handler.codec.DecoderException;
38 import io.netty.handler.codec.UnsupportedMessageTypeException;
39 import io.netty.util.ReferenceCountUtil;
40 import io.netty.util.concurrent.DefaultPromise;
41 import io.netty.util.concurrent.EventExecutor;
42 import io.netty.util.concurrent.Future;
43 import io.netty.util.concurrent.FutureListener;
44 import io.netty.util.concurrent.ImmediateExecutor;
45 import io.netty.util.concurrent.Promise;
46 import io.netty.util.concurrent.PromiseNotifier;
47 import io.netty.util.internal.ObjectUtil;
48 import io.netty.util.internal.PlatformDependent;
49 import io.netty.util.internal.ThrowableUtil;
50 import io.netty.util.internal.UnstableApi;
51 import io.netty.util.internal.logging.InternalLogger;
52 import io.netty.util.internal.logging.InternalLoggerFactory;
53
54 import java.io.IOException;
55 import java.net.SocketAddress;
56 import java.nio.ByteBuffer;
57 import java.nio.channels.ClosedChannelException;
58 import java.nio.channels.DatagramChannel;
59 import java.nio.channels.SocketChannel;
60 import java.security.cert.CertificateException;
61 import java.util.List;
62 import java.util.concurrent.Executor;
63 import java.util.concurrent.RejectedExecutionException;
64 import java.util.concurrent.TimeUnit;
65 import java.util.regex.Pattern;
66
67 import javax.net.ssl.SSLEngine;
68 import javax.net.ssl.SSLEngineResult;
69 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
70 import javax.net.ssl.SSLEngineResult.Status;
71 import javax.net.ssl.SSLException;
72 import javax.net.ssl.SSLHandshakeException;
73 import javax.net.ssl.SSLSession;
74
75 import static io.netty.handler.ssl.SslUtils.NOT_ENOUGH_DATA;
76 import static io.netty.handler.ssl.SslUtils.getEncryptedPacketLength;
77 import static io.netty.util.internal.ObjectUtil.checkNotNull;
78 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170 public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundHandler {
171 private static final InternalLogger logger =
172 InternalLoggerFactory.getInstance(SslHandler.class);
173 private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile(
174 "^.*(?:connection.*(?:reset|closed|abort|broken)|broken.*pipe).*$", Pattern.CASE_INSENSITIVE);
175 private static final int STATE_SENT_FIRST_MESSAGE = 1;
176 private static final int STATE_FLUSHED_BEFORE_HANDSHAKE = 1 << 1;
177 private static final int STATE_READ_DURING_HANDSHAKE = 1 << 2;
178 private static final int STATE_HANDSHAKE_STARTED = 1 << 3;
179
180
181
182
183 private static final int STATE_NEEDS_FLUSH = 1 << 4;
184 private static final int STATE_OUTBOUND_CLOSED = 1 << 5;
185 private static final int STATE_CLOSE_NOTIFY = 1 << 6;
186 private static final int STATE_PROCESS_TASK = 1 << 7;
187
188
189
190
191 private static final int STATE_FIRE_CHANNEL_READ = 1 << 8;
192 private static final int STATE_UNWRAP_REENTRY = 1 << 9;
193
194
195
196
197
198 private static final int MAX_PLAINTEXT_LENGTH = 16 * 1024;
199
200 private enum SslEngineType {
201 TCNATIVE(true, COMPOSITE_CUMULATOR) {
202 @Override
203 SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException {
204 int nioBufferCount = in.nioBufferCount();
205 int writerIndex = out.writerIndex();
206 final SSLEngineResult result;
207 if (nioBufferCount > 1) {
208
209
210
211
212
213 ReferenceCountedOpenSslEngine opensslEngine = (ReferenceCountedOpenSslEngine) handler.engine;
214 try {
215 handler.singleBuffer[0] = toByteBuffer(out, writerIndex, out.writableBytes());
216 result = opensslEngine.unwrap(in.nioBuffers(in.readerIndex(), len), handler.singleBuffer);
217 } finally {
218 handler.singleBuffer[0] = null;
219 }
220 } else {
221 result = handler.engine.unwrap(toByteBuffer(in, in.readerIndex(), len),
222 toByteBuffer(out, writerIndex, out.writableBytes()));
223 }
224 out.writerIndex(writerIndex + result.bytesProduced());
225 return result;
226 }
227
228 @Override
229 ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
230 int pendingBytes, int numComponents) {
231 return allocator.directBuffer(((ReferenceCountedOpenSslEngine) handler.engine)
232 .calculateOutNetBufSize(pendingBytes, numComponents));
233 }
234
235 @Override
236 int calculateRequiredOutBufSpace(SslHandler handler, int pendingBytes, int numComponents) {
237 return ((ReferenceCountedOpenSslEngine) handler.engine)
238 .calculateMaxLengthForWrap(pendingBytes, numComponents);
239 }
240
241 @Override
242 int calculatePendingData(SslHandler handler, int guess) {
243 int sslPending = ((ReferenceCountedOpenSslEngine) handler.engine).sslPending();
244 return sslPending > 0 ? sslPending : guess;
245 }
246
247 @Override
248 boolean jdkCompatibilityMode(SSLEngine engine) {
249 return ((ReferenceCountedOpenSslEngine) engine).jdkCompatibilityMode;
250 }
251 },
252 CONSCRYPT(true, COMPOSITE_CUMULATOR) {
253 @Override
254 SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException {
255 int nioBufferCount = in.nioBufferCount();
256 int writerIndex = out.writerIndex();
257 final SSLEngineResult result;
258 if (nioBufferCount > 1) {
259
260
261
262 try {
263 handler.singleBuffer[0] = toByteBuffer(out, writerIndex, out.writableBytes());
264 result = ((ConscryptAlpnSslEngine) handler.engine).unwrap(
265 in.nioBuffers(in.readerIndex(), len),
266 handler.singleBuffer);
267 } finally {
268 handler.singleBuffer[0] = null;
269 }
270 } else {
271 result = handler.engine.unwrap(toByteBuffer(in, in.readerIndex(), len),
272 toByteBuffer(out, writerIndex, out.writableBytes()));
273 }
274 out.writerIndex(writerIndex + result.bytesProduced());
275 return result;
276 }
277
278 @Override
279 ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
280 int pendingBytes, int numComponents) {
281 return allocator.directBuffer(
282 ((ConscryptAlpnSslEngine) handler.engine).calculateOutNetBufSize(pendingBytes, numComponents));
283 }
284
285 @Override
286 int calculateRequiredOutBufSpace(SslHandler handler, int pendingBytes, int numComponents) {
287 return ((ConscryptAlpnSslEngine) handler.engine)
288 .calculateRequiredOutBufSpace(pendingBytes, numComponents);
289 }
290
291 @Override
292 int calculatePendingData(SslHandler handler, int guess) {
293 return guess;
294 }
295
296 @Override
297 boolean jdkCompatibilityMode(SSLEngine engine) {
298 return true;
299 }
300 },
301 JDK(false, MERGE_CUMULATOR) {
302 @Override
303 SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException {
304 int writerIndex = out.writerIndex();
305 ByteBuffer inNioBuffer = getUnwrapInputBuffer(handler, toByteBuffer(in, in.readerIndex(), len));
306 int position = inNioBuffer.position();
307 final SSLEngineResult result = handler.engine.unwrap(inNioBuffer,
308 toByteBuffer(out, writerIndex, out.writableBytes()));
309 out.writerIndex(writerIndex + result.bytesProduced());
310
311
312
313
314
315
316
317 if (result.bytesConsumed() == 0) {
318 int consumed = inNioBuffer.position() - position;
319 if (consumed != result.bytesConsumed()) {
320
321 return new SSLEngineResult(
322 result.getStatus(), result.getHandshakeStatus(), consumed, result.bytesProduced());
323 }
324 }
325 return result;
326 }
327
328 private ByteBuffer getUnwrapInputBuffer(SslHandler handler, ByteBuffer inNioBuffer) {
329 int javaVersion = PlatformDependent.javaVersion();
330 if (javaVersion >= 22 && javaVersion < 25 && inNioBuffer.isDirect()) {
331
332 int remaining = inNioBuffer.remaining();
333 ByteBuffer copy = handler.unwrapInputCopy;
334 if (copy == null || copy.capacity() < remaining) {
335 handler.unwrapInputCopy = copy = ByteBuffer.allocate(remaining);
336 } else {
337 copy.clear();
338 }
339 copy.put(inNioBuffer).flip();
340 return copy;
341 }
342 return inNioBuffer;
343 }
344
345 @Override
346 ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
347 int pendingBytes, int numComponents) {
348
349
350 return allocator.heapBuffer(Math.max(pendingBytes, handler.engine.getSession().getPacketBufferSize()));
351 }
352
353 @Override
354 int calculateRequiredOutBufSpace(SslHandler handler, int pendingBytes, int numComponents) {
355
356
357
358
359
360
361 return handler.engine.getSession().getPacketBufferSize();
362 }
363
364 @Override
365 int calculatePendingData(SslHandler handler, int guess) {
366 return guess;
367 }
368
369 @Override
370 boolean jdkCompatibilityMode(SSLEngine engine) {
371 return true;
372 }
373 };
374
375 static SslEngineType forEngine(SSLEngine engine) {
376 return engine instanceof ReferenceCountedOpenSslEngine ? TCNATIVE :
377 engine instanceof ConscryptAlpnSslEngine ? CONSCRYPT : JDK;
378 }
379
380 SslEngineType(boolean wantsDirectBuffer, Cumulator cumulator) {
381 this.wantsDirectBuffer = wantsDirectBuffer;
382 this.cumulator = cumulator;
383 }
384
385 abstract SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException;
386
387 abstract int calculatePendingData(SslHandler handler, int guess);
388
389 abstract boolean jdkCompatibilityMode(SSLEngine engine);
390
391 abstract ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
392 int pendingBytes, int numComponents);
393
394 abstract int calculateRequiredOutBufSpace(SslHandler handler, int pendingBytes, int numComponents);
395
396
397
398
399
400
401
402 final boolean wantsDirectBuffer;
403
404
405
406
407
408
409
410
411
412
413
414 final Cumulator cumulator;
415 }
416
417 private volatile ChannelHandlerContext ctx;
418 private final SSLEngine engine;
419 private final SslEngineType engineType;
420 private final Executor delegatedTaskExecutor;
421 private final boolean jdkCompatibilityMode;
422
423
424
425
426
427
428 private final ByteBuffer[] singleBuffer = new ByteBuffer[1];
429 private ByteBuffer unwrapInputCopy;
430
431 private final boolean startTls;
432 private final ResumptionController resumptionController;
433
434 private final SslTasksRunner sslTaskRunnerForUnwrap = new SslTasksRunner(true);
435 private final SslTasksRunner sslTaskRunner = new SslTasksRunner(false);
436
437 private SslHandlerCoalescingBufferQueue pendingUnencryptedWrites;
438 private Promise<Channel> handshakePromise = new LazyChannelPromise();
439 private final LazyChannelPromise sslClosePromise = new LazyChannelPromise();
440
441 private int packetLength;
442 private short state;
443
444 private volatile long handshakeTimeoutMillis = 10000;
445 private volatile long closeNotifyFlushTimeoutMillis = 3000;
446 private volatile long closeNotifyReadTimeoutMillis;
447 volatile int wrapDataSize = MAX_PLAINTEXT_LENGTH;
448
449
450
451
452
453
454 public SslHandler(SSLEngine engine) {
455 this(engine, false);
456 }
457
458
459
460
461
462
463
464
465 public SslHandler(SSLEngine engine, boolean startTls) {
466 this(engine, startTls, ImmediateExecutor.INSTANCE);
467 }
468
469
470
471
472
473
474
475
476 public SslHandler(SSLEngine engine, Executor delegatedTaskExecutor) {
477 this(engine, false, delegatedTaskExecutor);
478 }
479
480
481
482
483
484
485
486
487
488
489 public SslHandler(SSLEngine engine, boolean startTls, Executor delegatedTaskExecutor) {
490 this(engine, startTls, delegatedTaskExecutor, null);
491 }
492
493 SslHandler(SSLEngine engine, boolean startTls, Executor delegatedTaskExecutor,
494 ResumptionController resumptionController) {
495 this.engine = ObjectUtil.checkNotNull(engine, "engine");
496 this.delegatedTaskExecutor = ObjectUtil.checkNotNull(delegatedTaskExecutor, "delegatedTaskExecutor");
497 engineType = SslEngineType.forEngine(engine);
498 this.startTls = startTls;
499 this.jdkCompatibilityMode = engineType.jdkCompatibilityMode(engine);
500 setCumulator(engineType.cumulator);
501 this.resumptionController = resumptionController;
502 }
503
504 public long getHandshakeTimeoutMillis() {
505 return handshakeTimeoutMillis;
506 }
507
508 public void setHandshakeTimeout(long handshakeTimeout, TimeUnit unit) {
509 checkNotNull(unit, "unit");
510 setHandshakeTimeoutMillis(unit.toMillis(handshakeTimeout));
511 }
512
513 public void setHandshakeTimeoutMillis(long handshakeTimeoutMillis) {
514 this.handshakeTimeoutMillis = checkPositiveOrZero(handshakeTimeoutMillis, "handshakeTimeoutMillis");
515 }
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537 @UnstableApi
538 public final void setWrapDataSize(int wrapDataSize) {
539 this.wrapDataSize = wrapDataSize;
540 }
541
542
543
544
545 @Deprecated
546 public long getCloseNotifyTimeoutMillis() {
547 return getCloseNotifyFlushTimeoutMillis();
548 }
549
550
551
552
553 @Deprecated
554 public void setCloseNotifyTimeout(long closeNotifyTimeout, TimeUnit unit) {
555 setCloseNotifyFlushTimeout(closeNotifyTimeout, unit);
556 }
557
558
559
560
561 @Deprecated
562 public void setCloseNotifyTimeoutMillis(long closeNotifyFlushTimeoutMillis) {
563 setCloseNotifyFlushTimeoutMillis(closeNotifyFlushTimeoutMillis);
564 }
565
566
567
568
569
570
571 public final long getCloseNotifyFlushTimeoutMillis() {
572 return closeNotifyFlushTimeoutMillis;
573 }
574
575
576
577
578
579
580 public final void setCloseNotifyFlushTimeout(long closeNotifyFlushTimeout, TimeUnit unit) {
581 setCloseNotifyFlushTimeoutMillis(unit.toMillis(closeNotifyFlushTimeout));
582 }
583
584
585
586
587 public final void setCloseNotifyFlushTimeoutMillis(long closeNotifyFlushTimeoutMillis) {
588 this.closeNotifyFlushTimeoutMillis = checkPositiveOrZero(closeNotifyFlushTimeoutMillis,
589 "closeNotifyFlushTimeoutMillis");
590 }
591
592
593
594
595
596
597 public final long getCloseNotifyReadTimeoutMillis() {
598 return closeNotifyReadTimeoutMillis;
599 }
600
601
602
603
604
605
606 public final void setCloseNotifyReadTimeout(long closeNotifyReadTimeout, TimeUnit unit) {
607 setCloseNotifyReadTimeoutMillis(unit.toMillis(closeNotifyReadTimeout));
608 }
609
610
611
612
613 public final void setCloseNotifyReadTimeoutMillis(long closeNotifyReadTimeoutMillis) {
614 this.closeNotifyReadTimeoutMillis = checkPositiveOrZero(closeNotifyReadTimeoutMillis,
615 "closeNotifyReadTimeoutMillis");
616 }
617
618
619
620
621 public SSLEngine engine() {
622 return engine;
623 }
624
625
626
627
628
629
630 public String applicationProtocol() {
631 SSLEngine engine = engine();
632 if (!(engine instanceof ApplicationProtocolAccessor)) {
633 return null;
634 }
635
636 return ((ApplicationProtocolAccessor) engine).getNegotiatedApplicationProtocol();
637 }
638
639
640
641
642
643
644
645 public Future<Channel> handshakeFuture() {
646 return handshakePromise;
647 }
648
649
650
651
652 @Deprecated
653 public ChannelFuture close() {
654 return closeOutbound();
655 }
656
657
658
659
660 @Deprecated
661 public ChannelFuture close(ChannelPromise promise) {
662 return closeOutbound(promise);
663 }
664
665
666
667
668
669
670
671 public ChannelFuture closeOutbound() {
672 return closeOutbound(ctx.newPromise());
673 }
674
675
676
677
678
679
680
681 public ChannelFuture closeOutbound(final ChannelPromise promise) {
682 final ChannelHandlerContext ctx = this.ctx;
683 if (ctx.executor().inEventLoop()) {
684 closeOutbound0(promise);
685 } else {
686 ctx.executor().execute(new Runnable() {
687 @Override
688 public void run() {
689 closeOutbound0(promise);
690 }
691 });
692 }
693 return promise;
694 }
695
696 private void closeOutbound0(ChannelPromise promise) {
697 setState(STATE_OUTBOUND_CLOSED);
698 engine.closeOutbound();
699 try {
700 flush(ctx, promise);
701 } catch (Exception e) {
702 if (!promise.tryFailure(e)) {
703 logger.warn("{} flush() raised a masked exception.", ctx.channel(), e);
704 }
705 }
706 }
707
708
709
710
711
712
713
714
715 public Future<Channel> sslCloseFuture() {
716 return sslClosePromise;
717 }
718
719 @Override
720 public void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
721 try {
722 if (pendingUnencryptedWrites != null && !pendingUnencryptedWrites.isEmpty()) {
723
724 pendingUnencryptedWrites.releaseAndFailAll(ctx,
725 new ChannelException("Pending write on removal of SslHandler"));
726 }
727 pendingUnencryptedWrites = null;
728
729 SSLException cause = null;
730
731
732
733
734 if (!handshakePromise.isDone()) {
735 cause = new SSLHandshakeException("SslHandler removed before handshake completed");
736 if (handshakePromise.tryFailure(cause)) {
737 ctx.fireUserEventTriggered(new SslHandshakeCompletionEvent(cause));
738 }
739 }
740 if (!sslClosePromise.isDone()) {
741 if (cause == null) {
742 cause = new SSLException("SslHandler removed before SSLEngine was closed");
743 }
744 notifyClosePromise(cause);
745 }
746 } finally {
747 ReferenceCountUtil.release(engine);
748 }
749 }
750
751 @Override
752 public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
753 ctx.bind(localAddress, promise);
754 }
755
756 @Override
757 public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
758 ChannelPromise promise) throws Exception {
759 ctx.connect(remoteAddress, localAddress, promise);
760 }
761
762 @Override
763 public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
764 ctx.deregister(promise);
765 }
766
767 @Override
768 public void disconnect(final ChannelHandlerContext ctx,
769 final ChannelPromise promise) throws Exception {
770 closeOutboundAndChannel(ctx, promise, true);
771 }
772
773 @Override
774 public void close(final ChannelHandlerContext ctx,
775 final ChannelPromise promise) throws Exception {
776 closeOutboundAndChannel(ctx, promise, false);
777 }
778
779 @Override
780 public void read(ChannelHandlerContext ctx) throws Exception {
781 if (!handshakePromise.isDone()) {
782 setState(STATE_READ_DURING_HANDSHAKE);
783 }
784
785 ctx.read();
786 }
787
788 private static IllegalStateException newPendingWritesNullException() {
789 return new IllegalStateException("pendingUnencryptedWrites is null, handlerRemoved0 called?");
790 }
791
792 @Override
793 public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
794 if (!(msg instanceof ByteBuf)) {
795 UnsupportedMessageTypeException exception = new UnsupportedMessageTypeException(msg, ByteBuf.class);
796 ReferenceCountUtil.safeRelease(msg);
797 promise.setFailure(exception);
798 } else if (pendingUnencryptedWrites == null) {
799 ReferenceCountUtil.safeRelease(msg);
800 promise.setFailure(newPendingWritesNullException());
801 } else {
802 pendingUnencryptedWrites.add((ByteBuf) msg, promise);
803 }
804 }
805
806 @Override
807 public void flush(ChannelHandlerContext ctx) throws Exception {
808
809
810 if (startTls && !isStateSet(STATE_SENT_FIRST_MESSAGE)) {
811 setState(STATE_SENT_FIRST_MESSAGE);
812 pendingUnencryptedWrites.writeAndRemoveAll(ctx);
813 forceFlush(ctx);
814
815
816 startHandshakeProcessing(true);
817 return;
818 }
819
820 if (isStateSet(STATE_PROCESS_TASK)) {
821 return;
822 }
823
824 try {
825 wrapAndFlush(ctx);
826 } catch (Throwable cause) {
827 setHandshakeFailure(ctx, cause);
828 PlatformDependent.throwException(cause);
829 }
830 }
831
832 private void wrapAndFlush(ChannelHandlerContext ctx) throws SSLException {
833 if (pendingUnencryptedWrites.isEmpty()) {
834
835
836
837
838 pendingUnencryptedWrites.add(Unpooled.EMPTY_BUFFER, ctx.newPromise());
839 }
840 if (!handshakePromise.isDone()) {
841 setState(STATE_FLUSHED_BEFORE_HANDSHAKE);
842 }
843 try {
844 wrap(ctx, false);
845 } finally {
846
847
848 forceFlush(ctx);
849 }
850 }
851
852
853 private void wrap(ChannelHandlerContext ctx, boolean inUnwrap) throws SSLException {
854 ByteBuf out = null;
855 ByteBufAllocator alloc = ctx.alloc();
856 try {
857 final int wrapDataSize = this.wrapDataSize;
858
859
860 outer: while (!ctx.isRemoved()) {
861 ChannelPromise promise = ctx.newPromise();
862 ByteBuf buf = wrapDataSize > 0 ?
863 pendingUnencryptedWrites.remove(alloc, wrapDataSize, promise) :
864 pendingUnencryptedWrites.removeFirst(promise);
865 if (buf == null) {
866 break;
867 }
868
869 SSLEngineResult result;
870
871 try {
872 if (buf.readableBytes() > MAX_PLAINTEXT_LENGTH) {
873
874
875
876
877
878 int readableBytes = buf.readableBytes();
879 int numPackets = readableBytes / MAX_PLAINTEXT_LENGTH;
880 if (readableBytes % MAX_PLAINTEXT_LENGTH != 0) {
881 numPackets += 1;
882 }
883
884 if (out == null) {
885 out = allocateOutNetBuf(ctx, readableBytes, buf.nioBufferCount() + numPackets);
886 }
887 result = wrapMultiple(alloc, engine, buf, out);
888 } else {
889 if (out == null) {
890 out = allocateOutNetBuf(ctx, buf.readableBytes(), buf.nioBufferCount());
891 }
892 result = wrap(alloc, engine, buf, out);
893 }
894 } catch (SSLException e) {
895
896
897
898
899
900
901 buf.release();
902 promise.setFailure(e);
903 throw e;
904 }
905
906 if (buf.isReadable()) {
907 pendingUnencryptedWrites.addFirst(buf, promise);
908
909
910 promise = null;
911 } else {
912 buf.release();
913 }
914
915
916
917 if (out.isReadable()) {
918 final ByteBuf b = out;
919 out = null;
920 if (promise != null) {
921 ctx.write(b, promise);
922 } else {
923 ctx.write(b);
924 }
925 } else if (promise != null) {
926 ctx.write(Unpooled.EMPTY_BUFFER, promise);
927 }
928
929
930 if (result.getStatus() == Status.CLOSED) {
931
932
933 if (!pendingUnencryptedWrites.isEmpty()) {
934
935
936 Throwable exception = handshakePromise.cause();
937 if (exception == null) {
938 exception = sslClosePromise.cause();
939 if (exception == null) {
940 exception = new SslClosedEngineException("SSLEngine closed already");
941 }
942 }
943 pendingUnencryptedWrites.releaseAndFailAll(ctx, exception);
944 }
945
946 return;
947 } else {
948 switch (result.getHandshakeStatus()) {
949 case NEED_TASK:
950 if (!runDelegatedTasks(inUnwrap)) {
951
952
953 break outer;
954 }
955 break;
956 case FINISHED:
957 case NOT_HANDSHAKING:
958 setHandshakeSuccess();
959 break;
960 case NEED_WRAP:
961
962
963
964
965 if (result.bytesProduced() > 0 && pendingUnencryptedWrites.isEmpty()) {
966 pendingUnencryptedWrites.add(Unpooled.EMPTY_BUFFER);
967 }
968 break;
969 case NEED_UNWRAP:
970
971
972 readIfNeeded(ctx);
973 return;
974 default:
975 throw new IllegalStateException(
976 "Unknown handshake status: " + result.getHandshakeStatus());
977 }
978 }
979 }
980 } finally {
981 if (out != null) {
982 out.release();
983 }
984 if (inUnwrap) {
985 setState(STATE_NEEDS_FLUSH);
986 }
987 }
988 }
989
990
991
992
993
994
995
996 private boolean wrapNonAppData(final ChannelHandlerContext ctx, boolean inUnwrap) throws SSLException {
997 ByteBuf out = null;
998 ByteBufAllocator alloc = ctx.alloc();
999 try {
1000
1001
1002 outer: while (!ctx.isRemoved()) {
1003 if (out == null) {
1004
1005
1006
1007 out = allocateOutNetBuf(ctx, 2048, 1);
1008 }
1009 SSLEngineResult result = wrap(alloc, engine, Unpooled.EMPTY_BUFFER, out);
1010 if (result.bytesProduced() > 0) {
1011 ctx.write(out).addListener(new ChannelFutureListener() {
1012 @Override
1013 public void operationComplete(ChannelFuture future) {
1014 Throwable cause = future.cause();
1015 if (cause != null) {
1016 setHandshakeFailureTransportFailure(ctx, cause);
1017 }
1018 }
1019 });
1020 if (inUnwrap) {
1021 setState(STATE_NEEDS_FLUSH);
1022 }
1023 out = null;
1024 }
1025
1026 HandshakeStatus status = result.getHandshakeStatus();
1027 switch (status) {
1028 case FINISHED:
1029
1030
1031
1032
1033 if (setHandshakeSuccess() && inUnwrap && !pendingUnencryptedWrites.isEmpty()) {
1034 wrap(ctx, true);
1035 }
1036 return false;
1037 case NEED_TASK:
1038 if (!runDelegatedTasks(inUnwrap)) {
1039
1040
1041 break outer;
1042 }
1043 break;
1044 case NEED_UNWRAP:
1045 if (inUnwrap || unwrapNonAppData(ctx) <= 0) {
1046
1047
1048
1049 return false;
1050 }
1051 break;
1052 case NEED_WRAP:
1053 break;
1054 case NOT_HANDSHAKING:
1055 if (setHandshakeSuccess() && inUnwrap && !pendingUnencryptedWrites.isEmpty()) {
1056 wrap(ctx, true);
1057 }
1058
1059
1060 if (!inUnwrap) {
1061 unwrapNonAppData(ctx);
1062 }
1063 return true;
1064 default:
1065 throw new IllegalStateException("Unknown handshake status: " + result.getHandshakeStatus());
1066 }
1067
1068
1069
1070 if (result.bytesProduced() == 0 && status != HandshakeStatus.NEED_TASK) {
1071 break;
1072 }
1073
1074
1075
1076 if (result.bytesConsumed() == 0 && result.getHandshakeStatus() == HandshakeStatus.NOT_HANDSHAKING) {
1077 break;
1078 }
1079 }
1080 } finally {
1081 if (out != null) {
1082 out.release();
1083 }
1084 }
1085 return false;
1086 }
1087
1088 private SSLEngineResult wrapMultiple(ByteBufAllocator alloc, SSLEngine engine, ByteBuf in, ByteBuf out)
1089 throws SSLException {
1090 SSLEngineResult result = null;
1091
1092 do {
1093 int nextSliceSize = Math.min(MAX_PLAINTEXT_LENGTH, in.readableBytes());
1094
1095
1096 int nextOutSize = engineType.calculateRequiredOutBufSpace(this, nextSliceSize, in.nioBufferCount());
1097
1098 if (!out.isWritable(nextOutSize)) {
1099 if (result != null) {
1100
1101
1102 break;
1103 }
1104
1105
1106 out.ensureWritable(nextOutSize);
1107 }
1108
1109 ByteBuf wrapBuf = in.readSlice(nextSliceSize);
1110 result = wrap(alloc, engine, wrapBuf, out);
1111
1112 if (result.getStatus() == Status.CLOSED) {
1113
1114
1115 break;
1116 }
1117
1118 if (wrapBuf.isReadable()) {
1119
1120
1121 in.readerIndex(in.readerIndex() - wrapBuf.readableBytes());
1122 }
1123 } while (in.readableBytes() > 0);
1124
1125 return result;
1126 }
1127
1128 private SSLEngineResult wrap(ByteBufAllocator alloc, SSLEngine engine, ByteBuf in, ByteBuf out)
1129 throws SSLException {
1130 ByteBuf newDirectIn = null;
1131 try {
1132 int readerIndex = in.readerIndex();
1133 int readableBytes = in.readableBytes();
1134
1135
1136
1137 final ByteBuffer[] in0;
1138 if (in.isDirect() || !engineType.wantsDirectBuffer) {
1139
1140
1141
1142
1143 if (!(in instanceof CompositeByteBuf) && in.nioBufferCount() == 1) {
1144 in0 = singleBuffer;
1145
1146
1147 in0[0] = in.internalNioBuffer(readerIndex, readableBytes);
1148 } else {
1149 in0 = in.nioBuffers();
1150 }
1151 } else {
1152
1153
1154
1155 newDirectIn = alloc.directBuffer(readableBytes);
1156 newDirectIn.writeBytes(in, readerIndex, readableBytes);
1157 in0 = singleBuffer;
1158 in0[0] = newDirectIn.internalNioBuffer(newDirectIn.readerIndex(), readableBytes);
1159 }
1160
1161 for (;;) {
1162
1163
1164 ByteBuffer out0 = toByteBuffer(out, out.writerIndex(), out.writableBytes());
1165 SSLEngineResult result = engine.wrap(in0, out0);
1166 in.skipBytes(result.bytesConsumed());
1167 out.writerIndex(out.writerIndex() + result.bytesProduced());
1168
1169 if (result.getStatus() == Status.BUFFER_OVERFLOW) {
1170 out.ensureWritable(engine.getSession().getPacketBufferSize());
1171 } else {
1172 return result;
1173 }
1174 }
1175 } finally {
1176
1177 singleBuffer[0] = null;
1178
1179 if (newDirectIn != null) {
1180 newDirectIn.release();
1181 }
1182 }
1183 }
1184
1185 @Override
1186 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
1187 boolean handshakeFailed = handshakePromise.cause() != null;
1188
1189
1190 ClosedChannelException exception = new ClosedChannelException();
1191
1192
1193 if (isStateSet(STATE_HANDSHAKE_STARTED) && !handshakePromise.isDone()) {
1194 ThrowableUtil.addSuppressed(exception, StacklessSSLHandshakeException.newInstance(
1195 "Connection closed while SSL/TLS handshake was in progress",
1196 SslHandler.class, "channelInactive"));
1197 }
1198
1199
1200
1201 setHandshakeFailure(ctx, exception, !isStateSet(STATE_OUTBOUND_CLOSED), isStateSet(STATE_HANDSHAKE_STARTED),
1202 false);
1203
1204
1205 notifyClosePromise(exception);
1206
1207 try {
1208 super.channelInactive(ctx);
1209 } catch (DecoderException e) {
1210 if (!handshakeFailed || !(e.getCause() instanceof SSLException)) {
1211
1212
1213
1214
1215
1216 throw e;
1217 }
1218 }
1219 }
1220
1221 @Override
1222 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
1223 if (ignoreException(cause)) {
1224
1225
1226 if (logger.isDebugEnabled()) {
1227 logger.debug(
1228 "{} Swallowing a harmless 'connection reset by peer / broken pipe' error that occurred " +
1229 "while writing close_notify in response to the peer's close_notify", ctx.channel(), cause);
1230 }
1231
1232
1233
1234 if (ctx.channel().isActive()) {
1235 ctx.close();
1236 }
1237 } else {
1238 ctx.fireExceptionCaught(cause);
1239 }
1240 }
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251 private boolean ignoreException(Throwable t) {
1252 if (!(t instanceof SSLException) && t instanceof IOException && sslClosePromise.isDone()) {
1253 String message = t.getMessage();
1254
1255
1256
1257 if (message != null && IGNORABLE_ERROR_MESSAGE.matcher(message).matches()) {
1258 return true;
1259 }
1260
1261
1262 StackTraceElement[] elements = t.getStackTrace();
1263 for (StackTraceElement element: elements) {
1264 String classname = element.getClassName();
1265 String methodname = element.getMethodName();
1266
1267
1268 if (classname.startsWith("io.netty.")) {
1269 continue;
1270 }
1271
1272
1273 if (!"read".equals(methodname)) {
1274 continue;
1275 }
1276
1277 if (isIgnorableClassInStack(classname)) {
1278 return true;
1279 }
1280
1281 try {
1282
1283
1284
1285 Class<?> clazz = PlatformDependent.getClassLoader(getClass()).loadClass(classname);
1286
1287 if (SocketChannel.class.isAssignableFrom(clazz)
1288 || DatagramChannel.class.isAssignableFrom(clazz)) {
1289 return true;
1290 }
1291
1292
1293 if ("com.sun.nio.sctp.SctpChannel".equals(clazz.getSuperclass().getName())) {
1294 return true;
1295 }
1296 } catch (Throwable cause) {
1297 if (logger.isDebugEnabled()) {
1298 logger.debug("Unexpected exception while loading class {} classname {}",
1299 getClass(), classname, cause);
1300 }
1301 }
1302 }
1303 }
1304
1305 return false;
1306 }
1307
1308 private static boolean isIgnorableClassInStack(String classname) {
1309 return classname.contains("SocketChannel") ||
1310 classname.contains("DatagramChannel") ||
1311 classname.contains("SctpChannel") ||
1312 classname.contains("UdtChannel");
1313 }
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328 @Deprecated
1329 public static boolean isEncrypted(ByteBuf buffer) {
1330 return isEncrypted(buffer, false);
1331 }
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349 public static boolean isEncrypted(ByteBuf buffer, boolean probeSSLv2) {
1350 if (buffer.readableBytes() < SslUtils.SSL_RECORD_HEADER_LENGTH) {
1351 throw new IllegalArgumentException(
1352 "buffer must have at least " + SslUtils.SSL_RECORD_HEADER_LENGTH + " readable bytes");
1353 }
1354 return getEncryptedPacketLength(buffer, buffer.readerIndex(), probeSSLv2) != SslUtils.NOT_ENCRYPTED;
1355 }
1356
1357 private void decodeJdkCompatible(ChannelHandlerContext ctx, ByteBuf in) throws NotSslRecordException {
1358 int packetLength = this.packetLength;
1359
1360 if (packetLength > 0) {
1361 if (in.readableBytes() < packetLength) {
1362 return;
1363 }
1364 } else {
1365
1366 final int readableBytes = in.readableBytes();
1367 if (readableBytes < SslUtils.SSL_RECORD_HEADER_LENGTH) {
1368 return;
1369 }
1370 packetLength = getEncryptedPacketLength(in, in.readerIndex(), true);
1371 if (packetLength == SslUtils.NOT_ENCRYPTED) {
1372
1373 NotSslRecordException e = new NotSslRecordException(
1374 "not an SSL/TLS record: " + ByteBufUtil.hexDump(in));
1375 in.skipBytes(in.readableBytes());
1376
1377
1378
1379 setHandshakeFailure(ctx, e);
1380
1381 throw e;
1382 }
1383 if (packetLength == NOT_ENOUGH_DATA) {
1384 return;
1385 }
1386 assert packetLength > 0;
1387 if (packetLength > readableBytes) {
1388
1389 this.packetLength = packetLength;
1390 return;
1391 }
1392 }
1393
1394
1395
1396 this.packetLength = 0;
1397 try {
1398 final int bytesConsumed = unwrap(ctx, in, packetLength);
1399 if (bytesConsumed != packetLength && !engine.isInboundDone()) {
1400
1401
1402 throw new NotSslRecordException();
1403 }
1404 } catch (Throwable cause) {
1405 handleUnwrapThrowable(ctx, cause);
1406 }
1407 }
1408
1409 private void decodeNonJdkCompatible(ChannelHandlerContext ctx, ByteBuf in) {
1410 try {
1411 unwrap(ctx, in, in.readableBytes());
1412 } catch (Throwable cause) {
1413 handleUnwrapThrowable(ctx, cause);
1414 }
1415 }
1416
1417 private void handleUnwrapThrowable(ChannelHandlerContext ctx, Throwable cause) {
1418 try {
1419
1420
1421
1422
1423 if (handshakePromise.tryFailure(cause)) {
1424 ctx.fireUserEventTriggered(new SslHandshakeCompletionEvent(cause));
1425 }
1426
1427
1428 if (pendingUnencryptedWrites != null) {
1429
1430
1431 wrapAndFlush(ctx);
1432 }
1433 } catch (SSLException ex) {
1434 logger.debug("SSLException during trying to call SSLEngine.wrap(...)" +
1435 " because of an previous SSLException, ignoring...", ex);
1436 } finally {
1437
1438 setHandshakeFailure(ctx, cause, true, false, true);
1439 }
1440 PlatformDependent.throwException(cause);
1441 }
1442
1443 @Override
1444 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws SSLException {
1445 if (isStateSet(STATE_PROCESS_TASK)) {
1446 return;
1447 }
1448 if (jdkCompatibilityMode) {
1449 decodeJdkCompatible(ctx, in);
1450 } else {
1451 decodeNonJdkCompatible(ctx, in);
1452 }
1453 }
1454
1455 @Override
1456 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
1457 channelReadComplete0(ctx);
1458 }
1459
1460 private void channelReadComplete0(ChannelHandlerContext ctx) {
1461
1462 discardSomeReadBytes();
1463
1464 flushIfNeeded(ctx);
1465 readIfNeeded(ctx);
1466
1467 clearState(STATE_FIRE_CHANNEL_READ);
1468 ctx.fireChannelReadComplete();
1469 }
1470
1471 private void readIfNeeded(ChannelHandlerContext ctx) {
1472
1473 if (!ctx.channel().config().isAutoRead() &&
1474 (!isStateSet(STATE_FIRE_CHANNEL_READ) || !handshakePromise.isDone())) {
1475
1476
1477 ctx.read();
1478 }
1479 }
1480
1481 private void flushIfNeeded(ChannelHandlerContext ctx) {
1482 if (isStateSet(STATE_NEEDS_FLUSH)) {
1483 forceFlush(ctx);
1484 }
1485 }
1486
1487
1488
1489
1490 private int unwrapNonAppData(ChannelHandlerContext ctx) throws SSLException {
1491 return unwrap(ctx, Unpooled.EMPTY_BUFFER, 0);
1492 }
1493
1494
1495
1496
1497 private int unwrap(ChannelHandlerContext ctx, ByteBuf packet, int length) throws SSLException {
1498 final int originalLength = length;
1499 boolean wrapLater = false;
1500 boolean notifyClosure = false;
1501 boolean executedRead = false;
1502 ByteBuf decodeOut = allocate(ctx, length);
1503 try {
1504
1505
1506 do {
1507 final SSLEngineResult result = engineType.unwrap(this, packet, length, decodeOut);
1508 final Status status = result.getStatus();
1509 final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
1510 final int produced = result.bytesProduced();
1511 final int consumed = result.bytesConsumed();
1512
1513
1514
1515
1516 packet.skipBytes(consumed);
1517 length -= consumed;
1518
1519
1520
1521
1522 if (handshakeStatus == HandshakeStatus.FINISHED || handshakeStatus == HandshakeStatus.NOT_HANDSHAKING) {
1523 wrapLater |= (decodeOut.isReadable() ?
1524 setHandshakeSuccessUnwrapMarkReentry() : setHandshakeSuccess()) ||
1525 handshakeStatus == HandshakeStatus.FINISHED ||
1526
1527
1528 (pendingUnencryptedWrites != null && !pendingUnencryptedWrites.isEmpty());
1529 }
1530
1531
1532
1533
1534 if (decodeOut.isReadable()) {
1535 setState(STATE_FIRE_CHANNEL_READ);
1536 if (isStateSet(STATE_UNWRAP_REENTRY)) {
1537 executedRead = true;
1538 executeChannelRead(ctx, decodeOut);
1539 } else {
1540 ctx.fireChannelRead(decodeOut);
1541 }
1542 decodeOut = null;
1543 }
1544
1545 if (status == Status.CLOSED) {
1546 notifyClosure = true;
1547 } else if (status == Status.BUFFER_OVERFLOW) {
1548 if (decodeOut != null) {
1549 decodeOut.release();
1550 }
1551 final int applicationBufferSize = engine.getSession().getApplicationBufferSize();
1552
1553
1554
1555
1556 decodeOut = allocate(ctx, engineType.calculatePendingData(this, applicationBufferSize < produced ?
1557 applicationBufferSize : applicationBufferSize - produced));
1558 continue;
1559 }
1560
1561 if (handshakeStatus == HandshakeStatus.NEED_TASK) {
1562 boolean pending = runDelegatedTasks(true);
1563 if (!pending) {
1564
1565
1566
1567
1568
1569 wrapLater = false;
1570 break;
1571 }
1572 } else if (handshakeStatus == HandshakeStatus.NEED_WRAP) {
1573
1574
1575
1576 if (wrapNonAppData(ctx, true) && length == 0) {
1577 break;
1578 }
1579 }
1580
1581 if (status == Status.BUFFER_UNDERFLOW ||
1582
1583 handshakeStatus != HandshakeStatus.NEED_TASK && (consumed == 0 && produced == 0 ||
1584 (length == 0 && handshakeStatus == HandshakeStatus.NOT_HANDSHAKING))) {
1585 if (handshakeStatus == HandshakeStatus.NEED_UNWRAP) {
1586
1587
1588 readIfNeeded(ctx);
1589 }
1590
1591 break;
1592 } else if (decodeOut == null) {
1593 decodeOut = allocate(ctx, length);
1594 }
1595 } while (!ctx.isRemoved());
1596
1597 if (isStateSet(STATE_FLUSHED_BEFORE_HANDSHAKE) && handshakePromise.isDone()) {
1598
1599
1600
1601
1602 clearState(STATE_FLUSHED_BEFORE_HANDSHAKE);
1603 wrapLater = true;
1604 }
1605
1606 if (wrapLater) {
1607 wrap(ctx, true);
1608 }
1609 } finally {
1610 if (decodeOut != null) {
1611 decodeOut.release();
1612 }
1613
1614 if (notifyClosure) {
1615 if (executedRead) {
1616 executeNotifyClosePromise(ctx);
1617 } else {
1618 notifyClosePromise(null);
1619 }
1620 }
1621 }
1622 return originalLength - length;
1623 }
1624
1625 private boolean setHandshakeSuccessUnwrapMarkReentry() throws SSLException {
1626
1627
1628 final boolean setReentryState = !isStateSet(STATE_UNWRAP_REENTRY);
1629 if (setReentryState) {
1630 setState(STATE_UNWRAP_REENTRY);
1631 }
1632 try {
1633 return setHandshakeSuccess();
1634 } finally {
1635
1636
1637 if (setReentryState) {
1638 clearState(STATE_UNWRAP_REENTRY);
1639 }
1640 }
1641 }
1642
1643 private void executeNotifyClosePromise(final ChannelHandlerContext ctx) {
1644 try {
1645 ctx.executor().execute(new Runnable() {
1646 @Override
1647 public void run() {
1648 notifyClosePromise(null);
1649 }
1650 });
1651 } catch (RejectedExecutionException e) {
1652 notifyClosePromise(e);
1653 }
1654 }
1655
1656 private void executeChannelRead(final ChannelHandlerContext ctx, final ByteBuf decodedOut) {
1657 try {
1658 ctx.executor().execute(new Runnable() {
1659 @Override
1660 public void run() {
1661 ctx.fireChannelRead(decodedOut);
1662 }
1663 });
1664 } catch (RejectedExecutionException e) {
1665 decodedOut.release();
1666 throw e;
1667 }
1668 }
1669
1670 private static ByteBuffer toByteBuffer(ByteBuf out, int index, int len) {
1671 return out.nioBufferCount() == 1 ? out.internalNioBuffer(index, len) :
1672 out.nioBuffer(index, len);
1673 }
1674
1675 private static boolean inEventLoop(Executor executor) {
1676 return executor instanceof EventExecutor && ((EventExecutor) executor).inEventLoop();
1677 }
1678
1679
1680
1681
1682
1683
1684
1685
1686 private boolean runDelegatedTasks(boolean inUnwrap) {
1687 if (delegatedTaskExecutor == ImmediateExecutor.INSTANCE || inEventLoop(delegatedTaskExecutor)) {
1688
1689
1690 for (;;) {
1691 Runnable task = engine.getDelegatedTask();
1692 if (task == null) {
1693 return true;
1694 }
1695 setState(STATE_PROCESS_TASK);
1696 if (task instanceof AsyncRunnable) {
1697
1698 boolean pending = false;
1699 try {
1700 AsyncRunnable asyncTask = (AsyncRunnable) task;
1701 AsyncTaskCompletionHandler completionHandler = new AsyncTaskCompletionHandler(inUnwrap);
1702 asyncTask.run(completionHandler);
1703 pending = completionHandler.resumeLater();
1704 if (pending) {
1705 return false;
1706 }
1707 } finally {
1708 if (!pending) {
1709
1710
1711 clearState(STATE_PROCESS_TASK);
1712 }
1713 }
1714 } else {
1715 try {
1716 task.run();
1717 } finally {
1718 clearState(STATE_PROCESS_TASK);
1719 }
1720 }
1721 }
1722 } else {
1723 executeDelegatedTask(inUnwrap);
1724 return false;
1725 }
1726 }
1727
1728 private SslTasksRunner getTaskRunner(boolean inUnwrap) {
1729 return inUnwrap ? sslTaskRunnerForUnwrap : sslTaskRunner;
1730 }
1731
1732 private void executeDelegatedTask(boolean inUnwrap) {
1733 executeDelegatedTask(getTaskRunner(inUnwrap));
1734 }
1735
1736 private void executeDelegatedTask(SslTasksRunner task) {
1737 setState(STATE_PROCESS_TASK);
1738 try {
1739 delegatedTaskExecutor.execute(task);
1740 } catch (RejectedExecutionException e) {
1741 clearState(STATE_PROCESS_TASK);
1742 throw e;
1743 }
1744 }
1745
1746 private final class AsyncTaskCompletionHandler implements Runnable {
1747 private final boolean inUnwrap;
1748 boolean didRun;
1749 boolean resumeLater;
1750
1751 AsyncTaskCompletionHandler(boolean inUnwrap) {
1752 this.inUnwrap = inUnwrap;
1753 }
1754
1755 @Override
1756 public void run() {
1757 didRun = true;
1758 if (resumeLater) {
1759 getTaskRunner(inUnwrap).runComplete();
1760 }
1761 }
1762
1763 boolean resumeLater() {
1764 if (!didRun) {
1765 resumeLater = true;
1766 return true;
1767 }
1768 return false;
1769 }
1770 }
1771
1772
1773
1774
1775
1776 private final class SslTasksRunner implements Runnable {
1777 private final boolean inUnwrap;
1778 private final Runnable runCompleteTask = new Runnable() {
1779 @Override
1780 public void run() {
1781 runComplete();
1782 }
1783 };
1784
1785 SslTasksRunner(boolean inUnwrap) {
1786 this.inUnwrap = inUnwrap;
1787 }
1788
1789
1790 private void taskError(Throwable e) {
1791 if (inUnwrap) {
1792
1793
1794
1795
1796 try {
1797 handleUnwrapThrowable(ctx, e);
1798 } catch (Throwable cause) {
1799 safeExceptionCaught(cause);
1800 }
1801 } else {
1802 setHandshakeFailure(ctx, e);
1803 forceFlush(ctx);
1804 }
1805 }
1806
1807
1808 private void safeExceptionCaught(Throwable cause) {
1809 try {
1810 exceptionCaught(ctx, wrapIfNeeded(cause));
1811 } catch (Throwable error) {
1812 ctx.fireExceptionCaught(error);
1813 }
1814 }
1815
1816 private Throwable wrapIfNeeded(Throwable cause) {
1817 if (!inUnwrap) {
1818
1819 return cause;
1820 }
1821
1822
1823 return cause instanceof DecoderException ? cause : new DecoderException(cause);
1824 }
1825
1826 private void tryDecodeAgain() {
1827 try {
1828 channelRead(ctx, Unpooled.EMPTY_BUFFER);
1829 } catch (Throwable cause) {
1830 safeExceptionCaught(cause);
1831 } finally {
1832
1833
1834
1835 channelReadComplete0(ctx);
1836 }
1837 }
1838
1839
1840
1841
1842
1843 private void resumeOnEventExecutor() {
1844 assert ctx.executor().inEventLoop();
1845 clearState(STATE_PROCESS_TASK);
1846 try {
1847 HandshakeStatus status = engine.getHandshakeStatus();
1848 switch (status) {
1849
1850
1851 case NEED_TASK:
1852 executeDelegatedTask(this);
1853
1854 break;
1855
1856
1857 case FINISHED:
1858
1859 case NOT_HANDSHAKING:
1860 setHandshakeSuccess();
1861 try {
1862
1863
1864 wrap(ctx, inUnwrap);
1865 } catch (Throwable e) {
1866 taskError(e);
1867 return;
1868 }
1869 if (inUnwrap) {
1870
1871
1872 unwrapNonAppData(ctx);
1873 }
1874
1875
1876 forceFlush(ctx);
1877
1878 tryDecodeAgain();
1879 break;
1880
1881
1882
1883 case NEED_UNWRAP:
1884 try {
1885 unwrapNonAppData(ctx);
1886 } catch (SSLException e) {
1887 handleUnwrapThrowable(ctx, e);
1888 return;
1889 }
1890 tryDecodeAgain();
1891 break;
1892
1893
1894
1895 case NEED_WRAP:
1896 try {
1897 if (!wrapNonAppData(ctx, false) && inUnwrap) {
1898
1899
1900
1901
1902 unwrapNonAppData(ctx);
1903 }
1904
1905
1906 forceFlush(ctx);
1907 } catch (Throwable e) {
1908 taskError(e);
1909 return;
1910 }
1911
1912
1913 tryDecodeAgain();
1914 break;
1915
1916 default:
1917
1918 throw new AssertionError();
1919 }
1920 } catch (Throwable cause) {
1921 safeExceptionCaught(cause);
1922 }
1923 }
1924
1925 void runComplete() {
1926 EventExecutor executor = ctx.executor();
1927
1928
1929
1930
1931
1932
1933
1934 executor.execute(new Runnable() {
1935 @Override
1936 public void run() {
1937 resumeOnEventExecutor();
1938 }
1939 });
1940 }
1941
1942 @Override
1943 public void run() {
1944 try {
1945 Runnable task = engine.getDelegatedTask();
1946 if (task == null) {
1947
1948 return;
1949 }
1950 if (task instanceof AsyncRunnable) {
1951 AsyncRunnable asyncTask = (AsyncRunnable) task;
1952 asyncTask.run(runCompleteTask);
1953 } else {
1954 task.run();
1955 runComplete();
1956 }
1957 } catch (final Throwable cause) {
1958 handleException(cause);
1959 }
1960 }
1961
1962 private void handleException(final Throwable cause) {
1963 EventExecutor executor = ctx.executor();
1964 if (executor.inEventLoop()) {
1965 clearState(STATE_PROCESS_TASK);
1966 safeExceptionCaught(cause);
1967 } else {
1968 try {
1969 executor.execute(new Runnable() {
1970 @Override
1971 public void run() {
1972 clearState(STATE_PROCESS_TASK);
1973 safeExceptionCaught(cause);
1974 }
1975 });
1976 } catch (RejectedExecutionException ignore) {
1977 clearState(STATE_PROCESS_TASK);
1978
1979
1980 ctx.fireExceptionCaught(cause);
1981 }
1982 }
1983 }
1984 }
1985
1986
1987
1988
1989
1990
1991 private boolean setHandshakeSuccess() throws SSLException {
1992
1993
1994
1995 final SSLSession session = engine.getSession();
1996 if (resumptionController != null && !handshakePromise.isDone()) {
1997 try {
1998 if (resumptionController.validateResumeIfNeeded(engine) && logger.isDebugEnabled()) {
1999 logger.debug("{} Resumed and reauthenticated session", ctx.channel());
2000 }
2001 } catch (CertificateException e) {
2002 SSLHandshakeException exception = new SSLHandshakeException(e.getMessage());
2003 exception.initCause(e);
2004 throw exception;
2005 }
2006 }
2007 final boolean notified = !handshakePromise.isDone() && handshakePromise.trySuccess(ctx.channel());
2008 if (notified) {
2009 if (logger.isDebugEnabled()) {
2010 logger.debug(
2011 "{} HANDSHAKEN: protocol:{} cipher suite:{}",
2012 ctx.channel(),
2013 session.getProtocol(),
2014 session.getCipherSuite());
2015 }
2016 ctx.fireUserEventTriggered(SslHandshakeCompletionEvent.SUCCESS);
2017 }
2018 if (isStateSet(STATE_READ_DURING_HANDSHAKE)) {
2019 clearState(STATE_READ_DURING_HANDSHAKE);
2020 if (!ctx.channel().config().isAutoRead()) {
2021 ctx.read();
2022 }
2023 }
2024 return notified;
2025 }
2026
2027
2028
2029
2030 private void setHandshakeFailure(ChannelHandlerContext ctx, Throwable cause) {
2031 setHandshakeFailure(ctx, cause, true, true, false);
2032 }
2033
2034
2035
2036
2037 private void setHandshakeFailure(ChannelHandlerContext ctx, Throwable cause, boolean closeInbound,
2038 boolean notify, boolean alwaysFlushAndClose) {
2039 try {
2040
2041 setState(STATE_OUTBOUND_CLOSED);
2042 engine.closeOutbound();
2043
2044 if (closeInbound) {
2045 try {
2046 engine.closeInbound();
2047 } catch (SSLException e) {
2048 if (logger.isDebugEnabled()) {
2049
2050
2051
2052
2053 String msg = e.getMessage();
2054 if (msg == null || !(msg.contains("possible truncation attack") ||
2055 msg.contains("closing inbound before receiving peer's close_notify"))) {
2056 logger.debug("{} SSLEngine.closeInbound() raised an exception.", ctx.channel(), e);
2057 }
2058 }
2059 }
2060 }
2061 if (handshakePromise.tryFailure(cause) || alwaysFlushAndClose) {
2062 SslUtils.handleHandshakeFailure(ctx, cause, notify);
2063 }
2064 } finally {
2065
2066 releaseAndFailAll(ctx, cause);
2067 }
2068 }
2069
2070 private void setHandshakeFailureTransportFailure(ChannelHandlerContext ctx, Throwable cause) {
2071
2072
2073
2074 try {
2075 SSLException transportFailure = new SSLException("failure when writing TLS control frames", cause);
2076 releaseAndFailAll(ctx, transportFailure);
2077 if (handshakePromise.tryFailure(transportFailure)) {
2078 ctx.fireUserEventTriggered(new SslHandshakeCompletionEvent(transportFailure));
2079 }
2080 } finally {
2081 ctx.close();
2082 }
2083 }
2084
2085 private void releaseAndFailAll(ChannelHandlerContext ctx, Throwable cause) {
2086 if (resumptionController != null &&
2087 (!engine.getSession().isValid() || cause instanceof SSLHandshakeException)) {
2088 resumptionController.remove(engine());
2089 }
2090 if (pendingUnencryptedWrites != null) {
2091 pendingUnencryptedWrites.releaseAndFailAll(ctx, cause);
2092 }
2093 }
2094
2095 private void notifyClosePromise(Throwable cause) {
2096 if (cause == null) {
2097 if (sslClosePromise.trySuccess(ctx.channel())) {
2098 ctx.fireUserEventTriggered(SslCloseCompletionEvent.SUCCESS);
2099 }
2100 } else {
2101 if (sslClosePromise.tryFailure(cause)) {
2102 ctx.fireUserEventTriggered(new SslCloseCompletionEvent(cause));
2103 }
2104 }
2105 }
2106
2107 private void closeOutboundAndChannel(
2108 final ChannelHandlerContext ctx, final ChannelPromise promise, boolean disconnect) throws Exception {
2109 setState(STATE_OUTBOUND_CLOSED);
2110 engine.closeOutbound();
2111
2112 if (!ctx.channel().isActive()) {
2113 if (disconnect) {
2114 ctx.disconnect(promise);
2115 } else {
2116 ctx.close(promise);
2117 }
2118 return;
2119 }
2120
2121 ChannelPromise closeNotifyPromise = ctx.newPromise();
2122 try {
2123 flush(ctx, closeNotifyPromise);
2124 } finally {
2125 if (!isStateSet(STATE_CLOSE_NOTIFY)) {
2126 setState(STATE_CLOSE_NOTIFY);
2127
2128
2129
2130
2131
2132
2133
2134
2135 safeClose(ctx, closeNotifyPromise, PromiseNotifier.cascade(false, ctx.newPromise(), promise));
2136 } else {
2137
2138 sslClosePromise.addListener(new FutureListener<Channel>() {
2139 @Override
2140 public void operationComplete(Future<Channel> future) {
2141 promise.setSuccess();
2142 }
2143 });
2144 }
2145 }
2146 }
2147
2148 private void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
2149 if (pendingUnencryptedWrites != null) {
2150 pendingUnencryptedWrites.add(Unpooled.EMPTY_BUFFER, promise);
2151 } else {
2152 promise.setFailure(newPendingWritesNullException());
2153 }
2154 flush(ctx);
2155 }
2156
2157 @Override
2158 public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
2159 this.ctx = ctx;
2160 Channel channel = ctx.channel();
2161 pendingUnencryptedWrites = new SslHandlerCoalescingBufferQueue(channel, 16, engineType.wantsDirectBuffer) {
2162 @Override
2163 protected int wrapDataSize() {
2164 return SslHandler.this.wrapDataSize;
2165 }
2166 };
2167
2168 setOpensslEngineSocketFd(channel);
2169 boolean fastOpen = Boolean.TRUE.equals(channel.config().getOption(ChannelOption.TCP_FASTOPEN_CONNECT));
2170 boolean active = channel.isActive();
2171 if (active || fastOpen) {
2172
2173
2174
2175 startHandshakeProcessing(active);
2176
2177
2178 final ChannelOutboundBuffer outboundBuffer;
2179 if (fastOpen && ((outboundBuffer = channel.unsafe().outboundBuffer()) == null ||
2180 outboundBuffer.totalPendingWriteBytes() > 0)) {
2181 setState(STATE_NEEDS_FLUSH);
2182 }
2183 }
2184 }
2185
2186 private void startHandshakeProcessing(boolean flushAtEnd) {
2187 if (!isStateSet(STATE_HANDSHAKE_STARTED)) {
2188 setState(STATE_HANDSHAKE_STARTED);
2189 if (engine.getUseClientMode()) {
2190
2191
2192
2193 handshake(flushAtEnd);
2194 }
2195 applyHandshakeTimeout();
2196 } else if (isStateSet(STATE_NEEDS_FLUSH)) {
2197 forceFlush(ctx);
2198 }
2199 }
2200
2201
2202
2203
2204 public Future<Channel> renegotiate() {
2205 ChannelHandlerContext ctx = this.ctx;
2206 if (ctx == null) {
2207 throw new IllegalStateException();
2208 }
2209
2210 return renegotiate(ctx.executor().<Channel>newPromise());
2211 }
2212
2213
2214
2215
2216 public Future<Channel> renegotiate(final Promise<Channel> promise) {
2217 ObjectUtil.checkNotNull(promise, "promise");
2218
2219 ChannelHandlerContext ctx = this.ctx;
2220 if (ctx == null) {
2221 throw new IllegalStateException();
2222 }
2223
2224 EventExecutor executor = ctx.executor();
2225 if (!executor.inEventLoop()) {
2226 executor.execute(new Runnable() {
2227 @Override
2228 public void run() {
2229 renegotiateOnEventLoop(promise);
2230 }
2231 });
2232 return promise;
2233 }
2234
2235 renegotiateOnEventLoop(promise);
2236 return promise;
2237 }
2238
2239 private void renegotiateOnEventLoop(final Promise<Channel> newHandshakePromise) {
2240 final Promise<Channel> oldHandshakePromise = handshakePromise;
2241 if (!oldHandshakePromise.isDone()) {
2242
2243
2244 PromiseNotifier.cascade(oldHandshakePromise, newHandshakePromise);
2245 } else {
2246 handshakePromise = newHandshakePromise;
2247 handshake(true);
2248 applyHandshakeTimeout();
2249 }
2250 }
2251
2252
2253
2254
2255
2256
2257
2258 private void handshake(boolean flushAtEnd) {
2259 if (engine.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING) {
2260
2261
2262 return;
2263 }
2264 if (handshakePromise.isDone()) {
2265
2266
2267
2268
2269
2270 return;
2271 }
2272
2273
2274 final ChannelHandlerContext ctx = this.ctx;
2275 try {
2276 engine.beginHandshake();
2277 wrapNonAppData(ctx, false);
2278 } catch (Throwable e) {
2279 setHandshakeFailure(ctx, e);
2280 } finally {
2281 if (flushAtEnd) {
2282 forceFlush(ctx);
2283 }
2284 }
2285 }
2286
2287 private void applyHandshakeTimeout() {
2288 final Promise<Channel> localHandshakePromise = this.handshakePromise;
2289
2290
2291 final long handshakeTimeoutMillis = this.handshakeTimeoutMillis;
2292 if (handshakeTimeoutMillis <= 0 || localHandshakePromise.isDone()) {
2293 return;
2294 }
2295
2296 final Future<?> timeoutFuture = ctx.executor().schedule(new Runnable() {
2297 @Override
2298 public void run() {
2299 if (localHandshakePromise.isDone()) {
2300 return;
2301 }
2302 SSLException exception =
2303 new SslHandshakeTimeoutException("handshake timed out after " + handshakeTimeoutMillis + "ms");
2304 try {
2305 if (localHandshakePromise.tryFailure(exception)) {
2306 SslUtils.handleHandshakeFailure(ctx, exception, true);
2307 }
2308 } finally {
2309 releaseAndFailAll(ctx, exception);
2310 }
2311 }
2312 }, handshakeTimeoutMillis, TimeUnit.MILLISECONDS);
2313
2314
2315 localHandshakePromise.addListener(new FutureListener<Channel>() {
2316 @Override
2317 public void operationComplete(Future<Channel> f) throws Exception {
2318 timeoutFuture.cancel(false);
2319 }
2320 });
2321 }
2322
2323 private void forceFlush(ChannelHandlerContext ctx) {
2324 clearState(STATE_NEEDS_FLUSH);
2325 ctx.flush();
2326 }
2327
2328 private void setOpensslEngineSocketFd(Channel c) {
2329 if (c instanceof UnixChannel && engine instanceof ReferenceCountedOpenSslEngine) {
2330 ((ReferenceCountedOpenSslEngine) engine).bioSetFd(((UnixChannel) c).fd().intValue());
2331 }
2332 }
2333
2334
2335
2336
2337 @Override
2338 public void channelActive(final ChannelHandlerContext ctx) throws Exception {
2339 setOpensslEngineSocketFd(ctx.channel());
2340 if (!startTls) {
2341 startHandshakeProcessing(true);
2342 }
2343 ctx.fireChannelActive();
2344 }
2345
2346 private void safeClose(
2347 final ChannelHandlerContext ctx, final ChannelFuture flushFuture,
2348 final ChannelPromise promise) {
2349 if (!ctx.channel().isActive()) {
2350 ctx.close(promise);
2351 return;
2352 }
2353
2354 final Future<?> timeoutFuture;
2355 if (!flushFuture.isDone()) {
2356 long closeNotifyTimeout = closeNotifyFlushTimeoutMillis;
2357 if (closeNotifyTimeout > 0) {
2358
2359 timeoutFuture = ctx.executor().schedule(new Runnable() {
2360 @Override
2361 public void run() {
2362
2363 if (!flushFuture.isDone()) {
2364 logger.warn("{} Last write attempt timed out; force-closing the connection.",
2365 ctx.channel());
2366 addCloseListener(ctx.close(ctx.newPromise()), promise);
2367 }
2368 }
2369 }, closeNotifyTimeout, TimeUnit.MILLISECONDS);
2370 } else {
2371 timeoutFuture = null;
2372 }
2373 } else {
2374 timeoutFuture = null;
2375 }
2376
2377
2378 flushFuture.addListener(new ChannelFutureListener() {
2379 @Override
2380 public void operationComplete(ChannelFuture f) {
2381 if (timeoutFuture != null) {
2382 timeoutFuture.cancel(false);
2383 }
2384 final long closeNotifyReadTimeout = closeNotifyReadTimeoutMillis;
2385 if (closeNotifyReadTimeout <= 0) {
2386
2387
2388 addCloseListener(ctx.close(ctx.newPromise()), promise);
2389 } else {
2390 final Future<?> closeNotifyReadTimeoutFuture;
2391
2392 if (!sslClosePromise.isDone()) {
2393 closeNotifyReadTimeoutFuture = ctx.executor().schedule(new Runnable() {
2394 @Override
2395 public void run() {
2396 if (!sslClosePromise.isDone()) {
2397 logger.debug(
2398 "{} did not receive close_notify in {}ms; force-closing the connection.",
2399 ctx.channel(), closeNotifyReadTimeout);
2400
2401
2402 addCloseListener(ctx.close(ctx.newPromise()), promise);
2403 }
2404 }
2405 }, closeNotifyReadTimeout, TimeUnit.MILLISECONDS);
2406 } else {
2407 closeNotifyReadTimeoutFuture = null;
2408 }
2409
2410
2411 sslClosePromise.addListener(new FutureListener<Channel>() {
2412 @Override
2413 public void operationComplete(Future<Channel> future) throws Exception {
2414 if (closeNotifyReadTimeoutFuture != null) {
2415 closeNotifyReadTimeoutFuture.cancel(false);
2416 }
2417 addCloseListener(ctx.close(ctx.newPromise()), promise);
2418 }
2419 });
2420 }
2421 }
2422 });
2423 }
2424
2425 private static void addCloseListener(ChannelFuture future, ChannelPromise promise) {
2426
2427
2428
2429
2430
2431
2432 PromiseNotifier.cascade(false, future, promise);
2433 }
2434
2435
2436
2437
2438
2439 private ByteBuf allocate(ChannelHandlerContext ctx, int capacity) {
2440 ByteBufAllocator alloc = ctx.alloc();
2441 if (engineType.wantsDirectBuffer) {
2442 return alloc.directBuffer(capacity);
2443 } else {
2444 return alloc.buffer(capacity);
2445 }
2446 }
2447
2448
2449
2450
2451
2452 private ByteBuf allocateOutNetBuf(ChannelHandlerContext ctx, int pendingBytes, int numComponents) {
2453 return engineType.allocateWrapBuffer(this, ctx.alloc(), pendingBytes, numComponents);
2454 }
2455
2456 private boolean isStateSet(int bit) {
2457 return (state & bit) == bit;
2458 }
2459
2460 private void setState(int bit) {
2461 state |= bit;
2462 }
2463
2464 private void clearState(int bit) {
2465 state &= ~bit;
2466 }
2467
2468 private final class LazyChannelPromise extends DefaultPromise<Channel> {
2469
2470 @Override
2471 protected EventExecutor executor() {
2472 if (ctx == null) {
2473 throw new IllegalStateException();
2474 }
2475 return ctx.executor();
2476 }
2477
2478 @Override
2479 protected void checkDeadLock() {
2480 if (ctx == null) {
2481
2482
2483
2484
2485
2486
2487 return;
2488 }
2489 super.checkDeadLock();
2490 }
2491 }
2492 }