View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.example.factorial;
17  
18  import java.math.BigInteger;
19  import java.util.concurrent.BlockingQueue;
20  import java.util.concurrent.LinkedBlockingQueue;
21  import java.util.logging.Level;
22  import java.util.logging.Logger;
23  
24  import org.jboss.netty.channel.Channel;
25  import org.jboss.netty.channel.ChannelEvent;
26  import org.jboss.netty.channel.ChannelFuture;
27  import org.jboss.netty.channel.ChannelFutureListener;
28  import org.jboss.netty.channel.ChannelHandlerContext;
29  import org.jboss.netty.channel.ChannelStateEvent;
30  import org.jboss.netty.channel.ExceptionEvent;
31  import org.jboss.netty.channel.MessageEvent;
32  import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
33  
34  /**
35   * Handler for a client-side channel.  This handler maintains stateful
36   * information which is specific to a certain channel using member variables.
37   * Therefore, an instance of this handler can cover only one channel.  You have
38   * to create a new handler instance whenever you create a new channel and insert
39   * this handler to avoid a race condition.
40   */
41  public class FactorialClientHandler extends SimpleChannelUpstreamHandler {
42  
43      private static final Logger logger = Logger.getLogger(
44              FactorialClientHandler.class.getName());
45  
46      // Stateful properties
47      private int i = 1;
48      private int receivedMessages;
49      private final int count;
50      final BlockingQueue<BigInteger> answer = new LinkedBlockingQueue<BigInteger>();
51  
52      public FactorialClientHandler(int count) {
53          this.count = count;
54      }
55  
56      public BigInteger getFactorial() {
57          boolean interrupted = false;
58          for (;;) {
59              try {
60                  BigInteger factorial = answer.take();
61                  if (interrupted) {
62                      Thread.currentThread().interrupt();
63                  }
64                  return factorial;
65              } catch (InterruptedException e) {
66                  interrupted = true;
67              }
68          }
69      }
70  
71      @Override
72      public void handleUpstream(
73              ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
74          if (e instanceof ChannelStateEvent) {
75              logger.info(e.toString());
76          }
77          super.handleUpstream(ctx, e);
78      }
79  
80      @Override
81      public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
82          sendNumbers(e);
83      }
84  
85      @Override
86      public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) {
87          sendNumbers(e);
88      }
89  
90      @Override
91      public void messageReceived(
92              ChannelHandlerContext ctx, final MessageEvent e) {
93          receivedMessages ++;
94          if (receivedMessages == count) {
95              // Offer the answer after closing the connection.
96              e.getChannel().close().addListener(new ChannelFutureListener() {
97                  public void operationComplete(ChannelFuture future) {
98                      boolean offered = answer.offer((BigInteger) e.getMessage());
99                      assert offered;
100                 }
101             });
102         }
103     }
104 
105     @Override
106     public void exceptionCaught(
107             ChannelHandlerContext ctx, ExceptionEvent e) {
108         logger.log(
109                 Level.WARNING,
110                 "Unexpected exception from downstream.",
111                 e.getCause());
112         e.getChannel().close();
113     }
114 
115     private void sendNumbers(ChannelStateEvent e) {
116         Channel channel = e.getChannel();
117         while (channel.isWritable()) {
118             if (i <= count) {
119                 channel.write(i);
120                 i ++;
121             } else {
122                 break;
123             }
124         }
125     }
126 }