blob: c20016b7d1aa06e0db4ce51272418fcaef625b40 [file] [log] [blame]
Jonathan Hartdb3af892015-01-26 13:19:07 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Jonathan Hart77bdd262015-02-03 09:07:48 -080016package org.onosproject.store.ecmap;
Jonathan Hartdb3af892015-01-26 13:19:07 -080017
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080018import com.google.common.collect.ImmutableList;
19import com.google.common.collect.Lists;
20import com.google.common.collect.Maps;
Madan Jampani3d76c942015-06-29 23:37:10 -070021import com.google.common.collect.Sets;
Madan Jampani3e033bd2015-04-08 13:03:49 -070022
Jonathan Hartf9108232015-02-02 16:37:35 -080023import org.apache.commons.lang3.tuple.Pair;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080024import org.onlab.util.AbstractAccumulator;
Jonathan Hartdb3af892015-01-26 13:19:07 -080025import org.onlab.util.KryoNamespace;
Jonathan Hart233a18a2015-03-02 17:24:58 -080026import org.onlab.util.SlidingWindowCounter;
Jonathan Hartdb3af892015-01-26 13:19:07 -080027import org.onosproject.cluster.ClusterService;
Jonathan Hartaaa56572015-01-28 21:56:35 -080028import org.onosproject.cluster.ControllerNode;
Jonathan Hartdb3af892015-01-26 13:19:07 -080029import org.onosproject.cluster.NodeId;
30import org.onosproject.store.Timestamp;
31import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Jonathan Hartdb3af892015-01-26 13:19:07 -080032import org.onosproject.store.cluster.messaging.MessageSubject;
Madan Jampani3e033bd2015-04-08 13:03:49 -070033import org.onosproject.store.impl.LogicalTimestamp;
Jonathan Hart63939a32015-05-08 11:57:03 -070034import org.onosproject.store.service.WallClockTimestamp;
Madan Jampani3d76c942015-06-29 23:37:10 -070035import org.onosproject.store.serializers.KryoNamespaces;
Jonathan Hartdb3af892015-01-26 13:19:07 -080036import org.onosproject.store.serializers.KryoSerializer;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070037import org.onosproject.store.service.EventuallyConsistentMap;
38import org.onosproject.store.service.EventuallyConsistentMapEvent;
39import org.onosproject.store.service.EventuallyConsistentMapListener;
Jonathan Hartdb3af892015-01-26 13:19:07 -080040import org.slf4j.Logger;
41import org.slf4j.LoggerFactory;
42
Madan Jampani3d76c942015-06-29 23:37:10 -070043import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
44import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
45
Jonathan Hartdb3af892015-01-26 13:19:07 -080046import java.util.Collection;
Madan Jampani3d76c942015-06-29 23:37:10 -070047import java.util.Collections;
Jonathan Hartdb3af892015-01-26 13:19:07 -080048import java.util.List;
49import java.util.Map;
Madan Jampanid13f3b82015-07-01 17:37:50 -070050import java.util.Objects;
Madan Jampani43e9c9c2015-06-26 14:16:46 -070051import java.util.Optional;
Jonathan Hartdb3af892015-01-26 13:19:07 -080052import java.util.Set;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080053import java.util.Timer;
Jonathan Hartdb3af892015-01-26 13:19:07 -080054import java.util.concurrent.ExecutorService;
55import java.util.concurrent.Executors;
56import java.util.concurrent.ScheduledExecutorService;
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -080057import java.util.concurrent.TimeUnit;
Madan Jampani3d76c942015-06-29 23:37:10 -070058import java.util.concurrent.atomic.AtomicBoolean;
Madan Jampani43e9c9c2015-06-26 14:16:46 -070059import java.util.concurrent.atomic.AtomicReference;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080060import java.util.function.BiFunction;
Jonathan Hartdb3af892015-01-26 13:19:07 -080061import java.util.stream.Collectors;
62
63import static com.google.common.base.Preconditions.checkNotNull;
64import static com.google.common.base.Preconditions.checkState;
65import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Brian O'Connorc6713a82015-02-24 11:55:48 -080066import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
Brian O'Connor4b2ba5f2015-02-18 20:54:00 -080067import static org.onlab.util.Tools.groupedThreads;
Jonathan Hartdb3af892015-01-26 13:19:07 -080068
69/**
70 * Distributed Map implementation which uses optimistic replication and gossip
71 * based techniques to provide an eventually consistent data store.
72 */
73public class EventuallyConsistentMapImpl<K, V>
74 implements EventuallyConsistentMap<K, V> {
75
76 private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
77
Madan Jampani3d76c942015-06-29 23:37:10 -070078 private final Map<K, MapValue<V>> items;
Jonathan Hartdb3af892015-01-26 13:19:07 -080079
Jonathan Hartdb3af892015-01-26 13:19:07 -080080 private final ClusterService clusterService;
81 private final ClusterCommunicationService clusterCommunicator;
82 private final KryoSerializer serializer;
Madan Jampani3d76c942015-06-29 23:37:10 -070083 private final NodeId localNodeId;
Jonathan Hartdb3af892015-01-26 13:19:07 -080084
Madan Jampanibcf1a482015-06-24 19:05:56 -070085 private final BiFunction<K, V, Timestamp> timestampProvider;
Jonathan Hartdb3af892015-01-26 13:19:07 -080086
87 private final MessageSubject updateMessageSubject;
Jonathan Hartaaa56572015-01-28 21:56:35 -080088 private final MessageSubject antiEntropyAdvertisementSubject;
Jonathan Hartdb3af892015-01-26 13:19:07 -080089
Jonathan Hartaaa56572015-01-28 21:56:35 -080090 private final Set<EventuallyConsistentMapListener<K, V>> listeners
Madan Jampani3d76c942015-06-29 23:37:10 -070091 = Sets.newCopyOnWriteArraySet();
Jonathan Hartdb3af892015-01-26 13:19:07 -080092
93 private final ExecutorService executor;
Jonathan Hartdb3af892015-01-26 13:19:07 -080094 private final ScheduledExecutorService backgroundExecutor;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080095 private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
Jonathan Hartdb3af892015-01-26 13:19:07 -080096
Jonathan Hart6ec029a2015-03-24 17:12:35 -070097 private final ExecutorService communicationExecutor;
98 private final Map<NodeId, EventAccumulator> senderPending;
Madan Jampani28726282015-02-19 11:40:23 -080099
Jonathan Hartdb3af892015-01-26 13:19:07 -0800100 private volatile boolean destroyed = false;
Jonathan Hart539a6462015-01-27 17:05:43 -0800101 private static final String ERROR_DESTROYED = " map is already destroyed";
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800102 private final String destroyedMessage;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800103
Jonathan Hart4f397e82015-02-04 09:10:41 -0800104 private static final String ERROR_NULL_KEY = "Key cannot be null";
105 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
106
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700107 private final long initialDelaySec = 5;
108 private final boolean lightweightAntiEntropy;
109 private final boolean tombstonesDisabled;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800110
Jonathan Hart233a18a2015-03-02 17:24:58 -0800111 private static final int WINDOW_SIZE = 5;
112 private static final int HIGH_LOAD_THRESHOLD = 0;
113 private static final int LOAD_WINDOW = 2;
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700114 private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
Jonathan Hart233a18a2015-03-02 17:24:58 -0800115
Jonathan Hartca335e92015-03-05 10:34:32 -0800116 private final boolean persistent;
117 private final PersistentStore<K, V> persistentStore;
118
Jonathan Hartdb3af892015-01-26 13:19:07 -0800119 /**
120 * Creates a new eventually consistent map shared amongst multiple instances.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800121 * <p>
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700122 * See {@link org.onosproject.store.service.EventuallyConsistentMapBuilder}
123 * for more description of the parameters expected by the map.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800124 * </p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800125 *
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700126 * @param mapName a String identifier for the map.
127 * @param clusterService the cluster service
128 * @param clusterCommunicator the cluster communications service
129 * @param serializerBuilder a Kryo namespace builder that can serialize
130 * both K and V
Madan Jampanibcf1a482015-06-24 19:05:56 -0700131 * @param timestampProvider provider of timestamps for K and V
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700132 * @param peerUpdateFunction function that provides a set of nodes to immediately
133 * update to when there writes to the map
134 * @param eventExecutor executor to use for processing incoming
135 * events from peers
136 * @param communicationExecutor executor to use for sending events to peers
137 * @param backgroundExecutor executor to use for background anti-entropy
138 * tasks
139 * @param tombstonesDisabled true if this map should not maintain
140 * tombstones
141 * @param antiEntropyPeriod period that the anti-entropy task should run
Jonathan Hartca335e92015-03-05 10:34:32 -0800142 * @param antiEntropyTimeUnit time unit for anti-entropy period
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700143 * @param convergeFaster make anti-entropy try to converge faster
Jonathan Hartca335e92015-03-05 10:34:32 -0800144 * @param persistent persist data to disk
Jonathan Hartdb3af892015-01-26 13:19:07 -0800145 */
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700146 EventuallyConsistentMapImpl(String mapName,
147 ClusterService clusterService,
148 ClusterCommunicationService clusterCommunicator,
149 KryoNamespace.Builder serializerBuilder,
Madan Jampanibcf1a482015-06-24 19:05:56 -0700150 BiFunction<K, V, Timestamp> timestampProvider,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700151 BiFunction<K, V, Collection<NodeId>> peerUpdateFunction,
152 ExecutorService eventExecutor,
153 ExecutorService communicationExecutor,
154 ScheduledExecutorService backgroundExecutor,
155 boolean tombstonesDisabled,
156 long antiEntropyPeriod,
157 TimeUnit antiEntropyTimeUnit,
Jonathan Hartca335e92015-03-05 10:34:32 -0800158 boolean convergeFaster,
159 boolean persistent) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700160 items = Maps.newConcurrentMap();
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800161 senderPending = Maps.newConcurrentMap();
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700162 destroyedMessage = mapName + ERROR_DESTROYED;
Madan Jampani28726282015-02-19 11:40:23 -0800163
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700164 this.clusterService = clusterService;
165 this.clusterCommunicator = clusterCommunicator;
Madan Jampani3d76c942015-06-29 23:37:10 -0700166 this.localNodeId = clusterService.getLocalNode().id();
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700167
168 this.serializer = createSerializer(serializerBuilder);
169
Madan Jampanibcf1a482015-06-24 19:05:56 -0700170 this.timestampProvider = timestampProvider;
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700171
172 if (peerUpdateFunction != null) {
173 this.peerUpdateFunction = peerUpdateFunction;
174 } else {
175 this.peerUpdateFunction = (key, value) -> clusterService.getNodes().stream()
176 .map(ControllerNode::id)
Madan Jampani3d76c942015-06-29 23:37:10 -0700177 .filter(nodeId -> !nodeId.equals(localNodeId))
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700178 .collect(Collectors.toList());
179 }
180
181 if (eventExecutor != null) {
182 this.executor = eventExecutor;
183 } else {
184 // should be a normal executor; it's used for receiving messages
185 this.executor =
186 Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d"));
187 }
188
189 if (communicationExecutor != null) {
190 this.communicationExecutor = communicationExecutor;
191 } else {
192 // sending executor; should be capped
193 //TODO this probably doesn't need to be bounded anymore
194 this.communicationExecutor =
195 newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
196 }
197
Jonathan Hartca335e92015-03-05 10:34:32 -0800198 this.persistent = persistent;
199
200 if (this.persistent) {
201 String dataDirectory = System.getProperty("karaf.data", "./data");
202 String filename = dataDirectory + "/" + "mapdb-ecm-" + mapName;
203
204 ExecutorService dbExecutor =
205 newFixedThreadPool(1, groupedThreads("onos/ecm", mapName + "-dbwriter"));
206
207 persistentStore = new MapDbPersistentStore<>(filename, dbExecutor, serializer);
Madan Jampani3d76c942015-06-29 23:37:10 -0700208 persistentStore.readInto(items);
Jonathan Hartca335e92015-03-05 10:34:32 -0800209 } else {
210 this.persistentStore = null;
211 }
212
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700213 if (backgroundExecutor != null) {
214 this.backgroundExecutor = backgroundExecutor;
215 } else {
216 this.backgroundExecutor =
217 newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d"));
218 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800219
Jonathan Hartaaa56572015-01-28 21:56:35 -0800220 // start anti-entropy thread
Madan Jampani3d76c942015-06-29 23:37:10 -0700221 this.backgroundExecutor.scheduleAtFixedRate(this::sendAdvertisement,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700222 initialDelaySec, antiEntropyPeriod,
223 antiEntropyTimeUnit);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800224
Jonathan Hartdb3af892015-01-26 13:19:07 -0800225 updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
226 clusterCommunicator.addSubscriber(updateMessageSubject,
Madan Jampani3d76c942015-06-29 23:37:10 -0700227 serializer::decode,
228 this::processUpdates,
229 this.executor);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800230
Jonathan Hartaaa56572015-01-28 21:56:35 -0800231 antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
232 clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
Madan Jampani3d76c942015-06-29 23:37:10 -0700233 serializer::decode,
234 this::handleAntiEntropyAdvertisement,
235 this.backgroundExecutor);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800236
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700237 this.tombstonesDisabled = tombstonesDisabled;
238 this.lightweightAntiEntropy = !convergeFaster;
Madan Jampanie1356282015-03-10 19:05:36 -0700239 }
240
Jonathan Hartdb3af892015-01-26 13:19:07 -0800241 private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
242 return new KryoSerializer() {
243 @Override
244 protected void setupKryoPool() {
245 // Add the map's internal helper classes to the user-supplied serializer
246 serializerPool = builder
Madan Jampani3d76c942015-06-29 23:37:10 -0700247 .register(KryoNamespaces.BASIC)
Madan Jampanidb5d06a2015-06-30 11:16:48 -0700248 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
Madan Jampani3e033bd2015-04-08 13:03:49 -0700249 .register(LogicalTimestamp.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800250 .register(WallClockTimestamp.class)
Jonathan Hartaaa56572015-01-28 21:56:35 -0800251 .register(AntiEntropyAdvertisement.class)
Madan Jampani3d76c942015-06-29 23:37:10 -0700252 .register(UpdateEntry.class)
253 .register(MapValue.class)
254 .register(MapValue.Digest.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800255 .build();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800256 }
257 };
258 }
259
260 @Override
261 public int size() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800262 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700263 // TODO: Maintain a separate counter for tracking live elements in map.
264 return Maps.filterValues(items, MapValue::isAlive).size();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800265 }
266
267 @Override
268 public boolean isEmpty() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800269 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700270 return size() == 0;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800271 }
272
273 @Override
274 public boolean containsKey(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800275 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800276 checkNotNull(key, ERROR_NULL_KEY);
Madan Jampani3d76c942015-06-29 23:37:10 -0700277 return get(key) != null;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800278 }
279
280 @Override
281 public boolean containsValue(V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800282 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800283 checkNotNull(value, ERROR_NULL_VALUE);
Madan Jampani3d76c942015-06-29 23:37:10 -0700284 return items.values()
285 .stream()
286 .filter(MapValue::isAlive)
287 .anyMatch(v -> v.get().equals(value));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800288 }
289
290 @Override
291 public V get(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800292 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800293 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800294
Madan Jampani3d76c942015-06-29 23:37:10 -0700295 MapValue<V> value = items.get(key);
296 return (value == null || value.isTombstone()) ? null : value.get();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800297 }
298
299 @Override
300 public void put(K key, V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800301 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800302 checkNotNull(key, ERROR_NULL_KEY);
303 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800304
Madan Jampani3d76c942015-06-29 23:37:10 -0700305 MapValue<V> newValue = new MapValue<>(value, timestampProvider.apply(key, value));
306 if (updateInternal(key, newValue)) {
307 notifyPeers(new UpdateEntry<>(key, newValue), peerUpdateFunction.apply(key, value));
308 notifyListeners(new EventuallyConsistentMapEvent<>(PUT, key, value));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800309 }
310 }
311
Jonathan Hartdb3af892015-01-26 13:19:07 -0800312 @Override
Madan Jampani43e9c9c2015-06-26 14:16:46 -0700313 public V remove(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800314 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800315 checkNotNull(key, ERROR_NULL_KEY);
Madan Jampanid13f3b82015-07-01 17:37:50 -0700316 MapValue<V> tombstone = new MapValue<>(null, timestampProvider.apply(key, null));
317 MapValue<V> previousValue = removeInternal(key, Optional.empty(), tombstone);
318 return previousValue != null ? previousValue.get() : null;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800319 }
320
321 @Override
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800322 public void remove(K key, V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800323 checkState(!destroyed, destroyedMessage);
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800324 checkNotNull(key, ERROR_NULL_KEY);
325 checkNotNull(value, ERROR_NULL_VALUE);
Madan Jampanid13f3b82015-07-01 17:37:50 -0700326 MapValue<V> tombstone = new MapValue<>(null, timestampProvider.apply(key, null));
327 removeInternal(key, Optional.of(value), tombstone);
Madan Jampani3d76c942015-06-29 23:37:10 -0700328 }
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800329
Madan Jampanid13f3b82015-07-01 17:37:50 -0700330 private MapValue<V> removeInternal(K key, Optional<V> value, MapValue<V> tombstone) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700331 checkState(!destroyed, destroyedMessage);
332 checkNotNull(key, ERROR_NULL_KEY);
333 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800334
Madan Jampanid13f3b82015-07-01 17:37:50 -0700335 checkState(tombstone.isTombstone());
Madan Jampani3d76c942015-06-29 23:37:10 -0700336 AtomicBoolean updated = new AtomicBoolean(false);
Madan Jampanid13f3b82015-07-01 17:37:50 -0700337 AtomicReference<MapValue<V>> previousValue = new AtomicReference<>();
Madan Jampani3d76c942015-06-29 23:37:10 -0700338 items.compute(key, (k, existing) -> {
Madan Jampanid13f3b82015-07-01 17:37:50 -0700339 boolean valueMatches = true;
340 if (value.isPresent() && existing != null && existing.isAlive()) {
341 valueMatches = Objects.equals(value.get(), existing.get());
Madan Jampani3d76c942015-06-29 23:37:10 -0700342 }
Madan Jampanid13f3b82015-07-01 17:37:50 -0700343 updated.set(valueMatches && (existing == null || tombstone.isNewerThan(existing)));
344 if (updated.get()) {
345 previousValue.set(existing);
346 }
347 return updated.get() ? tombstone : existing;
Madan Jampani3d76c942015-06-29 23:37:10 -0700348 });
349 if (updated.get()) {
Madan Jampanid13f3b82015-07-01 17:37:50 -0700350 notifyPeers(new UpdateEntry<>(key, tombstone), peerUpdateFunction.apply(key, null));
351 if (previousValue.get() != null && previousValue.get().isAlive()) {
352 notifyListeners(new EventuallyConsistentMapEvent<>(REMOVE, key, previousValue.get().get()));
Madan Jampani3d76c942015-06-29 23:37:10 -0700353 }
Madan Jampanid13f3b82015-07-01 17:37:50 -0700354 if (persistent) {
355 persistentStore.update(key, tombstone);
356 }
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800357 }
Madan Jampanid13f3b82015-07-01 17:37:50 -0700358 return previousValue.get();
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800359 }
360
361 @Override
Jonathan Hartdb3af892015-01-26 13:19:07 -0800362 public void putAll(Map<? extends K, ? extends V> m) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800363 checkState(!destroyed, destroyedMessage);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800364 m.forEach(this::put);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800365 }
366
367 @Override
368 public void clear() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800369 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700370 Maps.filterValues(items, MapValue::isAlive)
371 .forEach((k, v) -> remove(k));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800372 }
373
374 @Override
375 public Set<K> keySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800376 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700377 return Maps.filterValues(items, MapValue::isAlive)
378 .keySet();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800379 }
380
381 @Override
382 public Collection<V> values() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800383 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700384 return Maps.filterValues(items, MapValue::isAlive)
385 .values()
386 .stream()
387 .map(MapValue::get)
388 .collect(Collectors.toList());
Jonathan Hartdb3af892015-01-26 13:19:07 -0800389 }
390
391 @Override
392 public Set<Map.Entry<K, V>> entrySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800393 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700394 return Maps.filterValues(items, MapValue::isAlive)
395 .entrySet()
396 .stream()
397 .map(e -> Pair.of(e.getKey(), e.getValue().get()))
398 .collect(Collectors.toSet());
399 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800400
Madan Jampani3d76c942015-06-29 23:37:10 -0700401 /**
402 * Returns true if newValue was accepted i.e. map is updated.
403 * @param key key
404 * @param newValue proposed new value
405 * @return true if update happened; false if map already contains a more recent value for the key
406 */
407 private boolean updateInternal(K key, MapValue<V> newValue) {
408 AtomicBoolean updated = new AtomicBoolean(false);
409 items.compute(key, (k, existing) -> {
410 if (existing == null || newValue.isNewerThan(existing)) {
411 updated.set(true);
412 if (newValue.isTombstone()) {
413 return tombstonesDisabled ? null : newValue;
414 }
415 return newValue;
416 }
417 return existing;
418 });
419 if (updated.get() && persistent) {
420 persistentStore.update(key, newValue);
421 }
422 return updated.get();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800423 }
424
425 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800426 public void addListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800427 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800428
429 listeners.add(checkNotNull(listener));
430 }
431
432 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800433 public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800434 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800435
436 listeners.remove(checkNotNull(listener));
437 }
438
439 @Override
440 public void destroy() {
441 destroyed = true;
442
443 executor.shutdown();
444 backgroundExecutor.shutdown();
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800445 communicationExecutor.shutdown();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800446
Jonathan Hart584d2f32015-01-27 19:46:14 -0800447 listeners.clear();
448
Jonathan Hartdb3af892015-01-26 13:19:07 -0800449 clusterCommunicator.removeSubscriber(updateMessageSubject);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800450 clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800451 }
452
Jonathan Hartaaa56572015-01-28 21:56:35 -0800453 private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700454 listeners.forEach(listener -> listener.event(event));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800455 }
456
Madan Jampani3d76c942015-06-29 23:37:10 -0700457 private void notifyPeers(UpdateEntry<K, V> event, Collection<NodeId> peers) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800458 queueUpdate(event, peers);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800459 }
460
Madan Jampani3d76c942015-06-29 23:37:10 -0700461 private void queueUpdate(UpdateEntry<K, V> event, Collection<NodeId> peers) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800462 if (peers == null) {
463 // we have no friends :(
464 return;
465 }
466 peers.forEach(node ->
Madan Jampani3d76c942015-06-29 23:37:10 -0700467 senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event)
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800468 );
469 }
470
Jonathan Hart233a18a2015-03-02 17:24:58 -0800471 private boolean underHighLoad() {
472 return counter.get(LOAD_WINDOW) > HIGH_LOAD_THRESHOLD;
473 }
474
Madan Jampani3d76c942015-06-29 23:37:10 -0700475 private void sendAdvertisement() {
476 try {
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700477 if (underHighLoad() || destroyed) {
Jonathan Hart233a18a2015-03-02 17:24:58 -0800478 return;
479 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700480 pickRandomActivePeer().ifPresent(this::sendAdvertisementToPeer);
481 } catch (Exception e) {
482 // Catch all exceptions to avoid scheduled task being suppressed.
483 log.error("Exception thrown while sending advertisement", e);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800484 }
485 }
486
Madan Jampani3d76c942015-06-29 23:37:10 -0700487 private Optional<NodeId> pickRandomActivePeer() {
488 List<NodeId> activePeers = clusterService.getNodes()
489 .stream()
490 .filter(node -> !localNodeId.equals(node))
491 .map(ControllerNode::id)
492 .filter(id -> clusterService.getState(id) == ControllerNode.State.ACTIVE)
493 .collect(Collectors.toList());
494 Collections.shuffle(activePeers);
495 return activePeers.isEmpty() ? Optional.empty() : Optional.of(activePeers.get(0));
496 }
497
498 private void sendAdvertisementToPeer(NodeId peer) {
499 clusterCommunicator.unicast(createAdvertisement(),
500 antiEntropyAdvertisementSubject,
501 serializer::encode,
502 peer)
503 .whenComplete((result, error) -> {
504 if (error != null) {
505 log.warn("Failed to send anti-entropy advertisement to {}", peer);
506 }
507 });
508 }
509
510
Jonathan Hartaaa56572015-01-28 21:56:35 -0800511 private AntiEntropyAdvertisement<K> createAdvertisement() {
Madan Jampani3d76c942015-06-29 23:37:10 -0700512 return new AntiEntropyAdvertisement<K>(localNodeId, Maps.transformValues(items, MapValue::digest));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800513 }
514
515 private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700516 if (destroyed || underHighLoad()) {
517 return;
518 }
519 try {
520 antiEntropyCheckLocalItems(ad).forEach(this::notifyListeners);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800521
Madan Jampani3d76c942015-06-29 23:37:10 -0700522 if (!lightweightAntiEntropy) {
523 Set<K> missingKeys = Sets.difference(items.keySet(), ad.digest().keySet());
524 // if remote ad has something unknown, actively sync
525 if (missingKeys.size() > 0) {
Jonathan Hartf893be82015-02-24 17:35:51 -0800526 // Send the advertisement back if this peer is out-of-sync
Madan Jampani3d76c942015-06-29 23:37:10 -0700527 // TODO: Send ad for missing keys and for entries that are stale
528 sendAdvertisementToPeer(ad.sender());
Jonathan Hartf893be82015-02-24 17:35:51 -0800529 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800530 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700531 } catch (Exception e) {
532 log.warn("Error handling anti-entropy advertisement", e);
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800533 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800534 }
535
536 /**
Madan Jampani3d76c942015-06-29 23:37:10 -0700537 * Processes anti-entropy ad from peer by taking following actions:
538 * 1. If peer has an old entry, updates peer.
539 * 2. If peer indicates an entry is removed and has a more recent
540 * timestamp than the local entry, update local state.
Jonathan Hartaaa56572015-01-28 21:56:35 -0800541 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800542 private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
543 AntiEntropyAdvertisement<K> ad) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700544 final List<EventuallyConsistentMapEvent<K, V>> externalEvents = Lists.newLinkedList();
Jonathan Hartaaa56572015-01-28 21:56:35 -0800545 final NodeId sender = ad.sender();
Madan Jampani3d76c942015-06-29 23:37:10 -0700546 items.forEach((key, localValue) -> {
547 MapValue.Digest remoteValueDigest = ad.digest().get(key);
548 if (remoteValueDigest == null || localValue.isNewerThan(remoteValueDigest.timestamp())) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800549 // local value is more recent, push to sender
Madan Jampani3d76c942015-06-29 23:37:10 -0700550 queueUpdate(new UpdateEntry<>(key, localValue), ImmutableList.of(sender));
Madan Jampanid13f3b82015-07-01 17:37:50 -0700551 }
552 if (remoteValueDigest != null && remoteValueDigest.isTombstone()) {
553 MapValue<V> previousValue = removeInternal(key,
554 Optional.empty(),
555 new MapValue<>(null, remoteValueDigest.timestamp()));
556 if (previousValue != null && previousValue.isAlive()) {
557 externalEvents.add(new EventuallyConsistentMapEvent<>(REMOVE, key, previousValue.get()));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800558 }
559 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700560 });
Jonathan Hartaaa56572015-01-28 21:56:35 -0800561 return externalEvents;
562 }
563
Madan Jampani3d76c942015-06-29 23:37:10 -0700564 private void processUpdates(Collection<UpdateEntry<K, V>> updates) {
565 if (destroyed) {
566 return;
Jonathan Hartaaa56572015-01-28 21:56:35 -0800567 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700568 updates.forEach(update -> {
569 final K key = update.key();
570 final MapValue<V> value = update.value();
Madan Jampanid13f3b82015-07-01 17:37:50 -0700571 if (value.isTombstone()) {
572 MapValue<V> previousValue = removeInternal(key, Optional.empty(), value);
573 if (previousValue != null && previousValue.get() != null) {
574 notifyListeners(new EventuallyConsistentMapEvent<>(REMOVE, key, previousValue.get()));
575 }
576 } else if (updateInternal(key, value)) {
577 notifyListeners(new EventuallyConsistentMapEvent<>(PUT, key, value.get()));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800578 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700579 });
Jonathan Hartdb3af892015-01-26 13:19:07 -0800580 }
581
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800582 // TODO pull this into the class if this gets pulled out...
583 private static final int DEFAULT_MAX_EVENTS = 1000;
584 private static final int DEFAULT_MAX_IDLE_MS = 10;
585 private static final int DEFAULT_MAX_BATCH_MS = 50;
586 private static final Timer TIMER = new Timer("onos-ecm-sender-events");
Jonathan Hartdb3af892015-01-26 13:19:07 -0800587
Madan Jampani3d76c942015-06-29 23:37:10 -0700588 private final class EventAccumulator extends AbstractAccumulator<UpdateEntry<K, V>> {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800589
590 private final NodeId peer;
591
592 private EventAccumulator(NodeId peer) {
593 super(TIMER, DEFAULT_MAX_EVENTS, DEFAULT_MAX_BATCH_MS, DEFAULT_MAX_IDLE_MS);
594 this.peer = peer;
595 }
596
597 @Override
Madan Jampani3d76c942015-06-29 23:37:10 -0700598 public void processItems(List<UpdateEntry<K, V>> items) {
599 Map<K, UpdateEntry<K, V>> map = Maps.newHashMap();
600 items.forEach(item -> map.compute(item.key(), (key, existing) ->
601 existing == null || item.compareTo(existing) > 0 ? item : existing));
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800602 communicationExecutor.submit(() -> {
Madan Jampani3d76c942015-06-29 23:37:10 -0700603 clusterCommunicator.unicast(ImmutableList.copyOf(map.values()),
Madan Jampani175e8fd2015-05-20 14:10:45 -0700604 updateMessageSubject,
605 serializer::encode,
606 peer)
607 .whenComplete((result, error) -> {
608 if (error != null) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700609 log.debug("Failed to send to {}", peer, error);
Madan Jampani175e8fd2015-05-20 14:10:45 -0700610 }
611 });
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800612 });
Jonathan Hartdb3af892015-01-26 13:19:07 -0800613 }
614 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700615}