1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.example.factorial;
17
18 import io.netty5.channel.ChannelHandlerContext;
19 import io.netty5.channel.SimpleChannelInboundHandler;
20 import io.netty5.util.concurrent.Future;
21 import io.netty5.util.concurrent.FutureListener;
22
23 import java.math.BigInteger;
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.LinkedBlockingQueue;
26
27
28
29
30
31
32
33
34 public class FactorialClientHandler extends SimpleChannelInboundHandler<BigInteger> {
35
36 private ChannelHandlerContext ctx;
37 private int receivedMessages;
38 private int next = 1;
39 final BlockingQueue<BigInteger> answer = new LinkedBlockingQueue<>();
40
41 public BigInteger getFactorial() {
42 boolean interrupted = false;
43 try {
44 for (;;) {
45 try {
46 return answer.take();
47 } catch (InterruptedException ignore) {
48 interrupted = true;
49 }
50 }
51 } finally {
52 if (interrupted) {
53 Thread.currentThread().interrupt();
54 }
55 }
56 }
57
58 @Override
59 public void channelActive(ChannelHandlerContext ctx) {
60 this.ctx = ctx;
61 sendNumbers();
62 }
63
64 @Override
65 public void messageReceived(ChannelHandlerContext ctx, final BigInteger msg) {
66 receivedMessages ++;
67 if (receivedMessages == FactorialClient.COUNT) {
68
69 ctx.channel().close().addListener(future -> {
70 boolean offered = answer.offer(msg);
71 assert offered;
72 });
73 }
74 }
75
76 @Override
77 public void channelExceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
78 cause.printStackTrace();
79 ctx.close();
80 }
81
82 private void sendNumbers() {
83
84 Future<Void> future = null;
85 for (int i = 0; i < 4096 && next <= FactorialClient.COUNT; i++) {
86 future = ctx.write(Integer.valueOf(next));
87 next++;
88 }
89 if (next <= FactorialClient.COUNT) {
90 assert future != null;
91 future.addListener(numberSender);
92 }
93 ctx.flush();
94 }
95
96 private final FutureListener<Void> numberSender = future -> {
97 if (future.isSuccess()) {
98 sendNumbers();
99 } else {
100 future.cause().printStackTrace();
101 ctx.channel().close();
102 }
103 };
104 }