View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.local;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  
20  import java.nio.channels.ClosedChannelException;
21  import java.nio.channels.NotYetConnectedException;
22  import java.util.Queue;
23  import java.util.concurrent.ConcurrentLinkedQueue;
24  import java.util.concurrent.atomic.AtomicInteger;
25  
26  import org.jboss.netty.channel.AbstractChannel;
27  import org.jboss.netty.channel.ChannelConfig;
28  import org.jboss.netty.channel.ChannelException;
29  import org.jboss.netty.channel.ChannelFactory;
30  import org.jboss.netty.channel.ChannelFuture;
31  import org.jboss.netty.channel.ChannelFutureListener;
32  import org.jboss.netty.channel.ChannelPipeline;
33  import org.jboss.netty.channel.ChannelSink;
34  import org.jboss.netty.channel.DefaultChannelConfig;
35  import org.jboss.netty.channel.MessageEvent;
36  import org.jboss.netty.util.internal.ThreadLocalBoolean;
37  
38  /**
39   */
40  final class DefaultLocalChannel extends AbstractChannel implements LocalChannel {
41  
42      // TODO Move the state management up to AbstractChannel to remove duplication.
43      private static final int ST_OPEN = 0;
44      private static final int ST_BOUND = 1;
45      private static final int ST_CONNECTED = 2;
46      private static final int ST_CLOSED = -1;
47      final AtomicInteger state = new AtomicInteger(ST_OPEN);
48  
49      private final ChannelConfig config;
50      private final ThreadLocalBoolean delivering = new ThreadLocalBoolean();
51  
52      final Queue<MessageEvent> writeBuffer = new ConcurrentLinkedQueue<MessageEvent>();
53  
54      volatile DefaultLocalChannel pairedChannel;
55      volatile LocalAddress localAddress;
56      volatile LocalAddress remoteAddress;
57  
58      DefaultLocalChannel(
59              LocalServerChannel parent, ChannelFactory factory, ChannelPipeline pipeline,
60              ChannelSink sink, DefaultLocalChannel pairedChannel) {
61          super(parent, factory, pipeline, sink);
62          this.pairedChannel = pairedChannel;
63          config = new DefaultChannelConfig();
64  
65          // TODO Move the state variable to AbstractChannel so that we don't need
66          //      to add many listeners.
67          getCloseFuture().addListener(new ChannelFutureListener() {
68              public void operationComplete(ChannelFuture future) throws Exception {
69                  state.set(ST_CLOSED);
70              }
71          });
72  
73          fireChannelOpen(this);
74      }
75  
76      public ChannelConfig getConfig() {
77          return config;
78      }
79  
80      @Override
81      public boolean isOpen() {
82          return state.get() >= ST_OPEN;
83      }
84  
85      public boolean isBound() {
86          return state.get() >= ST_BOUND;
87      }
88  
89      public boolean isConnected() {
90          return state.get() == ST_CONNECTED;
91      }
92  
93      void setBound() throws ClosedChannelException {
94          if (!state.compareAndSet(ST_OPEN, ST_BOUND)) {
95              switch (state.get()) {
96              case ST_CLOSED:
97                  throw new ClosedChannelException();
98              default:
99                  throw new ChannelException("already bound");
100             }
101         }
102     }
103 
104     void setConnected() {
105         if (state.get() != ST_CLOSED) {
106             state.set(ST_CONNECTED);
107         }
108     }
109 
110     @Override
111     protected boolean setClosed() {
112         return super.setClosed();
113     }
114 
115     public LocalAddress getLocalAddress() {
116         return localAddress;
117     }
118 
119     public LocalAddress getRemoteAddress() {
120         return remoteAddress;
121     }
122 
123     void closeNow(ChannelFuture future) {
124         LocalAddress localAddress = this.localAddress;
125         try {
126             // Close the self.
127             if (!setClosed()) {
128                 return;
129             }
130 
131             DefaultLocalChannel pairedChannel = this.pairedChannel;
132             if (pairedChannel != null) {
133                 this.pairedChannel = null;
134                 fireChannelDisconnected(this);
135                 fireChannelUnbound(this);
136             }
137             fireChannelClosed(this);
138 
139             // Close the peer.
140             if (pairedChannel == null || !pairedChannel.setClosed()) {
141                 return;
142             }
143 
144             DefaultLocalChannel me = pairedChannel.pairedChannel;
145             if (me != null) {
146                 pairedChannel.pairedChannel = null;
147                 fireChannelDisconnected(pairedChannel);
148                 fireChannelUnbound(pairedChannel);
149             }
150             fireChannelClosed(pairedChannel);
151         } finally {
152             future.setSuccess();
153             if (localAddress != null && getParent() == null) {
154                 LocalChannelRegistry.unregister(localAddress);
155             }
156         }
157     }
158 
159     void flushWriteBuffer() {
160         DefaultLocalChannel pairedChannel = this.pairedChannel;
161         if (pairedChannel != null) {
162             if (pairedChannel.isConnected()) {
163                 // Channel is open and connected and channelConnected event has
164                 // been fired.
165                 if (!delivering.get()) {
166                     delivering.set(true);
167                     try {
168                         for (;;) {
169                             MessageEvent e = writeBuffer.poll();
170                             if (e == null) {
171                                 break;
172                             }
173 
174                             fireMessageReceived(pairedChannel, e.getMessage());
175                             e.getFuture().setSuccess();
176                             fireWriteComplete(this, 1);
177                         }
178                     } finally {
179                         delivering.set(false);
180                     }
181                 }
182             } else {
183                 // Channel is open and connected but channelConnected event has
184                 // not been fired yet.
185             }
186         } else {
187             // Channel is closed or not connected yet - notify as failures.
188             Exception cause;
189             if (isOpen()) {
190                 cause = new NotYetConnectedException();
191             } else {
192                 cause = new ClosedChannelException();
193             }
194 
195             for (;;) {
196                 MessageEvent e = writeBuffer.poll();
197                 if (e == null) {
198                     break;
199                 }
200 
201                 e.getFuture().setFailure(cause);
202                 fireExceptionCaught(this, cause);
203             }
204         }
205     }
206 }