blob: 1069f63baa0945c3c7ef6ef8c9da85ecc48e2a2a [file] [log] [blame]
Jonathan Hartdb3af892015-01-26 13:19:07 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Jonathan Hart77bdd262015-02-03 09:07:48 -080016package org.onosproject.store.ecmap;
Jonathan Hartdb3af892015-01-26 13:19:07 -080017
Jonathan Hartaaa56572015-01-28 21:56:35 -080018import org.apache.commons.lang3.RandomUtils;
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -080019import org.apache.commons.lang3.mutable.MutableBoolean;
Jonathan Hartf9108232015-02-02 16:37:35 -080020import org.apache.commons.lang3.tuple.Pair;
Jonathan Hartdb3af892015-01-26 13:19:07 -080021import org.onlab.util.KryoNamespace;
22import org.onosproject.cluster.ClusterService;
Jonathan Hartaaa56572015-01-28 21:56:35 -080023import org.onosproject.cluster.ControllerNode;
Jonathan Hartdb3af892015-01-26 13:19:07 -080024import org.onosproject.cluster.NodeId;
25import org.onosproject.store.Timestamp;
26import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
27import org.onosproject.store.cluster.messaging.ClusterMessage;
28import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
29import org.onosproject.store.cluster.messaging.MessageSubject;
Jonathan Hart77bdd262015-02-03 09:07:48 -080030import org.onosproject.store.impl.ClockService;
31import org.onosproject.store.impl.Timestamped;
32import org.onosproject.store.impl.WallClockTimestamp;
Jonathan Hartdb3af892015-01-26 13:19:07 -080033import org.onosproject.store.serializers.KryoSerializer;
34import org.slf4j.Logger;
35import org.slf4j.LoggerFactory;
36
37import java.io.IOException;
38import java.util.ArrayList;
39import java.util.Collection;
Jonathan Hartaaa56572015-01-28 21:56:35 -080040import java.util.HashMap;
41import java.util.LinkedList;
Jonathan Hartdb3af892015-01-26 13:19:07 -080042import java.util.List;
43import java.util.Map;
44import java.util.Set;
45import java.util.concurrent.ConcurrentHashMap;
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -080046import java.util.concurrent.ConcurrentMap;
Jonathan Hartdb3af892015-01-26 13:19:07 -080047import java.util.concurrent.CopyOnWriteArraySet;
48import java.util.concurrent.ExecutorService;
49import java.util.concurrent.Executors;
50import java.util.concurrent.ScheduledExecutorService;
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -080051import java.util.concurrent.TimeUnit;
Jonathan Hartdb3af892015-01-26 13:19:07 -080052import java.util.stream.Collectors;
53
54import static com.google.common.base.Preconditions.checkNotNull;
55import static com.google.common.base.Preconditions.checkState;
56import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Brian O'Connor4b2ba5f2015-02-18 20:54:00 -080057import static org.onlab.util.Tools.groupedThreads;
Jonathan Hartdb3af892015-01-26 13:19:07 -080058import static org.onlab.util.Tools.minPriority;
Jonathan Hartdb3af892015-01-26 13:19:07 -080059
60/**
61 * Distributed Map implementation which uses optimistic replication and gossip
62 * based techniques to provide an eventually consistent data store.
63 */
64public class EventuallyConsistentMapImpl<K, V>
65 implements EventuallyConsistentMap<K, V> {
66
67 private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
68
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -080069 private final ConcurrentMap<K, Timestamped<V>> items;
70 private final ConcurrentMap<K, Timestamp> removedItems;
Jonathan Hartdb3af892015-01-26 13:19:07 -080071
Jonathan Hartdb3af892015-01-26 13:19:07 -080072 private final ClusterService clusterService;
73 private final ClusterCommunicationService clusterCommunicator;
74 private final KryoSerializer serializer;
75
Jonathan Hart5ec32ba2015-02-05 13:33:58 -080076 private final ClockService<K, V> clockService;
Jonathan Hartdb3af892015-01-26 13:19:07 -080077
78 private final MessageSubject updateMessageSubject;
79 private final MessageSubject removeMessageSubject;
Jonathan Hartaaa56572015-01-28 21:56:35 -080080 private final MessageSubject antiEntropyAdvertisementSubject;
Jonathan Hartdb3af892015-01-26 13:19:07 -080081
Jonathan Hartaaa56572015-01-28 21:56:35 -080082 private final Set<EventuallyConsistentMapListener<K, V>> listeners
Jonathan Hartdb3af892015-01-26 13:19:07 -080083 = new CopyOnWriteArraySet<>();
84
85 private final ExecutorService executor;
86
87 private final ScheduledExecutorService backgroundExecutor;
88
Madan Jampanib28e4ad2015-02-19 12:31:37 -080089 private ExecutorService broadcastMessageExecutor;
Madan Jampani28726282015-02-19 11:40:23 -080090
Jonathan Hartdb3af892015-01-26 13:19:07 -080091 private volatile boolean destroyed = false;
Jonathan Hart539a6462015-01-27 17:05:43 -080092 private static final String ERROR_DESTROYED = " map is already destroyed";
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -080093 private final String destroyedMessage;
Jonathan Hartdb3af892015-01-26 13:19:07 -080094
Jonathan Hart4f397e82015-02-04 09:10:41 -080095 private static final String ERROR_NULL_KEY = "Key cannot be null";
96 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
97
Jonathan Hartdb3af892015-01-26 13:19:07 -080098 // TODO: Make these anti-entropy params configurable
99 private long initialDelaySec = 5;
100 private long periodSec = 5;
101
102 /**
103 * Creates a new eventually consistent map shared amongst multiple instances.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800104 * <p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800105 * Each map is identified by a string map name. EventuallyConsistentMapImpl
106 * objects in different JVMs that use the same map name will form a
107 * distributed map across JVMs (provided the cluster service is aware of
108 * both nodes).
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800109 * </p>
110 * <p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800111 * The client is expected to provide an
112 * {@link org.onlab.util.KryoNamespace.Builder} with which all classes that
113 * will be stored in this map have been registered (including referenced
114 * classes). This serializer will be used to serialize both K and V for
115 * inter-node notifications.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800116 * </p>
117 * <p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800118 * The client must provide an {@link org.onosproject.store.impl.ClockService}
119 * which can generate timestamps for a given key. The clock service is free
120 * to generate timestamps however it wishes, however these timestamps will
121 * be used to serialize updates to the map so they must be strict enough
122 * to ensure updates are properly ordered for the use case (i.e. in some
123 * cases wallclock time will suffice, whereas in other cases logical time
124 * will be necessary).
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800125 * </p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800126 *
127 * @param mapName a String identifier for the map.
128 * @param clusterService the cluster service
129 * @param clusterCommunicator the cluster communications service
130 * @param serializerBuilder a Kryo namespace builder that can serialize
131 * both K and V
132 * @param clockService a clock service able to generate timestamps
133 * for K
134 */
135 public EventuallyConsistentMapImpl(String mapName,
136 ClusterService clusterService,
137 ClusterCommunicationService clusterCommunicator,
138 KryoNamespace.Builder serializerBuilder,
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800139 ClockService<K, V> clockService) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800140 this.clusterService = checkNotNull(clusterService);
141 this.clusterCommunicator = checkNotNull(clusterCommunicator);
142
143 serializer = createSerializer(checkNotNull(serializerBuilder));
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800144 destroyedMessage = mapName + ERROR_DESTROYED;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800145
146 this.clockService = checkNotNull(clockService);
147
148 items = new ConcurrentHashMap<>();
149 removedItems = new ConcurrentHashMap<>();
150
Brian O'Connor4b2ba5f2015-02-18 20:54:00 -0800151 executor = Executors //FIXME
152 .newFixedThreadPool(4, groupedThreads("onos/ecm", mapName + "-fg-%d"));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800153
Madan Jampani28726282015-02-19 11:40:23 -0800154 broadcastMessageExecutor = Executors.newSingleThreadExecutor(groupedThreads("onos/ecm", mapName + "-notify"));
155
Jonathan Hartdb3af892015-01-26 13:19:07 -0800156 backgroundExecutor =
157 newSingleThreadScheduledExecutor(minPriority(
Brian O'Connor4b2ba5f2015-02-18 20:54:00 -0800158 groupedThreads("onos/ecm", mapName + "-bg-%d")));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800159
Jonathan Hartaaa56572015-01-28 21:56:35 -0800160 // start anti-entropy thread
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800161 backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
162 initialDelaySec, periodSec,
163 TimeUnit.SECONDS);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800164
Jonathan Hartdb3af892015-01-26 13:19:07 -0800165 updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
166 clusterCommunicator.addSubscriber(updateMessageSubject,
Madan Jampani2af244a2015-02-22 13:12:01 -0800167 new InternalPutEventListener(), executor);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800168 removeMessageSubject = new MessageSubject("ecm-" + mapName + "-remove");
169 clusterCommunicator.addSubscriber(removeMessageSubject,
Madan Jampani2af244a2015-02-22 13:12:01 -0800170 new InternalRemoveEventListener(), executor);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800171 antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
172 clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
Madan Jampani2af244a2015-02-22 13:12:01 -0800173 new InternalAntiEntropyListener(), backgroundExecutor);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800174 }
175
176 private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
177 return new KryoSerializer() {
178 @Override
179 protected void setupKryoPool() {
180 // Add the map's internal helper classes to the user-supplied serializer
181 serializerPool = builder
182 .register(WallClockTimestamp.class)
183 .register(PutEntry.class)
Jonathan Hart539a6462015-01-27 17:05:43 -0800184 .register(RemoveEntry.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800185 .register(ArrayList.class)
186 .register(InternalPutEvent.class)
187 .register(InternalRemoveEvent.class)
Jonathan Hartaaa56572015-01-28 21:56:35 -0800188 .register(AntiEntropyAdvertisement.class)
189 .register(HashMap.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800190 .build();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800191 }
192 };
193 }
194
Madan Jampanib28e4ad2015-02-19 12:31:37 -0800195 /**
196 * Sets the executor to use for broadcasting messages and returns this
197 * instance for method chaining.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800198 *
Madan Jampanib28e4ad2015-02-19 12:31:37 -0800199 * @param executor executor service
200 * @return this instance
201 */
202 public EventuallyConsistentMapImpl<K, V> withBroadcastMessageExecutor(ExecutorService executor) {
203 checkNotNull(executor, "Null executor");
204 broadcastMessageExecutor = executor;
205 return this;
206 }
207
Jonathan Hartdb3af892015-01-26 13:19:07 -0800208 @Override
209 public int size() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800210 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800211 return items.size();
212 }
213
214 @Override
215 public boolean isEmpty() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800216 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800217 return items.isEmpty();
218 }
219
220 @Override
221 public boolean containsKey(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800222 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800223 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800224 return items.containsKey(key);
225 }
226
227 @Override
228 public boolean containsValue(V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800229 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800230 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800231
232 return items.values().stream()
233 .anyMatch(timestamped -> timestamped.value().equals(value));
234 }
235
236 @Override
237 public V get(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800238 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800239 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800240
241 Timestamped<V> value = items.get(key);
242 if (value != null) {
243 return value.value();
244 }
245 return null;
246 }
247
248 @Override
249 public void put(K key, V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800250 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800251 checkNotNull(key, ERROR_NULL_KEY);
252 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800253
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800254 Timestamp timestamp = clockService.getTimestamp(key, value);
255
Jonathan Hartdb3af892015-01-26 13:19:07 -0800256 if (putInternal(key, value, timestamp)) {
257 notifyPeers(new InternalPutEvent<>(key, value, timestamp));
258 EventuallyConsistentMapEvent<K, V> externalEvent
259 = new EventuallyConsistentMapEvent<>(
260 EventuallyConsistentMapEvent.Type.PUT, key, value);
261 notifyListeners(externalEvent);
262 }
263 }
264
265 private boolean putInternal(K key, V value, Timestamp timestamp) {
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800266 Timestamp removed = removedItems.get(key);
Jonathan Hart403ea932015-02-20 16:23:00 -0800267 if (removed != null && removed.isNewerThan(timestamp)) {
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800268 log.debug("ecmap - removed was newer {}", value);
269 return false;
270 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800271
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800272 final MutableBoolean updated = new MutableBoolean(false);
273
274 items.compute(key, (k, existing) -> {
Jonathan Hart403ea932015-02-20 16:23:00 -0800275 if (existing != null && existing.isNewerThan(timestamp)) {
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800276 updated.setFalse();
277 return existing;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800278 } else {
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800279 updated.setTrue();
280 return new Timestamped<>(value, timestamp);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800281 }
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800282 });
283
284 boolean success = updated.booleanValue();
285 if (!success) {
286 log.debug("ecmap - existing was newer {}", value);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800287 }
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800288
289 if (success && removed != null) {
290 removedItems.remove(key, removed);
291 }
292 return success;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800293 }
294
295 @Override
296 public void remove(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800297 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800298 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800299
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800300 // TODO prevent calls here if value is important for timestamp
301 Timestamp timestamp = clockService.getTimestamp(key, null);
302
Jonathan Hartdb3af892015-01-26 13:19:07 -0800303 if (removeInternal(key, timestamp)) {
304 notifyPeers(new InternalRemoveEvent<>(key, timestamp));
305 EventuallyConsistentMapEvent<K, V> externalEvent
306 = new EventuallyConsistentMapEvent<>(
307 EventuallyConsistentMapEvent.Type.REMOVE, key, null);
308 notifyListeners(externalEvent);
309 }
310 }
311
312 private boolean removeInternal(K key, Timestamp timestamp) {
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800313 final MutableBoolean updated = new MutableBoolean(false);
314
315 items.compute(key, (k, existing) -> {
316 if (existing != null && existing.isNewerThan(timestamp)) {
317 updated.setFalse();
318 return existing;
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800319 } else {
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800320 updated.setTrue();
321 // remove from items map
322 return null;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800323 }
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800324 });
325
326 if (updated.isFalse()) {
327 return false;
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800328 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800329
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800330 Timestamp removedTimestamp = removedItems.get(key);
331 if (removedTimestamp == null) {
332 return removedItems.putIfAbsent(key, timestamp) == null;
Jonathan Hart403ea932015-02-20 16:23:00 -0800333 } else if (timestamp.isNewerThan(removedTimestamp)) {
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800334 return removedItems.replace(key, removedTimestamp, timestamp);
335 } else {
336 return false;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800337 }
338 }
339
340 @Override
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800341 public void remove(K key, V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800342 checkState(!destroyed, destroyedMessage);
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800343 checkNotNull(key, ERROR_NULL_KEY);
344 checkNotNull(value, ERROR_NULL_VALUE);
345
346 Timestamp timestamp = clockService.getTimestamp(key, value);
347
348 if (removeInternal(key, timestamp)) {
349 notifyPeers(new InternalRemoveEvent<>(key, timestamp));
350 EventuallyConsistentMapEvent<K, V> externalEvent
351 = new EventuallyConsistentMapEvent<>(
352 EventuallyConsistentMapEvent.Type.REMOVE, key, value);
353 notifyListeners(externalEvent);
354 }
355 }
356
357 @Override
Jonathan Hartdb3af892015-01-26 13:19:07 -0800358 public void putAll(Map<? extends K, ? extends V> m) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800359 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800360
361 List<PutEntry<K, V>> updates = new ArrayList<>(m.size());
362
363 for (Map.Entry<? extends K, ? extends V> entry : m.entrySet()) {
364 K key = entry.getKey();
365 V value = entry.getValue();
Jonathan Hart4f397e82015-02-04 09:10:41 -0800366
367 checkNotNull(key, ERROR_NULL_KEY);
368 checkNotNull(value, ERROR_NULL_VALUE);
369
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800370 Timestamp timestamp = clockService.getTimestamp(key, value);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800371
372 if (putInternal(key, value, timestamp)) {
373 updates.add(new PutEntry<>(key, value, timestamp));
374 }
375 }
376
Jonathan Hart584d2f32015-01-27 19:46:14 -0800377 if (!updates.isEmpty()) {
378 notifyPeers(new InternalPutEvent<>(updates));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800379
Jonathan Hart584d2f32015-01-27 19:46:14 -0800380 for (PutEntry<K, V> entry : updates) {
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800381 EventuallyConsistentMapEvent<K, V> externalEvent =
382 new EventuallyConsistentMapEvent<>(
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800383 EventuallyConsistentMapEvent.Type.PUT, entry.key(),
384 entry.value());
Jonathan Hart584d2f32015-01-27 19:46:14 -0800385 notifyListeners(externalEvent);
386 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800387 }
388 }
389
390 @Override
391 public void clear() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800392 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800393
394 List<RemoveEntry<K>> removed = new ArrayList<>(items.size());
395
396 for (K key : items.keySet()) {
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800397 // TODO also this is not applicable if value is important for timestamp?
398 Timestamp timestamp = clockService.getTimestamp(key, null);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800399
400 if (removeInternal(key, timestamp)) {
401 removed.add(new RemoveEntry<>(key, timestamp));
402 }
403 }
404
Jonathan Hart584d2f32015-01-27 19:46:14 -0800405 if (!removed.isEmpty()) {
406 notifyPeers(new InternalRemoveEvent<>(removed));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800407
Jonathan Hart584d2f32015-01-27 19:46:14 -0800408 for (RemoveEntry<K> entry : removed) {
409 EventuallyConsistentMapEvent<K, V> externalEvent
410 = new EventuallyConsistentMapEvent<>(
411 EventuallyConsistentMapEvent.Type.REMOVE, entry.key(),
412 null);
413 notifyListeners(externalEvent);
414 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800415 }
416 }
417
418 @Override
419 public Set<K> keySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800420 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800421
422 return items.keySet();
423 }
424
425 @Override
426 public Collection<V> values() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800427 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800428
429 return items.values().stream()
430 .map(Timestamped::value)
431 .collect(Collectors.toList());
432 }
433
434 @Override
435 public Set<Map.Entry<K, V>> entrySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800436 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800437
438 return items.entrySet().stream()
Jonathan Hartf9108232015-02-02 16:37:35 -0800439 .map(e -> Pair.of(e.getKey(), e.getValue().value()))
Jonathan Hartdb3af892015-01-26 13:19:07 -0800440 .collect(Collectors.toSet());
441 }
442
443 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800444 public void addListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800445 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800446
447 listeners.add(checkNotNull(listener));
448 }
449
450 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800451 public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800452 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800453
454 listeners.remove(checkNotNull(listener));
455 }
456
457 @Override
458 public void destroy() {
459 destroyed = true;
460
461 executor.shutdown();
462 backgroundExecutor.shutdown();
Madan Jampani337bb442015-02-19 14:29:18 -0800463 broadcastMessageExecutor.shutdown();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800464
Jonathan Hart584d2f32015-01-27 19:46:14 -0800465 listeners.clear();
466
Jonathan Hartdb3af892015-01-26 13:19:07 -0800467 clusterCommunicator.removeSubscriber(updateMessageSubject);
468 clusterCommunicator.removeSubscriber(removeMessageSubject);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800469 clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800470 }
471
Jonathan Hartaaa56572015-01-28 21:56:35 -0800472 private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
473 for (EventuallyConsistentMapListener<K, V> listener : listeners) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800474 listener.event(event);
475 }
476 }
477
478 private void notifyPeers(InternalPutEvent event) {
Brian O'Connorb2894222015-02-20 22:05:19 -0800479 // FIXME extremely memory expensive when we are overrun
480// broadcastMessageExecutor.execute(() -> broadcastMessage(updateMessageSubject, event));
481 broadcastMessage(updateMessageSubject, event);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800482 }
483
484 private void notifyPeers(InternalRemoveEvent event) {
Brian O'Connorb2894222015-02-20 22:05:19 -0800485 // FIXME extremely memory expensive when we are overrun
486// broadcastMessageExecutor.execute(() -> broadcastMessage(removeMessageSubject, event));
487 broadcastMessage(removeMessageSubject, event);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800488 }
489
Jonathan Hart7d656f42015-01-27 14:07:23 -0800490 private void broadcastMessage(MessageSubject subject, Object event) {
Brian O'Connorb2894222015-02-20 22:05:19 -0800491 // FIXME can we parallelize the serialization... use the caller???
Jonathan Hartdb3af892015-01-26 13:19:07 -0800492 ClusterMessage message = new ClusterMessage(
493 clusterService.getLocalNode().id(),
494 subject,
495 serializer.encode(event));
Brian O'Connorb2894222015-02-20 22:05:19 -0800496 //broadcastMessageExecutor.execute(() -> clusterCommunicator.broadcast(message));
Madan Jampani337bb442015-02-19 14:29:18 -0800497 clusterCommunicator.broadcast(message);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800498 }
499
500 private void unicastMessage(NodeId peer,
501 MessageSubject subject,
502 Object event) throws IOException {
503 ClusterMessage message = new ClusterMessage(
504 clusterService.getLocalNode().id(),
505 subject,
506 serializer.encode(event));
507 clusterCommunicator.unicast(message, peer);
508 }
509
Jonathan Hartaaa56572015-01-28 21:56:35 -0800510 private final class SendAdvertisementTask implements Runnable {
511 @Override
512 public void run() {
513 if (Thread.currentThread().isInterrupted()) {
514 log.info("Interrupted, quitting");
515 return;
516 }
517
518 try {
519 final NodeId self = clusterService.getLocalNode().id();
520 Set<ControllerNode> nodes = clusterService.getNodes();
521
522 List<NodeId> nodeIds = nodes.stream()
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800523 .map(ControllerNode::id)
Jonathan Hartaaa56572015-01-28 21:56:35 -0800524 .collect(Collectors.toList());
525
526 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
527 log.trace("No other peers in the cluster.");
528 return;
529 }
530
531 NodeId peer;
532 do {
533 int idx = RandomUtils.nextInt(0, nodeIds.size());
534 peer = nodeIds.get(idx);
535 } while (peer.equals(self));
536
537 if (Thread.currentThread().isInterrupted()) {
538 log.info("Interrupted, quitting");
539 return;
540 }
541
542 AntiEntropyAdvertisement<K> ad = createAdvertisement();
543
544 try {
545 unicastMessage(peer, antiEntropyAdvertisementSubject, ad);
546 } catch (IOException e) {
547 log.debug("Failed to send anti-entropy advertisement to {}", peer);
548 }
549 } catch (Exception e) {
550 // Catch all exceptions to avoid scheduled task being suppressed.
551 log.error("Exception thrown while sending advertisement", e);
552 }
553 }
554 }
555
556 private AntiEntropyAdvertisement<K> createAdvertisement() {
557 final NodeId self = clusterService.getLocalNode().id();
558
559 Map<K, Timestamp> timestamps = new HashMap<>(items.size());
560
561 items.forEach((key, value) -> timestamps.put(key, value.timestamp()));
562
563 Map<K, Timestamp> tombstones = new HashMap<>(removedItems);
564
565 return new AntiEntropyAdvertisement<>(self, timestamps, tombstones);
566 }
567
568 private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
569 List<EventuallyConsistentMapEvent<K, V>> externalEvents;
Jonathan Hart3469e602015-02-19 13:50:27 -0800570 boolean sync = false;
Jonathan Hartaaa56572015-01-28 21:56:35 -0800571
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800572 externalEvents = antiEntropyCheckLocalItems(ad);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800573
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800574 antiEntropyCheckLocalRemoved(ad);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800575
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800576 externalEvents.addAll(antiEntropyCheckRemoteRemoved(ad));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800577
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800578 // if remote ad has something unknown, actively sync
579 for (K key : ad.timestamps().keySet()) {
580 if (!items.containsKey(key)) {
581 sync = true;
582 break;
Jonathan Hartaaa56572015-01-28 21:56:35 -0800583 }
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800584 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800585
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800586 // Send the advertisement back if this peer is out-of-sync
Jonathan Hart3469e602015-02-19 13:50:27 -0800587 if (sync) {
588 final NodeId sender = ad.sender();
589 AntiEntropyAdvertisement<K> myAd = createAdvertisement();
590 try {
591 unicastMessage(sender, antiEntropyAdvertisementSubject, myAd);
592 } catch (IOException e) {
593 log.debug(
594 "Failed to send reactive anti-entropy advertisement to {}",
595 sender);
596 }
597 }
598
Jonathan Hartaaa56572015-01-28 21:56:35 -0800599 externalEvents.forEach(this::notifyListeners);
600 }
601
602 /**
603 * Checks if any of the remote's live items or tombstones are out of date
604 * according to our local live item list, or if our live items are out of
605 * date according to the remote's tombstone list.
606 * If the local copy is more recent, it will be pushed to the remote. If the
607 * remote has a more recent remove, we apply that to the local state.
608 *
609 * @param ad remote anti-entropy advertisement
610 * @return list of external events relating to local operations performed
611 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800612 private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
613 AntiEntropyAdvertisement<K> ad) {
614 final List<EventuallyConsistentMapEvent<K, V>> externalEvents
615 = new LinkedList<>();
616 final NodeId sender = ad.sender();
617
618 final List<PutEntry<K, V>> updatesToSend = new ArrayList<>();
619
620 for (Map.Entry<K, Timestamped<V>> item : items.entrySet()) {
621 K key = item.getKey();
622 Timestamped<V> localValue = item.getValue();
623
624 Timestamp remoteTimestamp = ad.timestamps().get(key);
625 if (remoteTimestamp == null) {
626 remoteTimestamp = ad.tombstones().get(key);
627 }
628 if (remoteTimestamp == null || localValue
Jonathan Hart403ea932015-02-20 16:23:00 -0800629 .isNewerThan(remoteTimestamp)) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800630 // local value is more recent, push to sender
631 updatesToSend
632 .add(new PutEntry<>(key, localValue.value(),
633 localValue.timestamp()));
634 }
635
636 Timestamp remoteDeadTimestamp = ad.tombstones().get(key);
637 if (remoteDeadTimestamp != null &&
Jonathan Hart403ea932015-02-20 16:23:00 -0800638 remoteDeadTimestamp.isNewerThan(localValue.timestamp())) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800639 // sender has a more recent remove
640 if (removeInternal(key, remoteDeadTimestamp)) {
641 externalEvents.add(new EventuallyConsistentMapEvent<>(
642 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
643 }
644 }
645 }
646
647 // Send all updates to the peer at once
648 if (!updatesToSend.isEmpty()) {
649 try {
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800650 unicastMessage(sender, updateMessageSubject,
651 new InternalPutEvent<>(updatesToSend));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800652 } catch (IOException e) {
653 log.warn("Failed to send advertisement response", e);
654 }
655 }
656
657 return externalEvents;
658 }
659
660 /**
661 * Checks if any items in the remote live list are out of date according
662 * to our tombstone list. If we find we have a more up to date tombstone,
663 * we'll send it to the remote.
664 *
665 * @param ad remote anti-entropy advertisement
666 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800667 private void antiEntropyCheckLocalRemoved(AntiEntropyAdvertisement<K> ad) {
668 final NodeId sender = ad.sender();
669
670 final List<RemoveEntry<K>> removesToSend = new ArrayList<>();
671
672 for (Map.Entry<K, Timestamp> dead : removedItems.entrySet()) {
673 K key = dead.getKey();
674 Timestamp localDeadTimestamp = dead.getValue();
675
676 Timestamp remoteLiveTimestamp = ad.timestamps().get(key);
677 if (remoteLiveTimestamp != null
Jonathan Hart403ea932015-02-20 16:23:00 -0800678 && localDeadTimestamp.isNewerThan(remoteLiveTimestamp)) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800679 // sender has zombie, push remove
680 removesToSend
681 .add(new RemoveEntry<>(key, localDeadTimestamp));
682 }
683 }
684
685 // Send all removes to the peer at once
686 if (!removesToSend.isEmpty()) {
687 try {
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800688 unicastMessage(sender, removeMessageSubject,
689 new InternalRemoveEvent<>(removesToSend));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800690 } catch (IOException e) {
691 log.warn("Failed to send advertisement response", e);
692 }
693 }
694 }
695
696 /**
697 * Checks if any of the local live items are out of date according to the
698 * remote's tombstone advertisements. If we find a local item is out of date,
699 * we'll apply the remove operation to the local state.
700 *
701 * @param ad remote anti-entropy advertisement
702 * @return list of external events relating to local operations performed
703 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800704 private List<EventuallyConsistentMapEvent<K, V>>
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800705 antiEntropyCheckRemoteRemoved(AntiEntropyAdvertisement<K> ad) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800706 final List<EventuallyConsistentMapEvent<K, V>> externalEvents
707 = new LinkedList<>();
708
709 for (Map.Entry<K, Timestamp> remoteDead : ad.tombstones().entrySet()) {
710 K key = remoteDead.getKey();
711 Timestamp remoteDeadTimestamp = remoteDead.getValue();
712
713 Timestamped<V> local = items.get(key);
714 Timestamp localDead = removedItems.get(key);
Jonathan Hart403ea932015-02-20 16:23:00 -0800715 if (local != null && remoteDeadTimestamp.isNewerThan(
716 local.timestamp())) {
717 // If the remote has a more recent tombstone than either our local
718 // value, then do a remove with their timestamp
Jonathan Hartaaa56572015-01-28 21:56:35 -0800719 if (removeInternal(key, remoteDeadTimestamp)) {
720 externalEvents.add(new EventuallyConsistentMapEvent<>(
721 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
722 }
Jonathan Hart403ea932015-02-20 16:23:00 -0800723 } else if (localDead != null && remoteDeadTimestamp.isNewerThan(
724 localDead)) {
725 // If the remote has a more recent tombstone than us, update ours
726 // to their timestamp
727 removeInternal(key, remoteDeadTimestamp);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800728 }
729 }
730
731 return externalEvents;
732 }
733
734 private final class InternalAntiEntropyListener
735 implements ClusterMessageHandler {
736
737 @Override
738 public void handle(ClusterMessage message) {
Jonathan Hart4fd4ebb2015-02-04 17:38:48 -0800739 log.trace("Received anti-entropy advertisement from peer: {}",
740 message.sender());
Jonathan Hartaaa56572015-01-28 21:56:35 -0800741 AntiEntropyAdvertisement<K> advertisement = serializer.decode(message.payload());
Madan Jampani2af244a2015-02-22 13:12:01 -0800742 try {
743 handleAntiEntropyAdvertisement(advertisement);
744 } catch (Exception e) {
745 log.warn("Exception thrown handling advertisements", e);
746 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800747 }
748 }
749
Jonathan Hartdb3af892015-01-26 13:19:07 -0800750 private final class InternalPutEventListener implements
751 ClusterMessageHandler {
752 @Override
753 public void handle(ClusterMessage message) {
754 log.debug("Received put event from peer: {}", message.sender());
755 InternalPutEvent<K, V> event = serializer.decode(message.payload());
756
Madan Jampani2af244a2015-02-22 13:12:01 -0800757 try {
758 for (PutEntry<K, V> entry : event.entries()) {
759 K key = entry.key();
760 V value = entry.value();
761 Timestamp timestamp = entry.timestamp();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800762
Madan Jampani2af244a2015-02-22 13:12:01 -0800763 if (putInternal(key, value, timestamp)) {
764 EventuallyConsistentMapEvent<K, V> externalEvent =
765 new EventuallyConsistentMapEvent<>(
766 EventuallyConsistentMapEvent.Type.PUT, key,
767 value);
768 notifyListeners(externalEvent);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800769 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800770 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800771 } catch (Exception e) {
772 log.warn("Exception thrown handling put", e);
773 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800774 }
775 }
776
777 private final class InternalRemoveEventListener implements
778 ClusterMessageHandler {
779 @Override
780 public void handle(ClusterMessage message) {
781 log.debug("Received remove event from peer: {}", message.sender());
782 InternalRemoveEvent<K> event = serializer.decode(message.payload());
Madan Jampani2af244a2015-02-22 13:12:01 -0800783 try {
784 for (RemoveEntry<K> entry : event.entries()) {
785 K key = entry.key();
786 Timestamp timestamp = entry.timestamp();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800787
Madan Jampani2af244a2015-02-22 13:12:01 -0800788 if (removeInternal(key, timestamp)) {
789 EventuallyConsistentMapEvent<K, V> externalEvent
790 = new EventuallyConsistentMapEvent<>(
791 EventuallyConsistentMapEvent.Type.REMOVE,
792 key, null);
793 notifyListeners(externalEvent);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800794 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800795 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800796 } catch (Exception e) {
797 log.warn("Exception thrown handling remove", e);
798 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800799 }
800 }
801
Jonathan Hartdb3af892015-01-26 13:19:07 -0800802}