1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.example.factorial;
17
18 import org.jboss.netty.channel.Channel;
19 import org.jboss.netty.channel.ChannelEvent;
20 import org.jboss.netty.channel.ChannelFuture;
21 import org.jboss.netty.channel.ChannelFutureListener;
22 import org.jboss.netty.channel.ChannelHandlerContext;
23 import org.jboss.netty.channel.ChannelStateEvent;
24 import org.jboss.netty.channel.ExceptionEvent;
25 import org.jboss.netty.channel.MessageEvent;
26 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
27
28 import java.math.BigInteger;
29 import java.util.concurrent.BlockingQueue;
30 import java.util.concurrent.LinkedBlockingQueue;
31
32
33
34
35
36
37
38
39 public class FactorialClientHandler extends SimpleChannelUpstreamHandler {
40
41
42 private int i = 1;
43 private int receivedMessages;
44 final BlockingQueue<BigInteger> answer = new LinkedBlockingQueue<BigInteger>();
45
46 public BigInteger getFactorial() {
47 boolean interrupted = false;
48 try {
49 for (;;) {
50 try {
51 return answer.take();
52 } catch (InterruptedException ignore) {
53 interrupted = true;
54 }
55 }
56 } finally {
57 if (interrupted) {
58 Thread.currentThread().interrupt();
59 }
60 }
61 }
62
63 @Override
64 public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
65 if (e instanceof ChannelStateEvent) {
66 System.err.println(e);
67 }
68 super.handleUpstream(ctx, e);
69 }
70
71 @Override
72 public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
73 sendNumbers(e);
74 }
75
76 @Override
77 public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) {
78 sendNumbers(e);
79 }
80
81 @Override
82 public void messageReceived(
83 ChannelHandlerContext ctx, final MessageEvent e) {
84 receivedMessages ++;
85 if (receivedMessages == FactorialClient.COUNT) {
86
87 e.getChannel().close().addListener(new ChannelFutureListener() {
88 public void operationComplete(ChannelFuture future) {
89 boolean offered = answer.offer((BigInteger) e.getMessage());
90 assert offered;
91 }
92 });
93 }
94 }
95
96 @Override
97 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
98 e.getCause().printStackTrace();
99 e.getChannel().close();
100 }
101
102 private void sendNumbers(ChannelStateEvent e) {
103 Channel channel = e.getChannel();
104 while (channel.isWritable()) {
105 if (i <= FactorialClient.COUNT) {
106 channel.write(i);
107 i ++;
108 } else {
109 break;
110 }
111 }
112 }
113 }