View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.example.local;
17  
18  import java.util.concurrent.TimeUnit;
19  
20  import org.jboss.netty.bootstrap.ClientBootstrap;
21  import org.jboss.netty.bootstrap.ServerBootstrap;
22  import org.jboss.netty.channel.ChannelFuture;
23  import org.jboss.netty.channel.ChannelPipeline;
24  import org.jboss.netty.channel.ChannelPipelineFactory;
25  import org.jboss.netty.channel.Channels;
26  import org.jboss.netty.channel.local.DefaultLocalClientChannelFactory;
27  import org.jboss.netty.channel.local.DefaultLocalServerChannelFactory;
28  import org.jboss.netty.channel.local.LocalAddress;
29  import org.jboss.netty.handler.codec.string.StringDecoder;
30  import org.jboss.netty.handler.codec.string.StringEncoder;
31  import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
32  import org.jboss.netty.handler.logging.LoggingHandler;
33  import org.jboss.netty.logging.InternalLogLevel;
34  
35  public class LocalExampleMultithreaded {
36  
37      private final String port;
38  
39      public LocalExampleMultithreaded(String port) {
40          this.port = port;
41      }
42  
43      public void run() {
44          LocalAddress socketAddress = new LocalAddress(port);
45  
46          OrderedMemoryAwareThreadPoolExecutor eventExecutor =
47              new OrderedMemoryAwareThreadPoolExecutor(
48                      5, 1000000, 10000000, 100,
49                      TimeUnit.MILLISECONDS);
50  
51          ServerBootstrap sb = new ServerBootstrap(
52                  new DefaultLocalServerChannelFactory());
53  
54          sb.setPipelineFactory(new LocalServerPipelineFactory(eventExecutor));
55          sb.bind(socketAddress);
56  
57          ClientBootstrap cb = new ClientBootstrap(
58                  new DefaultLocalClientChannelFactory());
59  
60          cb.setPipelineFactory(new ChannelPipelineFactory() {
61              public ChannelPipeline getPipeline() throws Exception {
62                  return Channels.pipeline(
63                          new StringDecoder(),
64                          new StringEncoder(),
65                          new LoggingHandler(InternalLogLevel.INFO));
66              }
67          });
68  
69          // Read commands from array
70          String[] commands = { "First", "Second", "Third", "quit" };
71          for (int j = 0; j < 5 ; j++) {
72              System.err.println("Start " + j);
73              ChannelFuture channelFuture = cb.connect(socketAddress);
74              channelFuture.awaitUninterruptibly();
75              if (! channelFuture.isSuccess()) {
76                  System.err.println("CANNOT CONNECT");
77                  channelFuture.getCause().printStackTrace();
78                  break;
79              }
80              ChannelFuture lastWriteFuture = null;
81              for (String line: commands) {
82                  // Sends the received line to the server.
83                  lastWriteFuture = channelFuture.getChannel().write(line);
84              }
85  
86              // Wait until all messages are flushed before closing the channel.
87              if (lastWriteFuture != null) {
88                  lastWriteFuture.awaitUninterruptibly();
89              }
90              channelFuture.getChannel().close();
91              // Wait until the connection is closed or the connection attempt fails.
92              channelFuture.getChannel().getCloseFuture().awaitUninterruptibly();
93              System.err.println("End " + j);
94          }
95  
96          // Release all resources
97          cb.releaseExternalResources();
98          sb.releaseExternalResources();
99          eventExecutor.shutdownNow();
100     }
101 
102     public static void main(String[] args) throws Exception {
103         new LocalExampleMultithreaded("1").run();
104     }
105 }