blob: b56d74b6d873d151198dfd3734ae6bdb68960ba2 [file] [log] [blame]
Jonathan Hartdb3af892015-01-26 13:19:07 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Jonathan Hart77bdd262015-02-03 09:07:48 -080016package org.onosproject.store.ecmap;
Jonathan Hartdb3af892015-01-26 13:19:07 -080017
Madan Jampani4f1f4cd2015-07-08 23:05:35 -070018import com.google.common.collect.Collections2;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080019import com.google.common.collect.ImmutableList;
Madan Jampani4f1f4cd2015-07-08 23:05:35 -070020import com.google.common.collect.ImmutableMap;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080021import com.google.common.collect.Lists;
22import com.google.common.collect.Maps;
Madan Jampani3d76c942015-06-29 23:37:10 -070023import com.google.common.collect.Sets;
Madan Jampani3e033bd2015-04-08 13:03:49 -070024
Jonathan Hartf9108232015-02-02 16:37:35 -080025import org.apache.commons.lang3.tuple.Pair;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080026import org.onlab.util.AbstractAccumulator;
Jonathan Hartdb3af892015-01-26 13:19:07 -080027import org.onlab.util.KryoNamespace;
Jonathan Hart233a18a2015-03-02 17:24:58 -080028import org.onlab.util.SlidingWindowCounter;
Jonathan Hartdb3af892015-01-26 13:19:07 -080029import org.onosproject.cluster.ClusterService;
Jonathan Hartaaa56572015-01-28 21:56:35 -080030import org.onosproject.cluster.ControllerNode;
Jonathan Hartdb3af892015-01-26 13:19:07 -080031import org.onosproject.cluster.NodeId;
32import org.onosproject.store.Timestamp;
33import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Jonathan Hartdb3af892015-01-26 13:19:07 -080034import org.onosproject.store.cluster.messaging.MessageSubject;
Madan Jampani3e033bd2015-04-08 13:03:49 -070035import org.onosproject.store.impl.LogicalTimestamp;
Jonathan Hart63939a32015-05-08 11:57:03 -070036import org.onosproject.store.service.WallClockTimestamp;
Madan Jampani3d76c942015-06-29 23:37:10 -070037import org.onosproject.store.serializers.KryoNamespaces;
Jonathan Hartdb3af892015-01-26 13:19:07 -080038import org.onosproject.store.serializers.KryoSerializer;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070039import org.onosproject.store.service.EventuallyConsistentMap;
40import org.onosproject.store.service.EventuallyConsistentMapEvent;
41import org.onosproject.store.service.EventuallyConsistentMapListener;
Jonathan Hartdb3af892015-01-26 13:19:07 -080042import org.slf4j.Logger;
43import org.slf4j.LoggerFactory;
44
Madan Jampani3d76c942015-06-29 23:37:10 -070045import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
46import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
47
Jonathan Hartdb3af892015-01-26 13:19:07 -080048import java.util.Collection;
Madan Jampani3d76c942015-06-29 23:37:10 -070049import java.util.Collections;
Jonathan Hartdb3af892015-01-26 13:19:07 -080050import java.util.List;
51import java.util.Map;
Madan Jampanid13f3b82015-07-01 17:37:50 -070052import java.util.Objects;
Madan Jampani43e9c9c2015-06-26 14:16:46 -070053import java.util.Optional;
Jonathan Hartdb3af892015-01-26 13:19:07 -080054import java.util.Set;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080055import java.util.Timer;
Jonathan Hartdb3af892015-01-26 13:19:07 -080056import java.util.concurrent.ExecutorService;
57import java.util.concurrent.Executors;
58import java.util.concurrent.ScheduledExecutorService;
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -080059import java.util.concurrent.TimeUnit;
Madan Jampani3d76c942015-06-29 23:37:10 -070060import java.util.concurrent.atomic.AtomicBoolean;
Madan Jampani43e9c9c2015-06-26 14:16:46 -070061import java.util.concurrent.atomic.AtomicReference;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080062import java.util.function.BiFunction;
Jonathan Hartdb3af892015-01-26 13:19:07 -080063import java.util.stream.Collectors;
64
65import static com.google.common.base.Preconditions.checkNotNull;
66import static com.google.common.base.Preconditions.checkState;
67import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Brian O'Connorc6713a82015-02-24 11:55:48 -080068import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
Brian O'Connor4b2ba5f2015-02-18 20:54:00 -080069import static org.onlab.util.Tools.groupedThreads;
Jonathan Hartdb3af892015-01-26 13:19:07 -080070
71/**
72 * Distributed Map implementation which uses optimistic replication and gossip
73 * based techniques to provide an eventually consistent data store.
74 */
75public class EventuallyConsistentMapImpl<K, V>
76 implements EventuallyConsistentMap<K, V> {
77
78 private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
79
Madan Jampani3d76c942015-06-29 23:37:10 -070080 private final Map<K, MapValue<V>> items;
Jonathan Hartdb3af892015-01-26 13:19:07 -080081
Jonathan Hartdb3af892015-01-26 13:19:07 -080082 private final ClusterService clusterService;
83 private final ClusterCommunicationService clusterCommunicator;
84 private final KryoSerializer serializer;
Madan Jampani3d76c942015-06-29 23:37:10 -070085 private final NodeId localNodeId;
Jonathan Hartdb3af892015-01-26 13:19:07 -080086
Madan Jampanibcf1a482015-06-24 19:05:56 -070087 private final BiFunction<K, V, Timestamp> timestampProvider;
Jonathan Hartdb3af892015-01-26 13:19:07 -080088
89 private final MessageSubject updateMessageSubject;
Jonathan Hartaaa56572015-01-28 21:56:35 -080090 private final MessageSubject antiEntropyAdvertisementSubject;
Jonathan Hartdb3af892015-01-26 13:19:07 -080091
Jonathan Hartaaa56572015-01-28 21:56:35 -080092 private final Set<EventuallyConsistentMapListener<K, V>> listeners
Madan Jampani3d76c942015-06-29 23:37:10 -070093 = Sets.newCopyOnWriteArraySet();
Jonathan Hartdb3af892015-01-26 13:19:07 -080094
95 private final ExecutorService executor;
Jonathan Hartdb3af892015-01-26 13:19:07 -080096 private final ScheduledExecutorService backgroundExecutor;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080097 private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
Jonathan Hartdb3af892015-01-26 13:19:07 -080098
Jonathan Hart6ec029a2015-03-24 17:12:35 -070099 private final ExecutorService communicationExecutor;
100 private final Map<NodeId, EventAccumulator> senderPending;
Madan Jampani28726282015-02-19 11:40:23 -0800101
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700102 private final String mapName;
103
Jonathan Hartdb3af892015-01-26 13:19:07 -0800104 private volatile boolean destroyed = false;
Jonathan Hart539a6462015-01-27 17:05:43 -0800105 private static final String ERROR_DESTROYED = " map is already destroyed";
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800106 private final String destroyedMessage;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800107
Jonathan Hart4f397e82015-02-04 09:10:41 -0800108 private static final String ERROR_NULL_KEY = "Key cannot be null";
109 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
110
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700111 private final long initialDelaySec = 5;
112 private final boolean lightweightAntiEntropy;
113 private final boolean tombstonesDisabled;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800114
Jonathan Hart233a18a2015-03-02 17:24:58 -0800115 private static final int WINDOW_SIZE = 5;
116 private static final int HIGH_LOAD_THRESHOLD = 0;
117 private static final int LOAD_WINDOW = 2;
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700118 private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
Jonathan Hart233a18a2015-03-02 17:24:58 -0800119
Jonathan Hartca335e92015-03-05 10:34:32 -0800120 private final boolean persistent;
121 private final PersistentStore<K, V> persistentStore;
122
Jonathan Hartdb3af892015-01-26 13:19:07 -0800123 /**
124 * Creates a new eventually consistent map shared amongst multiple instances.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800125 * <p>
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700126 * See {@link org.onosproject.store.service.EventuallyConsistentMapBuilder}
127 * for more description of the parameters expected by the map.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800128 * </p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800129 *
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700130 * @param mapName a String identifier for the map.
131 * @param clusterService the cluster service
132 * @param clusterCommunicator the cluster communications service
133 * @param serializerBuilder a Kryo namespace builder that can serialize
134 * both K and V
Madan Jampanibcf1a482015-06-24 19:05:56 -0700135 * @param timestampProvider provider of timestamps for K and V
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700136 * @param peerUpdateFunction function that provides a set of nodes to immediately
137 * update to when there writes to the map
138 * @param eventExecutor executor to use for processing incoming
139 * events from peers
140 * @param communicationExecutor executor to use for sending events to peers
141 * @param backgroundExecutor executor to use for background anti-entropy
142 * tasks
143 * @param tombstonesDisabled true if this map should not maintain
144 * tombstones
145 * @param antiEntropyPeriod period that the anti-entropy task should run
Jonathan Hartca335e92015-03-05 10:34:32 -0800146 * @param antiEntropyTimeUnit time unit for anti-entropy period
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700147 * @param convergeFaster make anti-entropy try to converge faster
Jonathan Hartca335e92015-03-05 10:34:32 -0800148 * @param persistent persist data to disk
Jonathan Hartdb3af892015-01-26 13:19:07 -0800149 */
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700150 EventuallyConsistentMapImpl(String mapName,
151 ClusterService clusterService,
152 ClusterCommunicationService clusterCommunicator,
153 KryoNamespace.Builder serializerBuilder,
Madan Jampanibcf1a482015-06-24 19:05:56 -0700154 BiFunction<K, V, Timestamp> timestampProvider,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700155 BiFunction<K, V, Collection<NodeId>> peerUpdateFunction,
156 ExecutorService eventExecutor,
157 ExecutorService communicationExecutor,
158 ScheduledExecutorService backgroundExecutor,
159 boolean tombstonesDisabled,
160 long antiEntropyPeriod,
161 TimeUnit antiEntropyTimeUnit,
Jonathan Hartca335e92015-03-05 10:34:32 -0800162 boolean convergeFaster,
163 boolean persistent) {
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700164 this.mapName = mapName;
Madan Jampani3d76c942015-06-29 23:37:10 -0700165 items = Maps.newConcurrentMap();
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800166 senderPending = Maps.newConcurrentMap();
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700167 destroyedMessage = mapName + ERROR_DESTROYED;
Madan Jampani28726282015-02-19 11:40:23 -0800168
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700169 this.clusterService = clusterService;
170 this.clusterCommunicator = clusterCommunicator;
Madan Jampani3d76c942015-06-29 23:37:10 -0700171 this.localNodeId = clusterService.getLocalNode().id();
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700172
173 this.serializer = createSerializer(serializerBuilder);
174
Madan Jampanibcf1a482015-06-24 19:05:56 -0700175 this.timestampProvider = timestampProvider;
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700176
177 if (peerUpdateFunction != null) {
178 this.peerUpdateFunction = peerUpdateFunction;
179 } else {
180 this.peerUpdateFunction = (key, value) -> clusterService.getNodes().stream()
181 .map(ControllerNode::id)
Madan Jampani3d76c942015-06-29 23:37:10 -0700182 .filter(nodeId -> !nodeId.equals(localNodeId))
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700183 .collect(Collectors.toList());
184 }
185
186 if (eventExecutor != null) {
187 this.executor = eventExecutor;
188 } else {
189 // should be a normal executor; it's used for receiving messages
190 this.executor =
191 Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d"));
192 }
193
194 if (communicationExecutor != null) {
195 this.communicationExecutor = communicationExecutor;
196 } else {
197 // sending executor; should be capped
198 //TODO this probably doesn't need to be bounded anymore
199 this.communicationExecutor =
200 newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
201 }
202
Jonathan Hartca335e92015-03-05 10:34:32 -0800203 this.persistent = persistent;
204
205 if (this.persistent) {
206 String dataDirectory = System.getProperty("karaf.data", "./data");
207 String filename = dataDirectory + "/" + "mapdb-ecm-" + mapName;
208
209 ExecutorService dbExecutor =
210 newFixedThreadPool(1, groupedThreads("onos/ecm", mapName + "-dbwriter"));
211
212 persistentStore = new MapDbPersistentStore<>(filename, dbExecutor, serializer);
Madan Jampani3d76c942015-06-29 23:37:10 -0700213 persistentStore.readInto(items);
Jonathan Hartca335e92015-03-05 10:34:32 -0800214 } else {
215 this.persistentStore = null;
216 }
217
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700218 if (backgroundExecutor != null) {
219 this.backgroundExecutor = backgroundExecutor;
220 } else {
221 this.backgroundExecutor =
222 newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d"));
223 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800224
Jonathan Hartaaa56572015-01-28 21:56:35 -0800225 // start anti-entropy thread
Madan Jampani3d76c942015-06-29 23:37:10 -0700226 this.backgroundExecutor.scheduleAtFixedRate(this::sendAdvertisement,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700227 initialDelaySec, antiEntropyPeriod,
228 antiEntropyTimeUnit);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800229
Jonathan Hartdb3af892015-01-26 13:19:07 -0800230 updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
231 clusterCommunicator.addSubscriber(updateMessageSubject,
Madan Jampani3d76c942015-06-29 23:37:10 -0700232 serializer::decode,
233 this::processUpdates,
234 this.executor);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800235
Jonathan Hartaaa56572015-01-28 21:56:35 -0800236 antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
237 clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
Madan Jampani3d76c942015-06-29 23:37:10 -0700238 serializer::decode,
239 this::handleAntiEntropyAdvertisement,
240 this.backgroundExecutor);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800241
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700242 this.tombstonesDisabled = tombstonesDisabled;
243 this.lightweightAntiEntropy = !convergeFaster;
Madan Jampanie1356282015-03-10 19:05:36 -0700244 }
245
Jonathan Hartdb3af892015-01-26 13:19:07 -0800246 private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
247 return new KryoSerializer() {
248 @Override
249 protected void setupKryoPool() {
250 // Add the map's internal helper classes to the user-supplied serializer
251 serializerPool = builder
Madan Jampani3d76c942015-06-29 23:37:10 -0700252 .register(KryoNamespaces.BASIC)
Madan Jampanidb5d06a2015-06-30 11:16:48 -0700253 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
Madan Jampani3e033bd2015-04-08 13:03:49 -0700254 .register(LogicalTimestamp.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800255 .register(WallClockTimestamp.class)
Jonathan Hartaaa56572015-01-28 21:56:35 -0800256 .register(AntiEntropyAdvertisement.class)
Madan Jampani3d76c942015-06-29 23:37:10 -0700257 .register(UpdateEntry.class)
258 .register(MapValue.class)
259 .register(MapValue.Digest.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800260 .build();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800261 }
262 };
263 }
264
265 @Override
266 public int size() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800267 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700268 // TODO: Maintain a separate counter for tracking live elements in map.
269 return Maps.filterValues(items, MapValue::isAlive).size();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800270 }
271
272 @Override
273 public boolean isEmpty() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800274 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700275 return size() == 0;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800276 }
277
278 @Override
279 public boolean containsKey(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800280 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800281 checkNotNull(key, ERROR_NULL_KEY);
Madan Jampani3d76c942015-06-29 23:37:10 -0700282 return get(key) != null;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800283 }
284
285 @Override
286 public boolean containsValue(V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800287 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800288 checkNotNull(value, ERROR_NULL_VALUE);
Madan Jampani3d76c942015-06-29 23:37:10 -0700289 return items.values()
290 .stream()
291 .filter(MapValue::isAlive)
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700292 .anyMatch(v -> value.equals(v.get()));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800293 }
294
295 @Override
296 public V get(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800297 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800298 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800299
Madan Jampani3d76c942015-06-29 23:37:10 -0700300 MapValue<V> value = items.get(key);
301 return (value == null || value.isTombstone()) ? null : value.get();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800302 }
303
304 @Override
305 public void put(K key, V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800306 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800307 checkNotNull(key, ERROR_NULL_KEY);
308 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800309
Madan Jampani3d76c942015-06-29 23:37:10 -0700310 MapValue<V> newValue = new MapValue<>(value, timestampProvider.apply(key, value));
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700311 if (putInternal(key, newValue)) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700312 notifyPeers(new UpdateEntry<>(key, newValue), peerUpdateFunction.apply(key, value));
Madan Jampanicab114c2015-07-23 00:14:19 -0700313 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, PUT, key, value));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800314 }
315 }
316
Jonathan Hartdb3af892015-01-26 13:19:07 -0800317 @Override
Madan Jampani43e9c9c2015-06-26 14:16:46 -0700318 public V remove(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800319 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800320 checkNotNull(key, ERROR_NULL_KEY);
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700321 return removeAndNotify(key, null);
322 }
323
324 @Override
325 public void remove(K key, V value) {
326 checkState(!destroyed, destroyedMessage);
327 checkNotNull(key, ERROR_NULL_KEY);
328 checkNotNull(value, ERROR_NULL_VALUE);
329 removeAndNotify(key, value);
330 }
331
332 private V removeAndNotify(K key, V value) {
333 MapValue<V> tombstone = MapValue.tombstone(timestampProvider.apply(key, value));
334 MapValue<V> previousValue = removeInternal(key, Optional.ofNullable(value), tombstone);
Madan Jampania0ac4872015-07-02 11:23:49 -0700335 if (previousValue != null) {
336 notifyPeers(new UpdateEntry<>(key, tombstone), peerUpdateFunction.apply(key, previousValue.get()));
337 if (previousValue.isAlive()) {
Madan Jampanicab114c2015-07-23 00:14:19 -0700338 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
Madan Jampania0ac4872015-07-02 11:23:49 -0700339 }
340 }
Madan Jampanid13f3b82015-07-01 17:37:50 -0700341 return previousValue != null ? previousValue.get() : null;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800342 }
343
Madan Jampanid13f3b82015-07-01 17:37:50 -0700344 private MapValue<V> removeInternal(K key, Optional<V> value, MapValue<V> tombstone) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700345 checkState(!destroyed, destroyedMessage);
346 checkNotNull(key, ERROR_NULL_KEY);
347 checkNotNull(value, ERROR_NULL_VALUE);
Madan Jampanid13f3b82015-07-01 17:37:50 -0700348 checkState(tombstone.isTombstone());
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700349
350 counter.incrementCount();
Madan Jampani3d76c942015-06-29 23:37:10 -0700351 AtomicBoolean updated = new AtomicBoolean(false);
Madan Jampanid13f3b82015-07-01 17:37:50 -0700352 AtomicReference<MapValue<V>> previousValue = new AtomicReference<>();
Madan Jampani3d76c942015-06-29 23:37:10 -0700353 items.compute(key, (k, existing) -> {
Madan Jampanid13f3b82015-07-01 17:37:50 -0700354 boolean valueMatches = true;
355 if (value.isPresent() && existing != null && existing.isAlive()) {
356 valueMatches = Objects.equals(value.get(), existing.get());
Madan Jampani3d76c942015-06-29 23:37:10 -0700357 }
Brian O'Connor6325dad2015-07-07 15:36:29 -0700358 if (existing == null) {
359 log.debug("ECMap Remove: Existing value for key {} is already null", k);
360 }
Madan Jampanid13f3b82015-07-01 17:37:50 -0700361 updated.set(valueMatches && (existing == null || tombstone.isNewerThan(existing)));
362 if (updated.get()) {
363 previousValue.set(existing);
364 }
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700365 if (updated.get()) {
366 return tombstonesDisabled ? null : tombstone;
367 } else {
368 return existing;
369 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700370 });
371 if (updated.get()) {
Madan Jampanid13f3b82015-07-01 17:37:50 -0700372 if (persistent) {
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700373 if (tombstonesDisabled) {
374 persistentStore.remove(key);
375 } else {
376 persistentStore.update(key, tombstone);
377 }
Madan Jampanid13f3b82015-07-01 17:37:50 -0700378 }
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800379 }
Madan Jampanid13f3b82015-07-01 17:37:50 -0700380 return previousValue.get();
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800381 }
382
383 @Override
Madan Jampani4727a112015-07-16 12:12:58 -0700384 public V compute(K key, BiFunction<K, V, V> recomputeFunction) {
385 checkState(!destroyed, destroyedMessage);
386 checkNotNull(key, ERROR_NULL_KEY);
387 checkNotNull(recomputeFunction, "Recompute function cannot be null");
388
389 AtomicBoolean updated = new AtomicBoolean(false);
390 AtomicReference<MapValue<V>> previousValue = new AtomicReference<>();
391 MapValue<V> computedValue = items.compute(key, (k, mv) -> {
392 previousValue.set(mv);
393 V newRawValue = recomputeFunction.apply(key, mv == null ? null : mv.get());
394 MapValue<V> newValue = new MapValue<>(newRawValue, timestampProvider.apply(key, newRawValue));
395 if (mv == null || newValue.isNewerThan(mv)) {
396 updated.set(true);
397 return newValue;
398 } else {
399 return mv;
400 }
401 });
402 if (updated.get()) {
403 notifyPeers(new UpdateEntry<>(key, computedValue), peerUpdateFunction.apply(key, computedValue.get()));
404 EventuallyConsistentMapEvent.Type updateType = computedValue.isTombstone() ? REMOVE : PUT;
405 V value = computedValue.isTombstone()
406 ? previousValue.get() == null ? null : previousValue.get().get()
407 : computedValue.get();
408 if (value != null) {
Madan Jampanicab114c2015-07-23 00:14:19 -0700409 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, updateType, key, value));
Madan Jampani4727a112015-07-16 12:12:58 -0700410 }
411 }
412 return computedValue.get();
413 }
414
415 @Override
Jonathan Hartdb3af892015-01-26 13:19:07 -0800416 public void putAll(Map<? extends K, ? extends V> m) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800417 checkState(!destroyed, destroyedMessage);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800418 m.forEach(this::put);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800419 }
420
421 @Override
422 public void clear() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800423 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700424 Maps.filterValues(items, MapValue::isAlive)
425 .forEach((k, v) -> remove(k));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800426 }
427
428 @Override
429 public Set<K> keySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800430 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700431 return Maps.filterValues(items, MapValue::isAlive)
432 .keySet();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800433 }
434
435 @Override
436 public Collection<V> values() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800437 checkState(!destroyed, destroyedMessage);
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700438 return Collections2.transform(Maps.filterValues(items, MapValue::isAlive).values(), MapValue::get);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800439 }
440
441 @Override
442 public Set<Map.Entry<K, V>> entrySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800443 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700444 return Maps.filterValues(items, MapValue::isAlive)
445 .entrySet()
446 .stream()
447 .map(e -> Pair.of(e.getKey(), e.getValue().get()))
448 .collect(Collectors.toSet());
449 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800450
Madan Jampani3d76c942015-06-29 23:37:10 -0700451 /**
452 * Returns true if newValue was accepted i.e. map is updated.
453 * @param key key
454 * @param newValue proposed new value
455 * @return true if update happened; false if map already contains a more recent value for the key
456 */
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700457 private boolean putInternal(K key, MapValue<V> newValue) {
458 checkState(!destroyed, destroyedMessage);
459 checkNotNull(key, ERROR_NULL_KEY);
460 checkNotNull(newValue, ERROR_NULL_VALUE);
461 checkState(newValue.isAlive());
462 counter.incrementCount();
Madan Jampani3d76c942015-06-29 23:37:10 -0700463 AtomicBoolean updated = new AtomicBoolean(false);
464 items.compute(key, (k, existing) -> {
465 if (existing == null || newValue.isNewerThan(existing)) {
466 updated.set(true);
Madan Jampani3d76c942015-06-29 23:37:10 -0700467 return newValue;
468 }
469 return existing;
470 });
471 if (updated.get() && persistent) {
472 persistentStore.update(key, newValue);
473 }
474 return updated.get();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800475 }
476
477 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800478 public void addListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800479 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800480
481 listeners.add(checkNotNull(listener));
482 }
483
484 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800485 public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800486 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800487
488 listeners.remove(checkNotNull(listener));
489 }
490
491 @Override
492 public void destroy() {
493 destroyed = true;
494
495 executor.shutdown();
496 backgroundExecutor.shutdown();
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800497 communicationExecutor.shutdown();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800498
Jonathan Hart584d2f32015-01-27 19:46:14 -0800499 listeners.clear();
500
Jonathan Hartdb3af892015-01-26 13:19:07 -0800501 clusterCommunicator.removeSubscriber(updateMessageSubject);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800502 clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800503 }
504
Jonathan Hartaaa56572015-01-28 21:56:35 -0800505 private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700506 listeners.forEach(listener -> listener.event(event));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800507 }
508
Madan Jampani3d76c942015-06-29 23:37:10 -0700509 private void notifyPeers(UpdateEntry<K, V> event, Collection<NodeId> peers) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800510 queueUpdate(event, peers);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800511 }
512
Madan Jampani3d76c942015-06-29 23:37:10 -0700513 private void queueUpdate(UpdateEntry<K, V> event, Collection<NodeId> peers) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800514 if (peers == null) {
515 // we have no friends :(
516 return;
517 }
518 peers.forEach(node ->
Madan Jampani3d76c942015-06-29 23:37:10 -0700519 senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event)
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800520 );
521 }
522
Jonathan Hart233a18a2015-03-02 17:24:58 -0800523 private boolean underHighLoad() {
524 return counter.get(LOAD_WINDOW) > HIGH_LOAD_THRESHOLD;
525 }
526
Madan Jampani3d76c942015-06-29 23:37:10 -0700527 private void sendAdvertisement() {
528 try {
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700529 if (underHighLoad() || destroyed) {
Jonathan Hart233a18a2015-03-02 17:24:58 -0800530 return;
531 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700532 pickRandomActivePeer().ifPresent(this::sendAdvertisementToPeer);
533 } catch (Exception e) {
534 // Catch all exceptions to avoid scheduled task being suppressed.
535 log.error("Exception thrown while sending advertisement", e);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800536 }
537 }
538
Madan Jampani3d76c942015-06-29 23:37:10 -0700539 private Optional<NodeId> pickRandomActivePeer() {
540 List<NodeId> activePeers = clusterService.getNodes()
541 .stream()
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700542 .map(ControllerNode::id)
543 .filter(id -> !localNodeId.equals(id))
Madan Jampani3d76c942015-06-29 23:37:10 -0700544 .filter(id -> clusterService.getState(id) == ControllerNode.State.ACTIVE)
545 .collect(Collectors.toList());
546 Collections.shuffle(activePeers);
547 return activePeers.isEmpty() ? Optional.empty() : Optional.of(activePeers.get(0));
548 }
549
550 private void sendAdvertisementToPeer(NodeId peer) {
551 clusterCommunicator.unicast(createAdvertisement(),
552 antiEntropyAdvertisementSubject,
553 serializer::encode,
554 peer)
555 .whenComplete((result, error) -> {
556 if (error != null) {
Madan Jampania0ac4872015-07-02 11:23:49 -0700557 log.debug("Failed to send anti-entropy advertisement to {}", peer, error);
Madan Jampani3d76c942015-06-29 23:37:10 -0700558 }
559 });
560 }
561
Jonathan Hartaaa56572015-01-28 21:56:35 -0800562 private AntiEntropyAdvertisement<K> createAdvertisement() {
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700563 return new AntiEntropyAdvertisement<K>(localNodeId,
564 ImmutableMap.copyOf(Maps.transformValues(items, MapValue::digest)));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800565 }
566
567 private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700568 if (destroyed || underHighLoad()) {
569 return;
570 }
571 try {
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700572 log.debug("Received anti-entropy advertisement from {} for {} with {} entries in it",
573 mapName, ad.sender(), ad.digest().size());
Madan Jampani3d76c942015-06-29 23:37:10 -0700574 antiEntropyCheckLocalItems(ad).forEach(this::notifyListeners);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800575
Madan Jampani3d76c942015-06-29 23:37:10 -0700576 if (!lightweightAntiEntropy) {
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700577 // if remote ad has any entries that the local copy is missing, actively sync
578 // TODO: Missing keys is not the way local copy can be behind.
579 if (Sets.difference(ad.digest().keySet(), items.keySet()).size() > 0) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700580 // TODO: Send ad for missing keys and for entries that are stale
581 sendAdvertisementToPeer(ad.sender());
Jonathan Hartf893be82015-02-24 17:35:51 -0800582 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800583 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700584 } catch (Exception e) {
585 log.warn("Error handling anti-entropy advertisement", e);
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800586 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800587 }
588
589 /**
Madan Jampani3d76c942015-06-29 23:37:10 -0700590 * Processes anti-entropy ad from peer by taking following actions:
591 * 1. If peer has an old entry, updates peer.
592 * 2. If peer indicates an entry is removed and has a more recent
593 * timestamp than the local entry, update local state.
Jonathan Hartaaa56572015-01-28 21:56:35 -0800594 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800595 private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
596 AntiEntropyAdvertisement<K> ad) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700597 final List<EventuallyConsistentMapEvent<K, V>> externalEvents = Lists.newLinkedList();
Jonathan Hartaaa56572015-01-28 21:56:35 -0800598 final NodeId sender = ad.sender();
Madan Jampani3d76c942015-06-29 23:37:10 -0700599 items.forEach((key, localValue) -> {
600 MapValue.Digest remoteValueDigest = ad.digest().get(key);
601 if (remoteValueDigest == null || localValue.isNewerThan(remoteValueDigest.timestamp())) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800602 // local value is more recent, push to sender
Madan Jampani3d76c942015-06-29 23:37:10 -0700603 queueUpdate(new UpdateEntry<>(key, localValue), ImmutableList.of(sender));
Madan Jampanid13f3b82015-07-01 17:37:50 -0700604 }
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700605 if (remoteValueDigest != null
606 && remoteValueDigest.isNewerThan(localValue.digest())
607 && remoteValueDigest.isTombstone()) {
Madan Jampanid13f3b82015-07-01 17:37:50 -0700608 MapValue<V> previousValue = removeInternal(key,
609 Optional.empty(),
Madan Jampani43f37952015-07-02 12:54:08 -0700610 MapValue.tombstone(remoteValueDigest.timestamp()));
Madan Jampanid13f3b82015-07-01 17:37:50 -0700611 if (previousValue != null && previousValue.isAlive()) {
Madan Jampanicab114c2015-07-23 00:14:19 -0700612 externalEvents.add(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800613 }
614 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700615 });
Jonathan Hartaaa56572015-01-28 21:56:35 -0800616 return externalEvents;
617 }
618
Madan Jampani3d76c942015-06-29 23:37:10 -0700619 private void processUpdates(Collection<UpdateEntry<K, V>> updates) {
620 if (destroyed) {
621 return;
Jonathan Hartaaa56572015-01-28 21:56:35 -0800622 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700623 updates.forEach(update -> {
624 final K key = update.key();
625 final MapValue<V> value = update.value();
Madan Jampanid13f3b82015-07-01 17:37:50 -0700626 if (value.isTombstone()) {
627 MapValue<V> previousValue = removeInternal(key, Optional.empty(), value);
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700628 if (previousValue != null && previousValue.isAlive()) {
Madan Jampanicab114c2015-07-23 00:14:19 -0700629 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
Madan Jampanid13f3b82015-07-01 17:37:50 -0700630 }
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700631 } else if (putInternal(key, value)) {
Madan Jampanicab114c2015-07-23 00:14:19 -0700632 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, PUT, key, value.get()));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800633 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700634 });
Jonathan Hartdb3af892015-01-26 13:19:07 -0800635 }
636
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800637 // TODO pull this into the class if this gets pulled out...
638 private static final int DEFAULT_MAX_EVENTS = 1000;
639 private static final int DEFAULT_MAX_IDLE_MS = 10;
640 private static final int DEFAULT_MAX_BATCH_MS = 50;
641 private static final Timer TIMER = new Timer("onos-ecm-sender-events");
Jonathan Hartdb3af892015-01-26 13:19:07 -0800642
Madan Jampani3d76c942015-06-29 23:37:10 -0700643 private final class EventAccumulator extends AbstractAccumulator<UpdateEntry<K, V>> {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800644
645 private final NodeId peer;
646
647 private EventAccumulator(NodeId peer) {
648 super(TIMER, DEFAULT_MAX_EVENTS, DEFAULT_MAX_BATCH_MS, DEFAULT_MAX_IDLE_MS);
649 this.peer = peer;
650 }
651
652 @Override
Madan Jampani3d76c942015-06-29 23:37:10 -0700653 public void processItems(List<UpdateEntry<K, V>> items) {
654 Map<K, UpdateEntry<K, V>> map = Maps.newHashMap();
655 items.forEach(item -> map.compute(item.key(), (key, existing) ->
Madan Jampani92c64eb2015-07-23 15:37:07 -0700656 item.isNewerThan(existing) ? item : existing));
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800657 communicationExecutor.submit(() -> {
Madan Jampani3d76c942015-06-29 23:37:10 -0700658 clusterCommunicator.unicast(ImmutableList.copyOf(map.values()),
Madan Jampani175e8fd2015-05-20 14:10:45 -0700659 updateMessageSubject,
660 serializer::encode,
661 peer)
662 .whenComplete((result, error) -> {
663 if (error != null) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700664 log.debug("Failed to send to {}", peer, error);
Madan Jampani175e8fd2015-05-20 14:10:45 -0700665 }
666 });
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800667 });
Jonathan Hartdb3af892015-01-26 13:19:07 -0800668 }
669 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700670}