1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.ssl;
17
18 import org.jboss.netty.buffer.ChannelBuffer;
19 import org.jboss.netty.buffer.ChannelBuffers;
20 import org.jboss.netty.channel.Channel;
21 import org.jboss.netty.channel.ChannelDownstreamHandler;
22 import org.jboss.netty.channel.ChannelEvent;
23 import org.jboss.netty.channel.ChannelFuture;
24 import org.jboss.netty.channel.ChannelFutureListener;
25 import org.jboss.netty.channel.ChannelHandlerContext;
26 import org.jboss.netty.channel.ChannelPipeline;
27 import org.jboss.netty.channel.ChannelStateEvent;
28 import org.jboss.netty.channel.Channels;
29 import org.jboss.netty.channel.DefaultChannelFuture;
30 import org.jboss.netty.channel.DownstreamMessageEvent;
31 import org.jboss.netty.channel.ExceptionEvent;
32 import org.jboss.netty.channel.MessageEvent;
33 import org.jboss.netty.handler.codec.frame.FrameDecoder;
34 import org.jboss.netty.logging.InternalLogger;
35 import org.jboss.netty.logging.InternalLoggerFactory;
36 import org.jboss.netty.util.internal.DetectionUtil;
37 import org.jboss.netty.util.internal.NonReentrantLock;
38
39 import javax.net.ssl.SSLEngine;
40 import javax.net.ssl.SSLEngineResult;
41 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
42 import javax.net.ssl.SSLEngineResult.Status;
43 import javax.net.ssl.SSLException;
44 import java.io.IOException;
45 import java.nio.ByteBuffer;
46 import java.nio.channels.ClosedChannelException;
47 import java.nio.channels.DatagramChannel;
48 import java.nio.channels.SocketChannel;
49 import java.util.LinkedList;
50 import java.util.Queue;
51 import java.util.concurrent.ConcurrentLinkedQueue;
52 import java.util.concurrent.Executor;
53 import java.util.concurrent.atomic.AtomicBoolean;
54 import java.util.regex.Pattern;
55
56 import static org.jboss.netty.channel.Channels.*;
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156 public class SslHandler extends FrameDecoder
157 implements ChannelDownstreamHandler {
158
159 private static final InternalLogger logger =
160 InternalLoggerFactory.getInstance(SslHandler.class);
161
162 private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
163
164 private static final Pattern IGNORABLE_CLASS_IN_STACK = Pattern.compile(
165 "^.*(Socket|DatagramChannel|SctpChannel).*$");
166 private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile(
167 "^.*(?:connection.*reset|connection.*closed|broken.*pipe).*$",
168 Pattern.CASE_INSENSITIVE);
169
170 private static SslBufferPool defaultBufferPool;
171
172
173
174
175
176 public static synchronized SslBufferPool getDefaultBufferPool() {
177 if (defaultBufferPool == null) {
178 defaultBufferPool = new SslBufferPool();
179 }
180 return defaultBufferPool;
181 }
182
183 private volatile ChannelHandlerContext ctx;
184 private final SSLEngine engine;
185 private final SslBufferPool bufferPool;
186 private final Executor delegatedTaskExecutor;
187 private final boolean startTls;
188
189 private volatile boolean enableRenegotiation = true;
190
191 final Object handshakeLock = new Object();
192 private boolean handshaking;
193 private volatile boolean handshaken;
194 private volatile ChannelFuture handshakeFuture;
195
196 private final AtomicBoolean sentFirstMessage = new AtomicBoolean();
197 private final AtomicBoolean sentCloseNotify = new AtomicBoolean();
198 int ignoreClosedChannelException;
199 final Object ignoreClosedChannelExceptionLock = new Object();
200 private final Queue<PendingWrite> pendingUnencryptedWrites = new LinkedList<PendingWrite>();
201 private final Queue<MessageEvent> pendingEncryptedWrites = new ConcurrentLinkedQueue<MessageEvent>();
202 private final NonReentrantLock pendingEncryptedWritesLock = new NonReentrantLock();
203
204 private volatile boolean issueHandshake;
205
206 private final SSLEngineInboundCloseFuture sslEngineCloseFuture = new SSLEngineInboundCloseFuture();
207
208 private boolean closeOnSSLException;
209
210 private int packetLength = Integer.MIN_VALUE;
211
212
213
214
215
216
217 public SslHandler(SSLEngine engine) {
218 this(engine, getDefaultBufferPool(), ImmediateExecutor.INSTANCE);
219 }
220
221
222
223
224
225
226
227
228 public SslHandler(SSLEngine engine, SslBufferPool bufferPool) {
229 this(engine, bufferPool, ImmediateExecutor.INSTANCE);
230 }
231
232
233
234
235
236
237
238
239 public SslHandler(SSLEngine engine, boolean startTls) {
240 this(engine, getDefaultBufferPool(), startTls);
241 }
242
243
244
245
246
247
248
249
250
251
252 public SslHandler(SSLEngine engine, SslBufferPool bufferPool, boolean startTls) {
253 this(engine, bufferPool, startTls, ImmediateExecutor.INSTANCE);
254 }
255
256
257
258
259
260
261
262
263
264
265 public SslHandler(SSLEngine engine, Executor delegatedTaskExecutor) {
266 this(engine, getDefaultBufferPool(), delegatedTaskExecutor);
267 }
268
269
270
271
272
273
274
275
276
277
278
279
280
281 public SslHandler(SSLEngine engine, SslBufferPool bufferPool, Executor delegatedTaskExecutor) {
282 this(engine, bufferPool, false, delegatedTaskExecutor);
283 }
284
285
286
287
288
289
290
291
292
293
294
295
296
297 public SslHandler(SSLEngine engine, boolean startTls, Executor delegatedTaskExecutor) {
298 this(engine, getDefaultBufferPool(), startTls, delegatedTaskExecutor);
299 }
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316 public SslHandler(SSLEngine engine, SslBufferPool bufferPool, boolean startTls, Executor delegatedTaskExecutor) {
317 if (engine == null) {
318 throw new NullPointerException("engine");
319 }
320 if (bufferPool == null) {
321 throw new NullPointerException("bufferPool");
322 }
323 if (delegatedTaskExecutor == null) {
324 throw new NullPointerException("delegatedTaskExecutor");
325 }
326 this.engine = engine;
327 this.bufferPool = bufferPool;
328 this.delegatedTaskExecutor = delegatedTaskExecutor;
329 this.startTls = startTls;
330 }
331
332
333
334
335 public SSLEngine getEngine() {
336 return engine;
337 }
338
339
340
341
342
343
344
345 public ChannelFuture handshake() {
346 if (handshaken && !isEnableRenegotiation()) {
347 throw new IllegalStateException("renegotiation disabled");
348 }
349
350 ChannelHandlerContext ctx = this.ctx;
351 Channel channel = ctx.getChannel();
352 ChannelFuture handshakeFuture;
353 Exception exception = null;
354
355 synchronized (handshakeLock) {
356 if (handshaking) {
357 return this.handshakeFuture;
358 } else {
359 handshaking = true;
360 try {
361 engine.beginHandshake();
362 runDelegatedTasks();
363 handshakeFuture = this.handshakeFuture = future(channel);
364 } catch (Exception e) {
365 handshakeFuture = this.handshakeFuture = failedFuture(channel, e);
366 exception = e;
367 }
368 }
369 }
370
371 if (exception == null) {
372 try {
373 wrapNonAppData(ctx, channel);
374 } catch (SSLException e) {
375 handshakeFuture.setFailure(e);
376
377 fireExceptionCaught(ctx, e);
378 if (closeOnSSLException) {
379 Channels.close(ctx, future(channel));
380 }
381 }
382 } else {
383 fireExceptionCaught(ctx, exception);
384 if (closeOnSSLException) {
385 Channels.close(ctx, future(channel));
386 }
387 }
388
389 return handshakeFuture;
390 }
391
392
393
394
395 @Deprecated
396 public ChannelFuture handshake(@SuppressWarnings("unused") Channel channel) {
397 return handshake();
398 }
399
400
401
402
403
404 public ChannelFuture close() {
405 ChannelHandlerContext ctx = this.ctx;
406 Channel channel = ctx.getChannel();
407 try {
408 engine.closeOutbound();
409 return wrapNonAppData(ctx, channel);
410 } catch (SSLException e) {
411 fireExceptionCaught(ctx, e);
412 if (closeOnSSLException) {
413 Channels.close(ctx, future(channel));
414 }
415 return failedFuture(channel, e);
416 }
417 }
418
419
420
421
422 @Deprecated
423 public ChannelFuture close(@SuppressWarnings("unused") Channel channel) {
424 return close();
425 }
426
427
428
429
430 public boolean isEnableRenegotiation() {
431 return enableRenegotiation;
432 }
433
434
435
436
437 public void setEnableRenegotiation(boolean enableRenegotiation) {
438 this.enableRenegotiation = enableRenegotiation;
439 }
440
441
442
443
444
445
446 public void setIssueHandshake(boolean issueHandshake) {
447 this.issueHandshake = issueHandshake;
448 }
449
450
451
452
453 public boolean isIssueHandshake() {
454 return issueHandshake;
455 }
456
457
458
459
460
461
462
463
464
465 public ChannelFuture getSSLEngineInboundCloseFuture() {
466 return sslEngineCloseFuture;
467
468 }
469
470
471
472
473
474
475
476
477
478
479 public void setCloseOnSSLException(boolean closeOnSslException) {
480 if (ctx != null) {
481 throw new IllegalStateException("Can only get changed before attached to ChannelPipeline");
482 }
483 closeOnSSLException = closeOnSslException;
484 }
485
486 public boolean getCloseOnSSLException() {
487 return closeOnSSLException;
488 }
489
490 public void handleDownstream(
491 final ChannelHandlerContext context, final ChannelEvent evt) throws Exception {
492 if (evt instanceof ChannelStateEvent) {
493 ChannelStateEvent e = (ChannelStateEvent) evt;
494 switch (e.getState()) {
495 case OPEN:
496 case CONNECTED:
497 case BOUND:
498 if (Boolean.FALSE.equals(e.getValue()) || e.getValue() == null) {
499 closeOutboundAndChannel(context, e);
500 return;
501 }
502 }
503 }
504 if (!(evt instanceof MessageEvent)) {
505 context.sendDownstream(evt);
506 return;
507 }
508
509 MessageEvent e = (MessageEvent) evt;
510 if (!(e.getMessage() instanceof ChannelBuffer)) {
511 context.sendDownstream(evt);
512 return;
513 }
514
515
516
517 if (startTls && sentFirstMessage.compareAndSet(false, true)) {
518 context.sendDownstream(evt);
519 return;
520 }
521
522
523 ChannelBuffer msg = (ChannelBuffer) e.getMessage();
524 PendingWrite pendingWrite;
525
526 if (msg.readable()) {
527 pendingWrite = new PendingWrite(evt.getFuture(), msg.toByteBuffer(msg.readerIndex(), msg.readableBytes()));
528 } else {
529 pendingWrite = new PendingWrite(evt.getFuture(), null);
530 }
531 synchronized (pendingUnencryptedWrites) {
532 boolean offered = pendingUnencryptedWrites.offer(pendingWrite);
533 assert offered;
534 }
535
536 wrap(context, evt.getChannel());
537 }
538
539 @Override
540 public void channelDisconnected(ChannelHandlerContext ctx,
541 ChannelStateEvent e) throws Exception {
542
543
544
545 synchronized (handshakeLock) {
546 if (handshaking) {
547 handshakeFuture.setFailure(new ClosedChannelException());
548 }
549 }
550
551 try {
552 super.channelDisconnected(ctx, e);
553 } finally {
554 unwrap(ctx, e.getChannel(), ChannelBuffers.EMPTY_BUFFER, 0, 0);
555 engine.closeOutbound();
556 if (!sentCloseNotify.get() && handshaken) {
557 try {
558 engine.closeInbound();
559 } catch (SSLException ex) {
560 if (logger.isDebugEnabled()) {
561 logger.debug("Failed to clean up SSLEngine.", ex);
562 }
563 }
564 }
565 }
566 }
567
568 @Override
569 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
570 throws Exception {
571
572 Throwable cause = e.getCause();
573 if (cause instanceof IOException) {
574 if (cause instanceof ClosedChannelException) {
575 synchronized (ignoreClosedChannelExceptionLock) {
576 if (ignoreClosedChannelException > 0) {
577 ignoreClosedChannelException --;
578 if (logger.isDebugEnabled()) {
579 logger.debug(
580 "Swallowing an exception raised while " +
581 "writing non-app data", cause);
582 }
583
584 return;
585 }
586 }
587 } else {
588 if (ignoreException(cause)) {
589 return;
590 }
591 }
592 }
593
594 ctx.sendUpstream(e);
595 }
596
597
598
599
600
601
602
603
604
605
606 private boolean ignoreException(Throwable t) {
607 if (!(t instanceof SSLException) && t instanceof IOException && engine.isOutboundDone()) {
608 String message = String.valueOf(t.getMessage()).toLowerCase();
609
610
611
612 if (IGNORABLE_ERROR_MESSAGE.matcher(message).matches()) {
613 return true;
614 }
615
616
617 StackTraceElement[] elements = t.getStackTrace();
618 for (StackTraceElement element: elements) {
619 String classname = element.getClassName();
620 String methodname = element.getMethodName();
621
622
623 if (classname.startsWith("org.jboss.netty.")) {
624 continue;
625 }
626
627
628 if (!methodname.equals("read")) {
629 continue;
630 }
631
632
633
634 if (IGNORABLE_CLASS_IN_STACK.matcher(classname).matches()) {
635 return true;
636 }
637
638 try {
639
640
641
642 Class<?> clazz = getClass().getClassLoader().loadClass(classname);
643
644 if (SocketChannel.class.isAssignableFrom(clazz)
645 || DatagramChannel.class.isAssignableFrom(clazz)) {
646 return true;
647 }
648
649
650 if (DetectionUtil.javaVersion() >= 7
651 && "com.sun.nio.sctp.SctpChannel".equals(clazz.getSuperclass().getName())) {
652 return true;
653 }
654 } catch (ClassNotFoundException e) {
655
656 }
657
658 }
659 }
660
661 return false;
662 }
663
664
665
666
667
668
669
670
671
672
673
674
675
676 public static boolean isEncrypted(ChannelBuffer buffer) {
677 return getEncryptedPacketLength(buffer) != -1;
678 }
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693 private static int getEncryptedPacketLength(ChannelBuffer buffer) {
694 if (buffer.readableBytes() < 5) {
695 throw new IllegalArgumentException("buffer must have at least 5 readable bytes");
696 }
697
698 int packetLength = 0;
699
700
701 boolean tls;
702 switch (buffer.getUnsignedByte(buffer.readerIndex())) {
703 case 20:
704 case 21:
705 case 22:
706 case 23:
707 tls = true;
708 break;
709 default:
710
711 tls = false;
712 }
713
714 if (tls) {
715
716 int majorVersion = buffer.getUnsignedByte(buffer.readerIndex() + 1);
717 if (majorVersion == 3) {
718
719 packetLength = (getShort(buffer, buffer.readerIndex() + 3) & 0xFFFF) + 5;
720 if (packetLength <= 5) {
721
722 tls = false;
723 }
724 } else {
725
726 tls = false;
727 }
728 }
729
730 if (!tls) {
731
732 boolean sslv2 = true;
733 int headerLength = (buffer.getUnsignedByte(
734 buffer.readerIndex()) & 0x80) != 0 ? 2 : 3;
735 int majorVersion = buffer.getUnsignedByte(
736 buffer.readerIndex() + headerLength + 1);
737 if (majorVersion == 2 || majorVersion == 3) {
738
739 if (headerLength == 2) {
740 packetLength = (getShort(buffer, buffer.readerIndex()) & 0x7FFF) + 2;
741 } else {
742 packetLength = (getShort(buffer, buffer.readerIndex()) & 0x3FFF) + 3;
743 }
744 if (packetLength <= headerLength) {
745 sslv2 = false;
746 }
747 } else {
748 sslv2 = false;
749 }
750
751 if (!sslv2) {
752 return -1;
753 }
754 }
755 return packetLength;
756 }
757
758 @Override
759 protected Object decode(
760 final ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
761
762
763 if (packetLength == Integer.MIN_VALUE) {
764 if (buffer.readableBytes() < 5) {
765 return null;
766 }
767 int packetLength = getEncryptedPacketLength(buffer);
768
769 if (packetLength == -1) {
770
771 NotSslRecordException e = new NotSslRecordException(
772 "not an SSL/TLS record: " + ChannelBuffers.hexDump(buffer));
773 buffer.skipBytes(buffer.readableBytes());
774
775 if (closeOnSSLException) {
776
777 fireExceptionCaught(ctx, e);
778 Channels.close(ctx, future(channel));
779
780
781
782 return null;
783 } else {
784 throw e;
785 }
786 }
787
788 assert packetLength > 0;
789 this.packetLength = packetLength;
790 }
791
792 if (buffer.readableBytes() < packetLength) {
793 return null;
794 }
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810 final int packetOffset = buffer.readerIndex();
811 buffer.skipBytes(packetLength);
812 try {
813 return unwrap(ctx, channel, buffer, packetOffset, packetLength);
814 } finally {
815
816 packetLength = Integer.MIN_VALUE;
817 }
818
819 }
820
821
822
823
824
825 private static short getShort(ChannelBuffer buf, int offset) {
826 return (short) (buf.getByte(offset) << 8 | buf.getByte(offset + 1) & 0xFF);
827 }
828
829 private void wrap(ChannelHandlerContext context, Channel channel)
830 throws SSLException {
831
832 ChannelBuffer msg;
833 ByteBuffer outNetBuf = bufferPool.acquireBuffer();
834 boolean success = true;
835 boolean offered = false;
836 boolean needsUnwrap = false;
837 PendingWrite pendingWrite = null;
838
839 try {
840 loop:
841 for (;;) {
842
843
844
845 synchronized (pendingUnencryptedWrites) {
846 pendingWrite = pendingUnencryptedWrites.peek();
847 if (pendingWrite == null) {
848 break;
849 }
850
851 ByteBuffer outAppBuf = pendingWrite.outAppBuf;
852 if (outAppBuf == null) {
853
854 pendingUnencryptedWrites.remove();
855 offerEncryptedWriteRequest(
856 new DownstreamMessageEvent(
857 channel, pendingWrite.future,
858 ChannelBuffers.EMPTY_BUFFER,
859 channel.getRemoteAddress()));
860 offered = true;
861 } else {
862 SSLEngineResult result = null;
863 try {
864 synchronized (handshakeLock) {
865 result = engine.wrap(outAppBuf, outNetBuf);
866 }
867 } finally {
868 if (!outAppBuf.hasRemaining()) {
869 pendingUnencryptedWrites.remove();
870 }
871 }
872
873 if (result.bytesProduced() > 0) {
874 outNetBuf.flip();
875 int remaining = outNetBuf.remaining();
876 msg = ctx.getChannel().getConfig().getBufferFactory().getBuffer(remaining);
877
878
879
880
881
882 msg.writeBytes(outNetBuf);
883 outNetBuf.clear();
884
885 ChannelFuture future;
886 if (pendingWrite.outAppBuf.hasRemaining()) {
887
888
889 future = succeededFuture(channel);
890 } else {
891 future = pendingWrite.future;
892 }
893
894 MessageEvent encryptedWrite = new DownstreamMessageEvent(
895 channel, future, msg, channel.getRemoteAddress());
896 offerEncryptedWriteRequest(encryptedWrite);
897 offered = true;
898 } else if (result.getStatus() == Status.CLOSED) {
899
900
901 success = false;
902 break;
903 } else {
904 final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
905 handleRenegotiation(handshakeStatus);
906 switch (handshakeStatus) {
907 case NEED_WRAP:
908 if (outAppBuf.hasRemaining()) {
909 break;
910 } else {
911 break loop;
912 }
913 case NEED_UNWRAP:
914 needsUnwrap = true;
915 break loop;
916 case NEED_TASK:
917 runDelegatedTasks();
918 break;
919 case FINISHED:
920 case NOT_HANDSHAKING:
921 if (handshakeStatus == HandshakeStatus.FINISHED) {
922 setHandshakeSuccess(channel);
923 }
924 if (result.getStatus() == Status.CLOSED) {
925 success = false;
926 }
927 break loop;
928 default:
929 throw new IllegalStateException(
930 "Unknown handshake status: " +
931 handshakeStatus);
932 }
933 }
934 }
935 }
936 }
937 } catch (SSLException e) {
938 success = false;
939 setHandshakeFailure(channel, e);
940 throw e;
941 } finally {
942 bufferPool.releaseBuffer(outNetBuf);
943
944 if (offered) {
945 flushPendingEncryptedWrites(context);
946 }
947
948 if (!success) {
949 IllegalStateException cause =
950 new IllegalStateException("SSLEngine already closed");
951
952
953
954 if (pendingWrite != null) {
955 pendingWrite.future.setFailure(cause);
956 }
957
958
959
960
961
962 for (;;) {
963 synchronized (pendingUnencryptedWrites) {
964 pendingWrite = pendingUnencryptedWrites.poll();
965 if (pendingWrite == null) {
966 break;
967 }
968 }
969
970 pendingWrite.future.setFailure(cause);
971 }
972 }
973 }
974
975 if (needsUnwrap) {
976 unwrap(context, channel, ChannelBuffers.EMPTY_BUFFER, 0, 0);
977 }
978 }
979
980 private void offerEncryptedWriteRequest(MessageEvent encryptedWrite) {
981 final boolean locked = pendingEncryptedWritesLock.tryLock();
982 try {
983 pendingEncryptedWrites.offer(encryptedWrite);
984 } finally {
985 if (locked) {
986 pendingEncryptedWritesLock.unlock();
987 }
988 }
989 }
990
991 private void flushPendingEncryptedWrites(ChannelHandlerContext ctx) {
992
993
994
995 if (!pendingEncryptedWritesLock.tryLock()) {
996 return;
997 }
998
999 try {
1000 MessageEvent e;
1001 while ((e = pendingEncryptedWrites.poll()) != null) {
1002 ctx.sendDownstream(e);
1003 }
1004 } finally {
1005 pendingEncryptedWritesLock.unlock();
1006 }
1007 }
1008
1009 private ChannelFuture wrapNonAppData(ChannelHandlerContext ctx, Channel channel) throws SSLException {
1010 ChannelFuture future = null;
1011 ByteBuffer outNetBuf = bufferPool.acquireBuffer();
1012
1013 SSLEngineResult result;
1014 try {
1015 for (;;) {
1016 synchronized (handshakeLock) {
1017 result = engine.wrap(EMPTY_BUFFER, outNetBuf);
1018 }
1019
1020 if (result.bytesProduced() > 0) {
1021 outNetBuf.flip();
1022 ChannelBuffer msg =
1023 ctx.getChannel().getConfig().getBufferFactory().getBuffer(outNetBuf.remaining());
1024
1025
1026
1027
1028
1029
1030 msg.writeBytes(outNetBuf);
1031 outNetBuf.clear();
1032
1033 future = future(channel);
1034 future.addListener(new ChannelFutureListener() {
1035 public void operationComplete(ChannelFuture future)
1036 throws Exception {
1037 if (future.getCause() instanceof ClosedChannelException) {
1038 synchronized (ignoreClosedChannelExceptionLock) {
1039 ignoreClosedChannelException ++;
1040 }
1041 }
1042 }
1043 });
1044
1045 write(ctx, future, msg);
1046 }
1047
1048 final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
1049 handleRenegotiation(handshakeStatus);
1050 switch (handshakeStatus) {
1051 case FINISHED:
1052 setHandshakeSuccess(channel);
1053 runDelegatedTasks();
1054 break;
1055 case NEED_TASK:
1056 runDelegatedTasks();
1057 break;
1058 case NEED_UNWRAP:
1059 if (!Thread.holdsLock(handshakeLock)) {
1060
1061
1062
1063 unwrap(ctx, channel, ChannelBuffers.EMPTY_BUFFER, 0, 0);
1064 }
1065 break;
1066 case NOT_HANDSHAKING:
1067 case NEED_WRAP:
1068 break;
1069 default:
1070 throw new IllegalStateException(
1071 "Unexpected handshake status: " + handshakeStatus);
1072 }
1073
1074 if (result.bytesProduced() == 0) {
1075 break;
1076 }
1077 }
1078 } catch (SSLException e) {
1079 setHandshakeFailure(channel, e);
1080 throw e;
1081 } finally {
1082 bufferPool.releaseBuffer(outNetBuf);
1083 }
1084
1085 if (future == null) {
1086 future = succeededFuture(channel);
1087 }
1088
1089 return future;
1090 }
1091
1092 private ChannelBuffer unwrap(
1093 ChannelHandlerContext ctx, Channel channel,
1094 ChannelBuffer buffer, int offset, int length) throws SSLException {
1095 ByteBuffer inNetBuf = buffer.toByteBuffer(offset, length);
1096 ByteBuffer outAppBuf = bufferPool.acquireBuffer();
1097
1098 try {
1099 boolean needsWrap = false;
1100 loop:
1101 for (;;) {
1102 SSLEngineResult result;
1103 boolean needsHandshake = false;
1104 synchronized (handshakeLock) {
1105 if (!handshaken && !handshaking &&
1106 !engine.getUseClientMode() &&
1107 !engine.isInboundDone() && !engine.isOutboundDone()) {
1108 needsHandshake = true;
1109
1110 }
1111 }
1112 if (needsHandshake) {
1113 handshake();
1114 }
1115
1116 synchronized (handshakeLock) {
1117 result = engine.unwrap(inNetBuf, outAppBuf);
1118 }
1119
1120
1121 if (result.getStatus() == Status.CLOSED) {
1122 sslEngineCloseFuture.setClosed();
1123 }
1124
1125
1126 final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
1127 handleRenegotiation(handshakeStatus);
1128 switch (handshakeStatus) {
1129 case NEED_UNWRAP:
1130 if (inNetBuf.hasRemaining() && !engine.isInboundDone()) {
1131 break;
1132 } else {
1133 break loop;
1134 }
1135 case NEED_WRAP:
1136 wrapNonAppData(ctx, channel);
1137 break;
1138 case NEED_TASK:
1139 runDelegatedTasks();
1140 break;
1141 case FINISHED:
1142 setHandshakeSuccess(channel);
1143 needsWrap = true;
1144 break loop;
1145 case NOT_HANDSHAKING:
1146 needsWrap = true;
1147 break loop;
1148 default:
1149 throw new IllegalStateException(
1150 "Unknown handshake status: " + handshakeStatus);
1151 }
1152
1153 }
1154
1155 if (needsWrap) {
1156
1157
1158
1159
1160
1161
1162
1163
1164 if (!Thread.holdsLock(handshakeLock) &&
1165 !pendingEncryptedWritesLock.isHeldByCurrentThread()) {
1166 wrap(ctx, channel);
1167 }
1168 }
1169
1170 outAppBuf.flip();
1171
1172 if (outAppBuf.hasRemaining()) {
1173 ChannelBuffer frame = ctx.getChannel().getConfig().getBufferFactory().getBuffer(outAppBuf.remaining());
1174
1175
1176
1177
1178 frame.writeBytes(outAppBuf);
1179
1180 return frame;
1181 } else {
1182 return null;
1183 }
1184 } catch (SSLException e) {
1185 setHandshakeFailure(channel, e);
1186 throw e;
1187 } finally {
1188 bufferPool.releaseBuffer(outAppBuf);
1189 }
1190 }
1191
1192 private void handleRenegotiation(HandshakeStatus handshakeStatus) {
1193 if (handshakeStatus == HandshakeStatus.NOT_HANDSHAKING ||
1194 handshakeStatus == HandshakeStatus.FINISHED) {
1195
1196 return;
1197 }
1198
1199 if (!handshaken) {
1200
1201 return;
1202 }
1203
1204 final boolean renegotiate;
1205 synchronized (handshakeLock) {
1206 if (handshaking) {
1207
1208
1209 return;
1210 }
1211
1212 if (engine.isInboundDone() || engine.isOutboundDone()) {
1213
1214 return;
1215 }
1216
1217 if (isEnableRenegotiation()) {
1218
1219 renegotiate = true;
1220 } else {
1221
1222 renegotiate = false;
1223
1224 handshaking = true;
1225 }
1226 }
1227
1228 if (renegotiate) {
1229
1230 handshake();
1231 } else {
1232
1233 fireExceptionCaught(
1234 ctx, new SSLException(
1235 "renegotiation attempted by peer; " +
1236 "closing the connection"));
1237
1238
1239 Channels.close(ctx, succeededFuture(ctx.getChannel()));
1240 }
1241 }
1242
1243 private void runDelegatedTasks() {
1244 for (;;) {
1245 final Runnable task;
1246 synchronized (handshakeLock) {
1247 task = engine.getDelegatedTask();
1248 }
1249
1250 if (task == null) {
1251 break;
1252 }
1253
1254 delegatedTaskExecutor.execute(new Runnable() {
1255 public void run() {
1256 synchronized (handshakeLock) {
1257 task.run();
1258 }
1259 }
1260 });
1261 }
1262 }
1263
1264 private void setHandshakeSuccess(Channel channel) {
1265 synchronized (handshakeLock) {
1266 handshaking = false;
1267 handshaken = true;
1268
1269 if (handshakeFuture == null) {
1270 handshakeFuture = future(channel);
1271 }
1272 }
1273
1274 handshakeFuture.setSuccess();
1275 }
1276
1277 private void setHandshakeFailure(Channel channel, SSLException cause) {
1278 synchronized (handshakeLock) {
1279 if (!handshaking) {
1280 return;
1281 }
1282 handshaking = false;
1283 handshaken = false;
1284
1285 if (handshakeFuture == null) {
1286 handshakeFuture = future(channel);
1287 }
1288
1289
1290
1291
1292 engine.closeOutbound();
1293
1294 try {
1295 engine.closeInbound();
1296 } catch (SSLException e) {
1297 if (logger.isDebugEnabled()) {
1298 logger.debug(
1299 "SSLEngine.closeInbound() raised an exception after " +
1300 "a handshake failure.", e);
1301 }
1302
1303 }
1304 }
1305
1306 handshakeFuture.setFailure(cause);
1307 if (closeOnSSLException) {
1308 Channels.close(ctx, future(channel));
1309 }
1310 }
1311
1312 private void closeOutboundAndChannel(
1313 final ChannelHandlerContext context, final ChannelStateEvent e) {
1314 if (!e.getChannel().isConnected()) {
1315 context.sendDownstream(e);
1316 return;
1317 }
1318
1319 boolean passthrough = true;
1320 try {
1321 try {
1322 unwrap(context, e.getChannel(), ChannelBuffers.EMPTY_BUFFER, 0, 0);
1323 } catch (SSLException ex) {
1324 if (logger.isDebugEnabled()) {
1325 logger.debug("Failed to unwrap before sending a close_notify message", ex);
1326 }
1327 }
1328
1329 if (!engine.isOutboundDone()) {
1330 if (sentCloseNotify.compareAndSet(false, true)) {
1331 engine.closeOutbound();
1332 try {
1333 ChannelFuture closeNotifyFuture = wrapNonAppData(context, e.getChannel());
1334 closeNotifyFuture.addListener(
1335 new ClosingChannelFutureListener(context, e));
1336 passthrough = false;
1337 } catch (SSLException ex) {
1338 if (logger.isDebugEnabled()) {
1339 logger.debug("Failed to encode a close_notify message", ex);
1340 }
1341 }
1342 }
1343 }
1344 } finally {
1345 if (passthrough) {
1346 context.sendDownstream(e);
1347 }
1348 }
1349 }
1350
1351 private static final class PendingWrite {
1352 final ChannelFuture future;
1353 final ByteBuffer outAppBuf;
1354
1355 PendingWrite(ChannelFuture future, ByteBuffer outAppBuf) {
1356 this.future = future;
1357 this.outAppBuf = outAppBuf;
1358 }
1359 }
1360
1361 private static final class ClosingChannelFutureListener implements ChannelFutureListener {
1362
1363 private final ChannelHandlerContext context;
1364 private final ChannelStateEvent e;
1365
1366 ClosingChannelFutureListener(
1367 ChannelHandlerContext context, ChannelStateEvent e) {
1368 this.context = context;
1369 this.e = e;
1370 }
1371
1372 public void operationComplete(ChannelFuture closeNotifyFuture) throws Exception {
1373 if (!(closeNotifyFuture.getCause() instanceof ClosedChannelException)) {
1374 Channels.close(context, e.getFuture());
1375 } else {
1376 e.getFuture().setSuccess();
1377 }
1378 }
1379 }
1380
1381 @Override
1382 public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
1383 super.beforeAdd(ctx);
1384 this.ctx = ctx;
1385 }
1386
1387
1388
1389
1390 @Override
1391 public void afterRemove(ChannelHandlerContext ctx) throws Exception {
1392
1393
1394 Throwable cause = null;
1395 for (;;) {
1396 PendingWrite pw = pendingUnencryptedWrites.poll();
1397 if (pw == null) {
1398 break;
1399 }
1400 if (cause == null) {
1401 cause = new IOException("Unable to write data");
1402 }
1403 pw.future.setFailure(cause);
1404
1405 }
1406
1407 for (;;) {
1408 MessageEvent ev = pendingEncryptedWrites.poll();
1409 if (ev == null) {
1410 break;
1411 }
1412 if (cause == null) {
1413 cause = new IOException("Unable to write data");
1414 }
1415 ev.getFuture().setFailure(cause);
1416
1417 }
1418
1419 if (cause != null) {
1420 fireExceptionCaughtLater(ctx, cause);
1421 }
1422 }
1423
1424
1425
1426
1427
1428 @Override
1429 public void channelConnected(final ChannelHandlerContext ctx, final ChannelStateEvent e) throws Exception {
1430 if (issueHandshake) {
1431
1432
1433 handshake().addListener(new ChannelFutureListener() {
1434
1435 public void operationComplete(ChannelFuture future) throws Exception {
1436 if (!future.isSuccess()) {
1437 fireExceptionCaught(future.getChannel(), future.getCause());
1438 } else {
1439
1440
1441
1442 ctx.sendUpstream(e);
1443 }
1444
1445 }
1446 });
1447 } else {
1448 super.channelConnected(ctx, e);
1449 }
1450 }
1451
1452
1453
1454
1455
1456
1457 @Override
1458 public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
1459 Throwable cause = null;
1460 synchronized (pendingUnencryptedWrites) {
1461 for (;;) {
1462 PendingWrite pw = pendingUnencryptedWrites.poll();
1463 if (pw == null) {
1464 break;
1465 }
1466 if (cause == null) {
1467 cause = new ClosedChannelException();
1468 }
1469 pw.future.setFailure(cause);
1470
1471 }
1472
1473
1474 for (;;) {
1475 MessageEvent ev = pendingEncryptedWrites.poll();
1476 if (ev == null) {
1477 break;
1478 }
1479 if (cause == null) {
1480 cause = new ClosedChannelException();
1481 }
1482 ev.getFuture().setFailure(cause);
1483
1484 }
1485 }
1486
1487 if (cause != null) {
1488 fireExceptionCaught(ctx, cause);
1489 }
1490
1491 super.channelClosed(ctx, e);
1492 }
1493
1494 private final class SSLEngineInboundCloseFuture extends DefaultChannelFuture {
1495 public SSLEngineInboundCloseFuture() {
1496 super(null, true);
1497 }
1498
1499 void setClosed() {
1500 super.setSuccess();
1501 }
1502
1503 @Override
1504 public Channel getChannel() {
1505 if (ctx == null) {
1506
1507 return null;
1508 } else {
1509 return ctx.getChannel();
1510 }
1511 }
1512
1513 @Override
1514 public boolean setSuccess() {
1515 return false;
1516 }
1517
1518 @Override
1519 public boolean setFailure(Throwable cause) {
1520 return false;
1521 }
1522 }
1523 }