1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.example.factorial;
17
18 import java.math.BigInteger;
19 import java.util.concurrent.BlockingQueue;
20 import java.util.concurrent.LinkedBlockingQueue;
21 import java.util.logging.Level;
22 import java.util.logging.Logger;
23
24 import org.jboss.netty.channel.Channel;
25 import org.jboss.netty.channel.ChannelEvent;
26 import org.jboss.netty.channel.ChannelFuture;
27 import org.jboss.netty.channel.ChannelFutureListener;
28 import org.jboss.netty.channel.ChannelHandlerContext;
29 import org.jboss.netty.channel.ChannelStateEvent;
30 import org.jboss.netty.channel.ExceptionEvent;
31 import org.jboss.netty.channel.MessageEvent;
32 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
33
34
35
36
37
38
39
40
41 public class FactorialClientHandler extends SimpleChannelUpstreamHandler {
42
43 private static final Logger logger = Logger.getLogger(
44 FactorialClientHandler.class.getName());
45
46
47 private int i = 1;
48 private int receivedMessages;
49 private final int count;
50 final BlockingQueue<BigInteger> answer = new LinkedBlockingQueue<BigInteger>();
51
52 public FactorialClientHandler(int count) {
53 this.count = count;
54 }
55
56 public BigInteger getFactorial() {
57 boolean interrupted = false;
58 for (;;) {
59 try {
60 BigInteger factorial = answer.take();
61 if (interrupted) {
62 Thread.currentThread().interrupt();
63 }
64 return factorial;
65 } catch (InterruptedException e) {
66 interrupted = true;
67 }
68 }
69 }
70
71 @Override
72 public void handleUpstream(
73 ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
74 if (e instanceof ChannelStateEvent) {
75 logger.info(e.toString());
76 }
77 super.handleUpstream(ctx, e);
78 }
79
80 @Override
81 public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
82 sendNumbers(e);
83 }
84
85 @Override
86 public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) {
87 sendNumbers(e);
88 }
89
90 @Override
91 public void messageReceived(
92 ChannelHandlerContext ctx, final MessageEvent e) {
93 receivedMessages ++;
94 if (receivedMessages == count) {
95
96 e.getChannel().close().addListener(new ChannelFutureListener() {
97 public void operationComplete(ChannelFuture future) {
98 boolean offered = answer.offer((BigInteger) e.getMessage());
99 assert offered;
100 }
101 });
102 }
103 }
104
105 @Override
106 public void exceptionCaught(
107 ChannelHandlerContext ctx, ExceptionEvent e) {
108 logger.log(
109 Level.WARNING,
110 "Unexpected exception from downstream.",
111 e.getCause());
112 e.getChannel().close();
113 }
114
115 private void sendNumbers(ChannelStateEvent e) {
116 Channel channel = e.getChannel();
117 while (channel.isWritable()) {
118 if (i <= count) {
119 channel.write(i);
120 i ++;
121 } else {
122 break;
123 }
124 }
125 }
126 }