blob: bee4dc24c12bed47819aad6596b331617e221bd0 [file] [log] [blame]
Jonathan Hartdb3af892015-01-26 13:19:07 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Madan Jampanif4c88502016-01-21 12:35:36 -080016package org.onosproject.store.primitives.impl;
Jonathan Hartdb3af892015-01-26 13:19:07 -080017
Madan Jampanif4c88502016-01-21 12:35:36 -080018import static com.google.common.base.Preconditions.checkNotNull;
19import static com.google.common.base.Preconditions.checkState;
20import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
21import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
22import static org.onlab.util.Tools.groupedThreads;
23import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
24import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
Jonathan Hartdb3af892015-01-26 13:19:07 -080025
Jonathan Hartdb3af892015-01-26 13:19:07 -080026import java.util.Collection;
Madan Jampani3d76c942015-06-29 23:37:10 -070027import java.util.Collections;
Jonathan Hartdb3af892015-01-26 13:19:07 -080028import java.util.List;
29import java.util.Map;
Madan Jampanid13f3b82015-07-01 17:37:50 -070030import java.util.Objects;
Madan Jampani43e9c9c2015-06-26 14:16:46 -070031import java.util.Optional;
Jonathan Hartdb3af892015-01-26 13:19:07 -080032import java.util.Set;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080033import java.util.Timer;
Jonathan Hartdb3af892015-01-26 13:19:07 -080034import java.util.concurrent.ExecutorService;
35import java.util.concurrent.Executors;
36import java.util.concurrent.ScheduledExecutorService;
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -080037import java.util.concurrent.TimeUnit;
Madan Jampani3d76c942015-06-29 23:37:10 -070038import java.util.concurrent.atomic.AtomicBoolean;
Madan Jampani43e9c9c2015-06-26 14:16:46 -070039import java.util.concurrent.atomic.AtomicReference;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080040import java.util.function.BiFunction;
Jonathan Hartdb3af892015-01-26 13:19:07 -080041import java.util.stream.Collectors;
42
Madan Jampanif4c88502016-01-21 12:35:36 -080043import org.apache.commons.lang3.tuple.Pair;
44import org.onlab.util.AbstractAccumulator;
45import org.onlab.util.KryoNamespace;
46import org.onlab.util.SlidingWindowCounter;
47import org.onosproject.cluster.ClusterService;
48import org.onosproject.cluster.ControllerNode;
49import org.onosproject.cluster.NodeId;
50import org.onosproject.persistence.PersistenceService;
51import org.onosproject.store.LogicalTimestamp;
52import org.onosproject.store.Timestamp;
53import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
54import org.onosproject.store.cluster.messaging.MessageSubject;
55import org.onosproject.store.serializers.KryoNamespaces;
56import org.onosproject.store.serializers.KryoSerializer;
57import org.onosproject.store.service.EventuallyConsistentMap;
58import org.onosproject.store.service.EventuallyConsistentMapEvent;
59import org.onosproject.store.service.EventuallyConsistentMapListener;
60import org.onosproject.store.service.Serializer;
61import org.onosproject.store.service.WallClockTimestamp;
62import org.slf4j.Logger;
63import org.slf4j.LoggerFactory;
64
65import com.google.common.collect.Collections2;
66import com.google.common.collect.ImmutableList;
67import com.google.common.collect.ImmutableMap;
68import com.google.common.collect.Lists;
69import com.google.common.collect.Maps;
70import com.google.common.collect.Sets;
Jonathan Hartdb3af892015-01-26 13:19:07 -080071
72/**
73 * Distributed Map implementation which uses optimistic replication and gossip
74 * based techniques to provide an eventually consistent data store.
75 */
76public class EventuallyConsistentMapImpl<K, V>
77 implements EventuallyConsistentMap<K, V> {
78
79 private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
80
Madan Jampani3d76c942015-06-29 23:37:10 -070081 private final Map<K, MapValue<V>> items;
Jonathan Hartdb3af892015-01-26 13:19:07 -080082
Jonathan Hartdb3af892015-01-26 13:19:07 -080083 private final ClusterService clusterService;
84 private final ClusterCommunicationService clusterCommunicator;
85 private final KryoSerializer serializer;
Madan Jampani3d76c942015-06-29 23:37:10 -070086 private final NodeId localNodeId;
Aaron Kruglikov66cf0b92015-10-26 15:46:54 -070087 private final PersistenceService persistenceService;
Jonathan Hartdb3af892015-01-26 13:19:07 -080088
Madan Jampanibcf1a482015-06-24 19:05:56 -070089 private final BiFunction<K, V, Timestamp> timestampProvider;
Jonathan Hartdb3af892015-01-26 13:19:07 -080090
91 private final MessageSubject updateMessageSubject;
Jonathan Hartaaa56572015-01-28 21:56:35 -080092 private final MessageSubject antiEntropyAdvertisementSubject;
Jonathan Hartdb3af892015-01-26 13:19:07 -080093
Jonathan Hartaaa56572015-01-28 21:56:35 -080094 private final Set<EventuallyConsistentMapListener<K, V>> listeners
Madan Jampani3d76c942015-06-29 23:37:10 -070095 = Sets.newCopyOnWriteArraySet();
Jonathan Hartdb3af892015-01-26 13:19:07 -080096
97 private final ExecutorService executor;
Jonathan Hartdb3af892015-01-26 13:19:07 -080098 private final ScheduledExecutorService backgroundExecutor;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080099 private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800100
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700101 private final ExecutorService communicationExecutor;
102 private final Map<NodeId, EventAccumulator> senderPending;
Madan Jampani28726282015-02-19 11:40:23 -0800103
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700104 private final String mapName;
105
Jonathan Hartdb3af892015-01-26 13:19:07 -0800106 private volatile boolean destroyed = false;
Jonathan Hart539a6462015-01-27 17:05:43 -0800107 private static final String ERROR_DESTROYED = " map is already destroyed";
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800108 private final String destroyedMessage;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800109
Jonathan Hart4f397e82015-02-04 09:10:41 -0800110 private static final String ERROR_NULL_KEY = "Key cannot be null";
111 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
112
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700113 private final long initialDelaySec = 5;
114 private final boolean lightweightAntiEntropy;
115 private final boolean tombstonesDisabled;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800116
Jonathan Hart233a18a2015-03-02 17:24:58 -0800117 private static final int WINDOW_SIZE = 5;
118 private static final int HIGH_LOAD_THRESHOLD = 0;
119 private static final int LOAD_WINDOW = 2;
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700120 private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
Jonathan Hart233a18a2015-03-02 17:24:58 -0800121
Jonathan Hartca335e92015-03-05 10:34:32 -0800122 private final boolean persistent;
Aaron Kruglikov66cf0b92015-10-26 15:46:54 -0700123
124 private static final String PERSISTENT_LOCAL_MAP_NAME = "itemsMap";
125
Jonathan Hartca335e92015-03-05 10:34:32 -0800126
Jonathan Hartdb3af892015-01-26 13:19:07 -0800127 /**
128 * Creates a new eventually consistent map shared amongst multiple instances.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800129 * <p>
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700130 * See {@link org.onosproject.store.service.EventuallyConsistentMapBuilder}
131 * for more description of the parameters expected by the map.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800132 * </p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800133 *
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700134 * @param mapName a String identifier for the map.
135 * @param clusterService the cluster service
136 * @param clusterCommunicator the cluster communications service
137 * @param serializerBuilder a Kryo namespace builder that can serialize
138 * both K and V
Madan Jampanibcf1a482015-06-24 19:05:56 -0700139 * @param timestampProvider provider of timestamps for K and V
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700140 * @param peerUpdateFunction function that provides a set of nodes to immediately
141 * update to when there writes to the map
142 * @param eventExecutor executor to use for processing incoming
143 * events from peers
144 * @param communicationExecutor executor to use for sending events to peers
145 * @param backgroundExecutor executor to use for background anti-entropy
146 * tasks
147 * @param tombstonesDisabled true if this map should not maintain
148 * tombstones
149 * @param antiEntropyPeriod period that the anti-entropy task should run
Jonathan Hartca335e92015-03-05 10:34:32 -0800150 * @param antiEntropyTimeUnit time unit for anti-entropy period
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700151 * @param convergeFaster make anti-entropy try to converge faster
Jonathan Hartca335e92015-03-05 10:34:32 -0800152 * @param persistent persist data to disk
Jian Lidfba7392016-01-22 16:46:58 -0800153 * @param persistenceService persistence service
Jonathan Hartdb3af892015-01-26 13:19:07 -0800154 */
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700155 EventuallyConsistentMapImpl(String mapName,
156 ClusterService clusterService,
157 ClusterCommunicationService clusterCommunicator,
158 KryoNamespace.Builder serializerBuilder,
Madan Jampanibcf1a482015-06-24 19:05:56 -0700159 BiFunction<K, V, Timestamp> timestampProvider,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700160 BiFunction<K, V, Collection<NodeId>> peerUpdateFunction,
161 ExecutorService eventExecutor,
162 ExecutorService communicationExecutor,
163 ScheduledExecutorService backgroundExecutor,
164 boolean tombstonesDisabled,
165 long antiEntropyPeriod,
166 TimeUnit antiEntropyTimeUnit,
Jonathan Hartca335e92015-03-05 10:34:32 -0800167 boolean convergeFaster,
Aaron Kruglikov66cf0b92015-10-26 15:46:54 -0700168 boolean persistent,
169 PersistenceService persistenceService) {
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700170 this.mapName = mapName;
Aaron Kruglikov66cf0b92015-10-26 15:46:54 -0700171 this.serializer = createSerializer(serializerBuilder);
172 this.persistenceService = persistenceService;
173 this.persistent =
174 persistent;
175 if (persistent) {
176 items = this.persistenceService.<K, MapValue<V>>persistentMapBuilder()
177 .withName(PERSISTENT_LOCAL_MAP_NAME)
178 .withSerializer(new Serializer() {
179
180 @Override
181 public <T> byte[] encode(T object) {
182 return EventuallyConsistentMapImpl.this.serializer.encode(object);
183 }
184
185 @Override
186 public <T> T decode(byte[] bytes) {
187 return EventuallyConsistentMapImpl.this.serializer.decode(bytes);
188 }
189 })
190 .build();
191 } else {
192 items = Maps.newConcurrentMap();
193 }
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800194 senderPending = Maps.newConcurrentMap();
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700195 destroyedMessage = mapName + ERROR_DESTROYED;
Madan Jampani28726282015-02-19 11:40:23 -0800196
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700197 this.clusterService = clusterService;
198 this.clusterCommunicator = clusterCommunicator;
Madan Jampani3d76c942015-06-29 23:37:10 -0700199 this.localNodeId = clusterService.getLocalNode().id();
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700200
Madan Jampanibcf1a482015-06-24 19:05:56 -0700201 this.timestampProvider = timestampProvider;
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700202
203 if (peerUpdateFunction != null) {
204 this.peerUpdateFunction = peerUpdateFunction;
205 } else {
206 this.peerUpdateFunction = (key, value) -> clusterService.getNodes().stream()
207 .map(ControllerNode::id)
Madan Jampani3d76c942015-06-29 23:37:10 -0700208 .filter(nodeId -> !nodeId.equals(localNodeId))
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700209 .collect(Collectors.toList());
210 }
211
212 if (eventExecutor != null) {
213 this.executor = eventExecutor;
214 } else {
215 // should be a normal executor; it's used for receiving messages
216 this.executor =
217 Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d"));
218 }
219
220 if (communicationExecutor != null) {
221 this.communicationExecutor = communicationExecutor;
222 } else {
223 // sending executor; should be capped
224 //TODO this probably doesn't need to be bounded anymore
225 this.communicationExecutor =
226 newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
227 }
228
Jonathan Hartca335e92015-03-05 10:34:32 -0800229
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700230 if (backgroundExecutor != null) {
231 this.backgroundExecutor = backgroundExecutor;
232 } else {
233 this.backgroundExecutor =
234 newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d"));
235 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800236
Jonathan Hartaaa56572015-01-28 21:56:35 -0800237 // start anti-entropy thread
Madan Jampani3d76c942015-06-29 23:37:10 -0700238 this.backgroundExecutor.scheduleAtFixedRate(this::sendAdvertisement,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700239 initialDelaySec, antiEntropyPeriod,
240 antiEntropyTimeUnit);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800241
Jonathan Hartdb3af892015-01-26 13:19:07 -0800242 updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
243 clusterCommunicator.addSubscriber(updateMessageSubject,
Madan Jampani3d76c942015-06-29 23:37:10 -0700244 serializer::decode,
245 this::processUpdates,
246 this.executor);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800247
Jonathan Hartaaa56572015-01-28 21:56:35 -0800248 antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
249 clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
Madan Jampani3d76c942015-06-29 23:37:10 -0700250 serializer::decode,
251 this::handleAntiEntropyAdvertisement,
252 this.backgroundExecutor);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800253
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700254 this.tombstonesDisabled = tombstonesDisabled;
255 this.lightweightAntiEntropy = !convergeFaster;
Madan Jampanie1356282015-03-10 19:05:36 -0700256 }
257
Jonathan Hartdb3af892015-01-26 13:19:07 -0800258 private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
259 return new KryoSerializer() {
260 @Override
261 protected void setupKryoPool() {
262 // Add the map's internal helper classes to the user-supplied serializer
263 serializerPool = builder
Madan Jampani3d76c942015-06-29 23:37:10 -0700264 .register(KryoNamespaces.BASIC)
Madan Jampanidb5d06a2015-06-30 11:16:48 -0700265 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
Madan Jampani3e033bd2015-04-08 13:03:49 -0700266 .register(LogicalTimestamp.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800267 .register(WallClockTimestamp.class)
Jonathan Hartaaa56572015-01-28 21:56:35 -0800268 .register(AntiEntropyAdvertisement.class)
Madan Jampani3d76c942015-06-29 23:37:10 -0700269 .register(UpdateEntry.class)
270 .register(MapValue.class)
271 .register(MapValue.Digest.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800272 .build();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800273 }
274 };
275 }
276
277 @Override
Madan Jampania090a112016-01-18 16:38:17 -0800278 public String name() {
279 return mapName;
280 }
281
282 @Override
Jonathan Hartdb3af892015-01-26 13:19:07 -0800283 public int size() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800284 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700285 // TODO: Maintain a separate counter for tracking live elements in map.
286 return Maps.filterValues(items, MapValue::isAlive).size();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800287 }
288
289 @Override
290 public boolean isEmpty() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800291 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700292 return size() == 0;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800293 }
294
295 @Override
296 public boolean containsKey(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800297 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800298 checkNotNull(key, ERROR_NULL_KEY);
Madan Jampani3d76c942015-06-29 23:37:10 -0700299 return get(key) != null;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800300 }
301
302 @Override
303 public boolean containsValue(V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800304 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800305 checkNotNull(value, ERROR_NULL_VALUE);
Madan Jampani3d76c942015-06-29 23:37:10 -0700306 return items.values()
307 .stream()
308 .filter(MapValue::isAlive)
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700309 .anyMatch(v -> value.equals(v.get()));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800310 }
311
312 @Override
313 public V get(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800314 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800315 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800316
Madan Jampani3d76c942015-06-29 23:37:10 -0700317 MapValue<V> value = items.get(key);
318 return (value == null || value.isTombstone()) ? null : value.get();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800319 }
320
321 @Override
322 public void put(K key, V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800323 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800324 checkNotNull(key, ERROR_NULL_KEY);
325 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800326
Madan Jampani3d76c942015-06-29 23:37:10 -0700327 MapValue<V> newValue = new MapValue<>(value, timestampProvider.apply(key, value));
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700328 if (putInternal(key, newValue)) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700329 notifyPeers(new UpdateEntry<>(key, newValue), peerUpdateFunction.apply(key, value));
Madan Jampanicab114c2015-07-23 00:14:19 -0700330 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, PUT, key, value));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800331 }
332 }
333
Jonathan Hartdb3af892015-01-26 13:19:07 -0800334 @Override
Madan Jampani43e9c9c2015-06-26 14:16:46 -0700335 public V remove(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800336 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800337 checkNotNull(key, ERROR_NULL_KEY);
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700338 return removeAndNotify(key, null);
339 }
340
341 @Override
342 public void remove(K key, V value) {
343 checkState(!destroyed, destroyedMessage);
344 checkNotNull(key, ERROR_NULL_KEY);
345 checkNotNull(value, ERROR_NULL_VALUE);
346 removeAndNotify(key, value);
347 }
348
349 private V removeAndNotify(K key, V value) {
Madan Jampani483d0a22015-08-19 17:33:00 -0700350 Timestamp timestamp = timestampProvider.apply(key, value);
351 Optional<MapValue<V>> tombstone = tombstonesDisabled || timestamp == null
352 ? Optional.empty() : Optional.of(MapValue.tombstone(timestamp));
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700353 MapValue<V> previousValue = removeInternal(key, Optional.ofNullable(value), tombstone);
Madan Jampania0ac4872015-07-02 11:23:49 -0700354 if (previousValue != null) {
Madan Jampani483d0a22015-08-19 17:33:00 -0700355 notifyPeers(new UpdateEntry<>(key, tombstone.orElse(null)),
356 peerUpdateFunction.apply(key, previousValue.get()));
Madan Jampania0ac4872015-07-02 11:23:49 -0700357 if (previousValue.isAlive()) {
Madan Jampanicab114c2015-07-23 00:14:19 -0700358 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
Madan Jampania0ac4872015-07-02 11:23:49 -0700359 }
360 }
Madan Jampanid13f3b82015-07-01 17:37:50 -0700361 return previousValue != null ? previousValue.get() : null;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800362 }
363
Madan Jampani483d0a22015-08-19 17:33:00 -0700364 private MapValue<V> removeInternal(K key, Optional<V> value, Optional<MapValue<V>> tombstone) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700365 checkState(!destroyed, destroyedMessage);
366 checkNotNull(key, ERROR_NULL_KEY);
367 checkNotNull(value, ERROR_NULL_VALUE);
Madan Jampani483d0a22015-08-19 17:33:00 -0700368 tombstone.ifPresent(v -> checkState(v.isTombstone()));
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700369
370 counter.incrementCount();
Madan Jampani3d76c942015-06-29 23:37:10 -0700371 AtomicBoolean updated = new AtomicBoolean(false);
Madan Jampanid13f3b82015-07-01 17:37:50 -0700372 AtomicReference<MapValue<V>> previousValue = new AtomicReference<>();
Madan Jampani3d76c942015-06-29 23:37:10 -0700373 items.compute(key, (k, existing) -> {
Madan Jampanid13f3b82015-07-01 17:37:50 -0700374 boolean valueMatches = true;
375 if (value.isPresent() && existing != null && existing.isAlive()) {
376 valueMatches = Objects.equals(value.get(), existing.get());
Madan Jampani3d76c942015-06-29 23:37:10 -0700377 }
Brian O'Connor6325dad2015-07-07 15:36:29 -0700378 if (existing == null) {
Jonathan Hart4a29c592015-09-23 17:55:07 -0700379 log.trace("ECMap Remove: Existing value for key {} is already null", k);
Brian O'Connor6325dad2015-07-07 15:36:29 -0700380 }
Madan Jampani483d0a22015-08-19 17:33:00 -0700381 if (valueMatches) {
382 if (existing == null) {
383 updated.set(tombstone.isPresent());
384 } else {
385 updated.set(!tombstone.isPresent() || tombstone.get().isNewerThan(existing));
386 }
Madan Jampanid13f3b82015-07-01 17:37:50 -0700387 }
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700388 if (updated.get()) {
Madan Jampani483d0a22015-08-19 17:33:00 -0700389 previousValue.set(existing);
390 return tombstone.orElse(null);
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700391 } else {
392 return existing;
393 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700394 });
Madan Jampanid13f3b82015-07-01 17:37:50 -0700395 return previousValue.get();
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800396 }
397
398 @Override
Madan Jampani4727a112015-07-16 12:12:58 -0700399 public V compute(K key, BiFunction<K, V, V> recomputeFunction) {
400 checkState(!destroyed, destroyedMessage);
401 checkNotNull(key, ERROR_NULL_KEY);
402 checkNotNull(recomputeFunction, "Recompute function cannot be null");
403
404 AtomicBoolean updated = new AtomicBoolean(false);
405 AtomicReference<MapValue<V>> previousValue = new AtomicReference<>();
406 MapValue<V> computedValue = items.compute(key, (k, mv) -> {
407 previousValue.set(mv);
408 V newRawValue = recomputeFunction.apply(key, mv == null ? null : mv.get());
409 MapValue<V> newValue = new MapValue<>(newRawValue, timestampProvider.apply(key, newRawValue));
410 if (mv == null || newValue.isNewerThan(mv)) {
411 updated.set(true);
412 return newValue;
413 } else {
414 return mv;
415 }
416 });
417 if (updated.get()) {
418 notifyPeers(new UpdateEntry<>(key, computedValue), peerUpdateFunction.apply(key, computedValue.get()));
419 EventuallyConsistentMapEvent.Type updateType = computedValue.isTombstone() ? REMOVE : PUT;
420 V value = computedValue.isTombstone()
421 ? previousValue.get() == null ? null : previousValue.get().get()
422 : computedValue.get();
423 if (value != null) {
Madan Jampanicab114c2015-07-23 00:14:19 -0700424 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, updateType, key, value));
Madan Jampani4727a112015-07-16 12:12:58 -0700425 }
426 }
427 return computedValue.get();
428 }
429
430 @Override
Jonathan Hartdb3af892015-01-26 13:19:07 -0800431 public void putAll(Map<? extends K, ? extends V> m) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800432 checkState(!destroyed, destroyedMessage);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800433 m.forEach(this::put);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800434 }
435
436 @Override
437 public void clear() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800438 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700439 Maps.filterValues(items, MapValue::isAlive)
440 .forEach((k, v) -> remove(k));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800441 }
442
443 @Override
444 public Set<K> keySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800445 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700446 return Maps.filterValues(items, MapValue::isAlive)
447 .keySet();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800448 }
449
450 @Override
451 public Collection<V> values() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800452 checkState(!destroyed, destroyedMessage);
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700453 return Collections2.transform(Maps.filterValues(items, MapValue::isAlive).values(), MapValue::get);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800454 }
455
456 @Override
457 public Set<Map.Entry<K, V>> entrySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800458 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700459 return Maps.filterValues(items, MapValue::isAlive)
460 .entrySet()
461 .stream()
462 .map(e -> Pair.of(e.getKey(), e.getValue().get()))
463 .collect(Collectors.toSet());
464 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800465
Madan Jampani3d76c942015-06-29 23:37:10 -0700466 /**
467 * Returns true if newValue was accepted i.e. map is updated.
Aaron Kruglikov66cf0b92015-10-26 15:46:54 -0700468 *
Madan Jampani3d76c942015-06-29 23:37:10 -0700469 * @param key key
470 * @param newValue proposed new value
471 * @return true if update happened; false if map already contains a more recent value for the key
472 */
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700473 private boolean putInternal(K key, MapValue<V> newValue) {
474 checkState(!destroyed, destroyedMessage);
475 checkNotNull(key, ERROR_NULL_KEY);
476 checkNotNull(newValue, ERROR_NULL_VALUE);
477 checkState(newValue.isAlive());
478 counter.incrementCount();
Madan Jampani3d76c942015-06-29 23:37:10 -0700479 AtomicBoolean updated = new AtomicBoolean(false);
480 items.compute(key, (k, existing) -> {
481 if (existing == null || newValue.isNewerThan(existing)) {
482 updated.set(true);
Madan Jampani3d76c942015-06-29 23:37:10 -0700483 return newValue;
484 }
485 return existing;
486 });
Madan Jampani3d76c942015-06-29 23:37:10 -0700487 return updated.get();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800488 }
489
490 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800491 public void addListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800492 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800493
494 listeners.add(checkNotNull(listener));
495 }
496
497 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800498 public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800499 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800500
501 listeners.remove(checkNotNull(listener));
502 }
503
504 @Override
505 public void destroy() {
506 destroyed = true;
507
508 executor.shutdown();
509 backgroundExecutor.shutdown();
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800510 communicationExecutor.shutdown();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800511
Jonathan Hart584d2f32015-01-27 19:46:14 -0800512 listeners.clear();
513
Jonathan Hartdb3af892015-01-26 13:19:07 -0800514 clusterCommunicator.removeSubscriber(updateMessageSubject);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800515 clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800516 }
517
Jonathan Hartaaa56572015-01-28 21:56:35 -0800518 private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700519 listeners.forEach(listener -> listener.event(event));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800520 }
521
Madan Jampani3d76c942015-06-29 23:37:10 -0700522 private void notifyPeers(UpdateEntry<K, V> event, Collection<NodeId> peers) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800523 queueUpdate(event, peers);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800524 }
525
Madan Jampani3d76c942015-06-29 23:37:10 -0700526 private void queueUpdate(UpdateEntry<K, V> event, Collection<NodeId> peers) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800527 if (peers == null) {
528 // we have no friends :(
529 return;
530 }
531 peers.forEach(node ->
Jonathan Hart9a426f82015-09-03 15:43:13 +0200532 senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event)
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800533 );
534 }
535
Jonathan Hart233a18a2015-03-02 17:24:58 -0800536 private boolean underHighLoad() {
537 return counter.get(LOAD_WINDOW) > HIGH_LOAD_THRESHOLD;
538 }
539
Madan Jampani3d76c942015-06-29 23:37:10 -0700540 private void sendAdvertisement() {
541 try {
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700542 if (underHighLoad() || destroyed) {
Jonathan Hart233a18a2015-03-02 17:24:58 -0800543 return;
544 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700545 pickRandomActivePeer().ifPresent(this::sendAdvertisementToPeer);
546 } catch (Exception e) {
547 // Catch all exceptions to avoid scheduled task being suppressed.
548 log.error("Exception thrown while sending advertisement", e);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800549 }
550 }
551
Madan Jampani3d76c942015-06-29 23:37:10 -0700552 private Optional<NodeId> pickRandomActivePeer() {
553 List<NodeId> activePeers = clusterService.getNodes()
554 .stream()
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700555 .map(ControllerNode::id)
556 .filter(id -> !localNodeId.equals(id))
Madan Jampani3d76c942015-06-29 23:37:10 -0700557 .filter(id -> clusterService.getState(id) == ControllerNode.State.ACTIVE)
558 .collect(Collectors.toList());
559 Collections.shuffle(activePeers);
560 return activePeers.isEmpty() ? Optional.empty() : Optional.of(activePeers.get(0));
561 }
562
563 private void sendAdvertisementToPeer(NodeId peer) {
564 clusterCommunicator.unicast(createAdvertisement(),
565 antiEntropyAdvertisementSubject,
566 serializer::encode,
567 peer)
568 .whenComplete((result, error) -> {
569 if (error != null) {
Madan Jampania0ac4872015-07-02 11:23:49 -0700570 log.debug("Failed to send anti-entropy advertisement to {}", peer, error);
Madan Jampani3d76c942015-06-29 23:37:10 -0700571 }
572 });
573 }
574
Jonathan Hartaaa56572015-01-28 21:56:35 -0800575 private AntiEntropyAdvertisement<K> createAdvertisement() {
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700576 return new AntiEntropyAdvertisement<K>(localNodeId,
577 ImmutableMap.copyOf(Maps.transformValues(items, MapValue::digest)));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800578 }
579
580 private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700581 if (destroyed || underHighLoad()) {
582 return;
583 }
584 try {
Jonathan Hart9a426f82015-09-03 15:43:13 +0200585 if (log.isTraceEnabled()) {
586 log.trace("Received anti-entropy advertisement from {} for {} with {} entries in it",
587 mapName, ad.sender(), ad.digest().size());
588 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700589 antiEntropyCheckLocalItems(ad).forEach(this::notifyListeners);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800590
Madan Jampani3d76c942015-06-29 23:37:10 -0700591 if (!lightweightAntiEntropy) {
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700592 // if remote ad has any entries that the local copy is missing, actively sync
593 // TODO: Missing keys is not the way local copy can be behind.
594 if (Sets.difference(ad.digest().keySet(), items.keySet()).size() > 0) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700595 // TODO: Send ad for missing keys and for entries that are stale
596 sendAdvertisementToPeer(ad.sender());
Jonathan Hartf893be82015-02-24 17:35:51 -0800597 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800598 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700599 } catch (Exception e) {
600 log.warn("Error handling anti-entropy advertisement", e);
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800601 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800602 }
603
604 /**
Madan Jampani3d76c942015-06-29 23:37:10 -0700605 * Processes anti-entropy ad from peer by taking following actions:
606 * 1. If peer has an old entry, updates peer.
607 * 2. If peer indicates an entry is removed and has a more recent
608 * timestamp than the local entry, update local state.
Jonathan Hartaaa56572015-01-28 21:56:35 -0800609 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800610 private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
611 AntiEntropyAdvertisement<K> ad) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700612 final List<EventuallyConsistentMapEvent<K, V>> externalEvents = Lists.newLinkedList();
Jonathan Hartaaa56572015-01-28 21:56:35 -0800613 final NodeId sender = ad.sender();
Madan Jampani3d76c942015-06-29 23:37:10 -0700614 items.forEach((key, localValue) -> {
615 MapValue.Digest remoteValueDigest = ad.digest().get(key);
616 if (remoteValueDigest == null || localValue.isNewerThan(remoteValueDigest.timestamp())) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800617 // local value is more recent, push to sender
Madan Jampani3d76c942015-06-29 23:37:10 -0700618 queueUpdate(new UpdateEntry<>(key, localValue), ImmutableList.of(sender));
Madan Jampanid13f3b82015-07-01 17:37:50 -0700619 }
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700620 if (remoteValueDigest != null
621 && remoteValueDigest.isNewerThan(localValue.digest())
622 && remoteValueDigest.isTombstone()) {
Madan Jampani483d0a22015-08-19 17:33:00 -0700623 MapValue<V> tombstone = MapValue.tombstone(remoteValueDigest.timestamp());
Madan Jampanid13f3b82015-07-01 17:37:50 -0700624 MapValue<V> previousValue = removeInternal(key,
625 Optional.empty(),
Madan Jampani483d0a22015-08-19 17:33:00 -0700626 Optional.of(tombstone));
Madan Jampanid13f3b82015-07-01 17:37:50 -0700627 if (previousValue != null && previousValue.isAlive()) {
Madan Jampanicab114c2015-07-23 00:14:19 -0700628 externalEvents.add(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800629 }
630 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700631 });
Jonathan Hartaaa56572015-01-28 21:56:35 -0800632 return externalEvents;
633 }
634
Madan Jampani3d76c942015-06-29 23:37:10 -0700635 private void processUpdates(Collection<UpdateEntry<K, V>> updates) {
636 if (destroyed) {
637 return;
Jonathan Hartaaa56572015-01-28 21:56:35 -0800638 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700639 updates.forEach(update -> {
640 final K key = update.key();
641 final MapValue<V> value = update.value();
Madan Jampani483d0a22015-08-19 17:33:00 -0700642 if (value == null || value.isTombstone()) {
643 MapValue<V> previousValue = removeInternal(key, Optional.empty(), Optional.ofNullable(value));
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700644 if (previousValue != null && previousValue.isAlive()) {
Madan Jampanicab114c2015-07-23 00:14:19 -0700645 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
Madan Jampanid13f3b82015-07-01 17:37:50 -0700646 }
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700647 } else if (putInternal(key, value)) {
Madan Jampanicab114c2015-07-23 00:14:19 -0700648 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, PUT, key, value.get()));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800649 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700650 });
Jonathan Hartdb3af892015-01-26 13:19:07 -0800651 }
652
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800653 // TODO pull this into the class if this gets pulled out...
654 private static final int DEFAULT_MAX_EVENTS = 1000;
655 private static final int DEFAULT_MAX_IDLE_MS = 10;
656 private static final int DEFAULT_MAX_BATCH_MS = 50;
657 private static final Timer TIMER = new Timer("onos-ecm-sender-events");
Jonathan Hartdb3af892015-01-26 13:19:07 -0800658
Madan Jampani3d76c942015-06-29 23:37:10 -0700659 private final class EventAccumulator extends AbstractAccumulator<UpdateEntry<K, V>> {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800660
661 private final NodeId peer;
662
663 private EventAccumulator(NodeId peer) {
664 super(TIMER, DEFAULT_MAX_EVENTS, DEFAULT_MAX_BATCH_MS, DEFAULT_MAX_IDLE_MS);
665 this.peer = peer;
666 }
667
668 @Override
Madan Jampani3d76c942015-06-29 23:37:10 -0700669 public void processItems(List<UpdateEntry<K, V>> items) {
670 Map<K, UpdateEntry<K, V>> map = Maps.newHashMap();
671 items.forEach(item -> map.compute(item.key(), (key, existing) ->
Madan Jampani92c64eb2015-07-23 15:37:07 -0700672 item.isNewerThan(existing) ? item : existing));
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800673 communicationExecutor.submit(() -> {
Madan Jampani3d76c942015-06-29 23:37:10 -0700674 clusterCommunicator.unicast(ImmutableList.copyOf(map.values()),
Madan Jampani175e8fd2015-05-20 14:10:45 -0700675 updateMessageSubject,
676 serializer::encode,
677 peer)
678 .whenComplete((result, error) -> {
679 if (error != null) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700680 log.debug("Failed to send to {}", peer, error);
Madan Jampani175e8fd2015-05-20 14:10:45 -0700681 }
682 });
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800683 });
Jonathan Hartdb3af892015-01-26 13:19:07 -0800684 }
685 }
Jonathan Hart4a29c592015-09-23 17:55:07 -0700686}