blob: eecb20d200bc014c57892a5784eadbf3d71bb92b [file] [log] [blame]
Jonathan Hartdb3af892015-01-26 13:19:07 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Jonathan Hart77bdd262015-02-03 09:07:48 -080016package org.onosproject.store.ecmap;
Jonathan Hartdb3af892015-01-26 13:19:07 -080017
Jonathan Hartaaa56572015-01-28 21:56:35 -080018import org.apache.commons.lang3.RandomUtils;
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -080019import org.apache.commons.lang3.mutable.MutableBoolean;
Jonathan Hartf9108232015-02-02 16:37:35 -080020import org.apache.commons.lang3.tuple.Pair;
Jonathan Hartdb3af892015-01-26 13:19:07 -080021import org.onlab.util.KryoNamespace;
22import org.onosproject.cluster.ClusterService;
Jonathan Hartaaa56572015-01-28 21:56:35 -080023import org.onosproject.cluster.ControllerNode;
Jonathan Hartdb3af892015-01-26 13:19:07 -080024import org.onosproject.cluster.NodeId;
25import org.onosproject.store.Timestamp;
26import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
27import org.onosproject.store.cluster.messaging.ClusterMessage;
28import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
29import org.onosproject.store.cluster.messaging.MessageSubject;
Jonathan Hart77bdd262015-02-03 09:07:48 -080030import org.onosproject.store.impl.ClockService;
31import org.onosproject.store.impl.Timestamped;
32import org.onosproject.store.impl.WallClockTimestamp;
Jonathan Hartdb3af892015-01-26 13:19:07 -080033import org.onosproject.store.serializers.KryoSerializer;
34import org.slf4j.Logger;
35import org.slf4j.LoggerFactory;
36
37import java.io.IOException;
38import java.util.ArrayList;
39import java.util.Collection;
Jonathan Hartaaa56572015-01-28 21:56:35 -080040import java.util.HashMap;
41import java.util.LinkedList;
Jonathan Hartdb3af892015-01-26 13:19:07 -080042import java.util.List;
43import java.util.Map;
44import java.util.Set;
45import java.util.concurrent.ConcurrentHashMap;
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -080046import java.util.concurrent.ConcurrentMap;
Jonathan Hartdb3af892015-01-26 13:19:07 -080047import java.util.concurrent.CopyOnWriteArraySet;
48import java.util.concurrent.ExecutorService;
49import java.util.concurrent.Executors;
50import java.util.concurrent.ScheduledExecutorService;
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -080051import java.util.concurrent.TimeUnit;
Jonathan Hartdb3af892015-01-26 13:19:07 -080052import java.util.stream.Collectors;
53
54import static com.google.common.base.Preconditions.checkNotNull;
55import static com.google.common.base.Preconditions.checkState;
56import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Brian O'Connorc6713a82015-02-24 11:55:48 -080057import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
Brian O'Connor4b2ba5f2015-02-18 20:54:00 -080058import static org.onlab.util.Tools.groupedThreads;
Jonathan Hartdb3af892015-01-26 13:19:07 -080059
60/**
61 * Distributed Map implementation which uses optimistic replication and gossip
62 * based techniques to provide an eventually consistent data store.
63 */
64public class EventuallyConsistentMapImpl<K, V>
65 implements EventuallyConsistentMap<K, V> {
66
67 private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
68
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -080069 private final ConcurrentMap<K, Timestamped<V>> items;
70 private final ConcurrentMap<K, Timestamp> removedItems;
Jonathan Hartdb3af892015-01-26 13:19:07 -080071
Jonathan Hartdb3af892015-01-26 13:19:07 -080072 private final ClusterService clusterService;
73 private final ClusterCommunicationService clusterCommunicator;
74 private final KryoSerializer serializer;
75
Jonathan Hart5ec32ba2015-02-05 13:33:58 -080076 private final ClockService<K, V> clockService;
Jonathan Hartdb3af892015-01-26 13:19:07 -080077
78 private final MessageSubject updateMessageSubject;
79 private final MessageSubject removeMessageSubject;
Jonathan Hartaaa56572015-01-28 21:56:35 -080080 private final MessageSubject antiEntropyAdvertisementSubject;
Jonathan Hartdb3af892015-01-26 13:19:07 -080081
Jonathan Hartaaa56572015-01-28 21:56:35 -080082 private final Set<EventuallyConsistentMapListener<K, V>> listeners
Jonathan Hartdb3af892015-01-26 13:19:07 -080083 = new CopyOnWriteArraySet<>();
84
85 private final ExecutorService executor;
86
87 private final ScheduledExecutorService backgroundExecutor;
88
Madan Jampanib28e4ad2015-02-19 12:31:37 -080089 private ExecutorService broadcastMessageExecutor;
Madan Jampani28726282015-02-19 11:40:23 -080090
Jonathan Hartdb3af892015-01-26 13:19:07 -080091 private volatile boolean destroyed = false;
Jonathan Hart539a6462015-01-27 17:05:43 -080092 private static final String ERROR_DESTROYED = " map is already destroyed";
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -080093 private final String destroyedMessage;
Jonathan Hartdb3af892015-01-26 13:19:07 -080094
Jonathan Hart4f397e82015-02-04 09:10:41 -080095 private static final String ERROR_NULL_KEY = "Key cannot be null";
96 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
97
Jonathan Hartdb3af892015-01-26 13:19:07 -080098 // TODO: Make these anti-entropy params configurable
99 private long initialDelaySec = 5;
100 private long periodSec = 5;
Jonathan Hartf893be82015-02-24 17:35:51 -0800101 private boolean lightweightAntiEntropy = true;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800102
103 /**
104 * Creates a new eventually consistent map shared amongst multiple instances.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800105 * <p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800106 * Each map is identified by a string map name. EventuallyConsistentMapImpl
107 * objects in different JVMs that use the same map name will form a
108 * distributed map across JVMs (provided the cluster service is aware of
109 * both nodes).
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800110 * </p>
111 * <p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800112 * The client is expected to provide an
113 * {@link org.onlab.util.KryoNamespace.Builder} with which all classes that
114 * will be stored in this map have been registered (including referenced
115 * classes). This serializer will be used to serialize both K and V for
116 * inter-node notifications.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800117 * </p>
118 * <p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800119 * The client must provide an {@link org.onosproject.store.impl.ClockService}
120 * which can generate timestamps for a given key. The clock service is free
121 * to generate timestamps however it wishes, however these timestamps will
122 * be used to serialize updates to the map so they must be strict enough
123 * to ensure updates are properly ordered for the use case (i.e. in some
124 * cases wallclock time will suffice, whereas in other cases logical time
125 * will be necessary).
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800126 * </p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800127 *
128 * @param mapName a String identifier for the map.
129 * @param clusterService the cluster service
130 * @param clusterCommunicator the cluster communications service
131 * @param serializerBuilder a Kryo namespace builder that can serialize
132 * both K and V
133 * @param clockService a clock service able to generate timestamps
134 * for K
135 */
136 public EventuallyConsistentMapImpl(String mapName,
137 ClusterService clusterService,
138 ClusterCommunicationService clusterCommunicator,
139 KryoNamespace.Builder serializerBuilder,
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800140 ClockService<K, V> clockService) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800141 this.clusterService = checkNotNull(clusterService);
142 this.clusterCommunicator = checkNotNull(clusterCommunicator);
143
144 serializer = createSerializer(checkNotNull(serializerBuilder));
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800145 destroyedMessage = mapName + ERROR_DESTROYED;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800146
147 this.clockService = checkNotNull(clockService);
148
149 items = new ConcurrentHashMap<>();
150 removedItems = new ConcurrentHashMap<>();
151
Brian O'Connorc6713a82015-02-24 11:55:48 -0800152 // should be a normal executor; it's used for receiving messages
153 //TODO make # of threads configurable
154 executor = Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d"));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800155
Brian O'Connorc6713a82015-02-24 11:55:48 -0800156 // sending executor; should be capped
157 //TODO make # of threads configurable
158 broadcastMessageExecutor = //newSingleThreadExecutor(groupedThreads("onos/ecm", mapName + "-notify"));
159 newFixedThreadPool(4, groupedThreads("onos/ecm", mapName + "-notify"));
Madan Jampani28726282015-02-19 11:40:23 -0800160
Jonathan Hartdb3af892015-01-26 13:19:07 -0800161 backgroundExecutor =
Brian O'Connorc6713a82015-02-24 11:55:48 -0800162 //FIXME anti-entropy can take >60 seconds and it blocks fg workers
163 // ... dropping minPriority to try to help until this can be parallel
164 newSingleThreadScheduledExecutor(//minPriority(
165 groupedThreads("onos/ecm", mapName + "-bg-%d"))/*)*/;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800166
Jonathan Hartaaa56572015-01-28 21:56:35 -0800167 // start anti-entropy thread
Brian O'Connorc6713a82015-02-24 11:55:48 -0800168 //TODO disable anti-entropy for now in testing (it is unstable)
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800169 backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
170 initialDelaySec, periodSec,
171 TimeUnit.SECONDS);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800172
Jonathan Hartdb3af892015-01-26 13:19:07 -0800173 updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
174 clusterCommunicator.addSubscriber(updateMessageSubject,
Madan Jampani2af244a2015-02-22 13:12:01 -0800175 new InternalPutEventListener(), executor);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800176 removeMessageSubject = new MessageSubject("ecm-" + mapName + "-remove");
177 clusterCommunicator.addSubscriber(removeMessageSubject,
Madan Jampani2af244a2015-02-22 13:12:01 -0800178 new InternalRemoveEventListener(), executor);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800179 antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
180 clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
Madan Jampani2af244a2015-02-22 13:12:01 -0800181 new InternalAntiEntropyListener(), backgroundExecutor);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800182 }
183
184 private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
185 return new KryoSerializer() {
186 @Override
187 protected void setupKryoPool() {
188 // Add the map's internal helper classes to the user-supplied serializer
189 serializerPool = builder
190 .register(WallClockTimestamp.class)
191 .register(PutEntry.class)
Jonathan Hart539a6462015-01-27 17:05:43 -0800192 .register(RemoveEntry.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800193 .register(ArrayList.class)
194 .register(InternalPutEvent.class)
195 .register(InternalRemoveEvent.class)
Jonathan Hartaaa56572015-01-28 21:56:35 -0800196 .register(AntiEntropyAdvertisement.class)
197 .register(HashMap.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800198 .build();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800199 }
200 };
201 }
202
Madan Jampanib28e4ad2015-02-19 12:31:37 -0800203 /**
204 * Sets the executor to use for broadcasting messages and returns this
205 * instance for method chaining.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800206 *
Madan Jampanib28e4ad2015-02-19 12:31:37 -0800207 * @param executor executor service
208 * @return this instance
209 */
210 public EventuallyConsistentMapImpl<K, V> withBroadcastMessageExecutor(ExecutorService executor) {
211 checkNotNull(executor, "Null executor");
212 broadcastMessageExecutor = executor;
213 return this;
214 }
215
Jonathan Hartdb3af892015-01-26 13:19:07 -0800216 @Override
217 public int size() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800218 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800219 return items.size();
220 }
221
222 @Override
223 public boolean isEmpty() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800224 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800225 return items.isEmpty();
226 }
227
228 @Override
229 public boolean containsKey(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800230 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800231 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800232 return items.containsKey(key);
233 }
234
235 @Override
236 public boolean containsValue(V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800237 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800238 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800239
240 return items.values().stream()
241 .anyMatch(timestamped -> timestamped.value().equals(value));
242 }
243
244 @Override
245 public V get(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800246 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800247 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800248
249 Timestamped<V> value = items.get(key);
250 if (value != null) {
251 return value.value();
252 }
253 return null;
254 }
255
256 @Override
257 public void put(K key, V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800258 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800259 checkNotNull(key, ERROR_NULL_KEY);
260 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800261
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800262 Timestamp timestamp = clockService.getTimestamp(key, value);
263
Jonathan Hartdb3af892015-01-26 13:19:07 -0800264 if (putInternal(key, value, timestamp)) {
265 notifyPeers(new InternalPutEvent<>(key, value, timestamp));
266 EventuallyConsistentMapEvent<K, V> externalEvent
267 = new EventuallyConsistentMapEvent<>(
268 EventuallyConsistentMapEvent.Type.PUT, key, value);
269 notifyListeners(externalEvent);
270 }
271 }
272
273 private boolean putInternal(K key, V value, Timestamp timestamp) {
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800274 Timestamp removed = removedItems.get(key);
Jonathan Hart403ea932015-02-20 16:23:00 -0800275 if (removed != null && removed.isNewerThan(timestamp)) {
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800276 log.debug("ecmap - removed was newer {}", value);
277 return false;
278 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800279
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800280 final MutableBoolean updated = new MutableBoolean(false);
281
282 items.compute(key, (k, existing) -> {
Jonathan Hart403ea932015-02-20 16:23:00 -0800283 if (existing != null && existing.isNewerThan(timestamp)) {
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800284 updated.setFalse();
285 return existing;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800286 } else {
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800287 updated.setTrue();
288 return new Timestamped<>(value, timestamp);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800289 }
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800290 });
291
292 boolean success = updated.booleanValue();
293 if (!success) {
294 log.debug("ecmap - existing was newer {}", value);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800295 }
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800296
297 if (success && removed != null) {
298 removedItems.remove(key, removed);
299 }
300 return success;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800301 }
302
303 @Override
304 public void remove(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800305 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800306 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800307
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800308 // TODO prevent calls here if value is important for timestamp
309 Timestamp timestamp = clockService.getTimestamp(key, null);
310
Jonathan Hartdb3af892015-01-26 13:19:07 -0800311 if (removeInternal(key, timestamp)) {
312 notifyPeers(new InternalRemoveEvent<>(key, timestamp));
313 EventuallyConsistentMapEvent<K, V> externalEvent
314 = new EventuallyConsistentMapEvent<>(
315 EventuallyConsistentMapEvent.Type.REMOVE, key, null);
316 notifyListeners(externalEvent);
317 }
318 }
319
320 private boolean removeInternal(K key, Timestamp timestamp) {
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800321 final MutableBoolean updated = new MutableBoolean(false);
322
323 items.compute(key, (k, existing) -> {
324 if (existing != null && existing.isNewerThan(timestamp)) {
325 updated.setFalse();
326 return existing;
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800327 } else {
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800328 updated.setTrue();
329 // remove from items map
330 return null;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800331 }
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800332 });
333
334 if (updated.isFalse()) {
335 return false;
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800336 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800337
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800338 Timestamp removedTimestamp = removedItems.get(key);
339 if (removedTimestamp == null) {
340 return removedItems.putIfAbsent(key, timestamp) == null;
Jonathan Hart403ea932015-02-20 16:23:00 -0800341 } else if (timestamp.isNewerThan(removedTimestamp)) {
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800342 return removedItems.replace(key, removedTimestamp, timestamp);
343 } else {
344 return false;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800345 }
346 }
347
348 @Override
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800349 public void remove(K key, V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800350 checkState(!destroyed, destroyedMessage);
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800351 checkNotNull(key, ERROR_NULL_KEY);
352 checkNotNull(value, ERROR_NULL_VALUE);
353
354 Timestamp timestamp = clockService.getTimestamp(key, value);
355
356 if (removeInternal(key, timestamp)) {
357 notifyPeers(new InternalRemoveEvent<>(key, timestamp));
358 EventuallyConsistentMapEvent<K, V> externalEvent
359 = new EventuallyConsistentMapEvent<>(
360 EventuallyConsistentMapEvent.Type.REMOVE, key, value);
361 notifyListeners(externalEvent);
362 }
363 }
364
365 @Override
Jonathan Hartdb3af892015-01-26 13:19:07 -0800366 public void putAll(Map<? extends K, ? extends V> m) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800367 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800368
369 List<PutEntry<K, V>> updates = new ArrayList<>(m.size());
370
371 for (Map.Entry<? extends K, ? extends V> entry : m.entrySet()) {
372 K key = entry.getKey();
373 V value = entry.getValue();
Jonathan Hart4f397e82015-02-04 09:10:41 -0800374
375 checkNotNull(key, ERROR_NULL_KEY);
376 checkNotNull(value, ERROR_NULL_VALUE);
377
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800378 Timestamp timestamp = clockService.getTimestamp(key, value);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800379
380 if (putInternal(key, value, timestamp)) {
381 updates.add(new PutEntry<>(key, value, timestamp));
382 }
383 }
384
Jonathan Hart584d2f32015-01-27 19:46:14 -0800385 if (!updates.isEmpty()) {
386 notifyPeers(new InternalPutEvent<>(updates));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800387
Jonathan Hart584d2f32015-01-27 19:46:14 -0800388 for (PutEntry<K, V> entry : updates) {
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800389 EventuallyConsistentMapEvent<K, V> externalEvent =
390 new EventuallyConsistentMapEvent<>(
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800391 EventuallyConsistentMapEvent.Type.PUT, entry.key(),
392 entry.value());
Jonathan Hart584d2f32015-01-27 19:46:14 -0800393 notifyListeners(externalEvent);
394 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800395 }
396 }
397
398 @Override
399 public void clear() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800400 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800401
402 List<RemoveEntry<K>> removed = new ArrayList<>(items.size());
403
404 for (K key : items.keySet()) {
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800405 // TODO also this is not applicable if value is important for timestamp?
406 Timestamp timestamp = clockService.getTimestamp(key, null);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800407
408 if (removeInternal(key, timestamp)) {
409 removed.add(new RemoveEntry<>(key, timestamp));
410 }
411 }
412
Jonathan Hart584d2f32015-01-27 19:46:14 -0800413 if (!removed.isEmpty()) {
414 notifyPeers(new InternalRemoveEvent<>(removed));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800415
Jonathan Hart584d2f32015-01-27 19:46:14 -0800416 for (RemoveEntry<K> entry : removed) {
417 EventuallyConsistentMapEvent<K, V> externalEvent
418 = new EventuallyConsistentMapEvent<>(
419 EventuallyConsistentMapEvent.Type.REMOVE, entry.key(),
420 null);
421 notifyListeners(externalEvent);
422 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800423 }
424 }
425
426 @Override
427 public Set<K> keySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800428 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800429
430 return items.keySet();
431 }
432
433 @Override
434 public Collection<V> values() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800435 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800436
437 return items.values().stream()
438 .map(Timestamped::value)
439 .collect(Collectors.toList());
440 }
441
442 @Override
443 public Set<Map.Entry<K, V>> entrySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800444 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800445
446 return items.entrySet().stream()
Jonathan Hartf9108232015-02-02 16:37:35 -0800447 .map(e -> Pair.of(e.getKey(), e.getValue().value()))
Jonathan Hartdb3af892015-01-26 13:19:07 -0800448 .collect(Collectors.toSet());
449 }
450
451 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800452 public void addListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800453 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800454
455 listeners.add(checkNotNull(listener));
456 }
457
458 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800459 public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800460 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800461
462 listeners.remove(checkNotNull(listener));
463 }
464
465 @Override
466 public void destroy() {
467 destroyed = true;
468
469 executor.shutdown();
470 backgroundExecutor.shutdown();
Madan Jampani337bb442015-02-19 14:29:18 -0800471 broadcastMessageExecutor.shutdown();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800472
Jonathan Hart584d2f32015-01-27 19:46:14 -0800473 listeners.clear();
474
Jonathan Hartdb3af892015-01-26 13:19:07 -0800475 clusterCommunicator.removeSubscriber(updateMessageSubject);
476 clusterCommunicator.removeSubscriber(removeMessageSubject);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800477 clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800478 }
479
Jonathan Hartaaa56572015-01-28 21:56:35 -0800480 private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
481 for (EventuallyConsistentMapListener<K, V> listener : listeners) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800482 listener.event(event);
483 }
484 }
485
486 private void notifyPeers(InternalPutEvent event) {
Brian O'Connorb2894222015-02-20 22:05:19 -0800487 // FIXME extremely memory expensive when we are overrun
488// broadcastMessageExecutor.execute(() -> broadcastMessage(updateMessageSubject, event));
489 broadcastMessage(updateMessageSubject, event);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800490 }
491
492 private void notifyPeers(InternalRemoveEvent event) {
Brian O'Connorb2894222015-02-20 22:05:19 -0800493 // FIXME extremely memory expensive when we are overrun
494// broadcastMessageExecutor.execute(() -> broadcastMessage(removeMessageSubject, event));
495 broadcastMessage(removeMessageSubject, event);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800496 }
497
Jonathan Hart7d656f42015-01-27 14:07:23 -0800498 private void broadcastMessage(MessageSubject subject, Object event) {
Brian O'Connorb2894222015-02-20 22:05:19 -0800499 // FIXME can we parallelize the serialization... use the caller???
Jonathan Hartdb3af892015-01-26 13:19:07 -0800500 ClusterMessage message = new ClusterMessage(
501 clusterService.getLocalNode().id(),
502 subject,
503 serializer.encode(event));
Brian O'Connorc6713a82015-02-24 11:55:48 -0800504 broadcastMessageExecutor.execute(() -> clusterCommunicator.broadcast(message));
505// clusterCommunicator.broadcast(message);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800506 }
507
508 private void unicastMessage(NodeId peer,
509 MessageSubject subject,
510 Object event) throws IOException {
511 ClusterMessage message = new ClusterMessage(
512 clusterService.getLocalNode().id(),
513 subject,
514 serializer.encode(event));
515 clusterCommunicator.unicast(message, peer);
516 }
517
Jonathan Hartaaa56572015-01-28 21:56:35 -0800518 private final class SendAdvertisementTask implements Runnable {
519 @Override
520 public void run() {
521 if (Thread.currentThread().isInterrupted()) {
522 log.info("Interrupted, quitting");
523 return;
524 }
525
526 try {
527 final NodeId self = clusterService.getLocalNode().id();
528 Set<ControllerNode> nodes = clusterService.getNodes();
529
530 List<NodeId> nodeIds = nodes.stream()
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800531 .map(ControllerNode::id)
Jonathan Hartaaa56572015-01-28 21:56:35 -0800532 .collect(Collectors.toList());
533
534 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
535 log.trace("No other peers in the cluster.");
536 return;
537 }
538
539 NodeId peer;
540 do {
541 int idx = RandomUtils.nextInt(0, nodeIds.size());
542 peer = nodeIds.get(idx);
543 } while (peer.equals(self));
544
545 if (Thread.currentThread().isInterrupted()) {
546 log.info("Interrupted, quitting");
547 return;
548 }
549
550 AntiEntropyAdvertisement<K> ad = createAdvertisement();
551
552 try {
553 unicastMessage(peer, antiEntropyAdvertisementSubject, ad);
554 } catch (IOException e) {
555 log.debug("Failed to send anti-entropy advertisement to {}", peer);
556 }
557 } catch (Exception e) {
558 // Catch all exceptions to avoid scheduled task being suppressed.
559 log.error("Exception thrown while sending advertisement", e);
560 }
561 }
562 }
563
564 private AntiEntropyAdvertisement<K> createAdvertisement() {
565 final NodeId self = clusterService.getLocalNode().id();
566
567 Map<K, Timestamp> timestamps = new HashMap<>(items.size());
568
569 items.forEach((key, value) -> timestamps.put(key, value.timestamp()));
570
571 Map<K, Timestamp> tombstones = new HashMap<>(removedItems);
572
573 return new AntiEntropyAdvertisement<>(self, timestamps, tombstones);
574 }
575
576 private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
577 List<EventuallyConsistentMapEvent<K, V>> externalEvents;
578
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800579 externalEvents = antiEntropyCheckLocalItems(ad);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800580
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800581 antiEntropyCheckLocalRemoved(ad);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800582
Jonathan Hartf893be82015-02-24 17:35:51 -0800583 if (!lightweightAntiEntropy) {
584 externalEvents.addAll(antiEntropyCheckRemoteRemoved(ad));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800585
Jonathan Hartf893be82015-02-24 17:35:51 -0800586 // if remote ad has something unknown, actively sync
587 for (K key : ad.timestamps().keySet()) {
588 if (!items.containsKey(key)) {
589 // Send the advertisement back if this peer is out-of-sync
590 final NodeId sender = ad.sender();
591 AntiEntropyAdvertisement<K> myAd = createAdvertisement();
592 try {
593 unicastMessage(sender, antiEntropyAdvertisementSubject, myAd);
594 } catch (IOException e) {
595 log.debug(
596 "Failed to send reactive anti-entropy advertisement to {}",
597 sender);
598 }
599
600 break;
601 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800602 }
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800603 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800604 externalEvents.forEach(this::notifyListeners);
605 }
606
607 /**
608 * Checks if any of the remote's live items or tombstones are out of date
609 * according to our local live item list, or if our live items are out of
610 * date according to the remote's tombstone list.
611 * If the local copy is more recent, it will be pushed to the remote. If the
612 * remote has a more recent remove, we apply that to the local state.
613 *
614 * @param ad remote anti-entropy advertisement
615 * @return list of external events relating to local operations performed
616 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800617 private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
618 AntiEntropyAdvertisement<K> ad) {
619 final List<EventuallyConsistentMapEvent<K, V>> externalEvents
620 = new LinkedList<>();
621 final NodeId sender = ad.sender();
622
623 final List<PutEntry<K, V>> updatesToSend = new ArrayList<>();
624
625 for (Map.Entry<K, Timestamped<V>> item : items.entrySet()) {
626 K key = item.getKey();
627 Timestamped<V> localValue = item.getValue();
628
629 Timestamp remoteTimestamp = ad.timestamps().get(key);
630 if (remoteTimestamp == null) {
631 remoteTimestamp = ad.tombstones().get(key);
632 }
633 if (remoteTimestamp == null || localValue
Jonathan Hart403ea932015-02-20 16:23:00 -0800634 .isNewerThan(remoteTimestamp)) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800635 // local value is more recent, push to sender
636 updatesToSend
637 .add(new PutEntry<>(key, localValue.value(),
638 localValue.timestamp()));
639 }
640
641 Timestamp remoteDeadTimestamp = ad.tombstones().get(key);
642 if (remoteDeadTimestamp != null &&
Jonathan Hart403ea932015-02-20 16:23:00 -0800643 remoteDeadTimestamp.isNewerThan(localValue.timestamp())) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800644 // sender has a more recent remove
645 if (removeInternal(key, remoteDeadTimestamp)) {
646 externalEvents.add(new EventuallyConsistentMapEvent<>(
647 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
648 }
649 }
650 }
651
652 // Send all updates to the peer at once
653 if (!updatesToSend.isEmpty()) {
654 try {
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800655 unicastMessage(sender, updateMessageSubject,
656 new InternalPutEvent<>(updatesToSend));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800657 } catch (IOException e) {
658 log.warn("Failed to send advertisement response", e);
659 }
660 }
661
662 return externalEvents;
663 }
664
665 /**
666 * Checks if any items in the remote live list are out of date according
667 * to our tombstone list. If we find we have a more up to date tombstone,
668 * we'll send it to the remote.
669 *
670 * @param ad remote anti-entropy advertisement
671 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800672 private void antiEntropyCheckLocalRemoved(AntiEntropyAdvertisement<K> ad) {
673 final NodeId sender = ad.sender();
674
675 final List<RemoveEntry<K>> removesToSend = new ArrayList<>();
676
677 for (Map.Entry<K, Timestamp> dead : removedItems.entrySet()) {
678 K key = dead.getKey();
679 Timestamp localDeadTimestamp = dead.getValue();
680
681 Timestamp remoteLiveTimestamp = ad.timestamps().get(key);
682 if (remoteLiveTimestamp != null
Jonathan Hart403ea932015-02-20 16:23:00 -0800683 && localDeadTimestamp.isNewerThan(remoteLiveTimestamp)) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800684 // sender has zombie, push remove
685 removesToSend
686 .add(new RemoveEntry<>(key, localDeadTimestamp));
687 }
688 }
689
690 // Send all removes to the peer at once
691 if (!removesToSend.isEmpty()) {
692 try {
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800693 unicastMessage(sender, removeMessageSubject,
694 new InternalRemoveEvent<>(removesToSend));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800695 } catch (IOException e) {
696 log.warn("Failed to send advertisement response", e);
697 }
698 }
699 }
700
701 /**
702 * Checks if any of the local live items are out of date according to the
703 * remote's tombstone advertisements. If we find a local item is out of date,
704 * we'll apply the remove operation to the local state.
705 *
706 * @param ad remote anti-entropy advertisement
707 * @return list of external events relating to local operations performed
708 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800709 private List<EventuallyConsistentMapEvent<K, V>>
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800710 antiEntropyCheckRemoteRemoved(AntiEntropyAdvertisement<K> ad) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800711 final List<EventuallyConsistentMapEvent<K, V>> externalEvents
712 = new LinkedList<>();
713
714 for (Map.Entry<K, Timestamp> remoteDead : ad.tombstones().entrySet()) {
715 K key = remoteDead.getKey();
716 Timestamp remoteDeadTimestamp = remoteDead.getValue();
717
718 Timestamped<V> local = items.get(key);
719 Timestamp localDead = removedItems.get(key);
Jonathan Hart403ea932015-02-20 16:23:00 -0800720 if (local != null && remoteDeadTimestamp.isNewerThan(
721 local.timestamp())) {
722 // If the remote has a more recent tombstone than either our local
723 // value, then do a remove with their timestamp
Jonathan Hartaaa56572015-01-28 21:56:35 -0800724 if (removeInternal(key, remoteDeadTimestamp)) {
725 externalEvents.add(new EventuallyConsistentMapEvent<>(
726 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
727 }
Jonathan Hart403ea932015-02-20 16:23:00 -0800728 } else if (localDead != null && remoteDeadTimestamp.isNewerThan(
729 localDead)) {
730 // If the remote has a more recent tombstone than us, update ours
731 // to their timestamp
732 removeInternal(key, remoteDeadTimestamp);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800733 }
734 }
735
736 return externalEvents;
737 }
738
739 private final class InternalAntiEntropyListener
740 implements ClusterMessageHandler {
741
742 @Override
743 public void handle(ClusterMessage message) {
Jonathan Hart4fd4ebb2015-02-04 17:38:48 -0800744 log.trace("Received anti-entropy advertisement from peer: {}",
745 message.sender());
Jonathan Hartaaa56572015-01-28 21:56:35 -0800746 AntiEntropyAdvertisement<K> advertisement = serializer.decode(message.payload());
Madan Jampani2af244a2015-02-22 13:12:01 -0800747 try {
748 handleAntiEntropyAdvertisement(advertisement);
749 } catch (Exception e) {
750 log.warn("Exception thrown handling advertisements", e);
751 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800752 }
753 }
754
Jonathan Hartdb3af892015-01-26 13:19:07 -0800755 private final class InternalPutEventListener implements
756 ClusterMessageHandler {
757 @Override
758 public void handle(ClusterMessage message) {
759 log.debug("Received put event from peer: {}", message.sender());
760 InternalPutEvent<K, V> event = serializer.decode(message.payload());
761
Madan Jampani2af244a2015-02-22 13:12:01 -0800762 try {
763 for (PutEntry<K, V> entry : event.entries()) {
764 K key = entry.key();
765 V value = entry.value();
766 Timestamp timestamp = entry.timestamp();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800767
Madan Jampani2af244a2015-02-22 13:12:01 -0800768 if (putInternal(key, value, timestamp)) {
769 EventuallyConsistentMapEvent<K, V> externalEvent =
770 new EventuallyConsistentMapEvent<>(
771 EventuallyConsistentMapEvent.Type.PUT, key,
772 value);
773 notifyListeners(externalEvent);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800774 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800775 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800776 } catch (Exception e) {
777 log.warn("Exception thrown handling put", e);
778 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800779 }
780 }
781
782 private final class InternalRemoveEventListener implements
783 ClusterMessageHandler {
784 @Override
785 public void handle(ClusterMessage message) {
786 log.debug("Received remove event from peer: {}", message.sender());
787 InternalRemoveEvent<K> event = serializer.decode(message.payload());
Madan Jampani2af244a2015-02-22 13:12:01 -0800788 try {
789 for (RemoveEntry<K> entry : event.entries()) {
790 K key = entry.key();
791 Timestamp timestamp = entry.timestamp();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800792
Madan Jampani2af244a2015-02-22 13:12:01 -0800793 if (removeInternal(key, timestamp)) {
794 EventuallyConsistentMapEvent<K, V> externalEvent
795 = new EventuallyConsistentMapEvent<>(
796 EventuallyConsistentMapEvent.Type.REMOVE,
797 key, null);
798 notifyListeners(externalEvent);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800799 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800800 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800801 } catch (Exception e) {
802 log.warn("Exception thrown handling remove", e);
803 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800804 }
805 }
806
Jonathan Hartdb3af892015-01-26 13:19:07 -0800807}