View Javadoc
1   /*
2    * Copyright 2018 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.resolver.dns;
17  
18  import io.netty5.channel.EventLoop;
19  import io.netty5.util.concurrent.EventExecutor;
20  import io.netty5.util.concurrent.Future;
21  import io.netty5.util.concurrent.FutureCompletionStage;
22  import io.netty5.util.concurrent.FutureContextListener;
23  import io.netty5.util.concurrent.FutureListener;
24  import io.netty5.util.concurrent.Promise;
25  
26  import java.util.ArrayList;
27  import java.util.Collections;
28  import java.util.Iterator;
29  import java.util.List;
30  import java.util.Map;
31  import java.util.Map.Entry;
32  import java.util.Objects;
33  import java.util.concurrent.CancellationException;
34  import java.util.concurrent.ConcurrentHashMap;
35  import java.util.concurrent.ConcurrentMap;
36  import java.util.concurrent.Delayed;
37  import java.util.concurrent.TimeUnit;
38  import java.util.concurrent.atomic.AtomicReference;
39  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
40  import java.util.function.Function;
41  
42  import static java.util.Collections.singletonList;
43  
44  /**
45   * Abstract cache that automatically removes entries for a hostname once the TTL for an entry is reached.
46   *
47   * @param <E>
48   */
49  abstract class Cache<E> {
50      private static final AtomicReferenceFieldUpdater<Cache.Entries, FutureAndDelay> FUTURE_UPDATER =
51              AtomicReferenceFieldUpdater.newUpdater(Cache.Entries.class, FutureAndDelay.class, "expirationFuture");
52  
53      private static final Future<?> CANCELLED_FUTURE = new Future<>() {
54          @Override
55          public boolean cancel() {
56              return false;
57          }
58  
59          @Override
60          public boolean isSuccess() {
61              return false;
62          }
63  
64          @Override
65          public boolean isFailed() {
66              return true;
67          }
68  
69          @Override
70          public boolean isCancellable() {
71              return false;
72          }
73  
74          @Override
75          public Throwable cause() {
76              return new CancellationException();
77          }
78  
79          @Override
80          public Future<Object> addListener(FutureListener<? super Object> listener) {
81              throw new UnsupportedOperationException();
82          }
83  
84          @Override
85          public <C> Future<Object> addListener(C context, FutureContextListener<? super C, ? super Object> listener) {
86              throw new UnsupportedOperationException();
87          }
88  
89          @Override
90          public Object getNow() {
91              return null;
92          }
93  
94          @Override
95          public EventExecutor executor() {
96              throw new UnsupportedOperationException();
97          }
98  
99          @Override
100         public boolean isCancelled() {
101             return true;
102         }
103 
104         @Override
105         public boolean isDone() {
106             return true;
107         }
108 
109         @Override
110         public FutureCompletionStage<Object> asStage() {
111             throw new UnsupportedOperationException();
112         }
113 
114         @Override
115         public <R> Future<R> map(Function<Object, R> mapper) {
116             throw new UnsupportedOperationException();
117         }
118 
119         @Override
120         public <R> Future<R> flatMap(Function<Object, Future<R>> mapper) {
121             throw new UnsupportedOperationException();
122         }
123 
124         @Override
125         public Future<Object> cascadeTo(Promise<? super Object> promise) {
126             throw new UnsupportedOperationException();
127         }
128     };
129     private static final FutureAndDelay CANCELLED = new FutureAndDelay(CANCELLED_FUTURE, Integer.MIN_VALUE);
130 
131     // Two years are supported by all our EventLoop implementations and so safe to use as maximum.
132     // See also: https://github.com/netty/netty/commit/b47fb817991b42ec8808c7d26538f3f2464e1fa6
133     static final int MAX_SUPPORTED_TTL_SECS = (int) TimeUnit.DAYS.toSeconds(365 * 2);
134 
135     private final ConcurrentMap<String, Entries> resolveCache = new ConcurrentHashMap<>();
136 
137     /**
138      * Remove everything from the cache.
139      */
140     final void clear() {
141         while (!resolveCache.isEmpty()) {
142             for (Iterator<Entry<String, Entries>> i = resolveCache.entrySet().iterator(); i.hasNext();) {
143                 Map.Entry<String, Entries> e = i.next();
144                 i.remove();
145 
146                 e.getValue().clearAndCancel();
147             }
148         }
149     }
150 
151     /**
152      * Clear all entries (if anything exists) for the given hostname and return {@code true} if anything was removed.
153      */
154     final boolean clear(String hostname) {
155         Entries entries = resolveCache.remove(hostname);
156         return entries != null && entries.clearAndCancel();
157     }
158 
159     /**
160      * Returns all caches entries for the given hostname.
161      */
162     final List<? extends E> get(String hostname) {
163         Entries entries = resolveCache.get(hostname);
164         return entries == null ? null : entries.get();
165     }
166 
167     /**
168      * Cache a value for the given hostname that will automatically expire once the TTL is reached.
169      */
170     final void cache(String hostname, E value, int ttl, EventLoop loop) {
171         Entries entries = resolveCache.get(hostname);
172         if (entries == null) {
173             entries = new Entries(hostname);
174             Entries oldEntries = resolveCache.putIfAbsent(hostname, entries);
175             if (oldEntries != null) {
176                 entries = oldEntries;
177             }
178         }
179         entries.add(value, ttl, loop);
180     }
181 
182     /**
183      * Return the number of hostnames for which we have cached something.
184      */
185     final int size() {
186         return resolveCache.size();
187     }
188 
189     /**
190      * Returns {@code true} if this entry should replace all other entries that are already cached for the hostname.
191      */
192     protected abstract boolean shouldReplaceAll(E entry);
193 
194     /**
195      * Sort the {@link List} for a {@code hostname} before caching these.
196      */
197     protected void sortEntries(
198             @SuppressWarnings("unused") String hostname, @SuppressWarnings("unused") List<E> entries) {
199         // NOOP.
200     }
201 
202     /**
203      * Returns {@code true} if both entries are equal.
204      */
205     protected abstract boolean equals(E entry, E otherEntry);
206 
207     // Directly extend AtomicReference for intrinsics and also to keep memory overhead low.
208     private final class Entries extends AtomicReference<List<E>> implements Runnable {
209 
210         private final String hostname;
211         // Needs to be package-private to be able to access it via the AtomicReferenceFieldUpdater
212         volatile FutureAndDelay expirationFuture;
213 
214         Entries(String hostname) {
215             super(Collections.emptyList());
216             this.hostname = hostname;
217         }
218 
219         void add(E e, int ttl, EventLoop loop) {
220             if (!shouldReplaceAll(e)) {
221                 for (;;) {
222                     List<E> entries = get();
223                     if (!entries.isEmpty()) {
224                         final E firstEntry = entries.get(0);
225                         if (shouldReplaceAll(firstEntry)) {
226                             assert entries.size() == 1;
227 
228                             if (compareAndSet(entries, singletonList(e))) {
229                                 scheduleCacheExpirationIfNeeded(ttl, loop);
230                                 return;
231                             } else {
232                                 // Need to try again as CAS failed
233                                 continue;
234                             }
235                         }
236 
237                         // Create a new List for COW semantics
238                         List<E> newEntries = new ArrayList<>(entries.size() + 1);
239                         int i = 0;
240                         E replacedEntry = null;
241                         do {
242                             E entry = entries.get(i);
243                             // Only add old entry if the address is not the same as the one we try to add as well.
244                             // In this case we will skip it and just add the new entry as this may have
245                             // more up-to-date data and cancel the old after we were able to update the cache.
246                             if (!Cache.this.equals(e, entry)) {
247                                 newEntries.add(entry);
248                             } else {
249                                 replacedEntry = entry;
250                                 newEntries.add(e);
251 
252                                 ++i;
253                                 for (; i < entries.size(); ++i) {
254                                     newEntries.add(entries.get(i));
255                                 }
256                                 break;
257                             }
258                         } while (++i < entries.size());
259                         if (replacedEntry == null) {
260                             newEntries.add(e);
261                         }
262                         sortEntries(hostname, newEntries);
263 
264                         if (compareAndSet(entries, Collections.unmodifiableList(newEntries))) {
265                             scheduleCacheExpirationIfNeeded(ttl, loop);
266                             return;
267                         }
268                     } else if (compareAndSet(entries, singletonList(e))) {
269                         scheduleCacheExpirationIfNeeded(ttl, loop);
270                         return;
271                     }
272                 }
273             } else {
274                 set(singletonList(e));
275                 scheduleCacheExpirationIfNeeded(ttl, loop);
276             }
277         }
278 
279         private void scheduleCacheExpirationIfNeeded(int ttl, EventLoop loop) {
280             for (;;) {
281                 // We currently don't calculate a new TTL when we need to retry the CAS as we don't expect this to
282                 // be invoked very concurrently and also we use SECONDS anyway. If this ever becomes a problem
283                 // we can reconsider.
284                 FutureAndDelay oldFuture = FUTURE_UPDATER.get(this);
285                 if (oldFuture == null || oldFuture.getDelay(TimeUnit.SECONDS) > ttl) {
286                     Future<?> newFuture = loop.schedule(this, ttl, TimeUnit.SECONDS);
287                     // It is possible that
288                     // 1. task will fire in between this line, or
289                     // 2. multiple timers may be set if there is concurrency
290                     // (1) Shouldn't be a problem because we will fail the CAS and then the next loop will see CANCELLED
291                     //     so the ttl will not be less, and we will bail out of the loop.
292                     // (2) This is a trade-off to avoid concurrency resulting in contention on a synchronized block.
293                     if (FUTURE_UPDATER.compareAndSet(this, oldFuture, new FutureAndDelay(newFuture, ttl))) {
294                         if (oldFuture != null) {
295                             oldFuture.cancel();
296                         }
297                         break;
298                     } else {
299                         // There was something else scheduled in the meantime... Cancel and try again.
300                         newFuture.cancel();
301                     }
302                 } else {
303                     break;
304                 }
305             }
306         }
307 
308         boolean clearAndCancel() {
309             List<E> entries = getAndSet(Collections.emptyList());
310             if (entries.isEmpty()) {
311                 return false;
312             }
313 
314             FutureAndDelay expirationFuture = FUTURE_UPDATER.getAndSet(this, CANCELLED);
315             if (expirationFuture != null) {
316                 expirationFuture.cancel();
317             }
318 
319             return true;
320         }
321 
322         @Override
323         public void run() {
324             // We always remove all entries for a hostname once one entry expire. This is not the
325             // most efficient to do but this way we can guarantee that if a DnsResolver
326             // be configured to prefer one ip family over the other we will not return unexpected
327             // results to the enduser if one of the A or AAAA records has different TTL settings.
328             //
329             // As a TTL is just a hint of the maximum time a cache is allowed to cache stuff it's
330             // completely fine to remove the entry even if the TTL is not reached yet.
331             //
332             // See https://github.com/netty/netty/issues/7329
333             resolveCache.remove(hostname, this);
334 
335             clearAndCancel();
336         }
337     }
338 
339     private static final class FutureAndDelay implements Delayed {
340         final Future<?> future;
341         final long deadlineNanos;
342 
343         private FutureAndDelay(Future<?> future, int ttl) {
344             this.future = Objects.requireNonNull(future, "future");
345             deadlineNanos = System.nanoTime() + TimeUnit.SECONDS.toNanos(ttl);
346         }
347 
348         void cancel() {
349             future.cancel();
350         }
351 
352         @Override
353         public long getDelay(TimeUnit unit) {
354             return unit.convert(deadlineNanos - System.nanoTime(), TimeUnit.NANOSECONDS);
355         }
356 
357         @Override
358         public int compareTo(Delayed other) {
359             return Long.compare(deadlineNanos, other.getDelay(TimeUnit.NANOSECONDS));
360         }
361 
362         @Override
363         public boolean equals(Object o) {
364             return o instanceof FutureAndDelay && compareTo((FutureAndDelay) o) == 0;
365         }
366 
367         @Override
368         public int hashCode() {
369             int result = future.hashCode();
370             result = 31 * result + (int) (deadlineNanos ^ deadlineNanos >>> 32);
371             return result;
372         }
373     }
374 }