View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel;
17  
18  import java.net.SocketAddress;
19  import java.util.Random;
20  import java.util.concurrent.ConcurrentHashMap;
21  import java.util.concurrent.ConcurrentMap;
22  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
23  
24  /**
25   * A skeletal {@link Channel} implementation.
26   */
27  public abstract class AbstractChannel implements Channel {
28  
29      static final ConcurrentMap<Integer, Channel> allChannels = new ConcurrentHashMap<Integer, Channel>();
30  
31      private static final Random random = new Random();
32  
33      private static Integer allocateId(Channel channel) {
34          Integer id = random.nextInt();
35          for (;;) {
36              // Loop until a unique ID is acquired.
37              // It should be found in one loop practically.
38              if (allChannels.putIfAbsent(id, channel) == null) {
39                  // Successfully acquired.
40                  return id;
41              } else {
42                  // Taken by other channel at almost the same moment.
43                  id = id.intValue() + 1;
44              }
45          }
46      }
47  
48      private final Integer id;
49      private final Channel parent;
50      private final ChannelFactory factory;
51      private final ChannelPipeline pipeline;
52      private final ChannelFuture succeededFuture = new SucceededChannelFuture(this);
53      private final ChannelCloseFuture closeFuture = new ChannelCloseFuture();
54      private volatile int interestOps = OP_READ;
55  
56      /** Cache for the string representation of this channel */
57      private boolean strValConnected;
58      private String strVal;
59      private volatile Object attachment;
60  
61      private static final AtomicIntegerFieldUpdater<AbstractChannel> UNWRITABLE_UPDATER;
62      @SuppressWarnings("UnusedDeclaration")
63      private volatile int unwritable;
64  
65      static {
66          UNWRITABLE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(AbstractChannel.class, "unwritable");
67      }
68  
69      /**
70       * Creates a new instance.
71       *
72       * @param parent
73       *        the parent of this channel. {@code null} if there's no parent.
74       * @param factory
75       *        the factory which created this channel
76       * @param pipeline
77       *        the pipeline which is going to be attached to this channel
78       * @param sink
79       *        the sink which will receive downstream events from the pipeline
80       *        and send upstream events to the pipeline
81       */
82      protected AbstractChannel(
83              Channel parent, ChannelFactory factory,
84              ChannelPipeline pipeline, ChannelSink sink) {
85  
86          this.parent = parent;
87          this.factory = factory;
88          this.pipeline = pipeline;
89  
90          id = allocateId(this);
91  
92          pipeline.attach(this, sink);
93      }
94  
95      /**
96       * (Internal use only) Creates a new temporary instance with the specified
97       * ID.
98       *
99       * @param parent
100      *        the parent of this channel. {@code null} if there's no parent.
101      * @param factory
102      *        the factory which created this channel
103      * @param pipeline
104      *        the pipeline which is going to be attached to this channel
105      * @param sink
106      *        the sink which will receive downstream events from the pipeline
107      *        and send upstream events to the pipeline
108      */
109     protected AbstractChannel(
110             Integer id,
111             Channel parent, ChannelFactory factory,
112             ChannelPipeline pipeline, ChannelSink sink) {
113 
114         this.id = id;
115         this.parent = parent;
116         this.factory = factory;
117         this.pipeline = pipeline;
118         pipeline.attach(this, sink);
119     }
120 
121     public final Integer getId() {
122         return id;
123     }
124 
125     public Channel getParent() {
126         return parent;
127     }
128 
129     public ChannelFactory getFactory() {
130         return factory;
131     }
132 
133     public ChannelPipeline getPipeline() {
134         return pipeline;
135     }
136 
137     /**
138      * Returns the cached {@link SucceededChannelFuture} instance.
139      */
140     protected ChannelFuture getSucceededFuture() {
141         return succeededFuture;
142     }
143 
144     /**
145      * Returns the {@link FailedChannelFuture} whose cause is an
146      * {@link UnsupportedOperationException}.
147      */
148     protected ChannelFuture getUnsupportedOperationFuture() {
149         return new FailedChannelFuture(this, new UnsupportedOperationException());
150     }
151 
152     /**
153      * Returns the ID of this channel.
154      */
155     @Override
156     public final int hashCode() {
157         return id;
158     }
159 
160     /**
161      * Returns {@code true} if and only if the specified object is identical
162      * with this channel (i.e: {@code this == o}).
163      */
164     @Override
165     public final boolean equals(Object o) {
166         return this == o;
167     }
168 
169     /**
170      * Compares the {@linkplain #getId() ID} of the two channels.
171      */
172     public final int compareTo(Channel o) {
173         return getId().compareTo(o.getId());
174     }
175 
176     public boolean isOpen() {
177         return !closeFuture.isDone();
178     }
179 
180     /**
181      * Marks this channel as closed.  This method is intended to be called by
182      * an internal component - please do not call it unless you know what you
183      * are doing.
184      *
185      * @return {@code true} if and only if this channel was not marked as
186      *                      closed yet
187      */
188     protected boolean setClosed() {
189         // Deallocate the current channel's ID from allChannels so that other
190         // new channels can use it.
191         allChannels.remove(id);
192 
193         return closeFuture.setClosed();
194     }
195 
196     public ChannelFuture bind(SocketAddress localAddress) {
197         return Channels.bind(this, localAddress);
198     }
199 
200     public ChannelFuture unbind() {
201         return Channels.unbind(this);
202     }
203 
204     public ChannelFuture close() {
205         ChannelFuture returnedCloseFuture = Channels.close(this);
206         assert closeFuture == returnedCloseFuture;
207         return closeFuture;
208     }
209 
210     public ChannelFuture getCloseFuture() {
211         return closeFuture;
212     }
213 
214     public ChannelFuture connect(SocketAddress remoteAddress) {
215         return Channels.connect(this, remoteAddress);
216     }
217 
218     public ChannelFuture disconnect() {
219         return Channels.disconnect(this);
220     }
221 
222     public int getInterestOps() {
223         if (!isOpen()) {
224             return Channel.OP_WRITE;
225         }
226 
227         int interestOps = getInternalInterestOps() & ~OP_WRITE;
228         if (!isWritable()) {
229             interestOps |= OP_WRITE;
230         }
231         return interestOps;
232     }
233 
234     public ChannelFuture setInterestOps(int interestOps) {
235         return Channels.setInterestOps(this, interestOps);
236     }
237 
238     protected int getInternalInterestOps() {
239         return interestOps;
240     }
241 
242     /**
243      * Sets the {@link #getInterestOps() interestOps} property of this channel
244      * immediately.  This method is intended to be called by an internal
245      * component - please do not call it unless you know what you are doing.
246      */
247     protected void setInternalInterestOps(int interestOps) {
248         this.interestOps = interestOps;
249     }
250 
251     public boolean isReadable() {
252         return (getInternalInterestOps() & OP_READ) != 0;
253     }
254 
255     public boolean isWritable() {
256         return unwritable == 0;
257     }
258 
259     public final boolean getUserDefinedWritability(int index) {
260         return (unwritable & writabilityMask(index)) == 0;
261     }
262 
263     public final void setUserDefinedWritability(int index, boolean writable) {
264         if (writable) {
265             setUserDefinedWritability(index);
266         } else {
267             clearUserDefinedWritability(index);
268         }
269     }
270 
271     private void setUserDefinedWritability(int index) {
272         final int mask = ~writabilityMask(index);
273         for (;;) {
274             final int oldValue = unwritable;
275             final int newValue = oldValue & mask;
276             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
277                 if (oldValue != 0 && newValue == 0) {
278                     getPipeline().sendUpstream(
279                             new UpstreamChannelStateEvent(
280                                     this, ChannelState.INTEREST_OPS, getInterestOps()));
281                 }
282                 break;
283             }
284         }
285     }
286 
287     private void clearUserDefinedWritability(int index) {
288         final int mask = writabilityMask(index);
289         for (;;) {
290             final int oldValue = unwritable;
291             final int newValue = oldValue | mask;
292             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
293                 if (oldValue == 0 && newValue != 0) {
294                     getPipeline().sendUpstream(
295                             new UpstreamChannelStateEvent(
296                                     this, ChannelState.INTEREST_OPS, getInterestOps()));
297                 }
298                 break;
299             }
300         }
301     }
302 
303     private static int writabilityMask(int index) {
304         if (index < 1 || index > 31) {
305             throw new IllegalArgumentException("index: " + index + " (expected: 1~31)");
306         }
307         return 1 << index;
308     }
309 
310     protected boolean setWritable() {
311         for (;;) {
312             final int oldValue = unwritable;
313             final int newValue = oldValue & ~1;
314             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
315                 if (oldValue != 0 && newValue == 0) {
316                     return true;
317                 }
318                 break;
319             }
320         }
321         return false;
322     }
323 
324     protected boolean setUnwritable() {
325         for (;;) {
326             final int oldValue = unwritable;
327             final int newValue = oldValue | 1;
328             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
329                 if (oldValue == 0 && newValue != 0) {
330                     return true;
331                 }
332                 break;
333             }
334         }
335         return false;
336     }
337 
338     public ChannelFuture setReadable(boolean readable) {
339         if (readable) {
340             return setInterestOps(getInterestOps() | OP_READ);
341         } else {
342             return setInterestOps(getInterestOps() & ~OP_READ);
343         }
344     }
345 
346     public ChannelFuture write(Object message) {
347         return Channels.write(this, message);
348     }
349 
350     public ChannelFuture write(Object message, SocketAddress remoteAddress) {
351         return Channels.write(this, message, remoteAddress);
352     }
353 
354     public Object getAttachment() {
355         return attachment;
356     }
357 
358     public void setAttachment(Object attachment) {
359         this.attachment = attachment;
360     }
361     /**
362      * Returns the {@link String} representation of this channel.  The returned
363      * string contains the {@linkplain #getId() ID}, {@linkplain #getLocalAddress() local address},
364      * and {@linkplain #getRemoteAddress() remote address} of this channel for
365      * easier identification.
366      */
367     @Override
368     public String toString() {
369         boolean connected = isConnected();
370         if (strValConnected == connected && strVal != null) {
371             return strVal;
372         }
373 
374         StringBuilder buf = new StringBuilder(128);
375         buf.append("[id: 0x");
376         buf.append(getIdString());
377 
378         SocketAddress localAddress = getLocalAddress();
379         SocketAddress remoteAddress = getRemoteAddress();
380         if (remoteAddress != null) {
381             buf.append(", ");
382             if (getParent() == null) {
383                 buf.append(localAddress);
384                 buf.append(connected? " => " : " :> ");
385                 buf.append(remoteAddress);
386             } else {
387                 buf.append(remoteAddress);
388                 buf.append(connected? " => " : " :> ");
389                 buf.append(localAddress);
390             }
391         } else if (localAddress != null) {
392             buf.append(", ");
393             buf.append(localAddress);
394         }
395 
396         buf.append(']');
397 
398         String strVal = buf.toString();
399         this.strVal = strVal;
400         strValConnected = connected;
401         return strVal;
402     }
403 
404     private String getIdString() {
405         String answer = Integer.toHexString(id.intValue());
406         switch (answer.length()) {
407         case 0:
408             answer = "00000000";
409             break;
410         case 1:
411             answer = "0000000" + answer;
412             break;
413         case 2:
414             answer = "000000" + answer;
415             break;
416         case 3:
417             answer = "00000" + answer;
418             break;
419         case 4:
420             answer = "0000" + answer;
421             break;
422         case 5:
423             answer = "000" + answer;
424             break;
425         case 6:
426             answer = "00" + answer;
427             break;
428         case 7:
429             answer = '0' + answer;
430             break;
431         }
432         return answer;
433     }
434 
435     private final class ChannelCloseFuture extends DefaultChannelFuture {
436 
437         ChannelCloseFuture() {
438             super(AbstractChannel.this, false);
439         }
440 
441         @Override
442         public boolean setSuccess() {
443             // User is not supposed to call this method - ignore silently.
444             return false;
445         }
446 
447         @Override
448         public boolean setFailure(Throwable cause) {
449             // User is not supposed to call this method - ignore silently.
450             return false;
451         }
452 
453         boolean setClosed() {
454             return super.setSuccess();
455         }
456     }
457 }