blob: 8de348a7715ceab29d39a7dd5014e54638f35a0a [file] [log] [blame]
Jonathan Hartdb3af892015-01-26 13:19:07 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Jonathan Hart77bdd262015-02-03 09:07:48 -080016package org.onosproject.store.ecmap;
Jonathan Hartdb3af892015-01-26 13:19:07 -080017
Jonathan Hartaaa56572015-01-28 21:56:35 -080018import org.apache.commons.lang3.RandomUtils;
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -080019import org.apache.commons.lang3.mutable.MutableBoolean;
Jonathan Hartf9108232015-02-02 16:37:35 -080020import org.apache.commons.lang3.tuple.Pair;
Jonathan Hartdb3af892015-01-26 13:19:07 -080021import org.onlab.util.KryoNamespace;
22import org.onosproject.cluster.ClusterService;
Jonathan Hartaaa56572015-01-28 21:56:35 -080023import org.onosproject.cluster.ControllerNode;
Jonathan Hartdb3af892015-01-26 13:19:07 -080024import org.onosproject.cluster.NodeId;
25import org.onosproject.store.Timestamp;
26import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
27import org.onosproject.store.cluster.messaging.ClusterMessage;
28import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
29import org.onosproject.store.cluster.messaging.MessageSubject;
Jonathan Hart77bdd262015-02-03 09:07:48 -080030import org.onosproject.store.impl.ClockService;
31import org.onosproject.store.impl.Timestamped;
32import org.onosproject.store.impl.WallClockTimestamp;
Jonathan Hartdb3af892015-01-26 13:19:07 -080033import org.onosproject.store.serializers.KryoSerializer;
34import org.slf4j.Logger;
35import org.slf4j.LoggerFactory;
36
37import java.io.IOException;
38import java.util.ArrayList;
39import java.util.Collection;
Jonathan Hartaaa56572015-01-28 21:56:35 -080040import java.util.HashMap;
41import java.util.LinkedList;
Jonathan Hartdb3af892015-01-26 13:19:07 -080042import java.util.List;
43import java.util.Map;
44import java.util.Set;
45import java.util.concurrent.ConcurrentHashMap;
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -080046import java.util.concurrent.ConcurrentMap;
Jonathan Hartdb3af892015-01-26 13:19:07 -080047import java.util.concurrent.CopyOnWriteArraySet;
48import java.util.concurrent.ExecutorService;
49import java.util.concurrent.Executors;
50import java.util.concurrent.ScheduledExecutorService;
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -080051import java.util.concurrent.TimeUnit;
Jonathan Hartdb3af892015-01-26 13:19:07 -080052import java.util.stream.Collectors;
53
54import static com.google.common.base.Preconditions.checkNotNull;
55import static com.google.common.base.Preconditions.checkState;
56import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Brian O'Connor4b2ba5f2015-02-18 20:54:00 -080057import static org.onlab.util.Tools.groupedThreads;
Jonathan Hartdb3af892015-01-26 13:19:07 -080058import static org.onlab.util.Tools.minPriority;
Jonathan Hartdb3af892015-01-26 13:19:07 -080059
60/**
61 * Distributed Map implementation which uses optimistic replication and gossip
62 * based techniques to provide an eventually consistent data store.
63 */
64public class EventuallyConsistentMapImpl<K, V>
65 implements EventuallyConsistentMap<K, V> {
66
67 private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
68
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -080069 private final ConcurrentMap<K, Timestamped<V>> items;
70 private final ConcurrentMap<K, Timestamp> removedItems;
Jonathan Hartdb3af892015-01-26 13:19:07 -080071
Jonathan Hartdb3af892015-01-26 13:19:07 -080072 private final ClusterService clusterService;
73 private final ClusterCommunicationService clusterCommunicator;
74 private final KryoSerializer serializer;
75
Jonathan Hart5ec32ba2015-02-05 13:33:58 -080076 private final ClockService<K, V> clockService;
Jonathan Hartdb3af892015-01-26 13:19:07 -080077
78 private final MessageSubject updateMessageSubject;
79 private final MessageSubject removeMessageSubject;
Jonathan Hartaaa56572015-01-28 21:56:35 -080080 private final MessageSubject antiEntropyAdvertisementSubject;
Jonathan Hartdb3af892015-01-26 13:19:07 -080081
Jonathan Hartaaa56572015-01-28 21:56:35 -080082 private final Set<EventuallyConsistentMapListener<K, V>> listeners
Jonathan Hartdb3af892015-01-26 13:19:07 -080083 = new CopyOnWriteArraySet<>();
84
85 private final ExecutorService executor;
86
87 private final ScheduledExecutorService backgroundExecutor;
88
Madan Jampanib28e4ad2015-02-19 12:31:37 -080089 private ExecutorService broadcastMessageExecutor;
Madan Jampani28726282015-02-19 11:40:23 -080090
Jonathan Hartdb3af892015-01-26 13:19:07 -080091 private volatile boolean destroyed = false;
Jonathan Hart539a6462015-01-27 17:05:43 -080092 private static final String ERROR_DESTROYED = " map is already destroyed";
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -080093 private final String destroyedMessage;
Jonathan Hartdb3af892015-01-26 13:19:07 -080094
Jonathan Hart4f397e82015-02-04 09:10:41 -080095 private static final String ERROR_NULL_KEY = "Key cannot be null";
96 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
97
Jonathan Hartdb3af892015-01-26 13:19:07 -080098 // TODO: Make these anti-entropy params configurable
99 private long initialDelaySec = 5;
100 private long periodSec = 5;
Jonathan Hartf893be82015-02-24 17:35:51 -0800101 private boolean lightweightAntiEntropy = true;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800102
103 /**
104 * Creates a new eventually consistent map shared amongst multiple instances.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800105 * <p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800106 * Each map is identified by a string map name. EventuallyConsistentMapImpl
107 * objects in different JVMs that use the same map name will form a
108 * distributed map across JVMs (provided the cluster service is aware of
109 * both nodes).
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800110 * </p>
111 * <p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800112 * The client is expected to provide an
113 * {@link org.onlab.util.KryoNamespace.Builder} with which all classes that
114 * will be stored in this map have been registered (including referenced
115 * classes). This serializer will be used to serialize both K and V for
116 * inter-node notifications.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800117 * </p>
118 * <p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800119 * The client must provide an {@link org.onosproject.store.impl.ClockService}
120 * which can generate timestamps for a given key. The clock service is free
121 * to generate timestamps however it wishes, however these timestamps will
122 * be used to serialize updates to the map so they must be strict enough
123 * to ensure updates are properly ordered for the use case (i.e. in some
124 * cases wallclock time will suffice, whereas in other cases logical time
125 * will be necessary).
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800126 * </p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800127 *
128 * @param mapName a String identifier for the map.
129 * @param clusterService the cluster service
130 * @param clusterCommunicator the cluster communications service
131 * @param serializerBuilder a Kryo namespace builder that can serialize
132 * both K and V
133 * @param clockService a clock service able to generate timestamps
134 * for K
135 */
136 public EventuallyConsistentMapImpl(String mapName,
137 ClusterService clusterService,
138 ClusterCommunicationService clusterCommunicator,
139 KryoNamespace.Builder serializerBuilder,
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800140 ClockService<K, V> clockService) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800141 this.clusterService = checkNotNull(clusterService);
142 this.clusterCommunicator = checkNotNull(clusterCommunicator);
143
144 serializer = createSerializer(checkNotNull(serializerBuilder));
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800145 destroyedMessage = mapName + ERROR_DESTROYED;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800146
147 this.clockService = checkNotNull(clockService);
148
149 items = new ConcurrentHashMap<>();
150 removedItems = new ConcurrentHashMap<>();
151
Brian O'Connor4b2ba5f2015-02-18 20:54:00 -0800152 executor = Executors //FIXME
153 .newFixedThreadPool(4, groupedThreads("onos/ecm", mapName + "-fg-%d"));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800154
Madan Jampani28726282015-02-19 11:40:23 -0800155 broadcastMessageExecutor = Executors.newSingleThreadExecutor(groupedThreads("onos/ecm", mapName + "-notify"));
156
Jonathan Hartdb3af892015-01-26 13:19:07 -0800157 backgroundExecutor =
158 newSingleThreadScheduledExecutor(minPriority(
Brian O'Connor4b2ba5f2015-02-18 20:54:00 -0800159 groupedThreads("onos/ecm", mapName + "-bg-%d")));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800160
Jonathan Hartaaa56572015-01-28 21:56:35 -0800161 // start anti-entropy thread
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800162 backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
163 initialDelaySec, periodSec,
164 TimeUnit.SECONDS);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800165
Jonathan Hartdb3af892015-01-26 13:19:07 -0800166 updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
167 clusterCommunicator.addSubscriber(updateMessageSubject,
Madan Jampani2af244a2015-02-22 13:12:01 -0800168 new InternalPutEventListener(), executor);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800169 removeMessageSubject = new MessageSubject("ecm-" + mapName + "-remove");
170 clusterCommunicator.addSubscriber(removeMessageSubject,
Madan Jampani2af244a2015-02-22 13:12:01 -0800171 new InternalRemoveEventListener(), executor);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800172 antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
173 clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
Madan Jampani2af244a2015-02-22 13:12:01 -0800174 new InternalAntiEntropyListener(), backgroundExecutor);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800175 }
176
177 private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
178 return new KryoSerializer() {
179 @Override
180 protected void setupKryoPool() {
181 // Add the map's internal helper classes to the user-supplied serializer
182 serializerPool = builder
183 .register(WallClockTimestamp.class)
184 .register(PutEntry.class)
Jonathan Hart539a6462015-01-27 17:05:43 -0800185 .register(RemoveEntry.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800186 .register(ArrayList.class)
187 .register(InternalPutEvent.class)
188 .register(InternalRemoveEvent.class)
Jonathan Hartaaa56572015-01-28 21:56:35 -0800189 .register(AntiEntropyAdvertisement.class)
190 .register(HashMap.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800191 .build();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800192 }
193 };
194 }
195
Madan Jampanib28e4ad2015-02-19 12:31:37 -0800196 /**
197 * Sets the executor to use for broadcasting messages and returns this
198 * instance for method chaining.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800199 *
Madan Jampanib28e4ad2015-02-19 12:31:37 -0800200 * @param executor executor service
201 * @return this instance
202 */
203 public EventuallyConsistentMapImpl<K, V> withBroadcastMessageExecutor(ExecutorService executor) {
204 checkNotNull(executor, "Null executor");
205 broadcastMessageExecutor = executor;
206 return this;
207 }
208
Jonathan Hartdb3af892015-01-26 13:19:07 -0800209 @Override
210 public int size() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800211 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800212 return items.size();
213 }
214
215 @Override
216 public boolean isEmpty() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800217 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800218 return items.isEmpty();
219 }
220
221 @Override
222 public boolean containsKey(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800223 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800224 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800225 return items.containsKey(key);
226 }
227
228 @Override
229 public boolean containsValue(V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800230 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800231 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800232
233 return items.values().stream()
234 .anyMatch(timestamped -> timestamped.value().equals(value));
235 }
236
237 @Override
238 public V get(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800239 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800240 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800241
242 Timestamped<V> value = items.get(key);
243 if (value != null) {
244 return value.value();
245 }
246 return null;
247 }
248
249 @Override
250 public void put(K key, V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800251 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800252 checkNotNull(key, ERROR_NULL_KEY);
253 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800254
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800255 Timestamp timestamp = clockService.getTimestamp(key, value);
256
Jonathan Hartdb3af892015-01-26 13:19:07 -0800257 if (putInternal(key, value, timestamp)) {
258 notifyPeers(new InternalPutEvent<>(key, value, timestamp));
259 EventuallyConsistentMapEvent<K, V> externalEvent
260 = new EventuallyConsistentMapEvent<>(
261 EventuallyConsistentMapEvent.Type.PUT, key, value);
262 notifyListeners(externalEvent);
263 }
264 }
265
266 private boolean putInternal(K key, V value, Timestamp timestamp) {
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800267 Timestamp removed = removedItems.get(key);
Jonathan Hart403ea932015-02-20 16:23:00 -0800268 if (removed != null && removed.isNewerThan(timestamp)) {
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800269 log.debug("ecmap - removed was newer {}", value);
270 return false;
271 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800272
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800273 final MutableBoolean updated = new MutableBoolean(false);
274
275 items.compute(key, (k, existing) -> {
Jonathan Hart403ea932015-02-20 16:23:00 -0800276 if (existing != null && existing.isNewerThan(timestamp)) {
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800277 updated.setFalse();
278 return existing;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800279 } else {
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800280 updated.setTrue();
281 return new Timestamped<>(value, timestamp);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800282 }
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800283 });
284
285 boolean success = updated.booleanValue();
286 if (!success) {
287 log.debug("ecmap - existing was newer {}", value);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800288 }
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800289
290 if (success && removed != null) {
291 removedItems.remove(key, removed);
292 }
293 return success;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800294 }
295
296 @Override
297 public void remove(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800298 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800299 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800300
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800301 // TODO prevent calls here if value is important for timestamp
302 Timestamp timestamp = clockService.getTimestamp(key, null);
303
Jonathan Hartdb3af892015-01-26 13:19:07 -0800304 if (removeInternal(key, timestamp)) {
305 notifyPeers(new InternalRemoveEvent<>(key, timestamp));
306 EventuallyConsistentMapEvent<K, V> externalEvent
307 = new EventuallyConsistentMapEvent<>(
308 EventuallyConsistentMapEvent.Type.REMOVE, key, null);
309 notifyListeners(externalEvent);
310 }
311 }
312
313 private boolean removeInternal(K key, Timestamp timestamp) {
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800314 final MutableBoolean updated = new MutableBoolean(false);
315
316 items.compute(key, (k, existing) -> {
317 if (existing != null && existing.isNewerThan(timestamp)) {
318 updated.setFalse();
319 return existing;
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800320 } else {
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800321 updated.setTrue();
322 // remove from items map
323 return null;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800324 }
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800325 });
326
327 if (updated.isFalse()) {
328 return false;
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800329 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800330
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800331 Timestamp removedTimestamp = removedItems.get(key);
332 if (removedTimestamp == null) {
333 return removedItems.putIfAbsent(key, timestamp) == null;
Jonathan Hart403ea932015-02-20 16:23:00 -0800334 } else if (timestamp.isNewerThan(removedTimestamp)) {
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800335 return removedItems.replace(key, removedTimestamp, timestamp);
336 } else {
337 return false;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800338 }
339 }
340
341 @Override
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800342 public void remove(K key, V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800343 checkState(!destroyed, destroyedMessage);
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800344 checkNotNull(key, ERROR_NULL_KEY);
345 checkNotNull(value, ERROR_NULL_VALUE);
346
347 Timestamp timestamp = clockService.getTimestamp(key, value);
348
349 if (removeInternal(key, timestamp)) {
350 notifyPeers(new InternalRemoveEvent<>(key, timestamp));
351 EventuallyConsistentMapEvent<K, V> externalEvent
352 = new EventuallyConsistentMapEvent<>(
353 EventuallyConsistentMapEvent.Type.REMOVE, key, value);
354 notifyListeners(externalEvent);
355 }
356 }
357
358 @Override
Jonathan Hartdb3af892015-01-26 13:19:07 -0800359 public void putAll(Map<? extends K, ? extends V> m) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800360 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800361
362 List<PutEntry<K, V>> updates = new ArrayList<>(m.size());
363
364 for (Map.Entry<? extends K, ? extends V> entry : m.entrySet()) {
365 K key = entry.getKey();
366 V value = entry.getValue();
Jonathan Hart4f397e82015-02-04 09:10:41 -0800367
368 checkNotNull(key, ERROR_NULL_KEY);
369 checkNotNull(value, ERROR_NULL_VALUE);
370
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800371 Timestamp timestamp = clockService.getTimestamp(key, value);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800372
373 if (putInternal(key, value, timestamp)) {
374 updates.add(new PutEntry<>(key, value, timestamp));
375 }
376 }
377
Jonathan Hart584d2f32015-01-27 19:46:14 -0800378 if (!updates.isEmpty()) {
379 notifyPeers(new InternalPutEvent<>(updates));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800380
Jonathan Hart584d2f32015-01-27 19:46:14 -0800381 for (PutEntry<K, V> entry : updates) {
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800382 EventuallyConsistentMapEvent<K, V> externalEvent =
383 new EventuallyConsistentMapEvent<>(
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800384 EventuallyConsistentMapEvent.Type.PUT, entry.key(),
385 entry.value());
Jonathan Hart584d2f32015-01-27 19:46:14 -0800386 notifyListeners(externalEvent);
387 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800388 }
389 }
390
391 @Override
392 public void clear() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800393 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800394
395 List<RemoveEntry<K>> removed = new ArrayList<>(items.size());
396
397 for (K key : items.keySet()) {
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800398 // TODO also this is not applicable if value is important for timestamp?
399 Timestamp timestamp = clockService.getTimestamp(key, null);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800400
401 if (removeInternal(key, timestamp)) {
402 removed.add(new RemoveEntry<>(key, timestamp));
403 }
404 }
405
Jonathan Hart584d2f32015-01-27 19:46:14 -0800406 if (!removed.isEmpty()) {
407 notifyPeers(new InternalRemoveEvent<>(removed));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800408
Jonathan Hart584d2f32015-01-27 19:46:14 -0800409 for (RemoveEntry<K> entry : removed) {
410 EventuallyConsistentMapEvent<K, V> externalEvent
411 = new EventuallyConsistentMapEvent<>(
412 EventuallyConsistentMapEvent.Type.REMOVE, entry.key(),
413 null);
414 notifyListeners(externalEvent);
415 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800416 }
417 }
418
419 @Override
420 public Set<K> keySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800421 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800422
423 return items.keySet();
424 }
425
426 @Override
427 public Collection<V> values() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800428 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800429
430 return items.values().stream()
431 .map(Timestamped::value)
432 .collect(Collectors.toList());
433 }
434
435 @Override
436 public Set<Map.Entry<K, V>> entrySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800437 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800438
439 return items.entrySet().stream()
Jonathan Hartf9108232015-02-02 16:37:35 -0800440 .map(e -> Pair.of(e.getKey(), e.getValue().value()))
Jonathan Hartdb3af892015-01-26 13:19:07 -0800441 .collect(Collectors.toSet());
442 }
443
444 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800445 public void addListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800446 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800447
448 listeners.add(checkNotNull(listener));
449 }
450
451 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800452 public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800453 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800454
455 listeners.remove(checkNotNull(listener));
456 }
457
458 @Override
459 public void destroy() {
460 destroyed = true;
461
462 executor.shutdown();
463 backgroundExecutor.shutdown();
Madan Jampani337bb442015-02-19 14:29:18 -0800464 broadcastMessageExecutor.shutdown();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800465
Jonathan Hart584d2f32015-01-27 19:46:14 -0800466 listeners.clear();
467
Jonathan Hartdb3af892015-01-26 13:19:07 -0800468 clusterCommunicator.removeSubscriber(updateMessageSubject);
469 clusterCommunicator.removeSubscriber(removeMessageSubject);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800470 clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800471 }
472
Jonathan Hartaaa56572015-01-28 21:56:35 -0800473 private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
474 for (EventuallyConsistentMapListener<K, V> listener : listeners) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800475 listener.event(event);
476 }
477 }
478
479 private void notifyPeers(InternalPutEvent event) {
Brian O'Connorb2894222015-02-20 22:05:19 -0800480 // FIXME extremely memory expensive when we are overrun
481// broadcastMessageExecutor.execute(() -> broadcastMessage(updateMessageSubject, event));
482 broadcastMessage(updateMessageSubject, event);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800483 }
484
485 private void notifyPeers(InternalRemoveEvent event) {
Brian O'Connorb2894222015-02-20 22:05:19 -0800486 // FIXME extremely memory expensive when we are overrun
487// broadcastMessageExecutor.execute(() -> broadcastMessage(removeMessageSubject, event));
488 broadcastMessage(removeMessageSubject, event);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800489 }
490
Jonathan Hart7d656f42015-01-27 14:07:23 -0800491 private void broadcastMessage(MessageSubject subject, Object event) {
Brian O'Connorb2894222015-02-20 22:05:19 -0800492 // FIXME can we parallelize the serialization... use the caller???
Jonathan Hartdb3af892015-01-26 13:19:07 -0800493 ClusterMessage message = new ClusterMessage(
494 clusterService.getLocalNode().id(),
495 subject,
496 serializer.encode(event));
Brian O'Connorb2894222015-02-20 22:05:19 -0800497 //broadcastMessageExecutor.execute(() -> clusterCommunicator.broadcast(message));
Madan Jampani337bb442015-02-19 14:29:18 -0800498 clusterCommunicator.broadcast(message);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800499 }
500
501 private void unicastMessage(NodeId peer,
502 MessageSubject subject,
503 Object event) throws IOException {
504 ClusterMessage message = new ClusterMessage(
505 clusterService.getLocalNode().id(),
506 subject,
507 serializer.encode(event));
508 clusterCommunicator.unicast(message, peer);
509 }
510
Jonathan Hartaaa56572015-01-28 21:56:35 -0800511 private final class SendAdvertisementTask implements Runnable {
512 @Override
513 public void run() {
514 if (Thread.currentThread().isInterrupted()) {
515 log.info("Interrupted, quitting");
516 return;
517 }
518
519 try {
520 final NodeId self = clusterService.getLocalNode().id();
521 Set<ControllerNode> nodes = clusterService.getNodes();
522
523 List<NodeId> nodeIds = nodes.stream()
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800524 .map(ControllerNode::id)
Jonathan Hartaaa56572015-01-28 21:56:35 -0800525 .collect(Collectors.toList());
526
527 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
528 log.trace("No other peers in the cluster.");
529 return;
530 }
531
532 NodeId peer;
533 do {
534 int idx = RandomUtils.nextInt(0, nodeIds.size());
535 peer = nodeIds.get(idx);
536 } while (peer.equals(self));
537
538 if (Thread.currentThread().isInterrupted()) {
539 log.info("Interrupted, quitting");
540 return;
541 }
542
543 AntiEntropyAdvertisement<K> ad = createAdvertisement();
544
545 try {
546 unicastMessage(peer, antiEntropyAdvertisementSubject, ad);
547 } catch (IOException e) {
548 log.debug("Failed to send anti-entropy advertisement to {}", peer);
549 }
550 } catch (Exception e) {
551 // Catch all exceptions to avoid scheduled task being suppressed.
552 log.error("Exception thrown while sending advertisement", e);
553 }
554 }
555 }
556
557 private AntiEntropyAdvertisement<K> createAdvertisement() {
558 final NodeId self = clusterService.getLocalNode().id();
559
560 Map<K, Timestamp> timestamps = new HashMap<>(items.size());
561
562 items.forEach((key, value) -> timestamps.put(key, value.timestamp()));
563
564 Map<K, Timestamp> tombstones = new HashMap<>(removedItems);
565
566 return new AntiEntropyAdvertisement<>(self, timestamps, tombstones);
567 }
568
569 private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
570 List<EventuallyConsistentMapEvent<K, V>> externalEvents;
571
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800572 externalEvents = antiEntropyCheckLocalItems(ad);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800573
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800574 antiEntropyCheckLocalRemoved(ad);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800575
Jonathan Hartf893be82015-02-24 17:35:51 -0800576 if (!lightweightAntiEntropy) {
577 externalEvents.addAll(antiEntropyCheckRemoteRemoved(ad));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800578
Jonathan Hartf893be82015-02-24 17:35:51 -0800579 // if remote ad has something unknown, actively sync
580 for (K key : ad.timestamps().keySet()) {
581 if (!items.containsKey(key)) {
582 // Send the advertisement back if this peer is out-of-sync
583 final NodeId sender = ad.sender();
584 AntiEntropyAdvertisement<K> myAd = createAdvertisement();
585 try {
586 unicastMessage(sender, antiEntropyAdvertisementSubject, myAd);
587 } catch (IOException e) {
588 log.debug(
589 "Failed to send reactive anti-entropy advertisement to {}",
590 sender);
591 }
592
593 break;
594 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800595 }
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800596 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800597 externalEvents.forEach(this::notifyListeners);
598 }
599
600 /**
601 * Checks if any of the remote's live items or tombstones are out of date
602 * according to our local live item list, or if our live items are out of
603 * date according to the remote's tombstone list.
604 * If the local copy is more recent, it will be pushed to the remote. If the
605 * remote has a more recent remove, we apply that to the local state.
606 *
607 * @param ad remote anti-entropy advertisement
608 * @return list of external events relating to local operations performed
609 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800610 private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
611 AntiEntropyAdvertisement<K> ad) {
612 final List<EventuallyConsistentMapEvent<K, V>> externalEvents
613 = new LinkedList<>();
614 final NodeId sender = ad.sender();
615
616 final List<PutEntry<K, V>> updatesToSend = new ArrayList<>();
617
618 for (Map.Entry<K, Timestamped<V>> item : items.entrySet()) {
619 K key = item.getKey();
620 Timestamped<V> localValue = item.getValue();
621
622 Timestamp remoteTimestamp = ad.timestamps().get(key);
623 if (remoteTimestamp == null) {
624 remoteTimestamp = ad.tombstones().get(key);
625 }
626 if (remoteTimestamp == null || localValue
Jonathan Hart403ea932015-02-20 16:23:00 -0800627 .isNewerThan(remoteTimestamp)) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800628 // local value is more recent, push to sender
629 updatesToSend
630 .add(new PutEntry<>(key, localValue.value(),
631 localValue.timestamp()));
632 }
633
634 Timestamp remoteDeadTimestamp = ad.tombstones().get(key);
635 if (remoteDeadTimestamp != null &&
Jonathan Hart403ea932015-02-20 16:23:00 -0800636 remoteDeadTimestamp.isNewerThan(localValue.timestamp())) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800637 // sender has a more recent remove
638 if (removeInternal(key, remoteDeadTimestamp)) {
639 externalEvents.add(new EventuallyConsistentMapEvent<>(
640 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
641 }
642 }
643 }
644
645 // Send all updates to the peer at once
646 if (!updatesToSend.isEmpty()) {
647 try {
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800648 unicastMessage(sender, updateMessageSubject,
649 new InternalPutEvent<>(updatesToSend));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800650 } catch (IOException e) {
651 log.warn("Failed to send advertisement response", e);
652 }
653 }
654
655 return externalEvents;
656 }
657
658 /**
659 * Checks if any items in the remote live list are out of date according
660 * to our tombstone list. If we find we have a more up to date tombstone,
661 * we'll send it to the remote.
662 *
663 * @param ad remote anti-entropy advertisement
664 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800665 private void antiEntropyCheckLocalRemoved(AntiEntropyAdvertisement<K> ad) {
666 final NodeId sender = ad.sender();
667
668 final List<RemoveEntry<K>> removesToSend = new ArrayList<>();
669
670 for (Map.Entry<K, Timestamp> dead : removedItems.entrySet()) {
671 K key = dead.getKey();
672 Timestamp localDeadTimestamp = dead.getValue();
673
674 Timestamp remoteLiveTimestamp = ad.timestamps().get(key);
675 if (remoteLiveTimestamp != null
Jonathan Hart403ea932015-02-20 16:23:00 -0800676 && localDeadTimestamp.isNewerThan(remoteLiveTimestamp)) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800677 // sender has zombie, push remove
678 removesToSend
679 .add(new RemoveEntry<>(key, localDeadTimestamp));
680 }
681 }
682
683 // Send all removes to the peer at once
684 if (!removesToSend.isEmpty()) {
685 try {
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800686 unicastMessage(sender, removeMessageSubject,
687 new InternalRemoveEvent<>(removesToSend));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800688 } catch (IOException e) {
689 log.warn("Failed to send advertisement response", e);
690 }
691 }
692 }
693
694 /**
695 * Checks if any of the local live items are out of date according to the
696 * remote's tombstone advertisements. If we find a local item is out of date,
697 * we'll apply the remove operation to the local state.
698 *
699 * @param ad remote anti-entropy advertisement
700 * @return list of external events relating to local operations performed
701 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800702 private List<EventuallyConsistentMapEvent<K, V>>
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800703 antiEntropyCheckRemoteRemoved(AntiEntropyAdvertisement<K> ad) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800704 final List<EventuallyConsistentMapEvent<K, V>> externalEvents
705 = new LinkedList<>();
706
707 for (Map.Entry<K, Timestamp> remoteDead : ad.tombstones().entrySet()) {
708 K key = remoteDead.getKey();
709 Timestamp remoteDeadTimestamp = remoteDead.getValue();
710
711 Timestamped<V> local = items.get(key);
712 Timestamp localDead = removedItems.get(key);
Jonathan Hart403ea932015-02-20 16:23:00 -0800713 if (local != null && remoteDeadTimestamp.isNewerThan(
714 local.timestamp())) {
715 // If the remote has a more recent tombstone than either our local
716 // value, then do a remove with their timestamp
Jonathan Hartaaa56572015-01-28 21:56:35 -0800717 if (removeInternal(key, remoteDeadTimestamp)) {
718 externalEvents.add(new EventuallyConsistentMapEvent<>(
719 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
720 }
Jonathan Hart403ea932015-02-20 16:23:00 -0800721 } else if (localDead != null && remoteDeadTimestamp.isNewerThan(
722 localDead)) {
723 // If the remote has a more recent tombstone than us, update ours
724 // to their timestamp
725 removeInternal(key, remoteDeadTimestamp);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800726 }
727 }
728
729 return externalEvents;
730 }
731
732 private final class InternalAntiEntropyListener
733 implements ClusterMessageHandler {
734
735 @Override
736 public void handle(ClusterMessage message) {
Jonathan Hart4fd4ebb2015-02-04 17:38:48 -0800737 log.trace("Received anti-entropy advertisement from peer: {}",
738 message.sender());
Jonathan Hartaaa56572015-01-28 21:56:35 -0800739 AntiEntropyAdvertisement<K> advertisement = serializer.decode(message.payload());
Madan Jampani2af244a2015-02-22 13:12:01 -0800740 try {
741 handleAntiEntropyAdvertisement(advertisement);
742 } catch (Exception e) {
743 log.warn("Exception thrown handling advertisements", e);
744 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800745 }
746 }
747
Jonathan Hartdb3af892015-01-26 13:19:07 -0800748 private final class InternalPutEventListener implements
749 ClusterMessageHandler {
750 @Override
751 public void handle(ClusterMessage message) {
752 log.debug("Received put event from peer: {}", message.sender());
753 InternalPutEvent<K, V> event = serializer.decode(message.payload());
754
Madan Jampani2af244a2015-02-22 13:12:01 -0800755 try {
756 for (PutEntry<K, V> entry : event.entries()) {
757 K key = entry.key();
758 V value = entry.value();
759 Timestamp timestamp = entry.timestamp();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800760
Madan Jampani2af244a2015-02-22 13:12:01 -0800761 if (putInternal(key, value, timestamp)) {
762 EventuallyConsistentMapEvent<K, V> externalEvent =
763 new EventuallyConsistentMapEvent<>(
764 EventuallyConsistentMapEvent.Type.PUT, key,
765 value);
766 notifyListeners(externalEvent);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800767 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800768 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800769 } catch (Exception e) {
770 log.warn("Exception thrown handling put", e);
771 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800772 }
773 }
774
775 private final class InternalRemoveEventListener implements
776 ClusterMessageHandler {
777 @Override
778 public void handle(ClusterMessage message) {
779 log.debug("Received remove event from peer: {}", message.sender());
780 InternalRemoveEvent<K> event = serializer.decode(message.payload());
Madan Jampani2af244a2015-02-22 13:12:01 -0800781 try {
782 for (RemoveEntry<K> entry : event.entries()) {
783 K key = entry.key();
784 Timestamp timestamp = entry.timestamp();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800785
Madan Jampani2af244a2015-02-22 13:12:01 -0800786 if (removeInternal(key, timestamp)) {
787 EventuallyConsistentMapEvent<K, V> externalEvent
788 = new EventuallyConsistentMapEvent<>(
789 EventuallyConsistentMapEvent.Type.REMOVE,
790 key, null);
791 notifyListeners(externalEvent);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800792 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800793 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800794 } catch (Exception e) {
795 log.warn("Exception thrown handling remove", e);
796 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800797 }
798 }
799
Jonathan Hartdb3af892015-01-26 13:19:07 -0800800}