blob: 8f99d0ee7284d5deac999533ab1320e2a34e9289 [file] [log] [blame]
Jonathan Hartdb3af892015-01-26 13:19:07 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Jonathan Hart77bdd262015-02-03 09:07:48 -080016package org.onosproject.store.ecmap;
Jonathan Hartdb3af892015-01-26 13:19:07 -080017
Jonathan Hartaaa56572015-01-28 21:56:35 -080018import org.apache.commons.lang3.RandomUtils;
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -080019import org.apache.commons.lang3.mutable.MutableBoolean;
Jonathan Hartf9108232015-02-02 16:37:35 -080020import org.apache.commons.lang3.tuple.Pair;
Jonathan Hartdb3af892015-01-26 13:19:07 -080021import org.onlab.util.KryoNamespace;
Jonathan Hart233a18a2015-03-02 17:24:58 -080022import org.onlab.util.SlidingWindowCounter;
Jonathan Hartdb3af892015-01-26 13:19:07 -080023import org.onosproject.cluster.ClusterService;
Jonathan Hartaaa56572015-01-28 21:56:35 -080024import org.onosproject.cluster.ControllerNode;
Jonathan Hartdb3af892015-01-26 13:19:07 -080025import org.onosproject.cluster.NodeId;
26import org.onosproject.store.Timestamp;
27import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
28import org.onosproject.store.cluster.messaging.ClusterMessage;
29import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
30import org.onosproject.store.cluster.messaging.MessageSubject;
Jonathan Hart77bdd262015-02-03 09:07:48 -080031import org.onosproject.store.impl.ClockService;
32import org.onosproject.store.impl.Timestamped;
33import org.onosproject.store.impl.WallClockTimestamp;
Jonathan Hartdb3af892015-01-26 13:19:07 -080034import org.onosproject.store.serializers.KryoSerializer;
35import org.slf4j.Logger;
36import org.slf4j.LoggerFactory;
37
Jonathan Hartdb3af892015-01-26 13:19:07 -080038import java.util.ArrayList;
39import java.util.Collection;
Jonathan Hartaaa56572015-01-28 21:56:35 -080040import java.util.HashMap;
41import java.util.LinkedList;
Jonathan Hartdb3af892015-01-26 13:19:07 -080042import java.util.List;
43import java.util.Map;
44import java.util.Set;
45import java.util.concurrent.ConcurrentHashMap;
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -080046import java.util.concurrent.ConcurrentMap;
Jonathan Hartdb3af892015-01-26 13:19:07 -080047import java.util.concurrent.CopyOnWriteArraySet;
48import java.util.concurrent.ExecutorService;
49import java.util.concurrent.Executors;
50import java.util.concurrent.ScheduledExecutorService;
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -080051import java.util.concurrent.TimeUnit;
Jonathan Hart233a18a2015-03-02 17:24:58 -080052import java.util.concurrent.atomic.AtomicLong;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080053import java.util.function.BiFunction;
Jonathan Hartdb3af892015-01-26 13:19:07 -080054import java.util.stream.Collectors;
55
56import static com.google.common.base.Preconditions.checkNotNull;
57import static com.google.common.base.Preconditions.checkState;
58import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Brian O'Connorc6713a82015-02-24 11:55:48 -080059import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
Brian O'Connor4b2ba5f2015-02-18 20:54:00 -080060import static org.onlab.util.Tools.groupedThreads;
Jonathan Hartdb3af892015-01-26 13:19:07 -080061
62/**
63 * Distributed Map implementation which uses optimistic replication and gossip
64 * based techniques to provide an eventually consistent data store.
65 */
66public class EventuallyConsistentMapImpl<K, V>
67 implements EventuallyConsistentMap<K, V> {
68
69 private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
70
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -080071 private final ConcurrentMap<K, Timestamped<V>> items;
72 private final ConcurrentMap<K, Timestamp> removedItems;
Jonathan Hartdb3af892015-01-26 13:19:07 -080073
Jonathan Hartdb3af892015-01-26 13:19:07 -080074 private final ClusterService clusterService;
75 private final ClusterCommunicationService clusterCommunicator;
76 private final KryoSerializer serializer;
77
Jonathan Hart5ec32ba2015-02-05 13:33:58 -080078 private final ClockService<K, V> clockService;
Jonathan Hartdb3af892015-01-26 13:19:07 -080079
80 private final MessageSubject updateMessageSubject;
81 private final MessageSubject removeMessageSubject;
Jonathan Hartaaa56572015-01-28 21:56:35 -080082 private final MessageSubject antiEntropyAdvertisementSubject;
Jonathan Hartdb3af892015-01-26 13:19:07 -080083
Jonathan Hartaaa56572015-01-28 21:56:35 -080084 private final Set<EventuallyConsistentMapListener<K, V>> listeners
Jonathan Hartdb3af892015-01-26 13:19:07 -080085 = new CopyOnWriteArraySet<>();
86
87 private final ExecutorService executor;
88
89 private final ScheduledExecutorService backgroundExecutor;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080090 private final BiFunction<K, V, Iterable<NodeId>> peerUpdateFunction;
Jonathan Hartdb3af892015-01-26 13:19:07 -080091
Madan Jampanib28e4ad2015-02-19 12:31:37 -080092 private ExecutorService broadcastMessageExecutor;
Madan Jampani28726282015-02-19 11:40:23 -080093
Jonathan Hartdb3af892015-01-26 13:19:07 -080094 private volatile boolean destroyed = false;
Jonathan Hart539a6462015-01-27 17:05:43 -080095 private static final String ERROR_DESTROYED = " map is already destroyed";
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -080096 private final String destroyedMessage;
Jonathan Hartdb3af892015-01-26 13:19:07 -080097
Jonathan Hart4f397e82015-02-04 09:10:41 -080098 private static final String ERROR_NULL_KEY = "Key cannot be null";
99 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
100
Jonathan Hartdb3af892015-01-26 13:19:07 -0800101 // TODO: Make these anti-entropy params configurable
102 private long initialDelaySec = 5;
103 private long periodSec = 5;
Jonathan Hartf893be82015-02-24 17:35:51 -0800104 private boolean lightweightAntiEntropy = true;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800105
Jonathan Hart233a18a2015-03-02 17:24:58 -0800106 private static final int WINDOW_SIZE = 5;
107 private static final int HIGH_LOAD_THRESHOLD = 0;
108 private static final int LOAD_WINDOW = 2;
109 SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
110 AtomicLong operations = new AtomicLong();
111
Jonathan Hartdb3af892015-01-26 13:19:07 -0800112 /**
113 * Creates a new eventually consistent map shared amongst multiple instances.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800114 * <p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800115 * Each map is identified by a string map name. EventuallyConsistentMapImpl
116 * objects in different JVMs that use the same map name will form a
117 * distributed map across JVMs (provided the cluster service is aware of
118 * both nodes).
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800119 * </p>
120 * <p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800121 * The client is expected to provide an
122 * {@link org.onlab.util.KryoNamespace.Builder} with which all classes that
123 * will be stored in this map have been registered (including referenced
124 * classes). This serializer will be used to serialize both K and V for
125 * inter-node notifications.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800126 * </p>
127 * <p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800128 * The client must provide an {@link org.onosproject.store.impl.ClockService}
129 * which can generate timestamps for a given key. The clock service is free
130 * to generate timestamps however it wishes, however these timestamps will
131 * be used to serialize updates to the map so they must be strict enough
132 * to ensure updates are properly ordered for the use case (i.e. in some
133 * cases wallclock time will suffice, whereas in other cases logical time
134 * will be necessary).
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800135 * </p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800136 *
137 * @param mapName a String identifier for the map.
138 * @param clusterService the cluster service
139 * @param clusterCommunicator the cluster communications service
140 * @param serializerBuilder a Kryo namespace builder that can serialize
141 * both K and V
142 * @param clockService a clock service able to generate timestamps
143 * for K
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800144 * @param peerUpdateFunction function that provides a set of nodes to immediately
145 * update to when there writes to the map
Jonathan Hartdb3af892015-01-26 13:19:07 -0800146 */
147 public EventuallyConsistentMapImpl(String mapName,
148 ClusterService clusterService,
149 ClusterCommunicationService clusterCommunicator,
150 KryoNamespace.Builder serializerBuilder,
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800151 ClockService<K, V> clockService,
152 BiFunction<K, V, Iterable<NodeId>> peerUpdateFunction) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800153 this.clusterService = checkNotNull(clusterService);
154 this.clusterCommunicator = checkNotNull(clusterCommunicator);
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800155 this.peerUpdateFunction = checkNotNull(peerUpdateFunction);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800156
157 serializer = createSerializer(checkNotNull(serializerBuilder));
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800158 destroyedMessage = mapName + ERROR_DESTROYED;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800159
160 this.clockService = checkNotNull(clockService);
161
162 items = new ConcurrentHashMap<>();
163 removedItems = new ConcurrentHashMap<>();
164
Brian O'Connorc6713a82015-02-24 11:55:48 -0800165 // should be a normal executor; it's used for receiving messages
166 //TODO make # of threads configurable
167 executor = Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d"));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800168
Brian O'Connorc6713a82015-02-24 11:55:48 -0800169 // sending executor; should be capped
170 //TODO make # of threads configurable
171 broadcastMessageExecutor = //newSingleThreadExecutor(groupedThreads("onos/ecm", mapName + "-notify"));
172 newFixedThreadPool(4, groupedThreads("onos/ecm", mapName + "-notify"));
Madan Jampani28726282015-02-19 11:40:23 -0800173
Jonathan Hartdb3af892015-01-26 13:19:07 -0800174 backgroundExecutor =
Brian O'Connorc6713a82015-02-24 11:55:48 -0800175 //FIXME anti-entropy can take >60 seconds and it blocks fg workers
176 // ... dropping minPriority to try to help until this can be parallel
177 newSingleThreadScheduledExecutor(//minPriority(
Jonathan Hart233a18a2015-03-02 17:24:58 -0800178 groupedThreads("onos/ecm", mapName + "-bg-%d"))/*)*/;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800179
Jonathan Hartaaa56572015-01-28 21:56:35 -0800180 // start anti-entropy thread
Brian O'Connorc6713a82015-02-24 11:55:48 -0800181 //TODO disable anti-entropy for now in testing (it is unstable)
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800182 backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
183 initialDelaySec, periodSec,
184 TimeUnit.SECONDS);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800185
Jonathan Hartdb3af892015-01-26 13:19:07 -0800186 updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
187 clusterCommunicator.addSubscriber(updateMessageSubject,
Madan Jampani2af244a2015-02-22 13:12:01 -0800188 new InternalPutEventListener(), executor);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800189 removeMessageSubject = new MessageSubject("ecm-" + mapName + "-remove");
190 clusterCommunicator.addSubscriber(removeMessageSubject,
Madan Jampani2af244a2015-02-22 13:12:01 -0800191 new InternalRemoveEventListener(), executor);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800192 antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
193 clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
Madan Jampani2af244a2015-02-22 13:12:01 -0800194 new InternalAntiEntropyListener(), backgroundExecutor);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800195 }
196
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800197 /**
198 * Creates a new eventually consistent map shared amongst multiple instances.
199 * <p>
200 * Take a look at the other constructor for usage information. The only difference
201 * is that a BiFunction is provided that returns all nodes in the cluster, so
202 * all nodes will be sent write updates immediately.
203 * </p>
204 *
205 * @param mapName a String identifier for the map.
206 * @param clusterService the cluster service
207 * @param clusterCommunicator the cluster communications service
208 * @param serializerBuilder a Kryo namespace builder that can serialize
209 * both K and V
210 * @param clockService a clock service able to generate timestamps
211 * for K
212 */
213 public EventuallyConsistentMapImpl(String mapName,
214 ClusterService clusterService,
215 ClusterCommunicationService clusterCommunicator,
216 KryoNamespace.Builder serializerBuilder,
217 ClockService<K, V> clockService) {
218 this(mapName, clusterService, clusterCommunicator, serializerBuilder, clockService,
219 (key, value) -> clusterService.getNodes().stream()
220 .map(ControllerNode::id)
221 .filter(nodeId -> !nodeId.equals(clusterService.getLocalNode().id()))
222 .collect(Collectors.toList()));
223 }
224
Jonathan Hartdb3af892015-01-26 13:19:07 -0800225 private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
226 return new KryoSerializer() {
227 @Override
228 protected void setupKryoPool() {
229 // Add the map's internal helper classes to the user-supplied serializer
230 serializerPool = builder
231 .register(WallClockTimestamp.class)
232 .register(PutEntry.class)
Jonathan Hart539a6462015-01-27 17:05:43 -0800233 .register(RemoveEntry.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800234 .register(ArrayList.class)
235 .register(InternalPutEvent.class)
236 .register(InternalRemoveEvent.class)
Jonathan Hartaaa56572015-01-28 21:56:35 -0800237 .register(AntiEntropyAdvertisement.class)
238 .register(HashMap.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800239 .build();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800240 }
241 };
242 }
243
Madan Jampanib28e4ad2015-02-19 12:31:37 -0800244 /**
245 * Sets the executor to use for broadcasting messages and returns this
246 * instance for method chaining.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800247 *
Madan Jampanib28e4ad2015-02-19 12:31:37 -0800248 * @param executor executor service
249 * @return this instance
250 */
251 public EventuallyConsistentMapImpl<K, V> withBroadcastMessageExecutor(ExecutorService executor) {
252 checkNotNull(executor, "Null executor");
253 broadcastMessageExecutor = executor;
254 return this;
255 }
256
Jonathan Hartdb3af892015-01-26 13:19:07 -0800257 @Override
258 public int size() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800259 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800260 return items.size();
261 }
262
263 @Override
264 public boolean isEmpty() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800265 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800266 return items.isEmpty();
267 }
268
269 @Override
270 public boolean containsKey(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800271 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800272 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800273 return items.containsKey(key);
274 }
275
276 @Override
277 public boolean containsValue(V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800278 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800279 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800280
281 return items.values().stream()
282 .anyMatch(timestamped -> timestamped.value().equals(value));
283 }
284
285 @Override
286 public V get(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800287 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800288 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800289
290 Timestamped<V> value = items.get(key);
291 if (value != null) {
292 return value.value();
293 }
294 return null;
295 }
296
297 @Override
298 public void put(K key, V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800299 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800300 checkNotNull(key, ERROR_NULL_KEY);
301 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800302
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800303 Timestamp timestamp = clockService.getTimestamp(key, value);
304
Jonathan Hartdb3af892015-01-26 13:19:07 -0800305 if (putInternal(key, value, timestamp)) {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800306 notifyPeers(new InternalPutEvent<>(key, value, timestamp),
307 peerUpdateFunction.apply(key, value));
308 notifyListeners(new EventuallyConsistentMapEvent<>(
309 EventuallyConsistentMapEvent.Type.PUT, key, value));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800310 }
311 }
312
313 private boolean putInternal(K key, V value, Timestamp timestamp) {
Jonathan Hart233a18a2015-03-02 17:24:58 -0800314 counter.incrementCount();
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800315 Timestamp removed = removedItems.get(key);
Jonathan Hart403ea932015-02-20 16:23:00 -0800316 if (removed != null && removed.isNewerThan(timestamp)) {
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800317 log.debug("ecmap - removed was newer {}", value);
318 return false;
319 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800320
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800321 final MutableBoolean updated = new MutableBoolean(false);
322
323 items.compute(key, (k, existing) -> {
Jonathan Hart403ea932015-02-20 16:23:00 -0800324 if (existing != null && existing.isNewerThan(timestamp)) {
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800325 updated.setFalse();
326 return existing;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800327 } else {
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800328 updated.setTrue();
329 return new Timestamped<>(value, timestamp);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800330 }
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800331 });
332
333 boolean success = updated.booleanValue();
334 if (!success) {
335 log.debug("ecmap - existing was newer {}", value);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800336 }
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800337
338 if (success && removed != null) {
339 removedItems.remove(key, removed);
340 }
341 return success;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800342 }
343
344 @Override
345 public void remove(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800346 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800347 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800348
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800349 // TODO prevent calls here if value is important for timestamp
350 Timestamp timestamp = clockService.getTimestamp(key, null);
351
Jonathan Hartdb3af892015-01-26 13:19:07 -0800352 if (removeInternal(key, timestamp)) {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800353 notifyPeers(new InternalRemoveEvent<>(key, timestamp),
354 peerUpdateFunction.apply(key, null));
355 notifyListeners(new EventuallyConsistentMapEvent<>(
356 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800357 }
358 }
359
360 private boolean removeInternal(K key, Timestamp timestamp) {
Jonathan Hart233a18a2015-03-02 17:24:58 -0800361 counter.incrementCount();
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800362 final MutableBoolean updated = new MutableBoolean(false);
363
364 items.compute(key, (k, existing) -> {
365 if (existing != null && existing.isNewerThan(timestamp)) {
366 updated.setFalse();
367 return existing;
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800368 } else {
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800369 updated.setTrue();
370 // remove from items map
371 return null;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800372 }
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800373 });
374
375 if (updated.isFalse()) {
376 return false;
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800377 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800378
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800379 Timestamp removedTimestamp = removedItems.get(key);
380 if (removedTimestamp == null) {
381 return removedItems.putIfAbsent(key, timestamp) == null;
Jonathan Hart403ea932015-02-20 16:23:00 -0800382 } else if (timestamp.isNewerThan(removedTimestamp)) {
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800383 return removedItems.replace(key, removedTimestamp, timestamp);
384 } else {
385 return false;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800386 }
387 }
388
389 @Override
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800390 public void remove(K key, V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800391 checkState(!destroyed, destroyedMessage);
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800392 checkNotNull(key, ERROR_NULL_KEY);
393 checkNotNull(value, ERROR_NULL_VALUE);
394
395 Timestamp timestamp = clockService.getTimestamp(key, value);
396
397 if (removeInternal(key, timestamp)) {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800398 notifyPeers(new InternalRemoveEvent<>(key, timestamp),
399 peerUpdateFunction.apply(key, value));
400 notifyListeners(new EventuallyConsistentMapEvent<>(
401 EventuallyConsistentMapEvent.Type.REMOVE, key, value));
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800402 }
403 }
404
405 @Override
Jonathan Hartdb3af892015-01-26 13:19:07 -0800406 public void putAll(Map<? extends K, ? extends V> m) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800407 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800408
409 List<PutEntry<K, V>> updates = new ArrayList<>(m.size());
410
411 for (Map.Entry<? extends K, ? extends V> entry : m.entrySet()) {
412 K key = entry.getKey();
413 V value = entry.getValue();
Jonathan Hart4f397e82015-02-04 09:10:41 -0800414
415 checkNotNull(key, ERROR_NULL_KEY);
416 checkNotNull(value, ERROR_NULL_VALUE);
417
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800418 Timestamp timestamp = clockService.getTimestamp(key, value);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800419
420 if (putInternal(key, value, timestamp)) {
421 updates.add(new PutEntry<>(key, value, timestamp));
422 }
423 }
424
Jonathan Hart584d2f32015-01-27 19:46:14 -0800425 if (!updates.isEmpty()) {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800426 broadcastMessage(updateMessageSubject, new InternalPutEvent<>(updates));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800427
Jonathan Hart584d2f32015-01-27 19:46:14 -0800428 for (PutEntry<K, V> entry : updates) {
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800429 EventuallyConsistentMapEvent<K, V> externalEvent =
430 new EventuallyConsistentMapEvent<>(
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800431 EventuallyConsistentMapEvent.Type.PUT, entry.key(),
432 entry.value());
Jonathan Hart584d2f32015-01-27 19:46:14 -0800433 notifyListeners(externalEvent);
434 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800435 }
436 }
437
438 @Override
439 public void clear() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800440 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800441
442 List<RemoveEntry<K>> removed = new ArrayList<>(items.size());
443
444 for (K key : items.keySet()) {
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800445 // TODO also this is not applicable if value is important for timestamp?
446 Timestamp timestamp = clockService.getTimestamp(key, null);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800447
448 if (removeInternal(key, timestamp)) {
449 removed.add(new RemoveEntry<>(key, timestamp));
450 }
451 }
452
Jonathan Hart584d2f32015-01-27 19:46:14 -0800453 if (!removed.isEmpty()) {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800454 broadcastMessage(removeMessageSubject, new InternalRemoveEvent<>(removed));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800455
Jonathan Hart584d2f32015-01-27 19:46:14 -0800456 for (RemoveEntry<K> entry : removed) {
457 EventuallyConsistentMapEvent<K, V> externalEvent
458 = new EventuallyConsistentMapEvent<>(
459 EventuallyConsistentMapEvent.Type.REMOVE, entry.key(),
460 null);
461 notifyListeners(externalEvent);
462 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800463 }
464 }
465
466 @Override
467 public Set<K> keySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800468 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800469
470 return items.keySet();
471 }
472
473 @Override
474 public Collection<V> values() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800475 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800476
477 return items.values().stream()
478 .map(Timestamped::value)
479 .collect(Collectors.toList());
480 }
481
482 @Override
483 public Set<Map.Entry<K, V>> entrySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800484 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800485
486 return items.entrySet().stream()
Jonathan Hartf9108232015-02-02 16:37:35 -0800487 .map(e -> Pair.of(e.getKey(), e.getValue().value()))
Jonathan Hartdb3af892015-01-26 13:19:07 -0800488 .collect(Collectors.toSet());
489 }
490
491 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800492 public void addListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800493 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800494
495 listeners.add(checkNotNull(listener));
496 }
497
498 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800499 public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800500 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800501
502 listeners.remove(checkNotNull(listener));
503 }
504
505 @Override
506 public void destroy() {
507 destroyed = true;
508
509 executor.shutdown();
510 backgroundExecutor.shutdown();
Madan Jampani337bb442015-02-19 14:29:18 -0800511 broadcastMessageExecutor.shutdown();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800512
Jonathan Hart584d2f32015-01-27 19:46:14 -0800513 listeners.clear();
514
Jonathan Hartdb3af892015-01-26 13:19:07 -0800515 clusterCommunicator.removeSubscriber(updateMessageSubject);
516 clusterCommunicator.removeSubscriber(removeMessageSubject);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800517 clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800518 }
519
Jonathan Hartaaa56572015-01-28 21:56:35 -0800520 private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
521 for (EventuallyConsistentMapListener<K, V> listener : listeners) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800522 listener.event(event);
523 }
524 }
525
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800526 private void notifyPeers(InternalPutEvent event, Iterable<NodeId> peers) {
Brian O'Connorb2894222015-02-20 22:05:19 -0800527 // FIXME extremely memory expensive when we are overrun
528// broadcastMessageExecutor.execute(() -> broadcastMessage(updateMessageSubject, event));
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800529 multicastMessage(updateMessageSubject, event, peers);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800530 }
531
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800532 private void notifyPeers(InternalRemoveEvent event, Iterable<NodeId> peers) {
Brian O'Connorb2894222015-02-20 22:05:19 -0800533 // FIXME extremely memory expensive when we are overrun
534// broadcastMessageExecutor.execute(() -> broadcastMessage(removeMessageSubject, event));
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800535 multicastMessage(removeMessageSubject, event, peers);
536 }
537
538 private void multicastMessage(MessageSubject subject, Object event, Iterable<NodeId> peers) {
539 // FIXME can we parallelize the serialization... use the caller???
540 ClusterMessage message = new ClusterMessage(
541 clusterService.getLocalNode().id(),
542 subject,
543 serializer.encode(event));
544 broadcastMessageExecutor.execute(() -> clusterCommunicator.multicast(message, peers));
545// clusterCommunicator.broadcast(message);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800546 }
547
Jonathan Hart7d656f42015-01-27 14:07:23 -0800548 private void broadcastMessage(MessageSubject subject, Object event) {
Brian O'Connorb2894222015-02-20 22:05:19 -0800549 // FIXME can we parallelize the serialization... use the caller???
Jonathan Hartdb3af892015-01-26 13:19:07 -0800550 ClusterMessage message = new ClusterMessage(
551 clusterService.getLocalNode().id(),
552 subject,
553 serializer.encode(event));
Brian O'Connorc6713a82015-02-24 11:55:48 -0800554 broadcastMessageExecutor.execute(() -> clusterCommunicator.broadcast(message));
555// clusterCommunicator.broadcast(message);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800556 }
557
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800558 private void unicastMessage(NodeId peer, MessageSubject subject, Object event) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800559 ClusterMessage message = new ClusterMessage(
560 clusterService.getLocalNode().id(),
561 subject,
562 serializer.encode(event));
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800563// clusterCommunicator.unicast(message, peer);
564 broadcastMessageExecutor.execute(() -> clusterCommunicator.unicast(message, peer));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800565 }
566
Jonathan Hart233a18a2015-03-02 17:24:58 -0800567 private boolean underHighLoad() {
568 return counter.get(LOAD_WINDOW) > HIGH_LOAD_THRESHOLD;
569 }
570
Jonathan Hartaaa56572015-01-28 21:56:35 -0800571 private final class SendAdvertisementTask implements Runnable {
572 @Override
573 public void run() {
574 if (Thread.currentThread().isInterrupted()) {
575 log.info("Interrupted, quitting");
576 return;
577 }
578
Jonathan Hart233a18a2015-03-02 17:24:58 -0800579 if (underHighLoad()) {
580 return;
581 }
582
Jonathan Hartaaa56572015-01-28 21:56:35 -0800583 try {
584 final NodeId self = clusterService.getLocalNode().id();
585 Set<ControllerNode> nodes = clusterService.getNodes();
586
587 List<NodeId> nodeIds = nodes.stream()
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800588 .map(ControllerNode::id)
Jonathan Hartaaa56572015-01-28 21:56:35 -0800589 .collect(Collectors.toList());
590
591 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
592 log.trace("No other peers in the cluster.");
593 return;
594 }
595
596 NodeId peer;
597 do {
598 int idx = RandomUtils.nextInt(0, nodeIds.size());
599 peer = nodeIds.get(idx);
600 } while (peer.equals(self));
601
602 if (Thread.currentThread().isInterrupted()) {
603 log.info("Interrupted, quitting");
604 return;
605 }
606
607 AntiEntropyAdvertisement<K> ad = createAdvertisement();
608
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800609 // TODO check the return value?
610 unicastMessage(peer, antiEntropyAdvertisementSubject, ad);
611 // error log: log.debug("Failed to send anti-entropy advertisement to {}", peer);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800612 } catch (Exception e) {
613 // Catch all exceptions to avoid scheduled task being suppressed.
614 log.error("Exception thrown while sending advertisement", e);
615 }
616 }
617 }
618
619 private AntiEntropyAdvertisement<K> createAdvertisement() {
620 final NodeId self = clusterService.getLocalNode().id();
621
622 Map<K, Timestamp> timestamps = new HashMap<>(items.size());
623
624 items.forEach((key, value) -> timestamps.put(key, value.timestamp()));
625
626 Map<K, Timestamp> tombstones = new HashMap<>(removedItems);
627
628 return new AntiEntropyAdvertisement<>(self, timestamps, tombstones);
629 }
630
631 private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
632 List<EventuallyConsistentMapEvent<K, V>> externalEvents;
633
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800634 externalEvents = antiEntropyCheckLocalItems(ad);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800635
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800636 antiEntropyCheckLocalRemoved(ad);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800637
Jonathan Hartf893be82015-02-24 17:35:51 -0800638 if (!lightweightAntiEntropy) {
639 externalEvents.addAll(antiEntropyCheckRemoteRemoved(ad));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800640
Jonathan Hartf893be82015-02-24 17:35:51 -0800641 // if remote ad has something unknown, actively sync
642 for (K key : ad.timestamps().keySet()) {
643 if (!items.containsKey(key)) {
644 // Send the advertisement back if this peer is out-of-sync
645 final NodeId sender = ad.sender();
646 AntiEntropyAdvertisement<K> myAd = createAdvertisement();
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800647 // TODO check the return value?
648 unicastMessage(sender, antiEntropyAdvertisementSubject, myAd);
649 // error log: log.debug("Failed to send reactive anti-entropy advertisement to {}", sender);
Jonathan Hartf893be82015-02-24 17:35:51 -0800650 break;
651 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800652 }
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800653 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800654 externalEvents.forEach(this::notifyListeners);
655 }
656
657 /**
658 * Checks if any of the remote's live items or tombstones are out of date
659 * according to our local live item list, or if our live items are out of
660 * date according to the remote's tombstone list.
661 * If the local copy is more recent, it will be pushed to the remote. If the
662 * remote has a more recent remove, we apply that to the local state.
663 *
664 * @param ad remote anti-entropy advertisement
665 * @return list of external events relating to local operations performed
666 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800667 private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
668 AntiEntropyAdvertisement<K> ad) {
669 final List<EventuallyConsistentMapEvent<K, V>> externalEvents
670 = new LinkedList<>();
671 final NodeId sender = ad.sender();
672
673 final List<PutEntry<K, V>> updatesToSend = new ArrayList<>();
674
675 for (Map.Entry<K, Timestamped<V>> item : items.entrySet()) {
676 K key = item.getKey();
677 Timestamped<V> localValue = item.getValue();
678
679 Timestamp remoteTimestamp = ad.timestamps().get(key);
680 if (remoteTimestamp == null) {
681 remoteTimestamp = ad.tombstones().get(key);
682 }
683 if (remoteTimestamp == null || localValue
Jonathan Hart403ea932015-02-20 16:23:00 -0800684 .isNewerThan(remoteTimestamp)) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800685 // local value is more recent, push to sender
686 updatesToSend
687 .add(new PutEntry<>(key, localValue.value(),
688 localValue.timestamp()));
689 }
690
691 Timestamp remoteDeadTimestamp = ad.tombstones().get(key);
692 if (remoteDeadTimestamp != null &&
Jonathan Hart403ea932015-02-20 16:23:00 -0800693 remoteDeadTimestamp.isNewerThan(localValue.timestamp())) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800694 // sender has a more recent remove
695 if (removeInternal(key, remoteDeadTimestamp)) {
696 externalEvents.add(new EventuallyConsistentMapEvent<>(
697 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
698 }
699 }
700 }
701
702 // Send all updates to the peer at once
703 if (!updatesToSend.isEmpty()) {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800704 // TODO check the return value?
705 unicastMessage(sender, updateMessageSubject,
706 new InternalPutEvent<>(updatesToSend));
707 //error log: log.warn("Failed to send advertisement response", e);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800708 }
709
710 return externalEvents;
711 }
712
713 /**
714 * Checks if any items in the remote live list are out of date according
715 * to our tombstone list. If we find we have a more up to date tombstone,
716 * we'll send it to the remote.
717 *
718 * @param ad remote anti-entropy advertisement
719 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800720 private void antiEntropyCheckLocalRemoved(AntiEntropyAdvertisement<K> ad) {
721 final NodeId sender = ad.sender();
722
723 final List<RemoveEntry<K>> removesToSend = new ArrayList<>();
724
725 for (Map.Entry<K, Timestamp> dead : removedItems.entrySet()) {
726 K key = dead.getKey();
727 Timestamp localDeadTimestamp = dead.getValue();
728
729 Timestamp remoteLiveTimestamp = ad.timestamps().get(key);
730 if (remoteLiveTimestamp != null
Jonathan Hart403ea932015-02-20 16:23:00 -0800731 && localDeadTimestamp.isNewerThan(remoteLiveTimestamp)) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800732 // sender has zombie, push remove
733 removesToSend
734 .add(new RemoveEntry<>(key, localDeadTimestamp));
735 }
736 }
737
738 // Send all removes to the peer at once
739 if (!removesToSend.isEmpty()) {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800740 // TODO check the return value
741 unicastMessage(sender, removeMessageSubject,
742 new InternalRemoveEvent<>(removesToSend));
743 // error log: log.warn("Failed to send advertisement response", e);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800744 }
745 }
746
747 /**
748 * Checks if any of the local live items are out of date according to the
749 * remote's tombstone advertisements. If we find a local item is out of date,
750 * we'll apply the remove operation to the local state.
751 *
752 * @param ad remote anti-entropy advertisement
753 * @return list of external events relating to local operations performed
754 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800755 private List<EventuallyConsistentMapEvent<K, V>>
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800756 antiEntropyCheckRemoteRemoved(AntiEntropyAdvertisement<K> ad) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800757 final List<EventuallyConsistentMapEvent<K, V>> externalEvents
758 = new LinkedList<>();
759
760 for (Map.Entry<K, Timestamp> remoteDead : ad.tombstones().entrySet()) {
761 K key = remoteDead.getKey();
762 Timestamp remoteDeadTimestamp = remoteDead.getValue();
763
764 Timestamped<V> local = items.get(key);
765 Timestamp localDead = removedItems.get(key);
Jonathan Hart403ea932015-02-20 16:23:00 -0800766 if (local != null && remoteDeadTimestamp.isNewerThan(
767 local.timestamp())) {
768 // If the remote has a more recent tombstone than either our local
769 // value, then do a remove with their timestamp
Jonathan Hartaaa56572015-01-28 21:56:35 -0800770 if (removeInternal(key, remoteDeadTimestamp)) {
771 externalEvents.add(new EventuallyConsistentMapEvent<>(
772 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
773 }
Jonathan Hart403ea932015-02-20 16:23:00 -0800774 } else if (localDead != null && remoteDeadTimestamp.isNewerThan(
775 localDead)) {
776 // If the remote has a more recent tombstone than us, update ours
777 // to their timestamp
778 removeInternal(key, remoteDeadTimestamp);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800779 }
780 }
781
782 return externalEvents;
783 }
784
785 private final class InternalAntiEntropyListener
786 implements ClusterMessageHandler {
787
788 @Override
789 public void handle(ClusterMessage message) {
Jonathan Hart4fd4ebb2015-02-04 17:38:48 -0800790 log.trace("Received anti-entropy advertisement from peer: {}",
791 message.sender());
Jonathan Hartaaa56572015-01-28 21:56:35 -0800792 AntiEntropyAdvertisement<K> advertisement = serializer.decode(message.payload());
Madan Jampani2af244a2015-02-22 13:12:01 -0800793 try {
Jonathan Hart233a18a2015-03-02 17:24:58 -0800794 if (!underHighLoad()) {
795 handleAntiEntropyAdvertisement(advertisement);
796 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800797 } catch (Exception e) {
798 log.warn("Exception thrown handling advertisements", e);
799 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800800 }
801 }
802
Jonathan Hartdb3af892015-01-26 13:19:07 -0800803 private final class InternalPutEventListener implements
804 ClusterMessageHandler {
805 @Override
806 public void handle(ClusterMessage message) {
807 log.debug("Received put event from peer: {}", message.sender());
808 InternalPutEvent<K, V> event = serializer.decode(message.payload());
809
Madan Jampani2af244a2015-02-22 13:12:01 -0800810 try {
811 for (PutEntry<K, V> entry : event.entries()) {
812 K key = entry.key();
813 V value = entry.value();
814 Timestamp timestamp = entry.timestamp();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800815
Madan Jampani2af244a2015-02-22 13:12:01 -0800816 if (putInternal(key, value, timestamp)) {
817 EventuallyConsistentMapEvent<K, V> externalEvent =
818 new EventuallyConsistentMapEvent<>(
819 EventuallyConsistentMapEvent.Type.PUT, key,
820 value);
821 notifyListeners(externalEvent);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800822 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800823 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800824 } catch (Exception e) {
825 log.warn("Exception thrown handling put", e);
826 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800827 }
828 }
829
830 private final class InternalRemoveEventListener implements
831 ClusterMessageHandler {
832 @Override
833 public void handle(ClusterMessage message) {
834 log.debug("Received remove event from peer: {}", message.sender());
835 InternalRemoveEvent<K> event = serializer.decode(message.payload());
Madan Jampani2af244a2015-02-22 13:12:01 -0800836 try {
837 for (RemoveEntry<K> entry : event.entries()) {
838 K key = entry.key();
839 Timestamp timestamp = entry.timestamp();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800840
Madan Jampani2af244a2015-02-22 13:12:01 -0800841 if (removeInternal(key, timestamp)) {
842 EventuallyConsistentMapEvent<K, V> externalEvent
843 = new EventuallyConsistentMapEvent<>(
844 EventuallyConsistentMapEvent.Type.REMOVE,
845 key, null);
846 notifyListeners(externalEvent);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800847 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800848 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800849 } catch (Exception e) {
850 log.warn("Exception thrown handling remove", e);
851 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800852 }
853 }
854
Jonathan Hartdb3af892015-01-26 13:19:07 -0800855}