blob: 8a178cb5ee2ef766c08637723176cf1129a159d7 [file] [log] [blame]
Jonathan Hartdb3af892015-01-26 13:19:07 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Jonathan Hart77bdd262015-02-03 09:07:48 -080016package org.onosproject.store.ecmap;
Jonathan Hartdb3af892015-01-26 13:19:07 -080017
Jonathan Hartaaa56572015-01-28 21:56:35 -080018import org.apache.commons.lang3.RandomUtils;
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -080019import org.apache.commons.lang3.mutable.MutableBoolean;
Jonathan Hartf9108232015-02-02 16:37:35 -080020import org.apache.commons.lang3.tuple.Pair;
Jonathan Hartdb3af892015-01-26 13:19:07 -080021import org.onlab.util.KryoNamespace;
Jonathan Hart233a18a2015-03-02 17:24:58 -080022import org.onlab.util.SlidingWindowCounter;
Jonathan Hartdb3af892015-01-26 13:19:07 -080023import org.onosproject.cluster.ClusterService;
Jonathan Hartaaa56572015-01-28 21:56:35 -080024import org.onosproject.cluster.ControllerNode;
Jonathan Hartdb3af892015-01-26 13:19:07 -080025import org.onosproject.cluster.NodeId;
26import org.onosproject.store.Timestamp;
27import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
28import org.onosproject.store.cluster.messaging.ClusterMessage;
29import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
30import org.onosproject.store.cluster.messaging.MessageSubject;
Jonathan Hart77bdd262015-02-03 09:07:48 -080031import org.onosproject.store.impl.ClockService;
32import org.onosproject.store.impl.Timestamped;
33import org.onosproject.store.impl.WallClockTimestamp;
Jonathan Hartdb3af892015-01-26 13:19:07 -080034import org.onosproject.store.serializers.KryoSerializer;
35import org.slf4j.Logger;
36import org.slf4j.LoggerFactory;
37
38import java.io.IOException;
39import java.util.ArrayList;
40import java.util.Collection;
Jonathan Hartaaa56572015-01-28 21:56:35 -080041import java.util.HashMap;
42import java.util.LinkedList;
Jonathan Hartdb3af892015-01-26 13:19:07 -080043import java.util.List;
44import java.util.Map;
45import java.util.Set;
46import java.util.concurrent.ConcurrentHashMap;
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -080047import java.util.concurrent.ConcurrentMap;
Jonathan Hartdb3af892015-01-26 13:19:07 -080048import java.util.concurrent.CopyOnWriteArraySet;
49import java.util.concurrent.ExecutorService;
50import java.util.concurrent.Executors;
51import java.util.concurrent.ScheduledExecutorService;
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -080052import java.util.concurrent.TimeUnit;
Jonathan Hart233a18a2015-03-02 17:24:58 -080053import java.util.concurrent.atomic.AtomicLong;
Jonathan Hartdb3af892015-01-26 13:19:07 -080054import java.util.stream.Collectors;
55
56import static com.google.common.base.Preconditions.checkNotNull;
57import static com.google.common.base.Preconditions.checkState;
58import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Brian O'Connorc6713a82015-02-24 11:55:48 -080059import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
Brian O'Connor4b2ba5f2015-02-18 20:54:00 -080060import static org.onlab.util.Tools.groupedThreads;
Jonathan Hartdb3af892015-01-26 13:19:07 -080061
62/**
63 * Distributed Map implementation which uses optimistic replication and gossip
64 * based techniques to provide an eventually consistent data store.
65 */
66public class EventuallyConsistentMapImpl<K, V>
67 implements EventuallyConsistentMap<K, V> {
68
69 private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
70
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -080071 private final ConcurrentMap<K, Timestamped<V>> items;
72 private final ConcurrentMap<K, Timestamp> removedItems;
Jonathan Hartdb3af892015-01-26 13:19:07 -080073
Jonathan Hartdb3af892015-01-26 13:19:07 -080074 private final ClusterService clusterService;
75 private final ClusterCommunicationService clusterCommunicator;
76 private final KryoSerializer serializer;
77
Jonathan Hart5ec32ba2015-02-05 13:33:58 -080078 private final ClockService<K, V> clockService;
Jonathan Hartdb3af892015-01-26 13:19:07 -080079
80 private final MessageSubject updateMessageSubject;
81 private final MessageSubject removeMessageSubject;
Jonathan Hartaaa56572015-01-28 21:56:35 -080082 private final MessageSubject antiEntropyAdvertisementSubject;
Jonathan Hartdb3af892015-01-26 13:19:07 -080083
Jonathan Hartaaa56572015-01-28 21:56:35 -080084 private final Set<EventuallyConsistentMapListener<K, V>> listeners
Jonathan Hartdb3af892015-01-26 13:19:07 -080085 = new CopyOnWriteArraySet<>();
86
87 private final ExecutorService executor;
88
89 private final ScheduledExecutorService backgroundExecutor;
90
Madan Jampanib28e4ad2015-02-19 12:31:37 -080091 private ExecutorService broadcastMessageExecutor;
Madan Jampani28726282015-02-19 11:40:23 -080092
Jonathan Hartdb3af892015-01-26 13:19:07 -080093 private volatile boolean destroyed = false;
Jonathan Hart539a6462015-01-27 17:05:43 -080094 private static final String ERROR_DESTROYED = " map is already destroyed";
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -080095 private final String destroyedMessage;
Jonathan Hartdb3af892015-01-26 13:19:07 -080096
Jonathan Hart4f397e82015-02-04 09:10:41 -080097 private static final String ERROR_NULL_KEY = "Key cannot be null";
98 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
99
Jonathan Hartdb3af892015-01-26 13:19:07 -0800100 // TODO: Make these anti-entropy params configurable
101 private long initialDelaySec = 5;
102 private long periodSec = 5;
Jonathan Hartf893be82015-02-24 17:35:51 -0800103 private boolean lightweightAntiEntropy = true;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800104
Jonathan Hart233a18a2015-03-02 17:24:58 -0800105 private static final int WINDOW_SIZE = 5;
106 private static final int HIGH_LOAD_THRESHOLD = 0;
107 private static final int LOAD_WINDOW = 2;
108 SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
109 AtomicLong operations = new AtomicLong();
110
Jonathan Hartdb3af892015-01-26 13:19:07 -0800111 /**
112 * Creates a new eventually consistent map shared amongst multiple instances.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800113 * <p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800114 * Each map is identified by a string map name. EventuallyConsistentMapImpl
115 * objects in different JVMs that use the same map name will form a
116 * distributed map across JVMs (provided the cluster service is aware of
117 * both nodes).
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800118 * </p>
119 * <p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800120 * The client is expected to provide an
121 * {@link org.onlab.util.KryoNamespace.Builder} with which all classes that
122 * will be stored in this map have been registered (including referenced
123 * classes). This serializer will be used to serialize both K and V for
124 * inter-node notifications.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800125 * </p>
126 * <p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800127 * The client must provide an {@link org.onosproject.store.impl.ClockService}
128 * which can generate timestamps for a given key. The clock service is free
129 * to generate timestamps however it wishes, however these timestamps will
130 * be used to serialize updates to the map so they must be strict enough
131 * to ensure updates are properly ordered for the use case (i.e. in some
132 * cases wallclock time will suffice, whereas in other cases logical time
133 * will be necessary).
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800134 * </p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800135 *
136 * @param mapName a String identifier for the map.
137 * @param clusterService the cluster service
138 * @param clusterCommunicator the cluster communications service
139 * @param serializerBuilder a Kryo namespace builder that can serialize
140 * both K and V
141 * @param clockService a clock service able to generate timestamps
142 * for K
143 */
144 public EventuallyConsistentMapImpl(String mapName,
145 ClusterService clusterService,
146 ClusterCommunicationService clusterCommunicator,
147 KryoNamespace.Builder serializerBuilder,
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800148 ClockService<K, V> clockService) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800149 this.clusterService = checkNotNull(clusterService);
150 this.clusterCommunicator = checkNotNull(clusterCommunicator);
151
152 serializer = createSerializer(checkNotNull(serializerBuilder));
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800153 destroyedMessage = mapName + ERROR_DESTROYED;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800154
155 this.clockService = checkNotNull(clockService);
156
157 items = new ConcurrentHashMap<>();
158 removedItems = new ConcurrentHashMap<>();
159
Brian O'Connorc6713a82015-02-24 11:55:48 -0800160 // should be a normal executor; it's used for receiving messages
161 //TODO make # of threads configurable
162 executor = Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d"));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800163
Brian O'Connorc6713a82015-02-24 11:55:48 -0800164 // sending executor; should be capped
165 //TODO make # of threads configurable
166 broadcastMessageExecutor = //newSingleThreadExecutor(groupedThreads("onos/ecm", mapName + "-notify"));
167 newFixedThreadPool(4, groupedThreads("onos/ecm", mapName + "-notify"));
Madan Jampani28726282015-02-19 11:40:23 -0800168
Jonathan Hartdb3af892015-01-26 13:19:07 -0800169 backgroundExecutor =
Brian O'Connorc6713a82015-02-24 11:55:48 -0800170 //FIXME anti-entropy can take >60 seconds and it blocks fg workers
171 // ... dropping minPriority to try to help until this can be parallel
172 newSingleThreadScheduledExecutor(//minPriority(
Jonathan Hart233a18a2015-03-02 17:24:58 -0800173 groupedThreads("onos/ecm", mapName + "-bg-%d"))/*)*/;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800174
Jonathan Hartaaa56572015-01-28 21:56:35 -0800175 // start anti-entropy thread
Brian O'Connorc6713a82015-02-24 11:55:48 -0800176 //TODO disable anti-entropy for now in testing (it is unstable)
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800177 backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
178 initialDelaySec, periodSec,
179 TimeUnit.SECONDS);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800180
Jonathan Hartdb3af892015-01-26 13:19:07 -0800181 updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
182 clusterCommunicator.addSubscriber(updateMessageSubject,
Madan Jampani2af244a2015-02-22 13:12:01 -0800183 new InternalPutEventListener(), executor);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800184 removeMessageSubject = new MessageSubject("ecm-" + mapName + "-remove");
185 clusterCommunicator.addSubscriber(removeMessageSubject,
Madan Jampani2af244a2015-02-22 13:12:01 -0800186 new InternalRemoveEventListener(), executor);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800187 antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
188 clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
Madan Jampani2af244a2015-02-22 13:12:01 -0800189 new InternalAntiEntropyListener(), backgroundExecutor);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800190 }
191
192 private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
193 return new KryoSerializer() {
194 @Override
195 protected void setupKryoPool() {
196 // Add the map's internal helper classes to the user-supplied serializer
197 serializerPool = builder
198 .register(WallClockTimestamp.class)
199 .register(PutEntry.class)
Jonathan Hart539a6462015-01-27 17:05:43 -0800200 .register(RemoveEntry.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800201 .register(ArrayList.class)
202 .register(InternalPutEvent.class)
203 .register(InternalRemoveEvent.class)
Jonathan Hartaaa56572015-01-28 21:56:35 -0800204 .register(AntiEntropyAdvertisement.class)
205 .register(HashMap.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800206 .build();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800207 }
208 };
209 }
210
Madan Jampanib28e4ad2015-02-19 12:31:37 -0800211 /**
212 * Sets the executor to use for broadcasting messages and returns this
213 * instance for method chaining.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800214 *
Madan Jampanib28e4ad2015-02-19 12:31:37 -0800215 * @param executor executor service
216 * @return this instance
217 */
218 public EventuallyConsistentMapImpl<K, V> withBroadcastMessageExecutor(ExecutorService executor) {
219 checkNotNull(executor, "Null executor");
220 broadcastMessageExecutor = executor;
221 return this;
222 }
223
Jonathan Hartdb3af892015-01-26 13:19:07 -0800224 @Override
225 public int size() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800226 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800227 return items.size();
228 }
229
230 @Override
231 public boolean isEmpty() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800232 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800233 return items.isEmpty();
234 }
235
236 @Override
237 public boolean containsKey(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800238 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800239 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800240 return items.containsKey(key);
241 }
242
243 @Override
244 public boolean containsValue(V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800245 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800246 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800247
248 return items.values().stream()
249 .anyMatch(timestamped -> timestamped.value().equals(value));
250 }
251
252 @Override
253 public V get(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800254 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800255 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800256
257 Timestamped<V> value = items.get(key);
258 if (value != null) {
259 return value.value();
260 }
261 return null;
262 }
263
264 @Override
265 public void put(K key, V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800266 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800267 checkNotNull(key, ERROR_NULL_KEY);
268 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800269
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800270 Timestamp timestamp = clockService.getTimestamp(key, value);
271
Jonathan Hartdb3af892015-01-26 13:19:07 -0800272 if (putInternal(key, value, timestamp)) {
273 notifyPeers(new InternalPutEvent<>(key, value, timestamp));
274 EventuallyConsistentMapEvent<K, V> externalEvent
275 = new EventuallyConsistentMapEvent<>(
276 EventuallyConsistentMapEvent.Type.PUT, key, value);
277 notifyListeners(externalEvent);
278 }
279 }
280
281 private boolean putInternal(K key, V value, Timestamp timestamp) {
Jonathan Hart233a18a2015-03-02 17:24:58 -0800282 counter.incrementCount();
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800283 Timestamp removed = removedItems.get(key);
Jonathan Hart403ea932015-02-20 16:23:00 -0800284 if (removed != null && removed.isNewerThan(timestamp)) {
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800285 log.debug("ecmap - removed was newer {}", value);
286 return false;
287 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800288
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800289 final MutableBoolean updated = new MutableBoolean(false);
290
291 items.compute(key, (k, existing) -> {
Jonathan Hart403ea932015-02-20 16:23:00 -0800292 if (existing != null && existing.isNewerThan(timestamp)) {
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800293 updated.setFalse();
294 return existing;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800295 } else {
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800296 updated.setTrue();
297 return new Timestamped<>(value, timestamp);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800298 }
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800299 });
300
301 boolean success = updated.booleanValue();
302 if (!success) {
303 log.debug("ecmap - existing was newer {}", value);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800304 }
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800305
306 if (success && removed != null) {
307 removedItems.remove(key, removed);
308 }
309 return success;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800310 }
311
312 @Override
313 public void remove(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800314 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800315 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800316
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800317 // TODO prevent calls here if value is important for timestamp
318 Timestamp timestamp = clockService.getTimestamp(key, null);
319
Jonathan Hartdb3af892015-01-26 13:19:07 -0800320 if (removeInternal(key, timestamp)) {
321 notifyPeers(new InternalRemoveEvent<>(key, timestamp));
322 EventuallyConsistentMapEvent<K, V> externalEvent
323 = new EventuallyConsistentMapEvent<>(
324 EventuallyConsistentMapEvent.Type.REMOVE, key, null);
325 notifyListeners(externalEvent);
326 }
327 }
328
329 private boolean removeInternal(K key, Timestamp timestamp) {
Jonathan Hart233a18a2015-03-02 17:24:58 -0800330 counter.incrementCount();
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800331 final MutableBoolean updated = new MutableBoolean(false);
332
333 items.compute(key, (k, existing) -> {
334 if (existing != null && existing.isNewerThan(timestamp)) {
335 updated.setFalse();
336 return existing;
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800337 } else {
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800338 updated.setTrue();
339 // remove from items map
340 return null;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800341 }
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800342 });
343
344 if (updated.isFalse()) {
345 return false;
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800346 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800347
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800348 Timestamp removedTimestamp = removedItems.get(key);
349 if (removedTimestamp == null) {
350 return removedItems.putIfAbsent(key, timestamp) == null;
Jonathan Hart403ea932015-02-20 16:23:00 -0800351 } else if (timestamp.isNewerThan(removedTimestamp)) {
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800352 return removedItems.replace(key, removedTimestamp, timestamp);
353 } else {
354 return false;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800355 }
356 }
357
358 @Override
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800359 public void remove(K key, V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800360 checkState(!destroyed, destroyedMessage);
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800361 checkNotNull(key, ERROR_NULL_KEY);
362 checkNotNull(value, ERROR_NULL_VALUE);
363
364 Timestamp timestamp = clockService.getTimestamp(key, value);
365
366 if (removeInternal(key, timestamp)) {
367 notifyPeers(new InternalRemoveEvent<>(key, timestamp));
368 EventuallyConsistentMapEvent<K, V> externalEvent
369 = new EventuallyConsistentMapEvent<>(
370 EventuallyConsistentMapEvent.Type.REMOVE, key, value);
371 notifyListeners(externalEvent);
372 }
373 }
374
375 @Override
Jonathan Hartdb3af892015-01-26 13:19:07 -0800376 public void putAll(Map<? extends K, ? extends V> m) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800377 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800378
379 List<PutEntry<K, V>> updates = new ArrayList<>(m.size());
380
381 for (Map.Entry<? extends K, ? extends V> entry : m.entrySet()) {
382 K key = entry.getKey();
383 V value = entry.getValue();
Jonathan Hart4f397e82015-02-04 09:10:41 -0800384
385 checkNotNull(key, ERROR_NULL_KEY);
386 checkNotNull(value, ERROR_NULL_VALUE);
387
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800388 Timestamp timestamp = clockService.getTimestamp(key, value);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800389
390 if (putInternal(key, value, timestamp)) {
391 updates.add(new PutEntry<>(key, value, timestamp));
392 }
393 }
394
Jonathan Hart584d2f32015-01-27 19:46:14 -0800395 if (!updates.isEmpty()) {
396 notifyPeers(new InternalPutEvent<>(updates));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800397
Jonathan Hart584d2f32015-01-27 19:46:14 -0800398 for (PutEntry<K, V> entry : updates) {
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800399 EventuallyConsistentMapEvent<K, V> externalEvent =
400 new EventuallyConsistentMapEvent<>(
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800401 EventuallyConsistentMapEvent.Type.PUT, entry.key(),
402 entry.value());
Jonathan Hart584d2f32015-01-27 19:46:14 -0800403 notifyListeners(externalEvent);
404 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800405 }
406 }
407
408 @Override
409 public void clear() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800410 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800411
412 List<RemoveEntry<K>> removed = new ArrayList<>(items.size());
413
414 for (K key : items.keySet()) {
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800415 // TODO also this is not applicable if value is important for timestamp?
416 Timestamp timestamp = clockService.getTimestamp(key, null);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800417
418 if (removeInternal(key, timestamp)) {
419 removed.add(new RemoveEntry<>(key, timestamp));
420 }
421 }
422
Jonathan Hart584d2f32015-01-27 19:46:14 -0800423 if (!removed.isEmpty()) {
424 notifyPeers(new InternalRemoveEvent<>(removed));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800425
Jonathan Hart584d2f32015-01-27 19:46:14 -0800426 for (RemoveEntry<K> entry : removed) {
427 EventuallyConsistentMapEvent<K, V> externalEvent
428 = new EventuallyConsistentMapEvent<>(
429 EventuallyConsistentMapEvent.Type.REMOVE, entry.key(),
430 null);
431 notifyListeners(externalEvent);
432 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800433 }
434 }
435
436 @Override
437 public Set<K> keySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800438 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800439
440 return items.keySet();
441 }
442
443 @Override
444 public Collection<V> values() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800445 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800446
447 return items.values().stream()
448 .map(Timestamped::value)
449 .collect(Collectors.toList());
450 }
451
452 @Override
453 public Set<Map.Entry<K, V>> entrySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800454 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800455
456 return items.entrySet().stream()
Jonathan Hartf9108232015-02-02 16:37:35 -0800457 .map(e -> Pair.of(e.getKey(), e.getValue().value()))
Jonathan Hartdb3af892015-01-26 13:19:07 -0800458 .collect(Collectors.toSet());
459 }
460
461 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800462 public void addListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800463 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800464
465 listeners.add(checkNotNull(listener));
466 }
467
468 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800469 public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800470 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800471
472 listeners.remove(checkNotNull(listener));
473 }
474
475 @Override
476 public void destroy() {
477 destroyed = true;
478
479 executor.shutdown();
480 backgroundExecutor.shutdown();
Madan Jampani337bb442015-02-19 14:29:18 -0800481 broadcastMessageExecutor.shutdown();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800482
Jonathan Hart584d2f32015-01-27 19:46:14 -0800483 listeners.clear();
484
Jonathan Hartdb3af892015-01-26 13:19:07 -0800485 clusterCommunicator.removeSubscriber(updateMessageSubject);
486 clusterCommunicator.removeSubscriber(removeMessageSubject);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800487 clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800488 }
489
Jonathan Hartaaa56572015-01-28 21:56:35 -0800490 private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
491 for (EventuallyConsistentMapListener<K, V> listener : listeners) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800492 listener.event(event);
493 }
494 }
495
496 private void notifyPeers(InternalPutEvent event) {
Brian O'Connorb2894222015-02-20 22:05:19 -0800497 // FIXME extremely memory expensive when we are overrun
498// broadcastMessageExecutor.execute(() -> broadcastMessage(updateMessageSubject, event));
499 broadcastMessage(updateMessageSubject, event);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800500 }
501
502 private void notifyPeers(InternalRemoveEvent event) {
Brian O'Connorb2894222015-02-20 22:05:19 -0800503 // FIXME extremely memory expensive when we are overrun
504// broadcastMessageExecutor.execute(() -> broadcastMessage(removeMessageSubject, event));
505 broadcastMessage(removeMessageSubject, event);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800506 }
507
Jonathan Hart7d656f42015-01-27 14:07:23 -0800508 private void broadcastMessage(MessageSubject subject, Object event) {
Brian O'Connorb2894222015-02-20 22:05:19 -0800509 // FIXME can we parallelize the serialization... use the caller???
Jonathan Hartdb3af892015-01-26 13:19:07 -0800510 ClusterMessage message = new ClusterMessage(
511 clusterService.getLocalNode().id(),
512 subject,
513 serializer.encode(event));
Brian O'Connorc6713a82015-02-24 11:55:48 -0800514 broadcastMessageExecutor.execute(() -> clusterCommunicator.broadcast(message));
515// clusterCommunicator.broadcast(message);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800516 }
517
518 private void unicastMessage(NodeId peer,
519 MessageSubject subject,
520 Object event) throws IOException {
521 ClusterMessage message = new ClusterMessage(
522 clusterService.getLocalNode().id(),
523 subject,
524 serializer.encode(event));
525 clusterCommunicator.unicast(message, peer);
526 }
527
Jonathan Hart233a18a2015-03-02 17:24:58 -0800528 private boolean underHighLoad() {
529 return counter.get(LOAD_WINDOW) > HIGH_LOAD_THRESHOLD;
530 }
531
Jonathan Hartaaa56572015-01-28 21:56:35 -0800532 private final class SendAdvertisementTask implements Runnable {
533 @Override
534 public void run() {
535 if (Thread.currentThread().isInterrupted()) {
536 log.info("Interrupted, quitting");
537 return;
538 }
539
Jonathan Hart233a18a2015-03-02 17:24:58 -0800540 if (underHighLoad()) {
541 return;
542 }
543
Jonathan Hartaaa56572015-01-28 21:56:35 -0800544 try {
545 final NodeId self = clusterService.getLocalNode().id();
546 Set<ControllerNode> nodes = clusterService.getNodes();
547
548 List<NodeId> nodeIds = nodes.stream()
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800549 .map(ControllerNode::id)
Jonathan Hartaaa56572015-01-28 21:56:35 -0800550 .collect(Collectors.toList());
551
552 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
553 log.trace("No other peers in the cluster.");
554 return;
555 }
556
557 NodeId peer;
558 do {
559 int idx = RandomUtils.nextInt(0, nodeIds.size());
560 peer = nodeIds.get(idx);
561 } while (peer.equals(self));
562
563 if (Thread.currentThread().isInterrupted()) {
564 log.info("Interrupted, quitting");
565 return;
566 }
567
568 AntiEntropyAdvertisement<K> ad = createAdvertisement();
569
570 try {
571 unicastMessage(peer, antiEntropyAdvertisementSubject, ad);
572 } catch (IOException e) {
573 log.debug("Failed to send anti-entropy advertisement to {}", peer);
574 }
575 } catch (Exception e) {
576 // Catch all exceptions to avoid scheduled task being suppressed.
577 log.error("Exception thrown while sending advertisement", e);
578 }
579 }
580 }
581
582 private AntiEntropyAdvertisement<K> createAdvertisement() {
583 final NodeId self = clusterService.getLocalNode().id();
584
585 Map<K, Timestamp> timestamps = new HashMap<>(items.size());
586
587 items.forEach((key, value) -> timestamps.put(key, value.timestamp()));
588
589 Map<K, Timestamp> tombstones = new HashMap<>(removedItems);
590
591 return new AntiEntropyAdvertisement<>(self, timestamps, tombstones);
592 }
593
594 private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
595 List<EventuallyConsistentMapEvent<K, V>> externalEvents;
596
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800597 externalEvents = antiEntropyCheckLocalItems(ad);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800598
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800599 antiEntropyCheckLocalRemoved(ad);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800600
Jonathan Hartf893be82015-02-24 17:35:51 -0800601 if (!lightweightAntiEntropy) {
602 externalEvents.addAll(antiEntropyCheckRemoteRemoved(ad));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800603
Jonathan Hartf893be82015-02-24 17:35:51 -0800604 // if remote ad has something unknown, actively sync
605 for (K key : ad.timestamps().keySet()) {
606 if (!items.containsKey(key)) {
607 // Send the advertisement back if this peer is out-of-sync
608 final NodeId sender = ad.sender();
609 AntiEntropyAdvertisement<K> myAd = createAdvertisement();
610 try {
611 unicastMessage(sender, antiEntropyAdvertisementSubject, myAd);
612 } catch (IOException e) {
613 log.debug(
614 "Failed to send reactive anti-entropy advertisement to {}",
615 sender);
616 }
617
618 break;
619 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800620 }
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800621 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800622 externalEvents.forEach(this::notifyListeners);
623 }
624
625 /**
626 * Checks if any of the remote's live items or tombstones are out of date
627 * according to our local live item list, or if our live items are out of
628 * date according to the remote's tombstone list.
629 * If the local copy is more recent, it will be pushed to the remote. If the
630 * remote has a more recent remove, we apply that to the local state.
631 *
632 * @param ad remote anti-entropy advertisement
633 * @return list of external events relating to local operations performed
634 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800635 private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
636 AntiEntropyAdvertisement<K> ad) {
637 final List<EventuallyConsistentMapEvent<K, V>> externalEvents
638 = new LinkedList<>();
639 final NodeId sender = ad.sender();
640
641 final List<PutEntry<K, V>> updatesToSend = new ArrayList<>();
642
643 for (Map.Entry<K, Timestamped<V>> item : items.entrySet()) {
644 K key = item.getKey();
645 Timestamped<V> localValue = item.getValue();
646
647 Timestamp remoteTimestamp = ad.timestamps().get(key);
648 if (remoteTimestamp == null) {
649 remoteTimestamp = ad.tombstones().get(key);
650 }
651 if (remoteTimestamp == null || localValue
Jonathan Hart403ea932015-02-20 16:23:00 -0800652 .isNewerThan(remoteTimestamp)) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800653 // local value is more recent, push to sender
654 updatesToSend
655 .add(new PutEntry<>(key, localValue.value(),
656 localValue.timestamp()));
657 }
658
659 Timestamp remoteDeadTimestamp = ad.tombstones().get(key);
660 if (remoteDeadTimestamp != null &&
Jonathan Hart403ea932015-02-20 16:23:00 -0800661 remoteDeadTimestamp.isNewerThan(localValue.timestamp())) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800662 // sender has a more recent remove
663 if (removeInternal(key, remoteDeadTimestamp)) {
664 externalEvents.add(new EventuallyConsistentMapEvent<>(
665 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
666 }
667 }
668 }
669
670 // Send all updates to the peer at once
671 if (!updatesToSend.isEmpty()) {
672 try {
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800673 unicastMessage(sender, updateMessageSubject,
674 new InternalPutEvent<>(updatesToSend));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800675 } catch (IOException e) {
676 log.warn("Failed to send advertisement response", e);
677 }
678 }
679
680 return externalEvents;
681 }
682
683 /**
684 * Checks if any items in the remote live list are out of date according
685 * to our tombstone list. If we find we have a more up to date tombstone,
686 * we'll send it to the remote.
687 *
688 * @param ad remote anti-entropy advertisement
689 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800690 private void antiEntropyCheckLocalRemoved(AntiEntropyAdvertisement<K> ad) {
691 final NodeId sender = ad.sender();
692
693 final List<RemoveEntry<K>> removesToSend = new ArrayList<>();
694
695 for (Map.Entry<K, Timestamp> dead : removedItems.entrySet()) {
696 K key = dead.getKey();
697 Timestamp localDeadTimestamp = dead.getValue();
698
699 Timestamp remoteLiveTimestamp = ad.timestamps().get(key);
700 if (remoteLiveTimestamp != null
Jonathan Hart403ea932015-02-20 16:23:00 -0800701 && localDeadTimestamp.isNewerThan(remoteLiveTimestamp)) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800702 // sender has zombie, push remove
703 removesToSend
704 .add(new RemoveEntry<>(key, localDeadTimestamp));
705 }
706 }
707
708 // Send all removes to the peer at once
709 if (!removesToSend.isEmpty()) {
710 try {
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800711 unicastMessage(sender, removeMessageSubject,
712 new InternalRemoveEvent<>(removesToSend));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800713 } catch (IOException e) {
714 log.warn("Failed to send advertisement response", e);
715 }
716 }
717 }
718
719 /**
720 * Checks if any of the local live items are out of date according to the
721 * remote's tombstone advertisements. If we find a local item is out of date,
722 * we'll apply the remove operation to the local state.
723 *
724 * @param ad remote anti-entropy advertisement
725 * @return list of external events relating to local operations performed
726 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800727 private List<EventuallyConsistentMapEvent<K, V>>
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800728 antiEntropyCheckRemoteRemoved(AntiEntropyAdvertisement<K> ad) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800729 final List<EventuallyConsistentMapEvent<K, V>> externalEvents
730 = new LinkedList<>();
731
732 for (Map.Entry<K, Timestamp> remoteDead : ad.tombstones().entrySet()) {
733 K key = remoteDead.getKey();
734 Timestamp remoteDeadTimestamp = remoteDead.getValue();
735
736 Timestamped<V> local = items.get(key);
737 Timestamp localDead = removedItems.get(key);
Jonathan Hart403ea932015-02-20 16:23:00 -0800738 if (local != null && remoteDeadTimestamp.isNewerThan(
739 local.timestamp())) {
740 // If the remote has a more recent tombstone than either our local
741 // value, then do a remove with their timestamp
Jonathan Hartaaa56572015-01-28 21:56:35 -0800742 if (removeInternal(key, remoteDeadTimestamp)) {
743 externalEvents.add(new EventuallyConsistentMapEvent<>(
744 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
745 }
Jonathan Hart403ea932015-02-20 16:23:00 -0800746 } else if (localDead != null && remoteDeadTimestamp.isNewerThan(
747 localDead)) {
748 // If the remote has a more recent tombstone than us, update ours
749 // to their timestamp
750 removeInternal(key, remoteDeadTimestamp);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800751 }
752 }
753
754 return externalEvents;
755 }
756
757 private final class InternalAntiEntropyListener
758 implements ClusterMessageHandler {
759
760 @Override
761 public void handle(ClusterMessage message) {
Jonathan Hart4fd4ebb2015-02-04 17:38:48 -0800762 log.trace("Received anti-entropy advertisement from peer: {}",
763 message.sender());
Jonathan Hartaaa56572015-01-28 21:56:35 -0800764 AntiEntropyAdvertisement<K> advertisement = serializer.decode(message.payload());
Madan Jampani2af244a2015-02-22 13:12:01 -0800765 try {
Jonathan Hart233a18a2015-03-02 17:24:58 -0800766 if (!underHighLoad()) {
767 handleAntiEntropyAdvertisement(advertisement);
768 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800769 } catch (Exception e) {
770 log.warn("Exception thrown handling advertisements", e);
771 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800772 }
773 }
774
Jonathan Hartdb3af892015-01-26 13:19:07 -0800775 private final class InternalPutEventListener implements
776 ClusterMessageHandler {
777 @Override
778 public void handle(ClusterMessage message) {
779 log.debug("Received put event from peer: {}", message.sender());
780 InternalPutEvent<K, V> event = serializer.decode(message.payload());
781
Madan Jampani2af244a2015-02-22 13:12:01 -0800782 try {
783 for (PutEntry<K, V> entry : event.entries()) {
784 K key = entry.key();
785 V value = entry.value();
786 Timestamp timestamp = entry.timestamp();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800787
Madan Jampani2af244a2015-02-22 13:12:01 -0800788 if (putInternal(key, value, timestamp)) {
789 EventuallyConsistentMapEvent<K, V> externalEvent =
790 new EventuallyConsistentMapEvent<>(
791 EventuallyConsistentMapEvent.Type.PUT, key,
792 value);
793 notifyListeners(externalEvent);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800794 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800795 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800796 } catch (Exception e) {
797 log.warn("Exception thrown handling put", e);
798 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800799 }
800 }
801
802 private final class InternalRemoveEventListener implements
803 ClusterMessageHandler {
804 @Override
805 public void handle(ClusterMessage message) {
806 log.debug("Received remove event from peer: {}", message.sender());
807 InternalRemoveEvent<K> event = serializer.decode(message.payload());
Madan Jampani2af244a2015-02-22 13:12:01 -0800808 try {
809 for (RemoveEntry<K> entry : event.entries()) {
810 K key = entry.key();
811 Timestamp timestamp = entry.timestamp();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800812
Madan Jampani2af244a2015-02-22 13:12:01 -0800813 if (removeInternal(key, timestamp)) {
814 EventuallyConsistentMapEvent<K, V> externalEvent
815 = new EventuallyConsistentMapEvent<>(
816 EventuallyConsistentMapEvent.Type.REMOVE,
817 key, null);
818 notifyListeners(externalEvent);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800819 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800820 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800821 } catch (Exception e) {
822 log.warn("Exception thrown handling remove", e);
823 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800824 }
825 }
826
Jonathan Hartdb3af892015-01-26 13:19:07 -0800827}