View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  /**
17   * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
18   */
19  package io.netty.util.internal;
20  
21  import java.io.IOException;
22  import java.io.ObjectInputStream;
23  import java.io.ObjectOutputStream;
24  import java.lang.reflect.Array;
25  import java.util.Arrays;
26  import java.util.Collection;
27  import java.util.Iterator;
28  import java.util.NoSuchElementException;
29  import java.util.Queue;
30  
31  /**
32   * A lock-free concurrent single-consumer multi-producer {@link Queue}.
33   * It allows multiple producer threads to perform the following operations simultaneously:
34   * <ul>
35   * <li>{@link #offer(Object)}, {@link #add(Object)}, and {@link #addAll(Collection)}</li>
36   * <li>All other read-only operations:
37   *     <ul>
38   *     <li>{@link #contains(Object)} and {@link #containsAll(Collection)}</li>
39   *     <li>{@link #element()}, {@link #peek()}</li>
40   *     <li>{@link #size()} and {@link #isEmpty()}</li>
41   *     <li>{@link #iterator()} (except {@link Iterator#remove()}</li>
42   *     <li>{@link #toArray()} and {@link #toArray(Object[])}</li>
43   *     </ul>
44   * </li>
45   * </ul>
46   * .. while only one consumer thread is allowed to perform the following operations exclusively:
47   * <ul>
48   * <li>{@link #poll()} and {@link #remove()}</li>
49   * <li>{@link #remove(Object)}, {@link #removeAll(Collection)}, and {@link #retainAll(Collection)}</li>
50   * <li>{@link #clear()}</li> {@link #}
51   * </ul>
52   *
53   * <strong>The behavior of this implementation is undefined if you perform the operations for a consumer thread only
54   * from multiple threads.</strong>
55   *
56   * The initial implementation is based on:
57   * <ul>
58   *   <li><a href="http://netty.io/s/mpsc-1024c">Non-intrusive MPSC node based queue</a> from 1024cores.net</li>
59   *   <li><a href="http://netty.io/s/mpsc-akka">AbstractNodeQueue</a> from Akka</li>
60   * </ul>
61   * and adopted padded head node changes from:
62   * <ul>
63   * <li><a href="http://netty.io/s/mpsc-rxjava">MpscPaddedQueue</a> from RxJava</li>
64   * </ul>
65   * data structure modified to avoid false sharing between head and tail Ref as per implementation of MpscLinkedQueue
66   * on <a href="https://github.com/JCTools/JCTools">JCTools project</a>.
67   */
68  final class MpscLinkedQueue<E> extends MpscLinkedQueueTailRef<E> implements Queue<E> {
69  
70      private static final long serialVersionUID = -1878402552271506449L;
71  
72      long p00, p01, p02, p03, p04, p05, p06, p07;
73      long p30, p31, p32, p33, p34, p35, p36, p37;
74  
75      // offer() occurs at the tail of the linked list.
76      // poll() occurs at the head of the linked list.
77      //
78      // Resulting layout is:
79      //
80      //   head --next--> 1st element --next--> 2nd element --next--> ... tail (last element)
81      //
82      // where the head is a dummy node whose value is null.
83      //
84      // offer() appends a new node next to the tail using AtomicReference.getAndSet()
85      // poll() removes head from the linked list and promotes the 1st element to the head,
86      // setting its value to null if possible.
87      //
88      // Also note that this class extends AtomicReference for the "tail" slot (which is the one that is appended to)
89      // since Unsafe does not expose XCHG operation intrinsically.
90      MpscLinkedQueue() {
91          MpscLinkedQueueNode<E> tombstone = new DefaultNode<E>(null);
92          setHeadRef(tombstone);
93          setTailRef(tombstone);
94      }
95  
96      /**
97       * Returns the node right next to the head, which contains the first element of this queue.
98       */
99      private MpscLinkedQueueNode<E> peekNode() {
100         MpscLinkedQueueNode<E> head = headRef();
101         MpscLinkedQueueNode<E> next = head.next();
102         if (next == null && head != tailRef()) {
103             // if tail != head this is not going to change until consumer makes progress
104             // we can avoid reading the head and just spin on next until it shows up
105             //
106             // See https://github.com/akka/akka/pull/15596
107             do {
108                 next = head.next();
109             } while (next == null);
110         }
111         return next;
112     }
113 
114     @Override
115     @SuppressWarnings("unchecked")
116     public boolean offer(E value) {
117         if (value == null) {
118             throw new NullPointerException("value");
119         }
120 
121         final MpscLinkedQueueNode<E> newTail;
122         if (value instanceof MpscLinkedQueueNode) {
123             newTail = (MpscLinkedQueueNode<E>) value;
124             newTail.setNext(null);
125         } else {
126             newTail = new DefaultNode<E>(value);
127         }
128 
129         MpscLinkedQueueNode<E> oldTail = getAndSetTailRef(newTail);
130         oldTail.setNext(newTail);
131         return true;
132     }
133 
134     @Override
135     public E poll() {
136         final MpscLinkedQueueNode<E> next = peekNode();
137         if (next == null) {
138             return null;
139         }
140 
141         // next becomes a new head.
142         MpscLinkedQueueNode<E> oldHead = headRef();
143         // Similar to 'headRef.node = next', but slightly faster (storestore vs loadstore)
144         // See: http://robsjava.blogspot.com/2013/06/a-faster-volatile.html
145         // See: http://psy-lob-saw.blogspot.com/2012/12/atomiclazyset-is-performance-win-for.html
146         lazySetHeadRef(next);
147 
148         // Break the linkage between the old head and the new head.
149         oldHead.unlink();
150 
151         return next.clearMaybe();
152     }
153 
154     @Override
155     public E peek() {
156         final MpscLinkedQueueNode<E> next = peekNode();
157         if (next == null) {
158             return null;
159         }
160         return next.value();
161     }
162 
163     @Override
164     public int size() {
165         int count = 0;
166         MpscLinkedQueueNode<E> n = peekNode();
167         for (;;) {
168             if (n == null) {
169                 break;
170             }
171             count ++;
172             n = n.next();
173         }
174         return count;
175     }
176 
177     @Override
178     public boolean isEmpty() {
179         return peekNode() == null;
180     }
181 
182     @Override
183     public boolean contains(Object o) {
184         MpscLinkedQueueNode<E> n = peekNode();
185         for (;;) {
186             if (n == null) {
187                 break;
188             }
189             if (n.value() == o) {
190                 return true;
191             }
192             n = n.next();
193         }
194         return false;
195     }
196 
197     @Override
198     public Iterator<E> iterator() {
199         return new Iterator<E>() {
200             private MpscLinkedQueueNode<E> node = peekNode();
201 
202             @Override
203             public boolean hasNext() {
204                 return node != null;
205             }
206 
207             @Override
208             public E next() {
209                 MpscLinkedQueueNode<E> node = this.node;
210                 if (node == null) {
211                     throw new NoSuchElementException();
212                 }
213                 E value = node.value();
214                 this.node = node.next();
215                 return value;
216             }
217 
218             @Override
219             public void remove() {
220                 throw new UnsupportedOperationException();
221             }
222         };
223     }
224 
225     @Override
226     public boolean add(E e) {
227         if (offer(e)) {
228             return true;
229         }
230         throw new IllegalStateException("queue full");
231     }
232 
233     @Override
234     public E remove() {
235         E e = poll();
236         if (e != null) {
237             return e;
238         }
239         throw new NoSuchElementException();
240     }
241 
242     @Override
243     public E element() {
244         E e = peek();
245         if (e != null) {
246             return e;
247         }
248         throw new NoSuchElementException();
249     }
250 
251     @Override
252     public Object[] toArray() {
253         final Object[] array = new Object[size()];
254         final Iterator<E> it = iterator();
255         for (int i = 0; i < array.length; i ++) {
256             if (it.hasNext()) {
257                 array[i] = it.next();
258             } else {
259                 return Arrays.copyOf(array, i);
260             }
261         }
262         return array;
263     }
264 
265     @Override
266     @SuppressWarnings("unchecked")
267     public <T> T[] toArray(T[] a) {
268         final int size = size();
269         final T[] array;
270         if (a.length >= size) {
271             array = a;
272         } else {
273             array = (T[]) Array.newInstance(a.getClass().getComponentType(), size);
274         }
275 
276         final Iterator<E> it = iterator();
277         for (int i = 0; i < array.length; i++) {
278             if (it.hasNext()) {
279                 array[i] = (T) it.next();
280             } else {
281                 if (a == array) {
282                     array[i] = null;
283                     return array;
284                 }
285 
286                 if (a.length < i) {
287                     return Arrays.copyOf(array, i);
288                 }
289 
290                 System.arraycopy(array, 0, a, 0, i);
291                 if (a.length > i) {
292                     a[i] = null;
293                 }
294                 return a;
295             }
296         }
297         return array;
298     }
299 
300     @Override
301     public boolean remove(Object o) {
302         throw new UnsupportedOperationException();
303     }
304 
305     @Override
306     public boolean containsAll(Collection<?> c) {
307         for (Object e: c) {
308             if (!contains(e)) {
309                 return false;
310             }
311         }
312         return true;
313     }
314 
315     @Override
316     public boolean addAll(Collection<? extends E> c) {
317         if (c == null) {
318             throw new NullPointerException("c");
319         }
320         if (c == this) {
321             throw new IllegalArgumentException("c == this");
322         }
323 
324         boolean modified = false;
325         for (E e: c) {
326             add(e);
327             modified = true;
328         }
329         return modified;
330     }
331 
332     @Override
333     public boolean removeAll(Collection<?> c) {
334         throw new UnsupportedOperationException();
335     }
336 
337     @Override
338     public boolean retainAll(Collection<?> c) {
339         throw new UnsupportedOperationException();
340     }
341 
342     @Override
343     public void clear() {
344         while (poll() != null) {
345             continue;
346         }
347     }
348 
349     private void writeObject(ObjectOutputStream out) throws IOException {
350         out.defaultWriteObject();
351         for (E e: this) {
352             out.writeObject(e);
353         }
354         out.writeObject(null);
355     }
356 
357     private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
358         in.defaultReadObject();
359 
360         final MpscLinkedQueueNode<E> tombstone = new DefaultNode<E>(null);
361         setHeadRef(tombstone);
362         setTailRef(tombstone);
363 
364         for (;;) {
365             @SuppressWarnings("unchecked")
366             E e = (E) in.readObject();
367             if (e == null) {
368                 break;
369             }
370             add(e);
371         }
372     }
373 
374     private static final class DefaultNode<T> extends MpscLinkedQueueNode<T> {
375 
376         private T value;
377 
378         DefaultNode(T value) {
379             this.value = value;
380         }
381 
382         @Override
383         public T value() {
384             return value;
385         }
386 
387         @Override
388         protected T clearMaybe() {
389             T value = this.value;
390             this.value = null;
391             return value;
392         }
393     }
394 }