blob: 649e5a82448b4fdd11fb18d87370fcd915318985 [file] [log] [blame]
Jonathan Hartdb3af892015-01-26 13:19:07 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Jonathan Hart77bdd262015-02-03 09:07:48 -080016package org.onosproject.store.ecmap;
Jonathan Hartdb3af892015-01-26 13:19:07 -080017
Madan Jampani4f1f4cd2015-07-08 23:05:35 -070018import com.google.common.collect.Collections2;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080019import com.google.common.collect.ImmutableList;
Madan Jampani4f1f4cd2015-07-08 23:05:35 -070020import com.google.common.collect.ImmutableMap;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080021import com.google.common.collect.Lists;
22import com.google.common.collect.Maps;
Madan Jampani3d76c942015-06-29 23:37:10 -070023import com.google.common.collect.Sets;
Jonathan Hartf9108232015-02-02 16:37:35 -080024import org.apache.commons.lang3.tuple.Pair;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080025import org.onlab.util.AbstractAccumulator;
Jonathan Hartdb3af892015-01-26 13:19:07 -080026import org.onlab.util.KryoNamespace;
Jonathan Hart233a18a2015-03-02 17:24:58 -080027import org.onlab.util.SlidingWindowCounter;
Jonathan Hartdb3af892015-01-26 13:19:07 -080028import org.onosproject.cluster.ClusterService;
Jonathan Hartaaa56572015-01-28 21:56:35 -080029import org.onosproject.cluster.ControllerNode;
Jonathan Hartdb3af892015-01-26 13:19:07 -080030import org.onosproject.cluster.NodeId;
Aaron Kruglikov66cf0b92015-10-26 15:46:54 -070031import org.onosproject.persistence.PersistenceService;
Jonathan Hartdb3af892015-01-26 13:19:07 -080032import org.onosproject.store.Timestamp;
33import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Jonathan Hartdb3af892015-01-26 13:19:07 -080034import org.onosproject.store.cluster.messaging.MessageSubject;
Madan Jampani3e033bd2015-04-08 13:03:49 -070035import org.onosproject.store.impl.LogicalTimestamp;
Madan Jampani3d76c942015-06-29 23:37:10 -070036import org.onosproject.store.serializers.KryoNamespaces;
Jonathan Hartdb3af892015-01-26 13:19:07 -080037import org.onosproject.store.serializers.KryoSerializer;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070038import org.onosproject.store.service.EventuallyConsistentMap;
39import org.onosproject.store.service.EventuallyConsistentMapEvent;
40import org.onosproject.store.service.EventuallyConsistentMapListener;
Aaron Kruglikov66cf0b92015-10-26 15:46:54 -070041import org.onosproject.store.service.Serializer;
Jonathan Hart4a29c592015-09-23 17:55:07 -070042import org.onosproject.store.service.WallClockTimestamp;
Jonathan Hartdb3af892015-01-26 13:19:07 -080043import org.slf4j.Logger;
44import org.slf4j.LoggerFactory;
45
Jonathan Hartdb3af892015-01-26 13:19:07 -080046import java.util.Collection;
Madan Jampani3d76c942015-06-29 23:37:10 -070047import java.util.Collections;
Jonathan Hartdb3af892015-01-26 13:19:07 -080048import java.util.List;
49import java.util.Map;
Madan Jampanid13f3b82015-07-01 17:37:50 -070050import java.util.Objects;
Madan Jampani43e9c9c2015-06-26 14:16:46 -070051import java.util.Optional;
Jonathan Hartdb3af892015-01-26 13:19:07 -080052import java.util.Set;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080053import java.util.Timer;
Jonathan Hartdb3af892015-01-26 13:19:07 -080054import java.util.concurrent.ExecutorService;
55import java.util.concurrent.Executors;
56import java.util.concurrent.ScheduledExecutorService;
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -080057import java.util.concurrent.TimeUnit;
Madan Jampani3d76c942015-06-29 23:37:10 -070058import java.util.concurrent.atomic.AtomicBoolean;
Madan Jampani43e9c9c2015-06-26 14:16:46 -070059import java.util.concurrent.atomic.AtomicReference;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080060import java.util.function.BiFunction;
Jonathan Hartdb3af892015-01-26 13:19:07 -080061import java.util.stream.Collectors;
62
63import static com.google.common.base.Preconditions.checkNotNull;
64import static com.google.common.base.Preconditions.checkState;
65import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Brian O'Connorc6713a82015-02-24 11:55:48 -080066import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
Brian O'Connor4b2ba5f2015-02-18 20:54:00 -080067import static org.onlab.util.Tools.groupedThreads;
Jonathan Hart4a29c592015-09-23 17:55:07 -070068import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
69import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
Jonathan Hartdb3af892015-01-26 13:19:07 -080070
71/**
72 * Distributed Map implementation which uses optimistic replication and gossip
73 * based techniques to provide an eventually consistent data store.
74 */
75public class EventuallyConsistentMapImpl<K, V>
76 implements EventuallyConsistentMap<K, V> {
77
78 private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
79
Madan Jampani3d76c942015-06-29 23:37:10 -070080 private final Map<K, MapValue<V>> items;
Jonathan Hartdb3af892015-01-26 13:19:07 -080081
Jonathan Hartdb3af892015-01-26 13:19:07 -080082 private final ClusterService clusterService;
83 private final ClusterCommunicationService clusterCommunicator;
84 private final KryoSerializer serializer;
Madan Jampani3d76c942015-06-29 23:37:10 -070085 private final NodeId localNodeId;
Aaron Kruglikov66cf0b92015-10-26 15:46:54 -070086 private final PersistenceService persistenceService;
Jonathan Hartdb3af892015-01-26 13:19:07 -080087
Madan Jampanibcf1a482015-06-24 19:05:56 -070088 private final BiFunction<K, V, Timestamp> timestampProvider;
Jonathan Hartdb3af892015-01-26 13:19:07 -080089
90 private final MessageSubject updateMessageSubject;
Jonathan Hartaaa56572015-01-28 21:56:35 -080091 private final MessageSubject antiEntropyAdvertisementSubject;
Jonathan Hartdb3af892015-01-26 13:19:07 -080092
Jonathan Hartaaa56572015-01-28 21:56:35 -080093 private final Set<EventuallyConsistentMapListener<K, V>> listeners
Madan Jampani3d76c942015-06-29 23:37:10 -070094 = Sets.newCopyOnWriteArraySet();
Jonathan Hartdb3af892015-01-26 13:19:07 -080095
96 private final ExecutorService executor;
Jonathan Hartdb3af892015-01-26 13:19:07 -080097 private final ScheduledExecutorService backgroundExecutor;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080098 private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
Jonathan Hartdb3af892015-01-26 13:19:07 -080099
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700100 private final ExecutorService communicationExecutor;
101 private final Map<NodeId, EventAccumulator> senderPending;
Madan Jampani28726282015-02-19 11:40:23 -0800102
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700103 private final String mapName;
104
Jonathan Hartdb3af892015-01-26 13:19:07 -0800105 private volatile boolean destroyed = false;
Jonathan Hart539a6462015-01-27 17:05:43 -0800106 private static final String ERROR_DESTROYED = " map is already destroyed";
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800107 private final String destroyedMessage;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800108
Jonathan Hart4f397e82015-02-04 09:10:41 -0800109 private static final String ERROR_NULL_KEY = "Key cannot be null";
110 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
111
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700112 private final long initialDelaySec = 5;
113 private final boolean lightweightAntiEntropy;
114 private final boolean tombstonesDisabled;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800115
Jonathan Hart233a18a2015-03-02 17:24:58 -0800116 private static final int WINDOW_SIZE = 5;
117 private static final int HIGH_LOAD_THRESHOLD = 0;
118 private static final int LOAD_WINDOW = 2;
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700119 private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
Jonathan Hart233a18a2015-03-02 17:24:58 -0800120
Jonathan Hartca335e92015-03-05 10:34:32 -0800121 private final boolean persistent;
Aaron Kruglikov66cf0b92015-10-26 15:46:54 -0700122
123 private static final String PERSISTENT_LOCAL_MAP_NAME = "itemsMap";
124
Jonathan Hartca335e92015-03-05 10:34:32 -0800125
Jonathan Hartdb3af892015-01-26 13:19:07 -0800126 /**
127 * Creates a new eventually consistent map shared amongst multiple instances.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800128 * <p>
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700129 * See {@link org.onosproject.store.service.EventuallyConsistentMapBuilder}
130 * for more description of the parameters expected by the map.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800131 * </p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800132 *
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700133 * @param mapName a String identifier for the map.
134 * @param clusterService the cluster service
135 * @param clusterCommunicator the cluster communications service
136 * @param serializerBuilder a Kryo namespace builder that can serialize
137 * both K and V
Madan Jampanibcf1a482015-06-24 19:05:56 -0700138 * @param timestampProvider provider of timestamps for K and V
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700139 * @param peerUpdateFunction function that provides a set of nodes to immediately
140 * update to when there writes to the map
141 * @param eventExecutor executor to use for processing incoming
142 * events from peers
143 * @param communicationExecutor executor to use for sending events to peers
144 * @param backgroundExecutor executor to use for background anti-entropy
145 * tasks
146 * @param tombstonesDisabled true if this map should not maintain
147 * tombstones
148 * @param antiEntropyPeriod period that the anti-entropy task should run
Jonathan Hartca335e92015-03-05 10:34:32 -0800149 * @param antiEntropyTimeUnit time unit for anti-entropy period
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700150 * @param convergeFaster make anti-entropy try to converge faster
Jonathan Hartca335e92015-03-05 10:34:32 -0800151 * @param persistent persist data to disk
Jonathan Hartdb3af892015-01-26 13:19:07 -0800152 */
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700153 EventuallyConsistentMapImpl(String mapName,
154 ClusterService clusterService,
155 ClusterCommunicationService clusterCommunicator,
156 KryoNamespace.Builder serializerBuilder,
Madan Jampanibcf1a482015-06-24 19:05:56 -0700157 BiFunction<K, V, Timestamp> timestampProvider,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700158 BiFunction<K, V, Collection<NodeId>> peerUpdateFunction,
159 ExecutorService eventExecutor,
160 ExecutorService communicationExecutor,
161 ScheduledExecutorService backgroundExecutor,
162 boolean tombstonesDisabled,
163 long antiEntropyPeriod,
164 TimeUnit antiEntropyTimeUnit,
Jonathan Hartca335e92015-03-05 10:34:32 -0800165 boolean convergeFaster,
Aaron Kruglikov66cf0b92015-10-26 15:46:54 -0700166 boolean persistent,
167 PersistenceService persistenceService) {
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700168 this.mapName = mapName;
Aaron Kruglikov66cf0b92015-10-26 15:46:54 -0700169 this.serializer = createSerializer(serializerBuilder);
170 this.persistenceService = persistenceService;
171 this.persistent =
172 persistent;
173 if (persistent) {
174 items = this.persistenceService.<K, MapValue<V>>persistentMapBuilder()
175 .withName(PERSISTENT_LOCAL_MAP_NAME)
176 .withSerializer(new Serializer() {
177
178 @Override
179 public <T> byte[] encode(T object) {
180 return EventuallyConsistentMapImpl.this.serializer.encode(object);
181 }
182
183 @Override
184 public <T> T decode(byte[] bytes) {
185 return EventuallyConsistentMapImpl.this.serializer.decode(bytes);
186 }
187 })
188 .build();
189 } else {
190 items = Maps.newConcurrentMap();
191 }
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800192 senderPending = Maps.newConcurrentMap();
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700193 destroyedMessage = mapName + ERROR_DESTROYED;
Madan Jampani28726282015-02-19 11:40:23 -0800194
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700195 this.clusterService = clusterService;
196 this.clusterCommunicator = clusterCommunicator;
Madan Jampani3d76c942015-06-29 23:37:10 -0700197 this.localNodeId = clusterService.getLocalNode().id();
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700198
Madan Jampanibcf1a482015-06-24 19:05:56 -0700199 this.timestampProvider = timestampProvider;
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700200
201 if (peerUpdateFunction != null) {
202 this.peerUpdateFunction = peerUpdateFunction;
203 } else {
204 this.peerUpdateFunction = (key, value) -> clusterService.getNodes().stream()
205 .map(ControllerNode::id)
Madan Jampani3d76c942015-06-29 23:37:10 -0700206 .filter(nodeId -> !nodeId.equals(localNodeId))
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700207 .collect(Collectors.toList());
208 }
209
210 if (eventExecutor != null) {
211 this.executor = eventExecutor;
212 } else {
213 // should be a normal executor; it's used for receiving messages
214 this.executor =
215 Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d"));
216 }
217
218 if (communicationExecutor != null) {
219 this.communicationExecutor = communicationExecutor;
220 } else {
221 // sending executor; should be capped
222 //TODO this probably doesn't need to be bounded anymore
223 this.communicationExecutor =
224 newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
225 }
226
Jonathan Hartca335e92015-03-05 10:34:32 -0800227
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700228 if (backgroundExecutor != null) {
229 this.backgroundExecutor = backgroundExecutor;
230 } else {
231 this.backgroundExecutor =
232 newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d"));
233 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800234
Jonathan Hartaaa56572015-01-28 21:56:35 -0800235 // start anti-entropy thread
Madan Jampani3d76c942015-06-29 23:37:10 -0700236 this.backgroundExecutor.scheduleAtFixedRate(this::sendAdvertisement,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700237 initialDelaySec, antiEntropyPeriod,
238 antiEntropyTimeUnit);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800239
Jonathan Hartdb3af892015-01-26 13:19:07 -0800240 updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
241 clusterCommunicator.addSubscriber(updateMessageSubject,
Madan Jampani3d76c942015-06-29 23:37:10 -0700242 serializer::decode,
243 this::processUpdates,
244 this.executor);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800245
Jonathan Hartaaa56572015-01-28 21:56:35 -0800246 antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
247 clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
Madan Jampani3d76c942015-06-29 23:37:10 -0700248 serializer::decode,
249 this::handleAntiEntropyAdvertisement,
250 this.backgroundExecutor);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800251
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700252 this.tombstonesDisabled = tombstonesDisabled;
253 this.lightweightAntiEntropy = !convergeFaster;
Madan Jampanie1356282015-03-10 19:05:36 -0700254 }
255
Jonathan Hartdb3af892015-01-26 13:19:07 -0800256 private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
257 return new KryoSerializer() {
258 @Override
259 protected void setupKryoPool() {
260 // Add the map's internal helper classes to the user-supplied serializer
261 serializerPool = builder
Madan Jampani3d76c942015-06-29 23:37:10 -0700262 .register(KryoNamespaces.BASIC)
Madan Jampanidb5d06a2015-06-30 11:16:48 -0700263 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
Madan Jampani3e033bd2015-04-08 13:03:49 -0700264 .register(LogicalTimestamp.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800265 .register(WallClockTimestamp.class)
Jonathan Hartaaa56572015-01-28 21:56:35 -0800266 .register(AntiEntropyAdvertisement.class)
Madan Jampani3d76c942015-06-29 23:37:10 -0700267 .register(UpdateEntry.class)
268 .register(MapValue.class)
269 .register(MapValue.Digest.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800270 .build();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800271 }
272 };
273 }
274
275 @Override
Madan Jampania090a112016-01-18 16:38:17 -0800276 public String name() {
277 return mapName;
278 }
279
280 @Override
Jonathan Hartdb3af892015-01-26 13:19:07 -0800281 public int size() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800282 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700283 // TODO: Maintain a separate counter for tracking live elements in map.
284 return Maps.filterValues(items, MapValue::isAlive).size();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800285 }
286
287 @Override
288 public boolean isEmpty() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800289 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700290 return size() == 0;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800291 }
292
293 @Override
294 public boolean containsKey(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800295 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800296 checkNotNull(key, ERROR_NULL_KEY);
Madan Jampani3d76c942015-06-29 23:37:10 -0700297 return get(key) != null;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800298 }
299
300 @Override
301 public boolean containsValue(V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800302 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800303 checkNotNull(value, ERROR_NULL_VALUE);
Madan Jampani3d76c942015-06-29 23:37:10 -0700304 return items.values()
305 .stream()
306 .filter(MapValue::isAlive)
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700307 .anyMatch(v -> value.equals(v.get()));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800308 }
309
310 @Override
311 public V get(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800312 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800313 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800314
Madan Jampani3d76c942015-06-29 23:37:10 -0700315 MapValue<V> value = items.get(key);
316 return (value == null || value.isTombstone()) ? null : value.get();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800317 }
318
319 @Override
320 public void put(K key, V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800321 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800322 checkNotNull(key, ERROR_NULL_KEY);
323 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800324
Madan Jampani3d76c942015-06-29 23:37:10 -0700325 MapValue<V> newValue = new MapValue<>(value, timestampProvider.apply(key, value));
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700326 if (putInternal(key, newValue)) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700327 notifyPeers(new UpdateEntry<>(key, newValue), peerUpdateFunction.apply(key, value));
Madan Jampanicab114c2015-07-23 00:14:19 -0700328 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, PUT, key, value));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800329 }
330 }
331
Jonathan Hartdb3af892015-01-26 13:19:07 -0800332 @Override
Madan Jampani43e9c9c2015-06-26 14:16:46 -0700333 public V remove(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800334 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800335 checkNotNull(key, ERROR_NULL_KEY);
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700336 return removeAndNotify(key, null);
337 }
338
339 @Override
340 public void remove(K key, V value) {
341 checkState(!destroyed, destroyedMessage);
342 checkNotNull(key, ERROR_NULL_KEY);
343 checkNotNull(value, ERROR_NULL_VALUE);
344 removeAndNotify(key, value);
345 }
346
347 private V removeAndNotify(K key, V value) {
Madan Jampani483d0a22015-08-19 17:33:00 -0700348 Timestamp timestamp = timestampProvider.apply(key, value);
349 Optional<MapValue<V>> tombstone = tombstonesDisabled || timestamp == null
350 ? Optional.empty() : Optional.of(MapValue.tombstone(timestamp));
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700351 MapValue<V> previousValue = removeInternal(key, Optional.ofNullable(value), tombstone);
Madan Jampania0ac4872015-07-02 11:23:49 -0700352 if (previousValue != null) {
Madan Jampani483d0a22015-08-19 17:33:00 -0700353 notifyPeers(new UpdateEntry<>(key, tombstone.orElse(null)),
354 peerUpdateFunction.apply(key, previousValue.get()));
Madan Jampania0ac4872015-07-02 11:23:49 -0700355 if (previousValue.isAlive()) {
Madan Jampanicab114c2015-07-23 00:14:19 -0700356 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
Madan Jampania0ac4872015-07-02 11:23:49 -0700357 }
358 }
Madan Jampanid13f3b82015-07-01 17:37:50 -0700359 return previousValue != null ? previousValue.get() : null;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800360 }
361
Madan Jampani483d0a22015-08-19 17:33:00 -0700362 private MapValue<V> removeInternal(K key, Optional<V> value, Optional<MapValue<V>> tombstone) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700363 checkState(!destroyed, destroyedMessage);
364 checkNotNull(key, ERROR_NULL_KEY);
365 checkNotNull(value, ERROR_NULL_VALUE);
Madan Jampani483d0a22015-08-19 17:33:00 -0700366 tombstone.ifPresent(v -> checkState(v.isTombstone()));
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700367
368 counter.incrementCount();
Madan Jampani3d76c942015-06-29 23:37:10 -0700369 AtomicBoolean updated = new AtomicBoolean(false);
Madan Jampanid13f3b82015-07-01 17:37:50 -0700370 AtomicReference<MapValue<V>> previousValue = new AtomicReference<>();
Madan Jampani3d76c942015-06-29 23:37:10 -0700371 items.compute(key, (k, existing) -> {
Madan Jampanid13f3b82015-07-01 17:37:50 -0700372 boolean valueMatches = true;
373 if (value.isPresent() && existing != null && existing.isAlive()) {
374 valueMatches = Objects.equals(value.get(), existing.get());
Madan Jampani3d76c942015-06-29 23:37:10 -0700375 }
Brian O'Connor6325dad2015-07-07 15:36:29 -0700376 if (existing == null) {
Jonathan Hart4a29c592015-09-23 17:55:07 -0700377 log.trace("ECMap Remove: Existing value for key {} is already null", k);
Brian O'Connor6325dad2015-07-07 15:36:29 -0700378 }
Madan Jampani483d0a22015-08-19 17:33:00 -0700379 if (valueMatches) {
380 if (existing == null) {
381 updated.set(tombstone.isPresent());
382 } else {
383 updated.set(!tombstone.isPresent() || tombstone.get().isNewerThan(existing));
384 }
Madan Jampanid13f3b82015-07-01 17:37:50 -0700385 }
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700386 if (updated.get()) {
Madan Jampani483d0a22015-08-19 17:33:00 -0700387 previousValue.set(existing);
388 return tombstone.orElse(null);
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700389 } else {
390 return existing;
391 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700392 });
Madan Jampanid13f3b82015-07-01 17:37:50 -0700393 return previousValue.get();
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800394 }
395
396 @Override
Madan Jampani4727a112015-07-16 12:12:58 -0700397 public V compute(K key, BiFunction<K, V, V> recomputeFunction) {
398 checkState(!destroyed, destroyedMessage);
399 checkNotNull(key, ERROR_NULL_KEY);
400 checkNotNull(recomputeFunction, "Recompute function cannot be null");
401
402 AtomicBoolean updated = new AtomicBoolean(false);
403 AtomicReference<MapValue<V>> previousValue = new AtomicReference<>();
404 MapValue<V> computedValue = items.compute(key, (k, mv) -> {
405 previousValue.set(mv);
406 V newRawValue = recomputeFunction.apply(key, mv == null ? null : mv.get());
407 MapValue<V> newValue = new MapValue<>(newRawValue, timestampProvider.apply(key, newRawValue));
408 if (mv == null || newValue.isNewerThan(mv)) {
409 updated.set(true);
410 return newValue;
411 } else {
412 return mv;
413 }
414 });
415 if (updated.get()) {
416 notifyPeers(new UpdateEntry<>(key, computedValue), peerUpdateFunction.apply(key, computedValue.get()));
417 EventuallyConsistentMapEvent.Type updateType = computedValue.isTombstone() ? REMOVE : PUT;
418 V value = computedValue.isTombstone()
419 ? previousValue.get() == null ? null : previousValue.get().get()
420 : computedValue.get();
421 if (value != null) {
Madan Jampanicab114c2015-07-23 00:14:19 -0700422 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, updateType, key, value));
Madan Jampani4727a112015-07-16 12:12:58 -0700423 }
424 }
425 return computedValue.get();
426 }
427
428 @Override
Jonathan Hartdb3af892015-01-26 13:19:07 -0800429 public void putAll(Map<? extends K, ? extends V> m) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800430 checkState(!destroyed, destroyedMessage);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800431 m.forEach(this::put);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800432 }
433
434 @Override
435 public void clear() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800436 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700437 Maps.filterValues(items, MapValue::isAlive)
438 .forEach((k, v) -> remove(k));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800439 }
440
441 @Override
442 public Set<K> keySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800443 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700444 return Maps.filterValues(items, MapValue::isAlive)
445 .keySet();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800446 }
447
448 @Override
449 public Collection<V> values() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800450 checkState(!destroyed, destroyedMessage);
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700451 return Collections2.transform(Maps.filterValues(items, MapValue::isAlive).values(), MapValue::get);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800452 }
453
454 @Override
455 public Set<Map.Entry<K, V>> entrySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800456 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700457 return Maps.filterValues(items, MapValue::isAlive)
458 .entrySet()
459 .stream()
460 .map(e -> Pair.of(e.getKey(), e.getValue().get()))
461 .collect(Collectors.toSet());
462 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800463
Madan Jampani3d76c942015-06-29 23:37:10 -0700464 /**
465 * Returns true if newValue was accepted i.e. map is updated.
Aaron Kruglikov66cf0b92015-10-26 15:46:54 -0700466 *
Madan Jampani3d76c942015-06-29 23:37:10 -0700467 * @param key key
468 * @param newValue proposed new value
469 * @return true if update happened; false if map already contains a more recent value for the key
470 */
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700471 private boolean putInternal(K key, MapValue<V> newValue) {
472 checkState(!destroyed, destroyedMessage);
473 checkNotNull(key, ERROR_NULL_KEY);
474 checkNotNull(newValue, ERROR_NULL_VALUE);
475 checkState(newValue.isAlive());
476 counter.incrementCount();
Madan Jampani3d76c942015-06-29 23:37:10 -0700477 AtomicBoolean updated = new AtomicBoolean(false);
478 items.compute(key, (k, existing) -> {
479 if (existing == null || newValue.isNewerThan(existing)) {
480 updated.set(true);
Madan Jampani3d76c942015-06-29 23:37:10 -0700481 return newValue;
482 }
483 return existing;
484 });
Madan Jampani3d76c942015-06-29 23:37:10 -0700485 return updated.get();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800486 }
487
488 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800489 public void addListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800490 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800491
492 listeners.add(checkNotNull(listener));
493 }
494
495 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800496 public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800497 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800498
499 listeners.remove(checkNotNull(listener));
500 }
501
502 @Override
503 public void destroy() {
504 destroyed = true;
505
506 executor.shutdown();
507 backgroundExecutor.shutdown();
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800508 communicationExecutor.shutdown();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800509
Jonathan Hart584d2f32015-01-27 19:46:14 -0800510 listeners.clear();
511
Jonathan Hartdb3af892015-01-26 13:19:07 -0800512 clusterCommunicator.removeSubscriber(updateMessageSubject);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800513 clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800514 }
515
Jonathan Hartaaa56572015-01-28 21:56:35 -0800516 private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700517 listeners.forEach(listener -> listener.event(event));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800518 }
519
Madan Jampani3d76c942015-06-29 23:37:10 -0700520 private void notifyPeers(UpdateEntry<K, V> event, Collection<NodeId> peers) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800521 queueUpdate(event, peers);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800522 }
523
Madan Jampani3d76c942015-06-29 23:37:10 -0700524 private void queueUpdate(UpdateEntry<K, V> event, Collection<NodeId> peers) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800525 if (peers == null) {
526 // we have no friends :(
527 return;
528 }
529 peers.forEach(node ->
Jonathan Hart9a426f82015-09-03 15:43:13 +0200530 senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event)
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800531 );
532 }
533
Jonathan Hart233a18a2015-03-02 17:24:58 -0800534 private boolean underHighLoad() {
535 return counter.get(LOAD_WINDOW) > HIGH_LOAD_THRESHOLD;
536 }
537
Madan Jampani3d76c942015-06-29 23:37:10 -0700538 private void sendAdvertisement() {
539 try {
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700540 if (underHighLoad() || destroyed) {
Jonathan Hart233a18a2015-03-02 17:24:58 -0800541 return;
542 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700543 pickRandomActivePeer().ifPresent(this::sendAdvertisementToPeer);
544 } catch (Exception e) {
545 // Catch all exceptions to avoid scheduled task being suppressed.
546 log.error("Exception thrown while sending advertisement", e);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800547 }
548 }
549
Madan Jampani3d76c942015-06-29 23:37:10 -0700550 private Optional<NodeId> pickRandomActivePeer() {
551 List<NodeId> activePeers = clusterService.getNodes()
552 .stream()
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700553 .map(ControllerNode::id)
554 .filter(id -> !localNodeId.equals(id))
Madan Jampani3d76c942015-06-29 23:37:10 -0700555 .filter(id -> clusterService.getState(id) == ControllerNode.State.ACTIVE)
556 .collect(Collectors.toList());
557 Collections.shuffle(activePeers);
558 return activePeers.isEmpty() ? Optional.empty() : Optional.of(activePeers.get(0));
559 }
560
561 private void sendAdvertisementToPeer(NodeId peer) {
562 clusterCommunicator.unicast(createAdvertisement(),
563 antiEntropyAdvertisementSubject,
564 serializer::encode,
565 peer)
566 .whenComplete((result, error) -> {
567 if (error != null) {
Madan Jampania0ac4872015-07-02 11:23:49 -0700568 log.debug("Failed to send anti-entropy advertisement to {}", peer, error);
Madan Jampani3d76c942015-06-29 23:37:10 -0700569 }
570 });
571 }
572
Jonathan Hartaaa56572015-01-28 21:56:35 -0800573 private AntiEntropyAdvertisement<K> createAdvertisement() {
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700574 return new AntiEntropyAdvertisement<K>(localNodeId,
575 ImmutableMap.copyOf(Maps.transformValues(items, MapValue::digest)));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800576 }
577
578 private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700579 if (destroyed || underHighLoad()) {
580 return;
581 }
582 try {
Jonathan Hart9a426f82015-09-03 15:43:13 +0200583 if (log.isTraceEnabled()) {
584 log.trace("Received anti-entropy advertisement from {} for {} with {} entries in it",
585 mapName, ad.sender(), ad.digest().size());
586 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700587 antiEntropyCheckLocalItems(ad).forEach(this::notifyListeners);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800588
Madan Jampani3d76c942015-06-29 23:37:10 -0700589 if (!lightweightAntiEntropy) {
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700590 // if remote ad has any entries that the local copy is missing, actively sync
591 // TODO: Missing keys is not the way local copy can be behind.
592 if (Sets.difference(ad.digest().keySet(), items.keySet()).size() > 0) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700593 // TODO: Send ad for missing keys and for entries that are stale
594 sendAdvertisementToPeer(ad.sender());
Jonathan Hartf893be82015-02-24 17:35:51 -0800595 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800596 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700597 } catch (Exception e) {
598 log.warn("Error handling anti-entropy advertisement", e);
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800599 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800600 }
601
602 /**
Madan Jampani3d76c942015-06-29 23:37:10 -0700603 * Processes anti-entropy ad from peer by taking following actions:
604 * 1. If peer has an old entry, updates peer.
605 * 2. If peer indicates an entry is removed and has a more recent
606 * timestamp than the local entry, update local state.
Jonathan Hartaaa56572015-01-28 21:56:35 -0800607 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800608 private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
609 AntiEntropyAdvertisement<K> ad) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700610 final List<EventuallyConsistentMapEvent<K, V>> externalEvents = Lists.newLinkedList();
Jonathan Hartaaa56572015-01-28 21:56:35 -0800611 final NodeId sender = ad.sender();
Madan Jampani3d76c942015-06-29 23:37:10 -0700612 items.forEach((key, localValue) -> {
613 MapValue.Digest remoteValueDigest = ad.digest().get(key);
614 if (remoteValueDigest == null || localValue.isNewerThan(remoteValueDigest.timestamp())) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800615 // local value is more recent, push to sender
Madan Jampani3d76c942015-06-29 23:37:10 -0700616 queueUpdate(new UpdateEntry<>(key, localValue), ImmutableList.of(sender));
Madan Jampanid13f3b82015-07-01 17:37:50 -0700617 }
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700618 if (remoteValueDigest != null
619 && remoteValueDigest.isNewerThan(localValue.digest())
620 && remoteValueDigest.isTombstone()) {
Madan Jampani483d0a22015-08-19 17:33:00 -0700621 MapValue<V> tombstone = MapValue.tombstone(remoteValueDigest.timestamp());
Madan Jampanid13f3b82015-07-01 17:37:50 -0700622 MapValue<V> previousValue = removeInternal(key,
623 Optional.empty(),
Madan Jampani483d0a22015-08-19 17:33:00 -0700624 Optional.of(tombstone));
Madan Jampanid13f3b82015-07-01 17:37:50 -0700625 if (previousValue != null && previousValue.isAlive()) {
Madan Jampanicab114c2015-07-23 00:14:19 -0700626 externalEvents.add(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800627 }
628 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700629 });
Jonathan Hartaaa56572015-01-28 21:56:35 -0800630 return externalEvents;
631 }
632
Madan Jampani3d76c942015-06-29 23:37:10 -0700633 private void processUpdates(Collection<UpdateEntry<K, V>> updates) {
634 if (destroyed) {
635 return;
Jonathan Hartaaa56572015-01-28 21:56:35 -0800636 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700637 updates.forEach(update -> {
638 final K key = update.key();
639 final MapValue<V> value = update.value();
Madan Jampani483d0a22015-08-19 17:33:00 -0700640 if (value == null || value.isTombstone()) {
641 MapValue<V> previousValue = removeInternal(key, Optional.empty(), Optional.ofNullable(value));
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700642 if (previousValue != null && previousValue.isAlive()) {
Madan Jampanicab114c2015-07-23 00:14:19 -0700643 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
Madan Jampanid13f3b82015-07-01 17:37:50 -0700644 }
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700645 } else if (putInternal(key, value)) {
Madan Jampanicab114c2015-07-23 00:14:19 -0700646 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, PUT, key, value.get()));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800647 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700648 });
Jonathan Hartdb3af892015-01-26 13:19:07 -0800649 }
650
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800651 // TODO pull this into the class if this gets pulled out...
652 private static final int DEFAULT_MAX_EVENTS = 1000;
653 private static final int DEFAULT_MAX_IDLE_MS = 10;
654 private static final int DEFAULT_MAX_BATCH_MS = 50;
655 private static final Timer TIMER = new Timer("onos-ecm-sender-events");
Jonathan Hartdb3af892015-01-26 13:19:07 -0800656
Madan Jampani3d76c942015-06-29 23:37:10 -0700657 private final class EventAccumulator extends AbstractAccumulator<UpdateEntry<K, V>> {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800658
659 private final NodeId peer;
660
661 private EventAccumulator(NodeId peer) {
662 super(TIMER, DEFAULT_MAX_EVENTS, DEFAULT_MAX_BATCH_MS, DEFAULT_MAX_IDLE_MS);
663 this.peer = peer;
664 }
665
666 @Override
Madan Jampani3d76c942015-06-29 23:37:10 -0700667 public void processItems(List<UpdateEntry<K, V>> items) {
668 Map<K, UpdateEntry<K, V>> map = Maps.newHashMap();
669 items.forEach(item -> map.compute(item.key(), (key, existing) ->
Madan Jampani92c64eb2015-07-23 15:37:07 -0700670 item.isNewerThan(existing) ? item : existing));
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800671 communicationExecutor.submit(() -> {
Madan Jampani3d76c942015-06-29 23:37:10 -0700672 clusterCommunicator.unicast(ImmutableList.copyOf(map.values()),
Madan Jampani175e8fd2015-05-20 14:10:45 -0700673 updateMessageSubject,
674 serializer::encode,
675 peer)
676 .whenComplete((result, error) -> {
677 if (error != null) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700678 log.debug("Failed to send to {}", peer, error);
Madan Jampani175e8fd2015-05-20 14:10:45 -0700679 }
680 });
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800681 });
Jonathan Hartdb3af892015-01-26 13:19:07 -0800682 }
683 }
Jonathan Hart4a29c592015-09-23 17:55:07 -0700684}