blob: 18cb3bc3cacc8a71b734688c71183f87b249b8c6 [file] [log] [blame]
Jonathan Hartdb3af892015-01-26 13:19:07 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Jonathan Hart77bdd262015-02-03 09:07:48 -080016package org.onosproject.store.ecmap;
Jonathan Hartdb3af892015-01-26 13:19:07 -080017
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080018import com.google.common.collect.ImmutableList;
19import com.google.common.collect.Lists;
20import com.google.common.collect.Maps;
Madan Jampani3d76c942015-06-29 23:37:10 -070021import com.google.common.collect.Sets;
Madan Jampani3e033bd2015-04-08 13:03:49 -070022
Jonathan Hartf9108232015-02-02 16:37:35 -080023import org.apache.commons.lang3.tuple.Pair;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080024import org.onlab.util.AbstractAccumulator;
Jonathan Hartdb3af892015-01-26 13:19:07 -080025import org.onlab.util.KryoNamespace;
Jonathan Hart233a18a2015-03-02 17:24:58 -080026import org.onlab.util.SlidingWindowCounter;
Jonathan Hartdb3af892015-01-26 13:19:07 -080027import org.onosproject.cluster.ClusterService;
Jonathan Hartaaa56572015-01-28 21:56:35 -080028import org.onosproject.cluster.ControllerNode;
Jonathan Hartdb3af892015-01-26 13:19:07 -080029import org.onosproject.cluster.NodeId;
30import org.onosproject.store.Timestamp;
31import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Jonathan Hartdb3af892015-01-26 13:19:07 -080032import org.onosproject.store.cluster.messaging.MessageSubject;
Madan Jampani3e033bd2015-04-08 13:03:49 -070033import org.onosproject.store.impl.LogicalTimestamp;
Jonathan Hart63939a32015-05-08 11:57:03 -070034import org.onosproject.store.service.WallClockTimestamp;
Madan Jampani3d76c942015-06-29 23:37:10 -070035import org.onosproject.store.serializers.KryoNamespaces;
Jonathan Hartdb3af892015-01-26 13:19:07 -080036import org.onosproject.store.serializers.KryoSerializer;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070037import org.onosproject.store.service.EventuallyConsistentMap;
38import org.onosproject.store.service.EventuallyConsistentMapEvent;
39import org.onosproject.store.service.EventuallyConsistentMapListener;
Jonathan Hartdb3af892015-01-26 13:19:07 -080040import org.slf4j.Logger;
41import org.slf4j.LoggerFactory;
42
Madan Jampani3d76c942015-06-29 23:37:10 -070043import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
44import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
45
Jonathan Hartdb3af892015-01-26 13:19:07 -080046import java.util.Collection;
Madan Jampani3d76c942015-06-29 23:37:10 -070047import java.util.Collections;
Jonathan Hartdb3af892015-01-26 13:19:07 -080048import java.util.List;
49import java.util.Map;
Madan Jampanid13f3b82015-07-01 17:37:50 -070050import java.util.Objects;
Madan Jampani43e9c9c2015-06-26 14:16:46 -070051import java.util.Optional;
Jonathan Hartdb3af892015-01-26 13:19:07 -080052import java.util.Set;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080053import java.util.Timer;
Jonathan Hartdb3af892015-01-26 13:19:07 -080054import java.util.concurrent.ExecutorService;
55import java.util.concurrent.Executors;
56import java.util.concurrent.ScheduledExecutorService;
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -080057import java.util.concurrent.TimeUnit;
Madan Jampani3d76c942015-06-29 23:37:10 -070058import java.util.concurrent.atomic.AtomicBoolean;
Madan Jampani43e9c9c2015-06-26 14:16:46 -070059import java.util.concurrent.atomic.AtomicReference;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080060import java.util.function.BiFunction;
Jonathan Hartdb3af892015-01-26 13:19:07 -080061import java.util.stream.Collectors;
62
63import static com.google.common.base.Preconditions.checkNotNull;
64import static com.google.common.base.Preconditions.checkState;
65import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Brian O'Connorc6713a82015-02-24 11:55:48 -080066import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
Brian O'Connor4b2ba5f2015-02-18 20:54:00 -080067import static org.onlab.util.Tools.groupedThreads;
Jonathan Hartdb3af892015-01-26 13:19:07 -080068
69/**
70 * Distributed Map implementation which uses optimistic replication and gossip
71 * based techniques to provide an eventually consistent data store.
72 */
73public class EventuallyConsistentMapImpl<K, V>
74 implements EventuallyConsistentMap<K, V> {
75
76 private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
77
Madan Jampani3d76c942015-06-29 23:37:10 -070078 private final Map<K, MapValue<V>> items;
Jonathan Hartdb3af892015-01-26 13:19:07 -080079
Jonathan Hartdb3af892015-01-26 13:19:07 -080080 private final ClusterService clusterService;
81 private final ClusterCommunicationService clusterCommunicator;
82 private final KryoSerializer serializer;
Madan Jampani3d76c942015-06-29 23:37:10 -070083 private final NodeId localNodeId;
Jonathan Hartdb3af892015-01-26 13:19:07 -080084
Madan Jampanibcf1a482015-06-24 19:05:56 -070085 private final BiFunction<K, V, Timestamp> timestampProvider;
Jonathan Hartdb3af892015-01-26 13:19:07 -080086
87 private final MessageSubject updateMessageSubject;
Jonathan Hartaaa56572015-01-28 21:56:35 -080088 private final MessageSubject antiEntropyAdvertisementSubject;
Jonathan Hartdb3af892015-01-26 13:19:07 -080089
Jonathan Hartaaa56572015-01-28 21:56:35 -080090 private final Set<EventuallyConsistentMapListener<K, V>> listeners
Madan Jampani3d76c942015-06-29 23:37:10 -070091 = Sets.newCopyOnWriteArraySet();
Jonathan Hartdb3af892015-01-26 13:19:07 -080092
93 private final ExecutorService executor;
Jonathan Hartdb3af892015-01-26 13:19:07 -080094 private final ScheduledExecutorService backgroundExecutor;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080095 private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
Jonathan Hartdb3af892015-01-26 13:19:07 -080096
Jonathan Hart6ec029a2015-03-24 17:12:35 -070097 private final ExecutorService communicationExecutor;
98 private final Map<NodeId, EventAccumulator> senderPending;
Madan Jampani28726282015-02-19 11:40:23 -080099
Jonathan Hartdb3af892015-01-26 13:19:07 -0800100 private volatile boolean destroyed = false;
Jonathan Hart539a6462015-01-27 17:05:43 -0800101 private static final String ERROR_DESTROYED = " map is already destroyed";
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800102 private final String destroyedMessage;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800103
Jonathan Hart4f397e82015-02-04 09:10:41 -0800104 private static final String ERROR_NULL_KEY = "Key cannot be null";
105 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
106
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700107 private final long initialDelaySec = 5;
108 private final boolean lightweightAntiEntropy;
109 private final boolean tombstonesDisabled;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800110
Jonathan Hart233a18a2015-03-02 17:24:58 -0800111 private static final int WINDOW_SIZE = 5;
112 private static final int HIGH_LOAD_THRESHOLD = 0;
113 private static final int LOAD_WINDOW = 2;
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700114 private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
Jonathan Hart233a18a2015-03-02 17:24:58 -0800115
Jonathan Hartca335e92015-03-05 10:34:32 -0800116 private final boolean persistent;
117 private final PersistentStore<K, V> persistentStore;
118
Jonathan Hartdb3af892015-01-26 13:19:07 -0800119 /**
120 * Creates a new eventually consistent map shared amongst multiple instances.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800121 * <p>
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700122 * See {@link org.onosproject.store.service.EventuallyConsistentMapBuilder}
123 * for more description of the parameters expected by the map.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800124 * </p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800125 *
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700126 * @param mapName a String identifier for the map.
127 * @param clusterService the cluster service
128 * @param clusterCommunicator the cluster communications service
129 * @param serializerBuilder a Kryo namespace builder that can serialize
130 * both K and V
Madan Jampanibcf1a482015-06-24 19:05:56 -0700131 * @param timestampProvider provider of timestamps for K and V
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700132 * @param peerUpdateFunction function that provides a set of nodes to immediately
133 * update to when there writes to the map
134 * @param eventExecutor executor to use for processing incoming
135 * events from peers
136 * @param communicationExecutor executor to use for sending events to peers
137 * @param backgroundExecutor executor to use for background anti-entropy
138 * tasks
139 * @param tombstonesDisabled true if this map should not maintain
140 * tombstones
141 * @param antiEntropyPeriod period that the anti-entropy task should run
Jonathan Hartca335e92015-03-05 10:34:32 -0800142 * @param antiEntropyTimeUnit time unit for anti-entropy period
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700143 * @param convergeFaster make anti-entropy try to converge faster
Jonathan Hartca335e92015-03-05 10:34:32 -0800144 * @param persistent persist data to disk
Jonathan Hartdb3af892015-01-26 13:19:07 -0800145 */
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700146 EventuallyConsistentMapImpl(String mapName,
147 ClusterService clusterService,
148 ClusterCommunicationService clusterCommunicator,
149 KryoNamespace.Builder serializerBuilder,
Madan Jampanibcf1a482015-06-24 19:05:56 -0700150 BiFunction<K, V, Timestamp> timestampProvider,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700151 BiFunction<K, V, Collection<NodeId>> peerUpdateFunction,
152 ExecutorService eventExecutor,
153 ExecutorService communicationExecutor,
154 ScheduledExecutorService backgroundExecutor,
155 boolean tombstonesDisabled,
156 long antiEntropyPeriod,
157 TimeUnit antiEntropyTimeUnit,
Jonathan Hartca335e92015-03-05 10:34:32 -0800158 boolean convergeFaster,
159 boolean persistent) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700160 items = Maps.newConcurrentMap();
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800161 senderPending = Maps.newConcurrentMap();
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700162 destroyedMessage = mapName + ERROR_DESTROYED;
Madan Jampani28726282015-02-19 11:40:23 -0800163
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700164 this.clusterService = clusterService;
165 this.clusterCommunicator = clusterCommunicator;
Madan Jampani3d76c942015-06-29 23:37:10 -0700166 this.localNodeId = clusterService.getLocalNode().id();
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700167
168 this.serializer = createSerializer(serializerBuilder);
169
Madan Jampanibcf1a482015-06-24 19:05:56 -0700170 this.timestampProvider = timestampProvider;
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700171
172 if (peerUpdateFunction != null) {
173 this.peerUpdateFunction = peerUpdateFunction;
174 } else {
175 this.peerUpdateFunction = (key, value) -> clusterService.getNodes().stream()
176 .map(ControllerNode::id)
Madan Jampani3d76c942015-06-29 23:37:10 -0700177 .filter(nodeId -> !nodeId.equals(localNodeId))
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700178 .collect(Collectors.toList());
179 }
180
181 if (eventExecutor != null) {
182 this.executor = eventExecutor;
183 } else {
184 // should be a normal executor; it's used for receiving messages
185 this.executor =
186 Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d"));
187 }
188
189 if (communicationExecutor != null) {
190 this.communicationExecutor = communicationExecutor;
191 } else {
192 // sending executor; should be capped
193 //TODO this probably doesn't need to be bounded anymore
194 this.communicationExecutor =
195 newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
196 }
197
Jonathan Hartca335e92015-03-05 10:34:32 -0800198 this.persistent = persistent;
199
200 if (this.persistent) {
201 String dataDirectory = System.getProperty("karaf.data", "./data");
202 String filename = dataDirectory + "/" + "mapdb-ecm-" + mapName;
203
204 ExecutorService dbExecutor =
205 newFixedThreadPool(1, groupedThreads("onos/ecm", mapName + "-dbwriter"));
206
207 persistentStore = new MapDbPersistentStore<>(filename, dbExecutor, serializer);
Madan Jampani3d76c942015-06-29 23:37:10 -0700208 persistentStore.readInto(items);
Jonathan Hartca335e92015-03-05 10:34:32 -0800209 } else {
210 this.persistentStore = null;
211 }
212
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700213 if (backgroundExecutor != null) {
214 this.backgroundExecutor = backgroundExecutor;
215 } else {
216 this.backgroundExecutor =
217 newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d"));
218 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800219
Jonathan Hartaaa56572015-01-28 21:56:35 -0800220 // start anti-entropy thread
Madan Jampani3d76c942015-06-29 23:37:10 -0700221 this.backgroundExecutor.scheduleAtFixedRate(this::sendAdvertisement,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700222 initialDelaySec, antiEntropyPeriod,
223 antiEntropyTimeUnit);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800224
Jonathan Hartdb3af892015-01-26 13:19:07 -0800225 updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
226 clusterCommunicator.addSubscriber(updateMessageSubject,
Madan Jampani3d76c942015-06-29 23:37:10 -0700227 serializer::decode,
228 this::processUpdates,
229 this.executor);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800230
Jonathan Hartaaa56572015-01-28 21:56:35 -0800231 antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
232 clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
Madan Jampani3d76c942015-06-29 23:37:10 -0700233 serializer::decode,
234 this::handleAntiEntropyAdvertisement,
235 this.backgroundExecutor);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800236
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700237 this.tombstonesDisabled = tombstonesDisabled;
238 this.lightweightAntiEntropy = !convergeFaster;
Madan Jampanie1356282015-03-10 19:05:36 -0700239 }
240
Jonathan Hartdb3af892015-01-26 13:19:07 -0800241 private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
242 return new KryoSerializer() {
243 @Override
244 protected void setupKryoPool() {
245 // Add the map's internal helper classes to the user-supplied serializer
246 serializerPool = builder
Madan Jampani3d76c942015-06-29 23:37:10 -0700247 .register(KryoNamespaces.BASIC)
Madan Jampanidb5d06a2015-06-30 11:16:48 -0700248 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
Madan Jampani3e033bd2015-04-08 13:03:49 -0700249 .register(LogicalTimestamp.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800250 .register(WallClockTimestamp.class)
Jonathan Hartaaa56572015-01-28 21:56:35 -0800251 .register(AntiEntropyAdvertisement.class)
Madan Jampani3d76c942015-06-29 23:37:10 -0700252 .register(UpdateEntry.class)
253 .register(MapValue.class)
254 .register(MapValue.Digest.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800255 .build();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800256 }
257 };
258 }
259
260 @Override
261 public int size() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800262 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700263 // TODO: Maintain a separate counter for tracking live elements in map.
264 return Maps.filterValues(items, MapValue::isAlive).size();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800265 }
266
267 @Override
268 public boolean isEmpty() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800269 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700270 return size() == 0;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800271 }
272
273 @Override
274 public boolean containsKey(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800275 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800276 checkNotNull(key, ERROR_NULL_KEY);
Madan Jampani3d76c942015-06-29 23:37:10 -0700277 return get(key) != null;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800278 }
279
280 @Override
281 public boolean containsValue(V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800282 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800283 checkNotNull(value, ERROR_NULL_VALUE);
Madan Jampani3d76c942015-06-29 23:37:10 -0700284 return items.values()
285 .stream()
286 .filter(MapValue::isAlive)
287 .anyMatch(v -> v.get().equals(value));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800288 }
289
290 @Override
291 public V get(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800292 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800293 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800294
Madan Jampani3d76c942015-06-29 23:37:10 -0700295 MapValue<V> value = items.get(key);
296 return (value == null || value.isTombstone()) ? null : value.get();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800297 }
298
299 @Override
300 public void put(K key, V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800301 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800302 checkNotNull(key, ERROR_NULL_KEY);
303 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800304
Madan Jampani3d76c942015-06-29 23:37:10 -0700305 MapValue<V> newValue = new MapValue<>(value, timestampProvider.apply(key, value));
306 if (updateInternal(key, newValue)) {
307 notifyPeers(new UpdateEntry<>(key, newValue), peerUpdateFunction.apply(key, value));
308 notifyListeners(new EventuallyConsistentMapEvent<>(PUT, key, value));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800309 }
310 }
311
Jonathan Hartdb3af892015-01-26 13:19:07 -0800312 @Override
Madan Jampani43e9c9c2015-06-26 14:16:46 -0700313 public V remove(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800314 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800315 checkNotNull(key, ERROR_NULL_KEY);
Madan Jampani5300bb82015-07-02 10:16:16 -0700316 // TODO prevent calls here if value is important for timestamp
Madan Jampani43f37952015-07-02 12:54:08 -0700317 MapValue<V> tombstone = MapValue.tombstone(timestampProvider.apply(key, null));
Madan Jampanid13f3b82015-07-01 17:37:50 -0700318 MapValue<V> previousValue = removeInternal(key, Optional.empty(), tombstone);
Madan Jampania0ac4872015-07-02 11:23:49 -0700319 if (previousValue != null) {
320 notifyPeers(new UpdateEntry<>(key, tombstone), peerUpdateFunction.apply(key, previousValue.get()));
321 if (previousValue.isAlive()) {
322 notifyListeners(new EventuallyConsistentMapEvent<>(REMOVE, key, previousValue.get()));
323 }
324 }
Madan Jampanid13f3b82015-07-01 17:37:50 -0700325 return previousValue != null ? previousValue.get() : null;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800326 }
327
328 @Override
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800329 public void remove(K key, V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800330 checkState(!destroyed, destroyedMessage);
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800331 checkNotNull(key, ERROR_NULL_KEY);
332 checkNotNull(value, ERROR_NULL_VALUE);
Madan Jampani43f37952015-07-02 12:54:08 -0700333 MapValue<V> tombstone = MapValue.tombstone(timestampProvider.apply(key, value));
Madan Jampania0ac4872015-07-02 11:23:49 -0700334 MapValue<V> previousValue = removeInternal(key, Optional.of(value), tombstone);
335 if (previousValue != null) {
336 notifyPeers(new UpdateEntry<>(key, tombstone), peerUpdateFunction.apply(key, previousValue.get()));
337 if (previousValue.isAlive()) {
338 notifyListeners(new EventuallyConsistentMapEvent<>(REMOVE, key, previousValue.get()));
339 }
340 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700341 }
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800342
Madan Jampanid13f3b82015-07-01 17:37:50 -0700343 private MapValue<V> removeInternal(K key, Optional<V> value, MapValue<V> tombstone) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700344 checkState(!destroyed, destroyedMessage);
345 checkNotNull(key, ERROR_NULL_KEY);
346 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800347
Madan Jampanid13f3b82015-07-01 17:37:50 -0700348 checkState(tombstone.isTombstone());
Madan Jampani3d76c942015-06-29 23:37:10 -0700349 AtomicBoolean updated = new AtomicBoolean(false);
Madan Jampanid13f3b82015-07-01 17:37:50 -0700350 AtomicReference<MapValue<V>> previousValue = new AtomicReference<>();
Madan Jampani3d76c942015-06-29 23:37:10 -0700351 items.compute(key, (k, existing) -> {
Madan Jampanid13f3b82015-07-01 17:37:50 -0700352 boolean valueMatches = true;
353 if (value.isPresent() && existing != null && existing.isAlive()) {
354 valueMatches = Objects.equals(value.get(), existing.get());
Madan Jampani3d76c942015-06-29 23:37:10 -0700355 }
Brian O'Connor6325dad2015-07-07 15:36:29 -0700356 if (existing == null) {
357 log.debug("ECMap Remove: Existing value for key {} is already null", k);
358 }
Madan Jampanid13f3b82015-07-01 17:37:50 -0700359 updated.set(valueMatches && (existing == null || tombstone.isNewerThan(existing)));
360 if (updated.get()) {
361 previousValue.set(existing);
362 }
363 return updated.get() ? tombstone : existing;
Madan Jampani3d76c942015-06-29 23:37:10 -0700364 });
365 if (updated.get()) {
Madan Jampanid13f3b82015-07-01 17:37:50 -0700366 if (persistent) {
367 persistentStore.update(key, tombstone);
368 }
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800369 }
Madan Jampanid13f3b82015-07-01 17:37:50 -0700370 return previousValue.get();
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800371 }
372
373 @Override
Jonathan Hartdb3af892015-01-26 13:19:07 -0800374 public void putAll(Map<? extends K, ? extends V> m) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800375 checkState(!destroyed, destroyedMessage);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800376 m.forEach(this::put);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800377 }
378
379 @Override
380 public void clear() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800381 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700382 Maps.filterValues(items, MapValue::isAlive)
383 .forEach((k, v) -> remove(k));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800384 }
385
386 @Override
387 public Set<K> keySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800388 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700389 return Maps.filterValues(items, MapValue::isAlive)
390 .keySet();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800391 }
392
393 @Override
394 public Collection<V> values() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800395 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700396 return Maps.filterValues(items, MapValue::isAlive)
397 .values()
398 .stream()
399 .map(MapValue::get)
400 .collect(Collectors.toList());
Jonathan Hartdb3af892015-01-26 13:19:07 -0800401 }
402
403 @Override
404 public Set<Map.Entry<K, V>> entrySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800405 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700406 return Maps.filterValues(items, MapValue::isAlive)
407 .entrySet()
408 .stream()
409 .map(e -> Pair.of(e.getKey(), e.getValue().get()))
410 .collect(Collectors.toSet());
411 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800412
Madan Jampani3d76c942015-06-29 23:37:10 -0700413 /**
414 * Returns true if newValue was accepted i.e. map is updated.
415 * @param key key
416 * @param newValue proposed new value
417 * @return true if update happened; false if map already contains a more recent value for the key
418 */
419 private boolean updateInternal(K key, MapValue<V> newValue) {
420 AtomicBoolean updated = new AtomicBoolean(false);
421 items.compute(key, (k, existing) -> {
422 if (existing == null || newValue.isNewerThan(existing)) {
423 updated.set(true);
424 if (newValue.isTombstone()) {
425 return tombstonesDisabled ? null : newValue;
426 }
427 return newValue;
428 }
429 return existing;
430 });
431 if (updated.get() && persistent) {
432 persistentStore.update(key, newValue);
433 }
434 return updated.get();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800435 }
436
437 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800438 public void addListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800439 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800440
441 listeners.add(checkNotNull(listener));
442 }
443
444 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800445 public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800446 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800447
448 listeners.remove(checkNotNull(listener));
449 }
450
451 @Override
452 public void destroy() {
453 destroyed = true;
454
455 executor.shutdown();
456 backgroundExecutor.shutdown();
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800457 communicationExecutor.shutdown();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800458
Jonathan Hart584d2f32015-01-27 19:46:14 -0800459 listeners.clear();
460
Jonathan Hartdb3af892015-01-26 13:19:07 -0800461 clusterCommunicator.removeSubscriber(updateMessageSubject);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800462 clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800463 }
464
Jonathan Hartaaa56572015-01-28 21:56:35 -0800465 private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700466 listeners.forEach(listener -> listener.event(event));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800467 }
468
Madan Jampani3d76c942015-06-29 23:37:10 -0700469 private void notifyPeers(UpdateEntry<K, V> event, Collection<NodeId> peers) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800470 queueUpdate(event, peers);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800471 }
472
Madan Jampani3d76c942015-06-29 23:37:10 -0700473 private void queueUpdate(UpdateEntry<K, V> event, Collection<NodeId> peers) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800474 if (peers == null) {
475 // we have no friends :(
476 return;
477 }
478 peers.forEach(node ->
Madan Jampani3d76c942015-06-29 23:37:10 -0700479 senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event)
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800480 );
481 }
482
Jonathan Hart233a18a2015-03-02 17:24:58 -0800483 private boolean underHighLoad() {
484 return counter.get(LOAD_WINDOW) > HIGH_LOAD_THRESHOLD;
485 }
486
Madan Jampani3d76c942015-06-29 23:37:10 -0700487 private void sendAdvertisement() {
488 try {
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700489 if (underHighLoad() || destroyed) {
Jonathan Hart233a18a2015-03-02 17:24:58 -0800490 return;
491 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700492 pickRandomActivePeer().ifPresent(this::sendAdvertisementToPeer);
493 } catch (Exception e) {
494 // Catch all exceptions to avoid scheduled task being suppressed.
495 log.error("Exception thrown while sending advertisement", e);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800496 }
497 }
498
Madan Jampani3d76c942015-06-29 23:37:10 -0700499 private Optional<NodeId> pickRandomActivePeer() {
500 List<NodeId> activePeers = clusterService.getNodes()
501 .stream()
502 .filter(node -> !localNodeId.equals(node))
503 .map(ControllerNode::id)
504 .filter(id -> clusterService.getState(id) == ControllerNode.State.ACTIVE)
505 .collect(Collectors.toList());
506 Collections.shuffle(activePeers);
507 return activePeers.isEmpty() ? Optional.empty() : Optional.of(activePeers.get(0));
508 }
509
510 private void sendAdvertisementToPeer(NodeId peer) {
511 clusterCommunicator.unicast(createAdvertisement(),
512 antiEntropyAdvertisementSubject,
513 serializer::encode,
514 peer)
515 .whenComplete((result, error) -> {
516 if (error != null) {
Madan Jampania0ac4872015-07-02 11:23:49 -0700517 log.debug("Failed to send anti-entropy advertisement to {}", peer, error);
Madan Jampani3d76c942015-06-29 23:37:10 -0700518 }
519 });
520 }
521
522
Jonathan Hartaaa56572015-01-28 21:56:35 -0800523 private AntiEntropyAdvertisement<K> createAdvertisement() {
Madan Jampani3d76c942015-06-29 23:37:10 -0700524 return new AntiEntropyAdvertisement<K>(localNodeId, Maps.transformValues(items, MapValue::digest));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800525 }
526
527 private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700528 if (destroyed || underHighLoad()) {
529 return;
530 }
531 try {
532 antiEntropyCheckLocalItems(ad).forEach(this::notifyListeners);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800533
Madan Jampani3d76c942015-06-29 23:37:10 -0700534 if (!lightweightAntiEntropy) {
535 Set<K> missingKeys = Sets.difference(items.keySet(), ad.digest().keySet());
536 // if remote ad has something unknown, actively sync
537 if (missingKeys.size() > 0) {
Jonathan Hartf893be82015-02-24 17:35:51 -0800538 // Send the advertisement back if this peer is out-of-sync
Madan Jampani3d76c942015-06-29 23:37:10 -0700539 // TODO: Send ad for missing keys and for entries that are stale
540 sendAdvertisementToPeer(ad.sender());
Jonathan Hartf893be82015-02-24 17:35:51 -0800541 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800542 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700543 } catch (Exception e) {
544 log.warn("Error handling anti-entropy advertisement", e);
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800545 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800546 }
547
548 /**
Madan Jampani3d76c942015-06-29 23:37:10 -0700549 * Processes anti-entropy ad from peer by taking following actions:
550 * 1. If peer has an old entry, updates peer.
551 * 2. If peer indicates an entry is removed and has a more recent
552 * timestamp than the local entry, update local state.
Jonathan Hartaaa56572015-01-28 21:56:35 -0800553 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800554 private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
555 AntiEntropyAdvertisement<K> ad) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700556 final List<EventuallyConsistentMapEvent<K, V>> externalEvents = Lists.newLinkedList();
Jonathan Hartaaa56572015-01-28 21:56:35 -0800557 final NodeId sender = ad.sender();
Madan Jampani3d76c942015-06-29 23:37:10 -0700558 items.forEach((key, localValue) -> {
559 MapValue.Digest remoteValueDigest = ad.digest().get(key);
560 if (remoteValueDigest == null || localValue.isNewerThan(remoteValueDigest.timestamp())) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800561 // local value is more recent, push to sender
Madan Jampani3d76c942015-06-29 23:37:10 -0700562 queueUpdate(new UpdateEntry<>(key, localValue), ImmutableList.of(sender));
Madan Jampanid13f3b82015-07-01 17:37:50 -0700563 }
564 if (remoteValueDigest != null && remoteValueDigest.isTombstone()) {
565 MapValue<V> previousValue = removeInternal(key,
566 Optional.empty(),
Madan Jampani43f37952015-07-02 12:54:08 -0700567 MapValue.tombstone(remoteValueDigest.timestamp()));
Madan Jampanid13f3b82015-07-01 17:37:50 -0700568 if (previousValue != null && previousValue.isAlive()) {
569 externalEvents.add(new EventuallyConsistentMapEvent<>(REMOVE, key, previousValue.get()));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800570 }
571 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700572 });
Jonathan Hartaaa56572015-01-28 21:56:35 -0800573 return externalEvents;
574 }
575
Madan Jampani3d76c942015-06-29 23:37:10 -0700576 private void processUpdates(Collection<UpdateEntry<K, V>> updates) {
577 if (destroyed) {
578 return;
Jonathan Hartaaa56572015-01-28 21:56:35 -0800579 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700580 updates.forEach(update -> {
581 final K key = update.key();
582 final MapValue<V> value = update.value();
Madan Jampanid13f3b82015-07-01 17:37:50 -0700583 if (value.isTombstone()) {
584 MapValue<V> previousValue = removeInternal(key, Optional.empty(), value);
585 if (previousValue != null && previousValue.get() != null) {
586 notifyListeners(new EventuallyConsistentMapEvent<>(REMOVE, key, previousValue.get()));
587 }
588 } else if (updateInternal(key, value)) {
589 notifyListeners(new EventuallyConsistentMapEvent<>(PUT, key, value.get()));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800590 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700591 });
Jonathan Hartdb3af892015-01-26 13:19:07 -0800592 }
593
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800594 // TODO pull this into the class if this gets pulled out...
595 private static final int DEFAULT_MAX_EVENTS = 1000;
596 private static final int DEFAULT_MAX_IDLE_MS = 10;
597 private static final int DEFAULT_MAX_BATCH_MS = 50;
598 private static final Timer TIMER = new Timer("onos-ecm-sender-events");
Jonathan Hartdb3af892015-01-26 13:19:07 -0800599
Madan Jampani3d76c942015-06-29 23:37:10 -0700600 private final class EventAccumulator extends AbstractAccumulator<UpdateEntry<K, V>> {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800601
602 private final NodeId peer;
603
604 private EventAccumulator(NodeId peer) {
605 super(TIMER, DEFAULT_MAX_EVENTS, DEFAULT_MAX_BATCH_MS, DEFAULT_MAX_IDLE_MS);
606 this.peer = peer;
607 }
608
609 @Override
Madan Jampani3d76c942015-06-29 23:37:10 -0700610 public void processItems(List<UpdateEntry<K, V>> items) {
611 Map<K, UpdateEntry<K, V>> map = Maps.newHashMap();
612 items.forEach(item -> map.compute(item.key(), (key, existing) ->
613 existing == null || item.compareTo(existing) > 0 ? item : existing));
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800614 communicationExecutor.submit(() -> {
Madan Jampani3d76c942015-06-29 23:37:10 -0700615 clusterCommunicator.unicast(ImmutableList.copyOf(map.values()),
Madan Jampani175e8fd2015-05-20 14:10:45 -0700616 updateMessageSubject,
617 serializer::encode,
618 peer)
619 .whenComplete((result, error) -> {
620 if (error != null) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700621 log.debug("Failed to send to {}", peer, error);
Madan Jampani175e8fd2015-05-20 14:10:45 -0700622 }
623 });
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800624 });
Jonathan Hartdb3af892015-01-26 13:19:07 -0800625 }
626 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700627}