1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.ssl;
17
18 import org.jboss.netty.buffer.ChannelBuffer;
19 import org.jboss.netty.buffer.ChannelBufferFactory;
20 import org.jboss.netty.buffer.ChannelBuffers;
21 import org.jboss.netty.channel.Channel;
22 import org.jboss.netty.channel.ChannelDownstreamHandler;
23 import org.jboss.netty.channel.ChannelEvent;
24 import org.jboss.netty.channel.ChannelFuture;
25 import org.jboss.netty.channel.ChannelFutureListener;
26 import org.jboss.netty.channel.ChannelHandlerContext;
27 import org.jboss.netty.channel.ChannelPipeline;
28 import org.jboss.netty.channel.ChannelStateEvent;
29 import org.jboss.netty.channel.Channels;
30 import org.jboss.netty.channel.DefaultChannelFuture;
31 import org.jboss.netty.channel.DownstreamMessageEvent;
32 import org.jboss.netty.channel.ExceptionEvent;
33 import org.jboss.netty.channel.MessageEvent;
34 import org.jboss.netty.handler.codec.frame.FrameDecoder;
35 import org.jboss.netty.logging.InternalLogger;
36 import org.jboss.netty.logging.InternalLoggerFactory;
37 import org.jboss.netty.util.Timeout;
38 import org.jboss.netty.util.Timer;
39 import org.jboss.netty.util.TimerTask;
40 import org.jboss.netty.util.internal.DetectionUtil;
41 import org.jboss.netty.util.internal.NonReentrantLock;
42
43 import javax.net.ssl.SSLEngine;
44 import javax.net.ssl.SSLEngineResult;
45 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
46 import javax.net.ssl.SSLEngineResult.Status;
47 import javax.net.ssl.SSLException;
48 import java.io.IOException;
49 import java.nio.ByteBuffer;
50 import java.nio.channels.ClosedChannelException;
51 import java.nio.channels.DatagramChannel;
52 import java.nio.channels.SocketChannel;
53 import java.util.ArrayList;
54 import java.util.LinkedList;
55 import java.util.List;
56 import java.util.Queue;
57 import java.util.concurrent.ConcurrentLinkedQueue;
58 import java.util.concurrent.CountDownLatch;
59 import java.util.concurrent.Executor;
60 import java.util.concurrent.TimeUnit;
61 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
62 import java.util.regex.Pattern;
63
64 import static org.jboss.netty.channel.Channels.*;
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182 public class SslHandler extends FrameDecoder
183 implements ChannelDownstreamHandler {
184
185 private static final InternalLogger logger = InternalLoggerFactory.getInstance(SslHandler.class);
186
187 private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
188
189 private static final Pattern IGNORABLE_CLASS_IN_STACK = Pattern.compile(
190 "^.*(?:Socket|Datagram|Sctp|Udt)Channel.*$");
191 private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile(
192 "^.*(?:connection.*(?:reset|closed|abort|broken)|broken.*pipe).*$", Pattern.CASE_INSENSITIVE);
193
194 private static SslBufferPool defaultBufferPool;
195
196
197
198
199
200 public static synchronized SslBufferPool getDefaultBufferPool() {
201 if (defaultBufferPool == null) {
202 defaultBufferPool = new SslBufferPool();
203 }
204 return defaultBufferPool;
205 }
206
207 private volatile ChannelHandlerContext ctx;
208 private final SSLEngine engine;
209 private final SslBufferPool bufferPool;
210 private final Executor delegatedTaskExecutor;
211 private final boolean startTls;
212
213 private volatile boolean enableRenegotiation = true;
214
215 final Object handshakeLock = new Object();
216 private boolean handshaking;
217 private volatile boolean handshaken;
218 private volatile ChannelFuture handshakeFuture;
219
220 @SuppressWarnings("UnusedDeclaration")
221 private volatile int sentFirstMessage;
222 @SuppressWarnings("UnusedDeclaration")
223 private volatile int sentCloseNotify;
224 @SuppressWarnings("UnusedDeclaration")
225 private volatile int closedOutboundAndChannel;
226
227 private static final AtomicIntegerFieldUpdater<SslHandler> SENT_FIRST_MESSAGE_UPDATER =
228 AtomicIntegerFieldUpdater.newUpdater(SslHandler.class, "sentFirstMessage");
229 private static final AtomicIntegerFieldUpdater<SslHandler> SENT_CLOSE_NOTIFY_UPDATER =
230 AtomicIntegerFieldUpdater.newUpdater(SslHandler.class, "sentCloseNotify");
231 private static final AtomicIntegerFieldUpdater<SslHandler> CLOSED_OUTBOUND_AND_CHANNEL_UPDATER =
232 AtomicIntegerFieldUpdater.newUpdater(SslHandler.class, "closedOutboundAndChannel");
233
234 int ignoreClosedChannelException;
235 final Object ignoreClosedChannelExceptionLock = new Object();
236 private final Queue<PendingWrite> pendingUnencryptedWrites = new LinkedList<PendingWrite>();
237 private final NonReentrantLock pendingUnencryptedWritesLock = new NonReentrantLock();
238 private final Queue<MessageEvent> pendingEncryptedWrites = new ConcurrentLinkedQueue<MessageEvent>();
239 private final NonReentrantLock pendingEncryptedWritesLock = new NonReentrantLock();
240
241 private volatile boolean issueHandshake;
242 private volatile boolean writeBeforeHandshakeDone;
243 private final SSLEngineInboundCloseFuture sslEngineCloseFuture = new SSLEngineInboundCloseFuture();
244
245 private boolean closeOnSslException;
246
247 private int packetLength;
248
249 private final Timer timer;
250 private final long handshakeTimeoutInMillis;
251 private Timeout handshakeTimeout;
252
253
254
255
256
257
258 public SslHandler(SSLEngine engine) {
259 this(engine, getDefaultBufferPool(), false, null, 0);
260 }
261
262
263
264
265
266
267
268
269 public SslHandler(SSLEngine engine, SslBufferPool bufferPool) {
270 this(engine, bufferPool, false, null, 0);
271 }
272
273
274
275
276
277
278
279
280 public SslHandler(SSLEngine engine, boolean startTls) {
281 this(engine, getDefaultBufferPool(), startTls);
282 }
283
284
285
286
287
288
289
290
291
292
293 public SslHandler(SSLEngine engine, SslBufferPool bufferPool, boolean startTls) {
294 this(engine, bufferPool, startTls, null, 0);
295 }
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315 @SuppressWarnings("deprecation")
316 public SslHandler(SSLEngine engine, SslBufferPool bufferPool, boolean startTls,
317 Timer timer, long handshakeTimeoutInMillis) {
318 this(engine, bufferPool, startTls, ImmediateExecutor.INSTANCE, timer, handshakeTimeoutInMillis);
319 }
320
321
322
323
324 @Deprecated
325 public SslHandler(SSLEngine engine, Executor delegatedTaskExecutor) {
326 this(engine, getDefaultBufferPool(), delegatedTaskExecutor);
327 }
328
329
330
331
332 @Deprecated
333 public SslHandler(SSLEngine engine, SslBufferPool bufferPool, Executor delegatedTaskExecutor) {
334 this(engine, bufferPool, false, delegatedTaskExecutor);
335 }
336
337
338
339
340 @Deprecated
341 public SslHandler(SSLEngine engine, boolean startTls, Executor delegatedTaskExecutor) {
342 this(engine, getDefaultBufferPool(), startTls, delegatedTaskExecutor);
343 }
344
345
346
347
348 @Deprecated
349 public SslHandler(SSLEngine engine, SslBufferPool bufferPool, boolean startTls, Executor delegatedTaskExecutor) {
350 this(engine, bufferPool, startTls, delegatedTaskExecutor, null, 0);
351 }
352
353
354
355
356
357 @Deprecated
358 public SslHandler(SSLEngine engine, SslBufferPool bufferPool, boolean startTls, Executor delegatedTaskExecutor,
359 Timer timer, long handshakeTimeoutInMillis) {
360 if (engine == null) {
361 throw new NullPointerException("engine");
362 }
363 if (bufferPool == null) {
364 throw new NullPointerException("bufferPool");
365 }
366 if (delegatedTaskExecutor == null) {
367 throw new NullPointerException("delegatedTaskExecutor");
368 }
369 if (timer == null && handshakeTimeoutInMillis > 0) {
370 throw new IllegalArgumentException("No Timer was given but a handshakeTimeoutInMillis, need both or none");
371 }
372
373 this.engine = engine;
374 this.bufferPool = bufferPool;
375 this.delegatedTaskExecutor = delegatedTaskExecutor;
376 this.startTls = startTls;
377 this.timer = timer;
378 this.handshakeTimeoutInMillis = handshakeTimeoutInMillis;
379 }
380
381
382
383
384 public SSLEngine getEngine() {
385 return engine;
386 }
387
388
389
390
391
392
393
394 public ChannelFuture handshake() {
395 synchronized (handshakeLock) {
396 if (handshaken && !isEnableRenegotiation()) {
397 throw new IllegalStateException("renegotiation disabled");
398 }
399
400 final ChannelHandlerContext ctx = this.ctx;
401 final Channel channel = ctx.getChannel();
402 ChannelFuture handshakeFuture;
403 Exception exception = null;
404
405 if (handshaking) {
406 return this.handshakeFuture;
407 }
408
409 handshaking = true;
410 try {
411 engine.beginHandshake();
412 runDelegatedTasks();
413 handshakeFuture = this.handshakeFuture = future(channel);
414 if (handshakeTimeoutInMillis > 0) {
415 handshakeTimeout = timer.newTimeout(new TimerTask() {
416 public void run(Timeout timeout) throws Exception {
417 ChannelFuture future = SslHandler.this.handshakeFuture;
418 if (future != null && future.isDone()) {
419 return;
420 }
421
422 setHandshakeFailure(channel, new SSLException("Handshake did not complete within " +
423 handshakeTimeoutInMillis + "ms"));
424 }
425 }, handshakeTimeoutInMillis, TimeUnit.MILLISECONDS);
426 }
427 } catch (Exception e) {
428 handshakeFuture = this.handshakeFuture = failedFuture(channel, e);
429 exception = e;
430 }
431
432 if (exception == null) {
433 try {
434 final ChannelFuture hsFuture = handshakeFuture;
435 wrapNonAppData(ctx, channel).addListener(new ChannelFutureListener() {
436 public void operationComplete(ChannelFuture future) throws Exception {
437 if (!future.isSuccess()) {
438 Throwable cause = future.getCause();
439 hsFuture.setFailure(cause);
440
441 fireExceptionCaught(ctx, cause);
442 if (closeOnSslException) {
443 Channels.close(ctx, future(channel));
444 }
445 }
446 }
447 });
448 } catch (SSLException e) {
449 handshakeFuture.setFailure(e);
450
451 fireExceptionCaught(ctx, e);
452 if (closeOnSslException) {
453 Channels.close(ctx, future(channel));
454 }
455 }
456 } else {
457 fireExceptionCaught(ctx, exception);
458 if (closeOnSslException) {
459 Channels.close(ctx, future(channel));
460 }
461 }
462 return handshakeFuture;
463 }
464 }
465
466
467
468
469 @Deprecated
470 public ChannelFuture handshake(@SuppressWarnings("unused") Channel channel) {
471 return handshake();
472 }
473
474
475
476
477
478 public ChannelFuture close() {
479 ChannelHandlerContext ctx = this.ctx;
480 Channel channel = ctx.getChannel();
481 try {
482 engine.closeOutbound();
483 return wrapNonAppData(ctx, channel);
484 } catch (SSLException e) {
485 fireExceptionCaught(ctx, e);
486 if (closeOnSslException) {
487 Channels.close(ctx, future(channel));
488 }
489 return failedFuture(channel, e);
490 }
491 }
492
493
494
495
496 @Deprecated
497 public ChannelFuture close(@SuppressWarnings("unused") Channel channel) {
498 return close();
499 }
500
501
502
503
504 public boolean isEnableRenegotiation() {
505 return enableRenegotiation;
506 }
507
508
509
510
511 public void setEnableRenegotiation(boolean enableRenegotiation) {
512 this.enableRenegotiation = enableRenegotiation;
513 }
514
515
516
517
518
519
520 public void setIssueHandshake(boolean issueHandshake) {
521 this.issueHandshake = issueHandshake;
522 }
523
524
525
526
527 public boolean isIssueHandshake() {
528 return issueHandshake;
529 }
530
531
532
533
534
535
536
537
538
539 public ChannelFuture getSSLEngineInboundCloseFuture() {
540 return sslEngineCloseFuture;
541 }
542
543
544
545
546
547 public long getHandshakeTimeout() {
548 return handshakeTimeoutInMillis;
549 }
550
551
552
553
554
555
556
557
558
559
560 public void setCloseOnSSLException(boolean closeOnSslException) {
561 if (ctx != null) {
562 throw new IllegalStateException("Can only get changed before attached to ChannelPipeline");
563 }
564 this.closeOnSslException = closeOnSslException;
565 }
566
567 public boolean getCloseOnSSLException() {
568 return closeOnSslException;
569 }
570
571 public void handleDownstream(
572 final ChannelHandlerContext context, final ChannelEvent evt) throws Exception {
573 if (evt instanceof ChannelStateEvent) {
574 ChannelStateEvent e = (ChannelStateEvent) evt;
575 switch (e.getState()) {
576 case OPEN:
577 case CONNECTED:
578 case BOUND:
579 if (Boolean.FALSE.equals(e.getValue()) || e.getValue() == null) {
580 closeOutboundAndChannel(context, e);
581 return;
582 }
583 }
584 }
585 if (!(evt instanceof MessageEvent)) {
586 context.sendDownstream(evt);
587 return;
588 }
589
590 MessageEvent e = (MessageEvent) evt;
591 if (!(e.getMessage() instanceof ChannelBuffer)) {
592 context.sendDownstream(evt);
593 return;
594 }
595
596
597
598 if (startTls && SENT_FIRST_MESSAGE_UPDATER.compareAndSet(this, 0, 1)) {
599 context.sendDownstream(evt);
600 return;
601 }
602
603
604 ChannelBuffer msg = (ChannelBuffer) e.getMessage();
605 PendingWrite pendingWrite;
606
607 if (msg.readable()) {
608 pendingWrite = new PendingWrite(evt.getFuture(), msg.toByteBuffer(msg.readerIndex(), msg.readableBytes()));
609 } else {
610 pendingWrite = new PendingWrite(evt.getFuture(), null);
611 }
612
613 pendingUnencryptedWritesLock.lock();
614 try {
615 pendingUnencryptedWrites.add(pendingWrite);
616 } finally {
617 pendingUnencryptedWritesLock.unlock();
618 }
619
620 if (handshakeFuture == null || !handshakeFuture.isDone()) {
621 writeBeforeHandshakeDone = true;
622 }
623 wrap(context, evt.getChannel());
624 }
625
626 private void cancelHandshakeTimeout() {
627 if (handshakeTimeout != null) {
628
629 handshakeTimeout.cancel();
630 }
631 }
632
633 @Override
634 public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
635
636
637
638 synchronized (handshakeLock) {
639 if (handshaking) {
640 cancelHandshakeTimeout();
641 handshakeFuture.setFailure(new ClosedChannelException());
642 }
643 }
644
645 try {
646 super.channelDisconnected(ctx, e);
647 } finally {
648 unwrapNonAppData(ctx, e.getChannel(), false);
649 closeEngine();
650 }
651 }
652
653 private void closeEngine() {
654 engine.closeOutbound();
655 if (sentCloseNotify == 0 && handshaken) {
656 try {
657 engine.closeInbound();
658 } catch (SSLException ex) {
659 if (logger.isDebugEnabled()) {
660 logger.debug("Failed to clean up SSLEngine.", ex);
661 }
662 }
663 }
664 }
665
666 @Override
667 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
668 throws Exception {
669
670 Throwable cause = e.getCause();
671 if (cause instanceof IOException) {
672 if (cause instanceof ClosedChannelException) {
673 synchronized (ignoreClosedChannelExceptionLock) {
674 if (ignoreClosedChannelException > 0) {
675 ignoreClosedChannelException --;
676 if (logger.isDebugEnabled()) {
677 logger.debug(
678 "Swallowing an exception raised while " +
679 "writing non-app data", cause);
680 }
681
682 return;
683 }
684 }
685 } else {
686 if (ignoreException(cause)) {
687 return;
688 }
689 }
690 }
691
692 ctx.sendUpstream(e);
693 }
694
695
696
697
698
699
700
701
702
703
704 private boolean ignoreException(Throwable t) {
705 if (!(t instanceof SSLException) && t instanceof IOException && engine.isOutboundDone()) {
706 String message = String.valueOf(t.getMessage()).toLowerCase();
707
708
709
710 if (IGNORABLE_ERROR_MESSAGE.matcher(message).matches()) {
711 return true;
712 }
713
714
715 StackTraceElement[] elements = t.getStackTrace();
716 for (StackTraceElement element: elements) {
717 String classname = element.getClassName();
718 String methodname = element.getMethodName();
719
720
721 if (classname.startsWith("org.jboss.netty.")) {
722 continue;
723 }
724
725
726 if (!"read".equals(methodname)) {
727 continue;
728 }
729
730
731
732 if (IGNORABLE_CLASS_IN_STACK.matcher(classname).matches()) {
733 return true;
734 }
735
736 try {
737
738
739
740 Class<?> clazz = getClass().getClassLoader().loadClass(classname);
741
742 if (SocketChannel.class.isAssignableFrom(clazz)
743 || DatagramChannel.class.isAssignableFrom(clazz)) {
744 return true;
745 }
746
747
748 if (DetectionUtil.javaVersion() >= 7
749 && "com.sun.nio.sctp.SctpChannel".equals(clazz.getSuperclass().getName())) {
750 return true;
751 }
752 } catch (ClassNotFoundException e) {
753
754 }
755 }
756 }
757
758 return false;
759 }
760
761
762
763
764
765
766
767
768
769
770
771
772
773 public static boolean isEncrypted(ChannelBuffer buffer) {
774 return getEncryptedPacketLength(buffer, buffer.readerIndex()) != -1;
775 }
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790 private static int getEncryptedPacketLength(ChannelBuffer buffer, int offset) {
791 int packetLength = 0;
792
793
794 boolean tls;
795 switch (buffer.getUnsignedByte(offset)) {
796 case 20:
797 case 21:
798 case 22:
799 case 23:
800 tls = true;
801 break;
802 default:
803
804 tls = false;
805 }
806
807 if (tls) {
808
809 int majorVersion = buffer.getUnsignedByte(offset + 1);
810 if (majorVersion == 3) {
811
812 packetLength = (getShort(buffer, offset + 3) & 0xFFFF) + 5;
813 if (packetLength <= 5) {
814
815 tls = false;
816 }
817 } else {
818
819 tls = false;
820 }
821 }
822
823 if (!tls) {
824
825 boolean sslv2 = true;
826 int headerLength = (buffer.getUnsignedByte(offset) & 0x80) != 0 ? 2 : 3;
827 int majorVersion = buffer.getUnsignedByte(offset + headerLength + 1);
828 if (majorVersion == 2 || majorVersion == 3) {
829
830 if (headerLength == 2) {
831 packetLength = (getShort(buffer, offset) & 0x7FFF) + 2;
832 } else {
833 packetLength = (getShort(buffer, offset) & 0x3FFF) + 3;
834 }
835 if (packetLength <= headerLength) {
836 sslv2 = false;
837 }
838 } else {
839 sslv2 = false;
840 }
841
842 if (!sslv2) {
843 return -1;
844 }
845 }
846 return packetLength;
847 }
848
849 @Override
850 protected Object decode(
851 final ChannelHandlerContext ctx, Channel channel, ChannelBuffer in) throws Exception {
852
853 final int startOffset = in.readerIndex();
854 final int endOffset = in.writerIndex();
855 int offset = startOffset;
856 int totalLength = 0;
857
858
859 if (packetLength > 0) {
860 if (endOffset - startOffset < packetLength) {
861 return null;
862 } else {
863 offset += packetLength;
864 totalLength = packetLength;
865 packetLength = 0;
866 }
867 }
868
869 boolean nonSslRecord = false;
870
871 while (totalLength < OpenSslEngine.MAX_ENCRYPTED_PACKET_LENGTH) {
872 final int readableBytes = endOffset - offset;
873 if (readableBytes < 5) {
874 break;
875 }
876
877 final int packetLength = getEncryptedPacketLength(in, offset);
878 if (packetLength == -1) {
879 nonSslRecord = true;
880 break;
881 }
882
883 assert packetLength > 0;
884
885 if (packetLength > readableBytes) {
886
887 this.packetLength = packetLength;
888 break;
889 }
890
891 int newTotalLength = totalLength + packetLength;
892 if (newTotalLength > OpenSslEngine.MAX_ENCRYPTED_PACKET_LENGTH) {
893
894 break;
895 }
896
897
898
899 offset += packetLength;
900 totalLength = newTotalLength;
901 }
902
903 ChannelBuffer unwrapped = null;
904 if (totalLength > 0) {
905
906
907
908
909
910
911
912
913
914
915
916 in.skipBytes(totalLength);
917 final ByteBuffer inNetBuf = in.toByteBuffer(startOffset, totalLength);
918 unwrapped = unwrap(ctx, channel, inNetBuf, totalLength, true);
919 }
920
921 if (nonSslRecord) {
922
923 NotSslRecordException e = new NotSslRecordException(
924 "not an SSL/TLS record: " + ChannelBuffers.hexDump(in));
925 in.skipBytes(in.readableBytes());
926 if (closeOnSslException) {
927
928 fireExceptionCaught(ctx, e);
929 Channels.close(ctx, future(channel));
930
931
932
933 return null;
934 } else {
935 throw e;
936 }
937 }
938
939 return unwrapped;
940 }
941
942
943
944
945
946 private static short getShort(ChannelBuffer buf, int offset) {
947 return (short) (buf.getByte(offset) << 8 | buf.getByte(offset + 1) & 0xFF);
948 }
949
950 private void wrap(ChannelHandlerContext context, Channel channel) throws SSLException {
951 ChannelBuffer msg;
952 ByteBuffer outNetBuf = bufferPool.acquireBuffer();
953 boolean success = true;
954 boolean offered = false;
955 boolean needsUnwrap = false;
956 PendingWrite pendingWrite = null;
957
958 try {
959 loop:
960 for (;;) {
961
962
963
964 pendingUnencryptedWritesLock.lock();
965 try {
966 pendingWrite = pendingUnencryptedWrites.peek();
967 if (pendingWrite == null) {
968 break;
969 }
970
971 ByteBuffer outAppBuf = pendingWrite.outAppBuf;
972 if (outAppBuf == null) {
973
974 pendingUnencryptedWrites.remove();
975 offerEncryptedWriteRequest(
976 new DownstreamMessageEvent(
977 channel, pendingWrite.future,
978 ChannelBuffers.EMPTY_BUFFER,
979 channel.getRemoteAddress()));
980 offered = true;
981 } else {
982 synchronized (handshakeLock) {
983 SSLEngineResult result = null;
984 try {
985 result = engine.wrap(outAppBuf, outNetBuf);
986 } finally {
987 if (!outAppBuf.hasRemaining()) {
988 pendingUnencryptedWrites.remove();
989 }
990 }
991
992 if (result.bytesProduced() > 0) {
993 outNetBuf.flip();
994 int remaining = outNetBuf.remaining();
995 msg = ctx.getChannel().getConfig().getBufferFactory().getBuffer(remaining);
996
997
998
999
1000
1001 msg.writeBytes(outNetBuf);
1002 outNetBuf.clear();
1003
1004 ChannelFuture future;
1005 if (pendingWrite.outAppBuf.hasRemaining()) {
1006
1007
1008 future = succeededFuture(channel);
1009 } else {
1010 future = pendingWrite.future;
1011 }
1012
1013 MessageEvent encryptedWrite = new DownstreamMessageEvent(
1014 channel, future, msg, channel.getRemoteAddress());
1015 offerEncryptedWriteRequest(encryptedWrite);
1016 offered = true;
1017 } else if (result.getStatus() == Status.CLOSED) {
1018
1019
1020 success = false;
1021 break;
1022 } else {
1023 final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
1024 handleRenegotiation(handshakeStatus);
1025 switch (handshakeStatus) {
1026 case NEED_WRAP:
1027 if (outAppBuf.hasRemaining()) {
1028 break;
1029 } else {
1030 break loop;
1031 }
1032 case NEED_UNWRAP:
1033 needsUnwrap = true;
1034 break loop;
1035 case NEED_TASK:
1036 runDelegatedTasks();
1037 break;
1038 case FINISHED:
1039 setHandshakeSuccess(channel);
1040 if (result.getStatus() == Status.CLOSED) {
1041 success = false;
1042 }
1043 break loop;
1044 case NOT_HANDSHAKING:
1045 setHandshakeSuccessIfStillHandshaking(channel);
1046 if (result.getStatus() == Status.CLOSED) {
1047 success = false;
1048 }
1049 break loop;
1050 default:
1051 throw new IllegalStateException(
1052 "Unknown handshake status: " +
1053 handshakeStatus);
1054 }
1055 }
1056 }
1057 }
1058 } finally {
1059 pendingUnencryptedWritesLock.unlock();
1060 }
1061 }
1062 } catch (SSLException e) {
1063 success = false;
1064 setHandshakeFailure(channel, e);
1065 throw e;
1066 } finally {
1067 bufferPool.releaseBuffer(outNetBuf);
1068
1069 if (offered) {
1070 flushPendingEncryptedWrites(context);
1071 }
1072
1073 if (!success) {
1074 IllegalStateException cause =
1075 new IllegalStateException("SSLEngine already closed");
1076
1077
1078
1079 if (pendingWrite != null) {
1080 pendingWrite.future.setFailure(cause);
1081 }
1082
1083
1084
1085
1086
1087 for (;;) {
1088 pendingUnencryptedWritesLock.lock();
1089 try {
1090 pendingWrite = pendingUnencryptedWrites.poll();
1091 if (pendingWrite == null) {
1092 break;
1093 }
1094 } finally {
1095 pendingUnencryptedWritesLock.unlock();
1096 }
1097
1098 pendingWrite.future.setFailure(cause);
1099 }
1100 }
1101 }
1102
1103 if (needsUnwrap) {
1104 unwrapNonAppData(ctx, channel, true);
1105 }
1106 }
1107
1108 private void offerEncryptedWriteRequest(MessageEvent encryptedWrite) {
1109 final boolean locked = pendingEncryptedWritesLock.tryLock();
1110 try {
1111 pendingEncryptedWrites.add(encryptedWrite);
1112 } finally {
1113 if (locked) {
1114 pendingEncryptedWritesLock.unlock();
1115 }
1116 }
1117 }
1118
1119 private void flushPendingEncryptedWrites(ChannelHandlerContext ctx) {
1120 while (!pendingEncryptedWrites.isEmpty()) {
1121
1122
1123
1124 if (!pendingEncryptedWritesLock.tryLock()) {
1125 return;
1126 }
1127
1128 try {
1129 MessageEvent e;
1130 while ((e = pendingEncryptedWrites.poll()) != null) {
1131 ctx.sendDownstream(e);
1132 }
1133 } finally {
1134 pendingEncryptedWritesLock.unlock();
1135 }
1136
1137
1138 }
1139 }
1140
1141 private ChannelFuture wrapNonAppData(ChannelHandlerContext ctx, Channel channel) throws SSLException {
1142 ChannelFuture future = null;
1143 ByteBuffer outNetBuf = bufferPool.acquireBuffer();
1144
1145 SSLEngineResult result;
1146 try {
1147 for (;;) {
1148 synchronized (handshakeLock) {
1149 result = engine.wrap(EMPTY_BUFFER, outNetBuf);
1150 }
1151
1152 if (result.bytesProduced() > 0) {
1153 outNetBuf.flip();
1154 ChannelBuffer msg =
1155 ctx.getChannel().getConfig().getBufferFactory().getBuffer(outNetBuf.remaining());
1156
1157
1158
1159
1160
1161 msg.writeBytes(outNetBuf);
1162 outNetBuf.clear();
1163
1164 future = future(channel);
1165 future.addListener(new ChannelFutureListener() {
1166 public void operationComplete(ChannelFuture future)
1167 throws Exception {
1168 if (future.getCause() instanceof ClosedChannelException) {
1169 synchronized (ignoreClosedChannelExceptionLock) {
1170 ignoreClosedChannelException ++;
1171 }
1172 }
1173 }
1174 });
1175
1176 write(ctx, future, msg);
1177 }
1178
1179 final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
1180 handleRenegotiation(handshakeStatus);
1181 switch (handshakeStatus) {
1182 case FINISHED:
1183 setHandshakeSuccess(channel);
1184 runDelegatedTasks();
1185 break;
1186 case NEED_TASK:
1187 runDelegatedTasks();
1188 break;
1189 case NEED_UNWRAP:
1190 if (!Thread.holdsLock(handshakeLock)) {
1191
1192
1193
1194 unwrapNonAppData(ctx, channel, true);
1195 }
1196 break;
1197 case NOT_HANDSHAKING:
1198 if (setHandshakeSuccessIfStillHandshaking(channel)) {
1199 runDelegatedTasks();
1200 }
1201 break;
1202 case NEED_WRAP:
1203 break;
1204 default:
1205 throw new IllegalStateException(
1206 "Unexpected handshake status: " + handshakeStatus);
1207 }
1208
1209 if (result.bytesProduced() == 0) {
1210 break;
1211 }
1212 }
1213 } catch (SSLException e) {
1214 setHandshakeFailure(channel, e);
1215 throw e;
1216 } finally {
1217 bufferPool.releaseBuffer(outNetBuf);
1218 }
1219
1220 if (future == null) {
1221 future = succeededFuture(channel);
1222 }
1223
1224 return future;
1225 }
1226
1227
1228
1229
1230 private void unwrapNonAppData(
1231 ChannelHandlerContext ctx, Channel channel, boolean mightNeedHandshake) throws SSLException {
1232 unwrap(ctx, channel, EMPTY_BUFFER, -1, mightNeedHandshake);
1233 }
1234
1235
1236
1237
1238 private ChannelBuffer unwrap(
1239 ChannelHandlerContext ctx, Channel channel,
1240 ByteBuffer nioInNetBuf,
1241 int initialNettyOutAppBufCapacity, boolean mightNeedHandshake) throws SSLException {
1242
1243 final int nioInNetBufStartOffset = nioInNetBuf.position();
1244 final ByteBuffer nioOutAppBuf = bufferPool.acquireBuffer();
1245
1246 ChannelBuffer nettyOutAppBuf = null;
1247
1248 try {
1249 boolean needsWrap = false;
1250 for (;;) {
1251 SSLEngineResult result;
1252 boolean needsHandshake = false;
1253 if (mightNeedHandshake) {
1254 synchronized (handshakeLock) {
1255 if (!handshaken && !handshaking &&
1256 !engine.getUseClientMode() &&
1257 !engine.isInboundDone() && !engine.isOutboundDone()) {
1258 needsHandshake = true;
1259 }
1260 }
1261 }
1262
1263 if (needsHandshake) {
1264 handshake();
1265 }
1266
1267 synchronized (handshakeLock) {
1268
1269
1270
1271
1272 for (;;) {
1273 final int outAppBufSize = engine.getSession().getApplicationBufferSize();
1274 final ByteBuffer outAppBuf;
1275 if (nioOutAppBuf.capacity() < outAppBufSize) {
1276
1277
1278 outAppBuf = ByteBuffer.allocate(outAppBufSize);
1279 } else {
1280 outAppBuf = nioOutAppBuf;
1281 }
1282
1283 try {
1284 result = engine.unwrap(nioInNetBuf, outAppBuf);
1285 switch (result.getStatus()) {
1286 case CLOSED:
1287
1288 sslEngineCloseFuture.setClosed();
1289 break;
1290 case BUFFER_OVERFLOW:
1291
1292
1293 continue;
1294 }
1295
1296 break;
1297 } finally {
1298 outAppBuf.flip();
1299
1300
1301 if (outAppBuf.hasRemaining()) {
1302 if (nettyOutAppBuf == null) {
1303 ChannelBufferFactory factory = ctx.getChannel().getConfig().getBufferFactory();
1304 nettyOutAppBuf = factory.getBuffer(initialNettyOutAppBufCapacity);
1305 }
1306 nettyOutAppBuf.writeBytes(outAppBuf);
1307 }
1308 outAppBuf.clear();
1309 }
1310 }
1311
1312 final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
1313 handleRenegotiation(handshakeStatus);
1314 switch (handshakeStatus) {
1315 case NEED_UNWRAP:
1316 break;
1317 case NEED_WRAP:
1318 wrapNonAppData(ctx, channel);
1319 break;
1320 case NEED_TASK:
1321 runDelegatedTasks();
1322 break;
1323 case FINISHED:
1324 setHandshakeSuccess(channel);
1325 needsWrap = true;
1326 continue;
1327 case NOT_HANDSHAKING:
1328 if (setHandshakeSuccessIfStillHandshaking(channel)) {
1329 needsWrap = true;
1330 continue;
1331 }
1332 if (writeBeforeHandshakeDone) {
1333
1334
1335
1336 writeBeforeHandshakeDone = false;
1337 needsWrap = true;
1338 }
1339 break;
1340 default:
1341 throw new IllegalStateException(
1342 "Unknown handshake status: " + handshakeStatus);
1343 }
1344
1345 if (result.getStatus() == Status.BUFFER_UNDERFLOW ||
1346 result.bytesConsumed() == 0 && result.bytesProduced() == 0) {
1347 if (nioInNetBuf.hasRemaining() && !engine.isInboundDone()) {
1348
1349
1350
1351
1352
1353 logger.warn("Unexpected leftover data after SSLEngine.unwrap():"
1354 + " status=" + result.getStatus()
1355 + " handshakeStatus=" + result.getHandshakeStatus()
1356 + " consumed=" + result.bytesConsumed()
1357 + " produced=" + result.bytesProduced()
1358 + " remaining=" + nioInNetBuf.remaining()
1359 + " data=" + ChannelBuffers.hexDump(ChannelBuffers.wrappedBuffer(nioInNetBuf)));
1360 }
1361 break;
1362 }
1363 }
1364 }
1365
1366 if (needsWrap) {
1367
1368
1369
1370
1371
1372
1373
1374
1375 if (!Thread.holdsLock(handshakeLock) && !pendingEncryptedWritesLock.isHeldByCurrentThread()) {
1376 wrap(ctx, channel);
1377 }
1378 }
1379 } catch (SSLException e) {
1380 setHandshakeFailure(channel, e);
1381 throw e;
1382 } finally {
1383 bufferPool.releaseBuffer(nioOutAppBuf);
1384 }
1385
1386 if (nettyOutAppBuf != null && nettyOutAppBuf.readable()) {
1387 return nettyOutAppBuf;
1388 } else {
1389 return null;
1390 }
1391 }
1392
1393 private void handleRenegotiation(HandshakeStatus handshakeStatus) {
1394 synchronized (handshakeLock) {
1395 if (handshakeStatus == HandshakeStatus.NOT_HANDSHAKING ||
1396 handshakeStatus == HandshakeStatus.FINISHED) {
1397
1398 return;
1399 }
1400
1401 if (!handshaken) {
1402
1403 return;
1404 }
1405
1406 final boolean renegotiate;
1407 if (handshaking) {
1408
1409
1410 return;
1411 }
1412
1413 if (engine.isInboundDone() || engine.isOutboundDone()) {
1414
1415 return;
1416 }
1417
1418 if (isEnableRenegotiation()) {
1419
1420 renegotiate = true;
1421 } else {
1422
1423 renegotiate = false;
1424
1425 handshaking = true;
1426 }
1427
1428 if (renegotiate) {
1429
1430 handshake();
1431 } else {
1432
1433 fireExceptionCaught(
1434 ctx, new SSLException(
1435 "renegotiation attempted by peer; " +
1436 "closing the connection"));
1437
1438
1439 Channels.close(ctx, succeededFuture(ctx.getChannel()));
1440 }
1441 }
1442 }
1443
1444
1445
1446
1447
1448
1449
1450 private void runDelegatedTasks() {
1451 if (delegatedTaskExecutor == ImmediateExecutor.INSTANCE) {
1452 for (;;) {
1453 final Runnable task;
1454 synchronized (handshakeLock) {
1455 task = engine.getDelegatedTask();
1456 }
1457
1458 if (task == null) {
1459 break;
1460 }
1461
1462 delegatedTaskExecutor.execute(task);
1463 }
1464 } else {
1465 final List<Runnable> tasks = new ArrayList<Runnable>(2);
1466 for (;;) {
1467 final Runnable task;
1468 synchronized (handshakeLock) {
1469 task = engine.getDelegatedTask();
1470 }
1471
1472 if (task == null) {
1473 break;
1474 }
1475
1476 tasks.add(task);
1477 }
1478
1479 if (tasks.isEmpty()) {
1480 return;
1481 }
1482
1483 final CountDownLatch latch = new CountDownLatch(1);
1484 delegatedTaskExecutor.execute(new Runnable() {
1485 public void run() {
1486 try {
1487 for (Runnable task: tasks) {
1488 task.run();
1489 }
1490 } catch (Exception e) {
1491 fireExceptionCaught(ctx, e);
1492 } finally {
1493 latch.countDown();
1494 }
1495 }
1496 });
1497
1498 boolean interrupted = false;
1499 while (latch.getCount() != 0) {
1500 try {
1501 latch.await();
1502 } catch (InterruptedException e) {
1503
1504 interrupted = true;
1505 }
1506 }
1507
1508 if (interrupted) {
1509 Thread.currentThread().interrupt();
1510 }
1511 }
1512 }
1513
1514
1515
1516
1517
1518
1519
1520
1521 private boolean setHandshakeSuccessIfStillHandshaking(Channel channel) {
1522 if (handshaking && !handshakeFuture.isDone()) {
1523 setHandshakeSuccess(channel);
1524 return true;
1525 }
1526 return false;
1527 }
1528
1529 private void setHandshakeSuccess(Channel channel) {
1530 synchronized (handshakeLock) {
1531 handshaking = false;
1532 handshaken = true;
1533
1534 if (handshakeFuture == null) {
1535 handshakeFuture = future(channel);
1536 }
1537 cancelHandshakeTimeout();
1538 }
1539
1540 if (logger.isDebugEnabled()) {
1541 logger.debug(channel + " HANDSHAKEN: " + engine.getSession().getCipherSuite());
1542 }
1543
1544 handshakeFuture.setSuccess();
1545 }
1546
1547 private void setHandshakeFailure(Channel channel, SSLException cause) {
1548 synchronized (handshakeLock) {
1549 if (!handshaking) {
1550 return;
1551 }
1552 handshaking = false;
1553 handshaken = false;
1554
1555 if (handshakeFuture == null) {
1556 handshakeFuture = future(channel);
1557 }
1558
1559
1560 cancelHandshakeTimeout();
1561
1562
1563
1564
1565 engine.closeOutbound();
1566
1567 try {
1568 engine.closeInbound();
1569 } catch (SSLException e) {
1570 if (logger.isDebugEnabled()) {
1571 logger.debug(
1572 "SSLEngine.closeInbound() raised an exception after " +
1573 "a handshake failure.", e);
1574 }
1575 }
1576 }
1577
1578 handshakeFuture.setFailure(cause);
1579 if (closeOnSslException) {
1580 Channels.close(ctx, future(channel));
1581 }
1582 }
1583
1584 private void closeOutboundAndChannel(
1585 final ChannelHandlerContext context, final ChannelStateEvent e) {
1586 if (!e.getChannel().isConnected()) {
1587 context.sendDownstream(e);
1588 return;
1589 }
1590
1591
1592 if (!CLOSED_OUTBOUND_AND_CHANNEL_UPDATER.compareAndSet(this, 0, 1)) {
1593
1594
1595
1596 e.getChannel().getCloseFuture().addListener(new ChannelFutureListener() {
1597 public void operationComplete(ChannelFuture future) throws Exception {
1598 context.sendDownstream(e);
1599 }
1600 });
1601 return;
1602 }
1603
1604 boolean passthrough = true;
1605 try {
1606 try {
1607 unwrapNonAppData(ctx, e.getChannel(), false);
1608 } catch (SSLException ex) {
1609 if (logger.isDebugEnabled()) {
1610 logger.debug("Failed to unwrap before sending a close_notify message", ex);
1611 }
1612 }
1613
1614 if (!engine.isOutboundDone()) {
1615 if (SENT_CLOSE_NOTIFY_UPDATER.compareAndSet(this, 0, 1)) {
1616 engine.closeOutbound();
1617 try {
1618 ChannelFuture closeNotifyFuture = wrapNonAppData(context, e.getChannel());
1619 closeNotifyFuture.addListener(
1620 new ClosingChannelFutureListener(context, e));
1621 passthrough = false;
1622 } catch (SSLException ex) {
1623 if (logger.isDebugEnabled()) {
1624 logger.debug("Failed to encode a close_notify message", ex);
1625 }
1626 }
1627 }
1628 }
1629 } finally {
1630 if (passthrough) {
1631 context.sendDownstream(e);
1632 }
1633 }
1634 }
1635
1636 private static final class PendingWrite {
1637 final ChannelFuture future;
1638 final ByteBuffer outAppBuf;
1639
1640 PendingWrite(ChannelFuture future, ByteBuffer outAppBuf) {
1641 this.future = future;
1642 this.outAppBuf = outAppBuf;
1643 }
1644 }
1645
1646 private static final class ClosingChannelFutureListener implements ChannelFutureListener {
1647
1648 private final ChannelHandlerContext context;
1649 private final ChannelStateEvent e;
1650
1651 ClosingChannelFutureListener(
1652 ChannelHandlerContext context, ChannelStateEvent e) {
1653 this.context = context;
1654 this.e = e;
1655 }
1656
1657 public void operationComplete(ChannelFuture closeNotifyFuture) throws Exception {
1658 if (!(closeNotifyFuture.getCause() instanceof ClosedChannelException)) {
1659 Channels.close(context, e.getFuture());
1660 } else {
1661 e.getFuture().setSuccess();
1662 }
1663 }
1664 }
1665
1666 @Override
1667 public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
1668 super.beforeAdd(ctx);
1669 this.ctx = ctx;
1670 }
1671
1672
1673
1674
1675 @Override
1676 public void afterRemove(ChannelHandlerContext ctx) throws Exception {
1677 closeEngine();
1678
1679
1680 Throwable cause = null;
1681 for (;;) {
1682 PendingWrite pw = pendingUnencryptedWrites.poll();
1683 if (pw == null) {
1684 break;
1685 }
1686 if (cause == null) {
1687 cause = new IOException("Unable to write data");
1688 }
1689 pw.future.setFailure(cause);
1690 }
1691
1692 for (;;) {
1693 MessageEvent ev = pendingEncryptedWrites.poll();
1694 if (ev == null) {
1695 break;
1696 }
1697 if (cause == null) {
1698 cause = new IOException("Unable to write data");
1699 }
1700 ev.getFuture().setFailure(cause);
1701 }
1702
1703 if (cause != null) {
1704 fireExceptionCaughtLater(ctx, cause);
1705 }
1706 }
1707
1708
1709
1710
1711 @Override
1712 public void channelConnected(final ChannelHandlerContext ctx, final ChannelStateEvent e) throws Exception {
1713 if (issueHandshake) {
1714
1715
1716 handshake().addListener(new ChannelFutureListener() {
1717
1718 public void operationComplete(ChannelFuture future) throws Exception {
1719 if (future.isSuccess()) {
1720
1721
1722
1723 ctx.sendUpstream(e);
1724 }
1725 }
1726 });
1727 } else {
1728 super.channelConnected(ctx, e);
1729 }
1730 }
1731
1732
1733
1734
1735
1736
1737 @Override
1738 public void channelClosed(final ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
1739
1740
1741 ctx.getPipeline().execute(new Runnable() {
1742 public void run() {
1743 if (!pendingUnencryptedWritesLock.tryLock()) {
1744 return;
1745 }
1746
1747 List<ChannelFuture> futures = null;
1748 try {
1749 for (;;) {
1750 PendingWrite pw = pendingUnencryptedWrites.poll();
1751 if (pw == null) {
1752 break;
1753 }
1754 if (futures == null) {
1755 futures = new ArrayList<ChannelFuture>();
1756 }
1757 futures.add(pw.future);
1758 }
1759
1760 for (;;) {
1761 MessageEvent ev = pendingEncryptedWrites.poll();
1762 if (ev == null) {
1763 break;
1764 }
1765 if (futures != null) {
1766 futures = new ArrayList<ChannelFuture>();
1767 }
1768 futures.add(ev.getFuture());
1769 }
1770 } finally {
1771 pendingUnencryptedWritesLock.unlock();
1772 }
1773
1774 if (futures != null) {
1775 final ClosedChannelException cause = new ClosedChannelException();
1776 final int size = futures.size();
1777 for (int i = 0; i < size; i ++) {
1778 futures.get(i).setFailure(cause);
1779 }
1780 fireExceptionCaught(ctx, cause);
1781 }
1782 }
1783 });
1784
1785 super.channelClosed(ctx, e);
1786 }
1787
1788 private final class SSLEngineInboundCloseFuture extends DefaultChannelFuture {
1789 SSLEngineInboundCloseFuture() {
1790 super(null, true);
1791 }
1792
1793 void setClosed() {
1794 super.setSuccess();
1795 }
1796
1797 @Override
1798 public Channel getChannel() {
1799 if (ctx == null) {
1800
1801 return null;
1802 } else {
1803 return ctx.getChannel();
1804 }
1805 }
1806
1807 @Override
1808 public boolean setSuccess() {
1809 return false;
1810 }
1811
1812 @Override
1813 public boolean setFailure(Throwable cause) {
1814 return false;
1815 }
1816 }
1817 }