blob: 118ef7805a0492951f23b0af0a49c5c3c9278349 [file] [log] [blame]
Jonathan Hartdb3af892015-01-26 13:19:07 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Jonathan Hart77bdd262015-02-03 09:07:48 -080016package org.onosproject.store.ecmap;
Jonathan Hartdb3af892015-01-26 13:19:07 -080017
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080018import com.google.common.collect.ImmutableList;
19import com.google.common.collect.Lists;
20import com.google.common.collect.Maps;
Madan Jampani3e033bd2015-04-08 13:03:49 -070021
Jonathan Hartaaa56572015-01-28 21:56:35 -080022import org.apache.commons.lang3.RandomUtils;
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -080023import org.apache.commons.lang3.mutable.MutableBoolean;
Jonathan Hartf9108232015-02-02 16:37:35 -080024import org.apache.commons.lang3.tuple.Pair;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080025import org.onlab.util.AbstractAccumulator;
Jonathan Hartdb3af892015-01-26 13:19:07 -080026import org.onlab.util.KryoNamespace;
Jonathan Hart233a18a2015-03-02 17:24:58 -080027import org.onlab.util.SlidingWindowCounter;
Jonathan Hartdb3af892015-01-26 13:19:07 -080028import org.onosproject.cluster.ClusterService;
Jonathan Hartaaa56572015-01-28 21:56:35 -080029import org.onosproject.cluster.ControllerNode;
Jonathan Hartdb3af892015-01-26 13:19:07 -080030import org.onosproject.cluster.NodeId;
31import org.onosproject.store.Timestamp;
32import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
33import org.onosproject.store.cluster.messaging.ClusterMessage;
34import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
35import org.onosproject.store.cluster.messaging.MessageSubject;
Madan Jampani3e033bd2015-04-08 13:03:49 -070036import org.onosproject.store.impl.LogicalTimestamp;
Jonathan Hart77bdd262015-02-03 09:07:48 -080037import org.onosproject.store.impl.Timestamped;
Jonathan Hart63939a32015-05-08 11:57:03 -070038import org.onosproject.store.service.WallClockTimestamp;
Jonathan Hartdb3af892015-01-26 13:19:07 -080039import org.onosproject.store.serializers.KryoSerializer;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070040import org.onosproject.store.service.EventuallyConsistentMap;
41import org.onosproject.store.service.EventuallyConsistentMapEvent;
42import org.onosproject.store.service.EventuallyConsistentMapListener;
Jonathan Hartdb3af892015-01-26 13:19:07 -080043import org.slf4j.Logger;
44import org.slf4j.LoggerFactory;
45
Jonathan Hartdb3af892015-01-26 13:19:07 -080046import java.util.ArrayList;
47import java.util.Collection;
Jonathan Hartaaa56572015-01-28 21:56:35 -080048import java.util.HashMap;
49import java.util.LinkedList;
Jonathan Hartdb3af892015-01-26 13:19:07 -080050import java.util.List;
51import java.util.Map;
Madan Jampani43e9c9c2015-06-26 14:16:46 -070052import java.util.Optional;
Jonathan Hartdb3af892015-01-26 13:19:07 -080053import java.util.Set;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080054import java.util.Timer;
Jonathan Hartdb3af892015-01-26 13:19:07 -080055import java.util.concurrent.ConcurrentHashMap;
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -080056import java.util.concurrent.ConcurrentMap;
Jonathan Hartdb3af892015-01-26 13:19:07 -080057import java.util.concurrent.CopyOnWriteArraySet;
58import java.util.concurrent.ExecutorService;
59import java.util.concurrent.Executors;
60import java.util.concurrent.ScheduledExecutorService;
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -080061import java.util.concurrent.TimeUnit;
Madan Jampani43e9c9c2015-06-26 14:16:46 -070062import java.util.concurrent.atomic.AtomicReference;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080063import java.util.function.BiFunction;
Jonathan Hartdb3af892015-01-26 13:19:07 -080064import java.util.stream.Collectors;
65
66import static com.google.common.base.Preconditions.checkNotNull;
67import static com.google.common.base.Preconditions.checkState;
Madan Jampani43e9c9c2015-06-26 14:16:46 -070068import static java.util.Objects.isNull;
69import static java.util.Objects.nonNull;
Jonathan Hartdb3af892015-01-26 13:19:07 -080070import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Brian O'Connorc6713a82015-02-24 11:55:48 -080071import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
Brian O'Connor4b2ba5f2015-02-18 20:54:00 -080072import static org.onlab.util.Tools.groupedThreads;
Jonathan Hartdb3af892015-01-26 13:19:07 -080073
74/**
75 * Distributed Map implementation which uses optimistic replication and gossip
76 * based techniques to provide an eventually consistent data store.
77 */
78public class EventuallyConsistentMapImpl<K, V>
79 implements EventuallyConsistentMap<K, V> {
80
81 private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
82
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -080083 private final ConcurrentMap<K, Timestamped<V>> items;
84 private final ConcurrentMap<K, Timestamp> removedItems;
Jonathan Hartdb3af892015-01-26 13:19:07 -080085
Jonathan Hartdb3af892015-01-26 13:19:07 -080086 private final ClusterService clusterService;
87 private final ClusterCommunicationService clusterCommunicator;
88 private final KryoSerializer serializer;
89
Madan Jampanibcf1a482015-06-24 19:05:56 -070090 private final BiFunction<K, V, Timestamp> timestampProvider;
Jonathan Hartdb3af892015-01-26 13:19:07 -080091
92 private final MessageSubject updateMessageSubject;
Jonathan Hartaaa56572015-01-28 21:56:35 -080093 private final MessageSubject antiEntropyAdvertisementSubject;
Jonathan Hartdb3af892015-01-26 13:19:07 -080094
Jonathan Hartaaa56572015-01-28 21:56:35 -080095 private final Set<EventuallyConsistentMapListener<K, V>> listeners
Jonathan Hartdb3af892015-01-26 13:19:07 -080096 = new CopyOnWriteArraySet<>();
97
98 private final ExecutorService executor;
Jonathan Hartdb3af892015-01-26 13:19:07 -080099 private final ScheduledExecutorService backgroundExecutor;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800100 private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800101
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700102 private final ExecutorService communicationExecutor;
103 private final Map<NodeId, EventAccumulator> senderPending;
Madan Jampani28726282015-02-19 11:40:23 -0800104
Jonathan Hartdb3af892015-01-26 13:19:07 -0800105 private volatile boolean destroyed = false;
Jonathan Hart539a6462015-01-27 17:05:43 -0800106 private static final String ERROR_DESTROYED = " map is already destroyed";
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800107 private final String destroyedMessage;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800108
Jonathan Hart4f397e82015-02-04 09:10:41 -0800109 private static final String ERROR_NULL_KEY = "Key cannot be null";
110 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
111
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700112 private final long initialDelaySec = 5;
113 private final boolean lightweightAntiEntropy;
114 private final boolean tombstonesDisabled;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800115
Jonathan Hart233a18a2015-03-02 17:24:58 -0800116 private static final int WINDOW_SIZE = 5;
117 private static final int HIGH_LOAD_THRESHOLD = 0;
118 private static final int LOAD_WINDOW = 2;
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700119 private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
Jonathan Hart233a18a2015-03-02 17:24:58 -0800120
Jonathan Hartca335e92015-03-05 10:34:32 -0800121 private final boolean persistent;
122 private final PersistentStore<K, V> persistentStore;
123
Jonathan Hartdb3af892015-01-26 13:19:07 -0800124 /**
125 * Creates a new eventually consistent map shared amongst multiple instances.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800126 * <p>
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700127 * See {@link org.onosproject.store.service.EventuallyConsistentMapBuilder}
128 * for more description of the parameters expected by the map.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800129 * </p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800130 *
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700131 * @param mapName a String identifier for the map.
132 * @param clusterService the cluster service
133 * @param clusterCommunicator the cluster communications service
134 * @param serializerBuilder a Kryo namespace builder that can serialize
135 * both K and V
Madan Jampanibcf1a482015-06-24 19:05:56 -0700136 * @param timestampProvider provider of timestamps for K and V
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700137 * @param peerUpdateFunction function that provides a set of nodes to immediately
138 * update to when there writes to the map
139 * @param eventExecutor executor to use for processing incoming
140 * events from peers
141 * @param communicationExecutor executor to use for sending events to peers
142 * @param backgroundExecutor executor to use for background anti-entropy
143 * tasks
144 * @param tombstonesDisabled true if this map should not maintain
145 * tombstones
146 * @param antiEntropyPeriod period that the anti-entropy task should run
Jonathan Hartca335e92015-03-05 10:34:32 -0800147 * @param antiEntropyTimeUnit time unit for anti-entropy period
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700148 * @param convergeFaster make anti-entropy try to converge faster
Jonathan Hartca335e92015-03-05 10:34:32 -0800149 * @param persistent persist data to disk
Jonathan Hartdb3af892015-01-26 13:19:07 -0800150 */
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700151 EventuallyConsistentMapImpl(String mapName,
152 ClusterService clusterService,
153 ClusterCommunicationService clusterCommunicator,
154 KryoNamespace.Builder serializerBuilder,
Madan Jampanibcf1a482015-06-24 19:05:56 -0700155 BiFunction<K, V, Timestamp> timestampProvider,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700156 BiFunction<K, V, Collection<NodeId>> peerUpdateFunction,
157 ExecutorService eventExecutor,
158 ExecutorService communicationExecutor,
159 ScheduledExecutorService backgroundExecutor,
160 boolean tombstonesDisabled,
161 long antiEntropyPeriod,
162 TimeUnit antiEntropyTimeUnit,
Jonathan Hartca335e92015-03-05 10:34:32 -0800163 boolean convergeFaster,
164 boolean persistent) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800165 items = new ConcurrentHashMap<>();
166 removedItems = new ConcurrentHashMap<>();
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800167 senderPending = Maps.newConcurrentMap();
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700168 destroyedMessage = mapName + ERROR_DESTROYED;
Madan Jampani28726282015-02-19 11:40:23 -0800169
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700170 this.clusterService = clusterService;
171 this.clusterCommunicator = clusterCommunicator;
172
173 this.serializer = createSerializer(serializerBuilder);
174
Madan Jampanibcf1a482015-06-24 19:05:56 -0700175 this.timestampProvider = timestampProvider;
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700176
177 if (peerUpdateFunction != null) {
178 this.peerUpdateFunction = peerUpdateFunction;
179 } else {
180 this.peerUpdateFunction = (key, value) -> clusterService.getNodes().stream()
181 .map(ControllerNode::id)
182 .filter(nodeId -> !nodeId.equals(clusterService.getLocalNode().id()))
183 .collect(Collectors.toList());
184 }
185
186 if (eventExecutor != null) {
187 this.executor = eventExecutor;
188 } else {
189 // should be a normal executor; it's used for receiving messages
190 this.executor =
191 Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d"));
192 }
193
194 if (communicationExecutor != null) {
195 this.communicationExecutor = communicationExecutor;
196 } else {
197 // sending executor; should be capped
198 //TODO this probably doesn't need to be bounded anymore
199 this.communicationExecutor =
200 newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
201 }
202
Jonathan Hartca335e92015-03-05 10:34:32 -0800203 this.persistent = persistent;
204
205 if (this.persistent) {
206 String dataDirectory = System.getProperty("karaf.data", "./data");
207 String filename = dataDirectory + "/" + "mapdb-ecm-" + mapName;
208
209 ExecutorService dbExecutor =
210 newFixedThreadPool(1, groupedThreads("onos/ecm", mapName + "-dbwriter"));
211
212 persistentStore = new MapDbPersistentStore<>(filename, dbExecutor, serializer);
213 persistentStore.readInto(items, removedItems);
214 } else {
215 this.persistentStore = null;
216 }
217
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700218 if (backgroundExecutor != null) {
219 this.backgroundExecutor = backgroundExecutor;
220 } else {
221 this.backgroundExecutor =
222 newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d"));
223 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800224
Jonathan Hartaaa56572015-01-28 21:56:35 -0800225 // start anti-entropy thread
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700226 this.backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
227 initialDelaySec, antiEntropyPeriod,
228 antiEntropyTimeUnit);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800229
Jonathan Hartdb3af892015-01-26 13:19:07 -0800230 updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
231 clusterCommunicator.addSubscriber(updateMessageSubject,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700232 new InternalEventListener(), this.executor);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800233
Jonathan Hartaaa56572015-01-28 21:56:35 -0800234 antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
235 clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700236 new InternalAntiEntropyListener(), this.backgroundExecutor);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800237
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700238 this.tombstonesDisabled = tombstonesDisabled;
239 this.lightweightAntiEntropy = !convergeFaster;
Madan Jampanie1356282015-03-10 19:05:36 -0700240 }
241
Jonathan Hartdb3af892015-01-26 13:19:07 -0800242 private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
243 return new KryoSerializer() {
244 @Override
245 protected void setupKryoPool() {
246 // Add the map's internal helper classes to the user-supplied serializer
247 serializerPool = builder
Madan Jampani3e033bd2015-04-08 13:03:49 -0700248 .register(LogicalTimestamp.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800249 .register(WallClockTimestamp.class)
250 .register(PutEntry.class)
Jonathan Hart539a6462015-01-27 17:05:43 -0800251 .register(RemoveEntry.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800252 .register(ArrayList.class)
Jonathan Hartaaa56572015-01-28 21:56:35 -0800253 .register(AntiEntropyAdvertisement.class)
254 .register(HashMap.class)
Jonathan Hartca335e92015-03-05 10:34:32 -0800255 .register(Timestamped.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800256 .build();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800257 }
258 };
259 }
260
261 @Override
262 public int size() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800263 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800264 return items.size();
265 }
266
267 @Override
268 public boolean isEmpty() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800269 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800270 return items.isEmpty();
271 }
272
273 @Override
274 public boolean containsKey(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800275 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800276 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800277 return items.containsKey(key);
278 }
279
280 @Override
281 public boolean containsValue(V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800282 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800283 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800284
285 return items.values().stream()
286 .anyMatch(timestamped -> timestamped.value().equals(value));
287 }
288
289 @Override
290 public V get(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800291 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800292 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800293
294 Timestamped<V> value = items.get(key);
295 if (value != null) {
296 return value.value();
297 }
298 return null;
299 }
300
301 @Override
302 public void put(K key, V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800303 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800304 checkNotNull(key, ERROR_NULL_KEY);
305 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800306
Madan Jampanibcf1a482015-06-24 19:05:56 -0700307 Timestamp timestamp = timestampProvider.apply(key, value);
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800308
Jonathan Hartdb3af892015-01-26 13:19:07 -0800309 if (putInternal(key, value, timestamp)) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800310 notifyPeers(new PutEntry<>(key, value, timestamp),
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800311 peerUpdateFunction.apply(key, value));
312 notifyListeners(new EventuallyConsistentMapEvent<>(
313 EventuallyConsistentMapEvent.Type.PUT, key, value));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800314 }
315 }
316
317 private boolean putInternal(K key, V value, Timestamp timestamp) {
Jonathan Hart233a18a2015-03-02 17:24:58 -0800318 counter.incrementCount();
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800319 Timestamp removed = removedItems.get(key);
Jonathan Hart403ea932015-02-20 16:23:00 -0800320 if (removed != null && removed.isNewerThan(timestamp)) {
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800321 log.debug("ecmap - removed was newer {}", value);
322 return false;
323 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800324
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800325 final MutableBoolean updated = new MutableBoolean(false);
326
327 items.compute(key, (k, existing) -> {
Jonathan Hart403ea932015-02-20 16:23:00 -0800328 if (existing != null && existing.isNewerThan(timestamp)) {
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800329 updated.setFalse();
330 return existing;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800331 } else {
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800332 updated.setTrue();
333 return new Timestamped<>(value, timestamp);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800334 }
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800335 });
336
337 boolean success = updated.booleanValue();
338 if (!success) {
339 log.debug("ecmap - existing was newer {}", value);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800340 }
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800341
342 if (success && removed != null) {
343 removedItems.remove(key, removed);
344 }
Jonathan Hartca335e92015-03-05 10:34:32 -0800345
346 if (success && persistent) {
347 persistentStore.put(key, value, timestamp);
348 }
349
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800350 return success;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800351 }
352
353 @Override
Madan Jampani43e9c9c2015-06-26 14:16:46 -0700354 public V remove(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800355 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800356 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800357
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800358 // TODO prevent calls here if value is important for timestamp
Madan Jampanibcf1a482015-06-24 19:05:56 -0700359 Timestamp timestamp = timestampProvider.apply(key, null);
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800360
Madan Jampani43e9c9c2015-06-26 14:16:46 -0700361 Optional<V> removedValue = removeInternal(key, timestamp);
362 if (removedValue == null) {
363 return null;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800364 }
Madan Jampani43e9c9c2015-06-26 14:16:46 -0700365 notifyPeers(new RemoveEntry<>(key, timestamp),
366 peerUpdateFunction.apply(key, null));
367 notifyListeners(new EventuallyConsistentMapEvent<>(
368 EventuallyConsistentMapEvent.Type.REMOVE, key, removedValue.orElse(null)));
369
370 return removedValue.orElse(null);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800371 }
372
Madan Jampani43e9c9c2015-06-26 14:16:46 -0700373 /**
374 * Returns null if the timestamp is for a outdated request i.e.
375 * the value is the map is more recent or a tombstone exists with a
376 * more recent timestamp.
377 * Returns non-empty optional if a value was indeed removed from the map.
378 * Returns empty optional if map did not contain a value for the key but the existing
379 * tombstone is older than this timestamp.
380 * @param key key
381 * @param timestamp timestamp for remove request
382 * @return Optional value.
383 */
384 private Optional<V> removeInternal(K key, Timestamp timestamp) {
Madan Jampani54d34992015-03-06 17:27:52 -0800385 if (timestamp == null) {
Madan Jampani43e9c9c2015-06-26 14:16:46 -0700386 return null;
Madan Jampani54d34992015-03-06 17:27:52 -0800387 }
388
Jonathan Hart233a18a2015-03-02 17:24:58 -0800389 counter.incrementCount();
Madan Jampani43e9c9c2015-06-26 14:16:46 -0700390 final AtomicReference<Optional<V>> removedValue = new AtomicReference<>(null);
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800391 items.compute(key, (k, existing) -> {
392 if (existing != null && existing.isNewerThan(timestamp)) {
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800393 return existing;
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800394 } else {
Madan Jampani43e9c9c2015-06-26 14:16:46 -0700395 removedValue.set(existing == null ? Optional.empty() : Optional.of(existing.value()));
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800396 return null;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800397 }
Jonathan Hartca335e92015-03-05 10:34:32 -0800398 });
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800399
Madan Jampani43e9c9c2015-06-26 14:16:46 -0700400 if (isNull(removedValue.get())) {
401 return null;
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800402 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800403
Jonathan Hartca335e92015-03-05 10:34:32 -0800404 boolean updatedTombstone = false;
405
Madan Jampanie1356282015-03-10 19:05:36 -0700406 if (!tombstonesDisabled) {
407 Timestamp removedTimestamp = removedItems.get(key);
408 if (removedTimestamp == null) {
Jonathan Hartca335e92015-03-05 10:34:32 -0800409 //Timestamp removed = removedItems.putIfAbsent(key, timestamp);
410 updatedTombstone = (removedItems.putIfAbsent(key, timestamp) == null);
Madan Jampanie1356282015-03-10 19:05:36 -0700411 } else if (timestamp.isNewerThan(removedTimestamp)) {
Jonathan Hartca335e92015-03-05 10:34:32 -0800412 updatedTombstone = removedItems.replace(key, removedTimestamp, timestamp);
Madan Jampanie1356282015-03-10 19:05:36 -0700413 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800414 }
Madan Jampanie1356282015-03-10 19:05:36 -0700415
Madan Jampani43e9c9c2015-06-26 14:16:46 -0700416 if (persistent) {
Jonathan Hartca335e92015-03-05 10:34:32 -0800417 persistentStore.remove(key, timestamp);
418 }
419
Madan Jampani43e9c9c2015-06-26 14:16:46 -0700420 if (tombstonesDisabled || updatedTombstone) {
421 return removedValue.get();
422 }
423 return null;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800424 }
425
426 @Override
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800427 public void remove(K key, V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800428 checkState(!destroyed, destroyedMessage);
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800429 checkNotNull(key, ERROR_NULL_KEY);
430 checkNotNull(value, ERROR_NULL_VALUE);
431
Madan Jampanibcf1a482015-06-24 19:05:56 -0700432 Timestamp timestamp = timestampProvider.apply(key, value);
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800433
Madan Jampani43e9c9c2015-06-26 14:16:46 -0700434 if (nonNull(removeInternal(key, timestamp))) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800435 notifyPeers(new RemoveEntry<>(key, timestamp),
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800436 peerUpdateFunction.apply(key, value));
437 notifyListeners(new EventuallyConsistentMapEvent<>(
438 EventuallyConsistentMapEvent.Type.REMOVE, key, value));
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800439 }
440 }
441
442 @Override
Jonathan Hartdb3af892015-01-26 13:19:07 -0800443 public void putAll(Map<? extends K, ? extends V> m) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800444 checkState(!destroyed, destroyedMessage);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800445 m.forEach(this::put);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800446 }
447
448 @Override
449 public void clear() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800450 checkState(!destroyed, destroyedMessage);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800451 items.forEach((key, value) -> remove(key));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800452 }
453
454 @Override
455 public Set<K> keySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800456 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800457 return items.keySet();
458 }
459
460 @Override
461 public Collection<V> values() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800462 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800463 return items.values().stream()
464 .map(Timestamped::value)
465 .collect(Collectors.toList());
466 }
467
468 @Override
469 public Set<Map.Entry<K, V>> entrySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800470 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800471
472 return items.entrySet().stream()
Jonathan Hartf9108232015-02-02 16:37:35 -0800473 .map(e -> Pair.of(e.getKey(), e.getValue().value()))
Jonathan Hartdb3af892015-01-26 13:19:07 -0800474 .collect(Collectors.toSet());
475 }
476
477 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800478 public void addListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800479 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800480
481 listeners.add(checkNotNull(listener));
482 }
483
484 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800485 public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800486 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800487
488 listeners.remove(checkNotNull(listener));
489 }
490
491 @Override
492 public void destroy() {
493 destroyed = true;
494
495 executor.shutdown();
496 backgroundExecutor.shutdown();
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800497 communicationExecutor.shutdown();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800498
Jonathan Hart584d2f32015-01-27 19:46:14 -0800499 listeners.clear();
500
Jonathan Hartdb3af892015-01-26 13:19:07 -0800501 clusterCommunicator.removeSubscriber(updateMessageSubject);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800502 clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800503 }
504
Jonathan Hartaaa56572015-01-28 21:56:35 -0800505 private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
506 for (EventuallyConsistentMapListener<K, V> listener : listeners) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800507 listener.event(event);
508 }
509 }
510
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800511 private void notifyPeers(PutEntry<K, V> event, Collection<NodeId> peers) {
512 queueUpdate(event, peers);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800513 }
514
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800515 private void notifyPeers(RemoveEntry<K, V> event, Collection<NodeId> peers) {
516 queueUpdate(event, peers);
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800517 }
518
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800519 private void queueUpdate(AbstractEntry<K, V> event, Collection<NodeId> peers) {
520 if (peers == null) {
521 // we have no friends :(
522 return;
523 }
524 peers.forEach(node ->
525 senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event)
526 );
527 }
528
Jonathan Hart233a18a2015-03-02 17:24:58 -0800529 private boolean underHighLoad() {
530 return counter.get(LOAD_WINDOW) > HIGH_LOAD_THRESHOLD;
531 }
532
Jonathan Hartaaa56572015-01-28 21:56:35 -0800533 private final class SendAdvertisementTask implements Runnable {
534 @Override
535 public void run() {
536 if (Thread.currentThread().isInterrupted()) {
537 log.info("Interrupted, quitting");
538 return;
539 }
540
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700541 if (underHighLoad() || destroyed) {
Jonathan Hart233a18a2015-03-02 17:24:58 -0800542 return;
543 }
544
Jonathan Hartaaa56572015-01-28 21:56:35 -0800545 try {
546 final NodeId self = clusterService.getLocalNode().id();
547 Set<ControllerNode> nodes = clusterService.getNodes();
548
549 List<NodeId> nodeIds = nodes.stream()
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800550 .map(ControllerNode::id)
Jonathan Hartaaa56572015-01-28 21:56:35 -0800551 .collect(Collectors.toList());
552
553 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
554 log.trace("No other peers in the cluster.");
555 return;
556 }
557
558 NodeId peer;
559 do {
560 int idx = RandomUtils.nextInt(0, nodeIds.size());
561 peer = nodeIds.get(idx);
562 } while (peer.equals(self));
563
564 if (Thread.currentThread().isInterrupted()) {
565 log.info("Interrupted, quitting");
566 return;
567 }
568
569 AntiEntropyAdvertisement<K> ad = createAdvertisement();
Madan Jampani175e8fd2015-05-20 14:10:45 -0700570 NodeId destination = peer;
571 clusterCommunicator.unicast(ad, antiEntropyAdvertisementSubject, serializer::encode, peer)
572 .whenComplete((result, error) -> {
573 if (error != null) {
574 log.debug("Failed to send anti-entropy advertisement to {}", destination);
575 }
576 });
Jonathan Hartaaa56572015-01-28 21:56:35 -0800577
Jonathan Hartaaa56572015-01-28 21:56:35 -0800578 } catch (Exception e) {
579 // Catch all exceptions to avoid scheduled task being suppressed.
580 log.error("Exception thrown while sending advertisement", e);
581 }
582 }
583 }
584
585 private AntiEntropyAdvertisement<K> createAdvertisement() {
586 final NodeId self = clusterService.getLocalNode().id();
587
588 Map<K, Timestamp> timestamps = new HashMap<>(items.size());
589
590 items.forEach((key, value) -> timestamps.put(key, value.timestamp()));
591
592 Map<K, Timestamp> tombstones = new HashMap<>(removedItems);
593
594 return new AntiEntropyAdvertisement<>(self, timestamps, tombstones);
595 }
596
597 private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
598 List<EventuallyConsistentMapEvent<K, V>> externalEvents;
599
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800600 externalEvents = antiEntropyCheckLocalItems(ad);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800601
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800602 antiEntropyCheckLocalRemoved(ad);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800603
Jonathan Hartf893be82015-02-24 17:35:51 -0800604 if (!lightweightAntiEntropy) {
605 externalEvents.addAll(antiEntropyCheckRemoteRemoved(ad));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800606
Jonathan Hartf893be82015-02-24 17:35:51 -0800607 // if remote ad has something unknown, actively sync
608 for (K key : ad.timestamps().keySet()) {
609 if (!items.containsKey(key)) {
610 // Send the advertisement back if this peer is out-of-sync
611 final NodeId sender = ad.sender();
612 AntiEntropyAdvertisement<K> myAd = createAdvertisement();
Madan Jampani175e8fd2015-05-20 14:10:45 -0700613
614 clusterCommunicator.unicast(myAd, antiEntropyAdvertisementSubject, serializer::encode, sender)
615 .whenComplete((result, error) -> {
616 if (error != null) {
617 log.debug("Failed to send reactive "
618 + "anti-entropy advertisement to {}", sender);
619 }
620 });
Jonathan Hartf893be82015-02-24 17:35:51 -0800621 break;
622 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800623 }
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800624 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800625 externalEvents.forEach(this::notifyListeners);
626 }
627
628 /**
629 * Checks if any of the remote's live items or tombstones are out of date
630 * according to our local live item list, or if our live items are out of
631 * date according to the remote's tombstone list.
632 * If the local copy is more recent, it will be pushed to the remote. If the
633 * remote has a more recent remove, we apply that to the local state.
634 *
635 * @param ad remote anti-entropy advertisement
636 * @return list of external events relating to local operations performed
637 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800638 private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
639 AntiEntropyAdvertisement<K> ad) {
640 final List<EventuallyConsistentMapEvent<K, V>> externalEvents
641 = new LinkedList<>();
642 final NodeId sender = ad.sender();
643
Jonathan Hartaaa56572015-01-28 21:56:35 -0800644 for (Map.Entry<K, Timestamped<V>> item : items.entrySet()) {
645 K key = item.getKey();
646 Timestamped<V> localValue = item.getValue();
647
648 Timestamp remoteTimestamp = ad.timestamps().get(key);
649 if (remoteTimestamp == null) {
650 remoteTimestamp = ad.tombstones().get(key);
651 }
652 if (remoteTimestamp == null || localValue
Jonathan Hart403ea932015-02-20 16:23:00 -0800653 .isNewerThan(remoteTimestamp)) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800654 // local value is more recent, push to sender
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800655 queueUpdate(new PutEntry<>(key, localValue.value(),
656 localValue.timestamp()), ImmutableList.of(sender));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800657 }
658
659 Timestamp remoteDeadTimestamp = ad.tombstones().get(key);
660 if (remoteDeadTimestamp != null &&
Jonathan Hart403ea932015-02-20 16:23:00 -0800661 remoteDeadTimestamp.isNewerThan(localValue.timestamp())) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800662 // sender has a more recent remove
Madan Jampani43e9c9c2015-06-26 14:16:46 -0700663 if (nonNull(removeInternal(key, remoteDeadTimestamp))) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800664 externalEvents.add(new EventuallyConsistentMapEvent<>(
665 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
666 }
667 }
668 }
669
Jonathan Hartaaa56572015-01-28 21:56:35 -0800670 return externalEvents;
671 }
672
673 /**
674 * Checks if any items in the remote live list are out of date according
675 * to our tombstone list. If we find we have a more up to date tombstone,
676 * we'll send it to the remote.
677 *
678 * @param ad remote anti-entropy advertisement
679 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800680 private void antiEntropyCheckLocalRemoved(AntiEntropyAdvertisement<K> ad) {
681 final NodeId sender = ad.sender();
682
Jonathan Hartaaa56572015-01-28 21:56:35 -0800683 for (Map.Entry<K, Timestamp> dead : removedItems.entrySet()) {
684 K key = dead.getKey();
685 Timestamp localDeadTimestamp = dead.getValue();
686
687 Timestamp remoteLiveTimestamp = ad.timestamps().get(key);
688 if (remoteLiveTimestamp != null
Jonathan Hart403ea932015-02-20 16:23:00 -0800689 && localDeadTimestamp.isNewerThan(remoteLiveTimestamp)) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800690 // sender has zombie, push remove
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800691 queueUpdate(new RemoveEntry<>(key, localDeadTimestamp), ImmutableList.of(sender));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800692 }
693 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800694 }
695
696 /**
697 * Checks if any of the local live items are out of date according to the
698 * remote's tombstone advertisements. If we find a local item is out of date,
699 * we'll apply the remove operation to the local state.
700 *
701 * @param ad remote anti-entropy advertisement
702 * @return list of external events relating to local operations performed
703 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800704 private List<EventuallyConsistentMapEvent<K, V>>
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800705 antiEntropyCheckRemoteRemoved(AntiEntropyAdvertisement<K> ad) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800706 final List<EventuallyConsistentMapEvent<K, V>> externalEvents
707 = new LinkedList<>();
708
709 for (Map.Entry<K, Timestamp> remoteDead : ad.tombstones().entrySet()) {
710 K key = remoteDead.getKey();
711 Timestamp remoteDeadTimestamp = remoteDead.getValue();
712
713 Timestamped<V> local = items.get(key);
714 Timestamp localDead = removedItems.get(key);
Jonathan Hart403ea932015-02-20 16:23:00 -0800715 if (local != null && remoteDeadTimestamp.isNewerThan(
716 local.timestamp())) {
717 // If the remote has a more recent tombstone than either our local
718 // value, then do a remove with their timestamp
Madan Jampani43e9c9c2015-06-26 14:16:46 -0700719 if (nonNull(removeInternal(key, remoteDeadTimestamp))) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800720 externalEvents.add(new EventuallyConsistentMapEvent<>(
721 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
722 }
Jonathan Hart403ea932015-02-20 16:23:00 -0800723 } else if (localDead != null && remoteDeadTimestamp.isNewerThan(
724 localDead)) {
725 // If the remote has a more recent tombstone than us, update ours
726 // to their timestamp
727 removeInternal(key, remoteDeadTimestamp);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800728 }
729 }
730
731 return externalEvents;
732 }
733
734 private final class InternalAntiEntropyListener
735 implements ClusterMessageHandler {
736
737 @Override
738 public void handle(ClusterMessage message) {
Jonathan Hart4fd4ebb2015-02-04 17:38:48 -0800739 log.trace("Received anti-entropy advertisement from peer: {}",
740 message.sender());
Jonathan Hartaaa56572015-01-28 21:56:35 -0800741 AntiEntropyAdvertisement<K> advertisement = serializer.decode(message.payload());
Madan Jampani2af244a2015-02-22 13:12:01 -0800742 try {
Jonathan Hart233a18a2015-03-02 17:24:58 -0800743 if (!underHighLoad()) {
744 handleAntiEntropyAdvertisement(advertisement);
745 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800746 } catch (Exception e) {
747 log.warn("Exception thrown handling advertisements", e);
748 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800749 }
750 }
751
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700752 private final class InternalEventListener implements ClusterMessageHandler {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800753 @Override
754 public void handle(ClusterMessage message) {
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700755 if (destroyed) {
756 return;
757 }
758
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800759 log.debug("Received update event from peer: {}", message.sender());
760 Collection<AbstractEntry<K, V>> events = serializer.decode(message.payload());
Jonathan Hartdb3af892015-01-26 13:19:07 -0800761
Madan Jampani2af244a2015-02-22 13:12:01 -0800762 try {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800763 // TODO clean this for loop up
764 for (AbstractEntry<K, V> entry : events) {
765 final K key = entry.key();
Madan Jampani43e9c9c2015-06-26 14:16:46 -0700766 V value;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800767 final Timestamp timestamp = entry.timestamp();
768 final EventuallyConsistentMapEvent.Type type;
769 if (entry instanceof PutEntry) {
770 PutEntry<K, V> putEntry = (PutEntry<K, V>) entry;
771 value = putEntry.value();
772 type = EventuallyConsistentMapEvent.Type.PUT;
773 } else if (entry instanceof RemoveEntry) {
774 type = EventuallyConsistentMapEvent.Type.REMOVE;
775 value = null;
776 } else {
777 throw new IllegalStateException("Unknown entry type " + entry.getClass());
778 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800779
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800780 boolean success;
781 switch (type) {
782 case PUT:
783 success = putInternal(key, value, timestamp);
784 break;
785 case REMOVE:
Madan Jampani43e9c9c2015-06-26 14:16:46 -0700786 Optional<V> removedValue = removeInternal(key, timestamp);
787 success = removedValue != null;
788 if (success) {
789 value = removedValue.orElse(null);
790 }
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800791 break;
792 default:
793 success = false;
794 }
795 if (success) {
796 notifyListeners(new EventuallyConsistentMapEvent<>(type, key, value));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800797 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800798 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800799 } catch (Exception e) {
800 log.warn("Exception thrown handling put", e);
801 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800802 }
803 }
804
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800805 // TODO pull this into the class if this gets pulled out...
806 private static final int DEFAULT_MAX_EVENTS = 1000;
807 private static final int DEFAULT_MAX_IDLE_MS = 10;
808 private static final int DEFAULT_MAX_BATCH_MS = 50;
809 private static final Timer TIMER = new Timer("onos-ecm-sender-events");
Jonathan Hartdb3af892015-01-26 13:19:07 -0800810
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800811 private final class EventAccumulator extends AbstractAccumulator<AbstractEntry<K, V>> {
812
813 private final NodeId peer;
814
815 private EventAccumulator(NodeId peer) {
816 super(TIMER, DEFAULT_MAX_EVENTS, DEFAULT_MAX_BATCH_MS, DEFAULT_MAX_IDLE_MS);
817 this.peer = peer;
818 }
819
820 @Override
821 public void processItems(List<AbstractEntry<K, V>> items) {
822 Map<K, AbstractEntry<K, V>> map = Maps.newHashMap();
823 items.forEach(item -> map.compute(item.key(), (key, oldValue) ->
824 oldValue == null || item.compareTo(oldValue) > 0 ? item : oldValue
825 )
826 );
827 communicationExecutor.submit(() -> {
Madan Jampani175e8fd2015-05-20 14:10:45 -0700828 clusterCommunicator.unicast(Lists.newArrayList(map.values()),
829 updateMessageSubject,
830 serializer::encode,
831 peer)
832 .whenComplete((result, error) -> {
833 if (error != null) {
834 log.debug("Failed to send to {}", peer);
835 }
836 });
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800837 });
Jonathan Hartdb3af892015-01-26 13:19:07 -0800838 }
839 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800840}