blob: 7835f3e442a0a5adde4ed1a1e5394dd5a83ea3b8 [file] [log] [blame]
Jonathan Hartdb3af892015-01-26 13:19:07 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Jonathan Hart77bdd262015-02-03 09:07:48 -080016package org.onosproject.store.ecmap;
Jonathan Hartdb3af892015-01-26 13:19:07 -080017
Madan Jampani4f1f4cd2015-07-08 23:05:35 -070018import com.google.common.collect.Collections2;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080019import com.google.common.collect.ImmutableList;
Madan Jampani4f1f4cd2015-07-08 23:05:35 -070020import com.google.common.collect.ImmutableMap;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080021import com.google.common.collect.Lists;
22import com.google.common.collect.Maps;
Madan Jampani3d76c942015-06-29 23:37:10 -070023import com.google.common.collect.Sets;
Jonathan Hartf9108232015-02-02 16:37:35 -080024import org.apache.commons.lang3.tuple.Pair;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080025import org.onlab.util.AbstractAccumulator;
Jonathan Hartdb3af892015-01-26 13:19:07 -080026import org.onlab.util.KryoNamespace;
Jonathan Hart233a18a2015-03-02 17:24:58 -080027import org.onlab.util.SlidingWindowCounter;
Jonathan Hartdb3af892015-01-26 13:19:07 -080028import org.onosproject.cluster.ClusterService;
Jonathan Hartaaa56572015-01-28 21:56:35 -080029import org.onosproject.cluster.ControllerNode;
Jonathan Hartdb3af892015-01-26 13:19:07 -080030import org.onosproject.cluster.NodeId;
31import org.onosproject.store.Timestamp;
32import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Jonathan Hartdb3af892015-01-26 13:19:07 -080033import org.onosproject.store.cluster.messaging.MessageSubject;
Madan Jampani3e033bd2015-04-08 13:03:49 -070034import org.onosproject.store.impl.LogicalTimestamp;
Madan Jampani3d76c942015-06-29 23:37:10 -070035import org.onosproject.store.serializers.KryoNamespaces;
Jonathan Hartdb3af892015-01-26 13:19:07 -080036import org.onosproject.store.serializers.KryoSerializer;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070037import org.onosproject.store.service.EventuallyConsistentMap;
38import org.onosproject.store.service.EventuallyConsistentMapEvent;
39import org.onosproject.store.service.EventuallyConsistentMapListener;
Jonathan Hart4a29c592015-09-23 17:55:07 -070040import org.onosproject.store.service.WallClockTimestamp;
Jonathan Hartdb3af892015-01-26 13:19:07 -080041import org.slf4j.Logger;
42import org.slf4j.LoggerFactory;
43
Jonathan Hartdb3af892015-01-26 13:19:07 -080044import java.util.Collection;
Madan Jampani3d76c942015-06-29 23:37:10 -070045import java.util.Collections;
Jonathan Hartdb3af892015-01-26 13:19:07 -080046import java.util.List;
47import java.util.Map;
Madan Jampanid13f3b82015-07-01 17:37:50 -070048import java.util.Objects;
Madan Jampani43e9c9c2015-06-26 14:16:46 -070049import java.util.Optional;
Jonathan Hartdb3af892015-01-26 13:19:07 -080050import java.util.Set;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080051import java.util.Timer;
Jonathan Hartdb3af892015-01-26 13:19:07 -080052import java.util.concurrent.ExecutorService;
53import java.util.concurrent.Executors;
54import java.util.concurrent.ScheduledExecutorService;
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -080055import java.util.concurrent.TimeUnit;
Madan Jampani3d76c942015-06-29 23:37:10 -070056import java.util.concurrent.atomic.AtomicBoolean;
Madan Jampani43e9c9c2015-06-26 14:16:46 -070057import java.util.concurrent.atomic.AtomicReference;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080058import java.util.function.BiFunction;
Jonathan Hartdb3af892015-01-26 13:19:07 -080059import java.util.stream.Collectors;
60
61import static com.google.common.base.Preconditions.checkNotNull;
62import static com.google.common.base.Preconditions.checkState;
63import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Brian O'Connorc6713a82015-02-24 11:55:48 -080064import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
Brian O'Connor4b2ba5f2015-02-18 20:54:00 -080065import static org.onlab.util.Tools.groupedThreads;
Jonathan Hart4a29c592015-09-23 17:55:07 -070066import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
67import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
Jonathan Hartdb3af892015-01-26 13:19:07 -080068
69/**
70 * Distributed Map implementation which uses optimistic replication and gossip
71 * based techniques to provide an eventually consistent data store.
72 */
73public class EventuallyConsistentMapImpl<K, V>
74 implements EventuallyConsistentMap<K, V> {
75
76 private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
77
Madan Jampani3d76c942015-06-29 23:37:10 -070078 private final Map<K, MapValue<V>> items;
Jonathan Hartdb3af892015-01-26 13:19:07 -080079
Jonathan Hartdb3af892015-01-26 13:19:07 -080080 private final ClusterService clusterService;
81 private final ClusterCommunicationService clusterCommunicator;
82 private final KryoSerializer serializer;
Madan Jampani3d76c942015-06-29 23:37:10 -070083 private final NodeId localNodeId;
Jonathan Hartdb3af892015-01-26 13:19:07 -080084
Madan Jampanibcf1a482015-06-24 19:05:56 -070085 private final BiFunction<K, V, Timestamp> timestampProvider;
Jonathan Hartdb3af892015-01-26 13:19:07 -080086
87 private final MessageSubject updateMessageSubject;
Jonathan Hartaaa56572015-01-28 21:56:35 -080088 private final MessageSubject antiEntropyAdvertisementSubject;
Jonathan Hartdb3af892015-01-26 13:19:07 -080089
Jonathan Hartaaa56572015-01-28 21:56:35 -080090 private final Set<EventuallyConsistentMapListener<K, V>> listeners
Madan Jampani3d76c942015-06-29 23:37:10 -070091 = Sets.newCopyOnWriteArraySet();
Jonathan Hartdb3af892015-01-26 13:19:07 -080092
93 private final ExecutorService executor;
Jonathan Hartdb3af892015-01-26 13:19:07 -080094 private final ScheduledExecutorService backgroundExecutor;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080095 private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
Jonathan Hartdb3af892015-01-26 13:19:07 -080096
Jonathan Hart6ec029a2015-03-24 17:12:35 -070097 private final ExecutorService communicationExecutor;
98 private final Map<NodeId, EventAccumulator> senderPending;
Madan Jampani28726282015-02-19 11:40:23 -080099
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700100 private final String mapName;
101
Jonathan Hartdb3af892015-01-26 13:19:07 -0800102 private volatile boolean destroyed = false;
Jonathan Hart539a6462015-01-27 17:05:43 -0800103 private static final String ERROR_DESTROYED = " map is already destroyed";
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800104 private final String destroyedMessage;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800105
Jonathan Hart4f397e82015-02-04 09:10:41 -0800106 private static final String ERROR_NULL_KEY = "Key cannot be null";
107 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
108
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700109 private final long initialDelaySec = 5;
110 private final boolean lightweightAntiEntropy;
111 private final boolean tombstonesDisabled;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800112
Jonathan Hart233a18a2015-03-02 17:24:58 -0800113 private static final int WINDOW_SIZE = 5;
114 private static final int HIGH_LOAD_THRESHOLD = 0;
115 private static final int LOAD_WINDOW = 2;
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700116 private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
Jonathan Hart233a18a2015-03-02 17:24:58 -0800117
Jonathan Hartca335e92015-03-05 10:34:32 -0800118 private final boolean persistent;
119 private final PersistentStore<K, V> persistentStore;
120
Jonathan Hartdb3af892015-01-26 13:19:07 -0800121 /**
122 * Creates a new eventually consistent map shared amongst multiple instances.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800123 * <p>
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700124 * See {@link org.onosproject.store.service.EventuallyConsistentMapBuilder}
125 * for more description of the parameters expected by the map.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800126 * </p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800127 *
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700128 * @param mapName a String identifier for the map.
129 * @param clusterService the cluster service
130 * @param clusterCommunicator the cluster communications service
131 * @param serializerBuilder a Kryo namespace builder that can serialize
132 * both K and V
Madan Jampanibcf1a482015-06-24 19:05:56 -0700133 * @param timestampProvider provider of timestamps for K and V
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700134 * @param peerUpdateFunction function that provides a set of nodes to immediately
135 * update to when there writes to the map
136 * @param eventExecutor executor to use for processing incoming
137 * events from peers
138 * @param communicationExecutor executor to use for sending events to peers
139 * @param backgroundExecutor executor to use for background anti-entropy
140 * tasks
141 * @param tombstonesDisabled true if this map should not maintain
142 * tombstones
143 * @param antiEntropyPeriod period that the anti-entropy task should run
Jonathan Hartca335e92015-03-05 10:34:32 -0800144 * @param antiEntropyTimeUnit time unit for anti-entropy period
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700145 * @param convergeFaster make anti-entropy try to converge faster
Jonathan Hartca335e92015-03-05 10:34:32 -0800146 * @param persistent persist data to disk
Jonathan Hartdb3af892015-01-26 13:19:07 -0800147 */
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700148 EventuallyConsistentMapImpl(String mapName,
149 ClusterService clusterService,
150 ClusterCommunicationService clusterCommunicator,
151 KryoNamespace.Builder serializerBuilder,
Madan Jampanibcf1a482015-06-24 19:05:56 -0700152 BiFunction<K, V, Timestamp> timestampProvider,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700153 BiFunction<K, V, Collection<NodeId>> peerUpdateFunction,
154 ExecutorService eventExecutor,
155 ExecutorService communicationExecutor,
156 ScheduledExecutorService backgroundExecutor,
157 boolean tombstonesDisabled,
158 long antiEntropyPeriod,
159 TimeUnit antiEntropyTimeUnit,
Jonathan Hartca335e92015-03-05 10:34:32 -0800160 boolean convergeFaster,
161 boolean persistent) {
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700162 this.mapName = mapName;
Madan Jampani3d76c942015-06-29 23:37:10 -0700163 items = Maps.newConcurrentMap();
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800164 senderPending = Maps.newConcurrentMap();
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700165 destroyedMessage = mapName + ERROR_DESTROYED;
Madan Jampani28726282015-02-19 11:40:23 -0800166
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700167 this.clusterService = clusterService;
168 this.clusterCommunicator = clusterCommunicator;
Madan Jampani3d76c942015-06-29 23:37:10 -0700169 this.localNodeId = clusterService.getLocalNode().id();
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700170
171 this.serializer = createSerializer(serializerBuilder);
172
Madan Jampanibcf1a482015-06-24 19:05:56 -0700173 this.timestampProvider = timestampProvider;
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700174
175 if (peerUpdateFunction != null) {
176 this.peerUpdateFunction = peerUpdateFunction;
177 } else {
178 this.peerUpdateFunction = (key, value) -> clusterService.getNodes().stream()
179 .map(ControllerNode::id)
Madan Jampani3d76c942015-06-29 23:37:10 -0700180 .filter(nodeId -> !nodeId.equals(localNodeId))
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700181 .collect(Collectors.toList());
182 }
183
184 if (eventExecutor != null) {
185 this.executor = eventExecutor;
186 } else {
187 // should be a normal executor; it's used for receiving messages
188 this.executor =
189 Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d"));
190 }
191
192 if (communicationExecutor != null) {
193 this.communicationExecutor = communicationExecutor;
194 } else {
195 // sending executor; should be capped
196 //TODO this probably doesn't need to be bounded anymore
197 this.communicationExecutor =
198 newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
199 }
200
Jonathan Hartca335e92015-03-05 10:34:32 -0800201 this.persistent = persistent;
202
203 if (this.persistent) {
204 String dataDirectory = System.getProperty("karaf.data", "./data");
205 String filename = dataDirectory + "/" + "mapdb-ecm-" + mapName;
206
207 ExecutorService dbExecutor =
208 newFixedThreadPool(1, groupedThreads("onos/ecm", mapName + "-dbwriter"));
209
210 persistentStore = new MapDbPersistentStore<>(filename, dbExecutor, serializer);
Madan Jampani3d76c942015-06-29 23:37:10 -0700211 persistentStore.readInto(items);
Jonathan Hartca335e92015-03-05 10:34:32 -0800212 } else {
213 this.persistentStore = null;
214 }
215
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700216 if (backgroundExecutor != null) {
217 this.backgroundExecutor = backgroundExecutor;
218 } else {
219 this.backgroundExecutor =
220 newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d"));
221 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800222
Jonathan Hartaaa56572015-01-28 21:56:35 -0800223 // start anti-entropy thread
Madan Jampani3d76c942015-06-29 23:37:10 -0700224 this.backgroundExecutor.scheduleAtFixedRate(this::sendAdvertisement,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700225 initialDelaySec, antiEntropyPeriod,
226 antiEntropyTimeUnit);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800227
Jonathan Hartdb3af892015-01-26 13:19:07 -0800228 updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
229 clusterCommunicator.addSubscriber(updateMessageSubject,
Madan Jampani3d76c942015-06-29 23:37:10 -0700230 serializer::decode,
231 this::processUpdates,
232 this.executor);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800233
Jonathan Hartaaa56572015-01-28 21:56:35 -0800234 antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
235 clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
Madan Jampani3d76c942015-06-29 23:37:10 -0700236 serializer::decode,
237 this::handleAntiEntropyAdvertisement,
238 this.backgroundExecutor);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800239
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700240 this.tombstonesDisabled = tombstonesDisabled;
241 this.lightweightAntiEntropy = !convergeFaster;
Madan Jampanie1356282015-03-10 19:05:36 -0700242 }
243
Jonathan Hartdb3af892015-01-26 13:19:07 -0800244 private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
245 return new KryoSerializer() {
246 @Override
247 protected void setupKryoPool() {
248 // Add the map's internal helper classes to the user-supplied serializer
249 serializerPool = builder
Madan Jampani3d76c942015-06-29 23:37:10 -0700250 .register(KryoNamespaces.BASIC)
Madan Jampanidb5d06a2015-06-30 11:16:48 -0700251 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
Madan Jampani3e033bd2015-04-08 13:03:49 -0700252 .register(LogicalTimestamp.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800253 .register(WallClockTimestamp.class)
Jonathan Hartaaa56572015-01-28 21:56:35 -0800254 .register(AntiEntropyAdvertisement.class)
Madan Jampani3d76c942015-06-29 23:37:10 -0700255 .register(UpdateEntry.class)
256 .register(MapValue.class)
257 .register(MapValue.Digest.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800258 .build();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800259 }
260 };
261 }
262
263 @Override
264 public int size() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800265 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700266 // TODO: Maintain a separate counter for tracking live elements in map.
267 return Maps.filterValues(items, MapValue::isAlive).size();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800268 }
269
270 @Override
271 public boolean isEmpty() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800272 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700273 return size() == 0;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800274 }
275
276 @Override
277 public boolean containsKey(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800278 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800279 checkNotNull(key, ERROR_NULL_KEY);
Madan Jampani3d76c942015-06-29 23:37:10 -0700280 return get(key) != null;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800281 }
282
283 @Override
284 public boolean containsValue(V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800285 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800286 checkNotNull(value, ERROR_NULL_VALUE);
Madan Jampani3d76c942015-06-29 23:37:10 -0700287 return items.values()
288 .stream()
289 .filter(MapValue::isAlive)
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700290 .anyMatch(v -> value.equals(v.get()));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800291 }
292
293 @Override
294 public V get(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800295 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800296 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800297
Madan Jampani3d76c942015-06-29 23:37:10 -0700298 MapValue<V> value = items.get(key);
299 return (value == null || value.isTombstone()) ? null : value.get();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800300 }
301
302 @Override
303 public void put(K key, V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800304 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800305 checkNotNull(key, ERROR_NULL_KEY);
306 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800307
Madan Jampani3d76c942015-06-29 23:37:10 -0700308 MapValue<V> newValue = new MapValue<>(value, timestampProvider.apply(key, value));
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700309 if (putInternal(key, newValue)) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700310 notifyPeers(new UpdateEntry<>(key, newValue), peerUpdateFunction.apply(key, value));
Madan Jampanicab114c2015-07-23 00:14:19 -0700311 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, PUT, key, value));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800312 }
313 }
314
Jonathan Hartdb3af892015-01-26 13:19:07 -0800315 @Override
Madan Jampani43e9c9c2015-06-26 14:16:46 -0700316 public V remove(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800317 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800318 checkNotNull(key, ERROR_NULL_KEY);
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700319 return removeAndNotify(key, null);
320 }
321
322 @Override
323 public void remove(K key, V value) {
324 checkState(!destroyed, destroyedMessage);
325 checkNotNull(key, ERROR_NULL_KEY);
326 checkNotNull(value, ERROR_NULL_VALUE);
327 removeAndNotify(key, value);
328 }
329
330 private V removeAndNotify(K key, V value) {
Madan Jampani483d0a22015-08-19 17:33:00 -0700331 Timestamp timestamp = timestampProvider.apply(key, value);
332 Optional<MapValue<V>> tombstone = tombstonesDisabled || timestamp == null
333 ? Optional.empty() : Optional.of(MapValue.tombstone(timestamp));
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700334 MapValue<V> previousValue = removeInternal(key, Optional.ofNullable(value), tombstone);
Madan Jampania0ac4872015-07-02 11:23:49 -0700335 if (previousValue != null) {
Madan Jampani483d0a22015-08-19 17:33:00 -0700336 notifyPeers(new UpdateEntry<>(key, tombstone.orElse(null)),
337 peerUpdateFunction.apply(key, previousValue.get()));
Madan Jampania0ac4872015-07-02 11:23:49 -0700338 if (previousValue.isAlive()) {
Madan Jampanicab114c2015-07-23 00:14:19 -0700339 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
Madan Jampania0ac4872015-07-02 11:23:49 -0700340 }
341 }
Madan Jampanid13f3b82015-07-01 17:37:50 -0700342 return previousValue != null ? previousValue.get() : null;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800343 }
344
Madan Jampani483d0a22015-08-19 17:33:00 -0700345 private MapValue<V> removeInternal(K key, Optional<V> value, Optional<MapValue<V>> tombstone) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700346 checkState(!destroyed, destroyedMessage);
347 checkNotNull(key, ERROR_NULL_KEY);
348 checkNotNull(value, ERROR_NULL_VALUE);
Madan Jampani483d0a22015-08-19 17:33:00 -0700349 tombstone.ifPresent(v -> checkState(v.isTombstone()));
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700350
351 counter.incrementCount();
Madan Jampani3d76c942015-06-29 23:37:10 -0700352 AtomicBoolean updated = new AtomicBoolean(false);
Madan Jampanid13f3b82015-07-01 17:37:50 -0700353 AtomicReference<MapValue<V>> previousValue = new AtomicReference<>();
Madan Jampani3d76c942015-06-29 23:37:10 -0700354 items.compute(key, (k, existing) -> {
Madan Jampanid13f3b82015-07-01 17:37:50 -0700355 boolean valueMatches = true;
356 if (value.isPresent() && existing != null && existing.isAlive()) {
357 valueMatches = Objects.equals(value.get(), existing.get());
Madan Jampani3d76c942015-06-29 23:37:10 -0700358 }
Brian O'Connor6325dad2015-07-07 15:36:29 -0700359 if (existing == null) {
Jonathan Hart4a29c592015-09-23 17:55:07 -0700360 log.trace("ECMap Remove: Existing value for key {} is already null", k);
Brian O'Connor6325dad2015-07-07 15:36:29 -0700361 }
Madan Jampani483d0a22015-08-19 17:33:00 -0700362 if (valueMatches) {
363 if (existing == null) {
364 updated.set(tombstone.isPresent());
365 } else {
366 updated.set(!tombstone.isPresent() || tombstone.get().isNewerThan(existing));
367 }
Madan Jampanid13f3b82015-07-01 17:37:50 -0700368 }
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700369 if (updated.get()) {
Madan Jampani483d0a22015-08-19 17:33:00 -0700370 previousValue.set(existing);
371 return tombstone.orElse(null);
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700372 } else {
373 return existing;
374 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700375 });
376 if (updated.get()) {
Madan Jampanid13f3b82015-07-01 17:37:50 -0700377 if (persistent) {
Madan Jampani483d0a22015-08-19 17:33:00 -0700378 if (tombstone.isPresent()) {
379 persistentStore.update(key, tombstone.get());
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700380 } else {
Madan Jampani483d0a22015-08-19 17:33:00 -0700381 persistentStore.remove(key);
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700382 }
Madan Jampanid13f3b82015-07-01 17:37:50 -0700383 }
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800384 }
Madan Jampanid13f3b82015-07-01 17:37:50 -0700385 return previousValue.get();
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800386 }
387
388 @Override
Madan Jampani4727a112015-07-16 12:12:58 -0700389 public V compute(K key, BiFunction<K, V, V> recomputeFunction) {
390 checkState(!destroyed, destroyedMessage);
391 checkNotNull(key, ERROR_NULL_KEY);
392 checkNotNull(recomputeFunction, "Recompute function cannot be null");
393
394 AtomicBoolean updated = new AtomicBoolean(false);
395 AtomicReference<MapValue<V>> previousValue = new AtomicReference<>();
396 MapValue<V> computedValue = items.compute(key, (k, mv) -> {
397 previousValue.set(mv);
398 V newRawValue = recomputeFunction.apply(key, mv == null ? null : mv.get());
399 MapValue<V> newValue = new MapValue<>(newRawValue, timestampProvider.apply(key, newRawValue));
400 if (mv == null || newValue.isNewerThan(mv)) {
401 updated.set(true);
402 return newValue;
403 } else {
404 return mv;
405 }
406 });
407 if (updated.get()) {
408 notifyPeers(new UpdateEntry<>(key, computedValue), peerUpdateFunction.apply(key, computedValue.get()));
409 EventuallyConsistentMapEvent.Type updateType = computedValue.isTombstone() ? REMOVE : PUT;
410 V value = computedValue.isTombstone()
411 ? previousValue.get() == null ? null : previousValue.get().get()
412 : computedValue.get();
413 if (value != null) {
Madan Jampanicab114c2015-07-23 00:14:19 -0700414 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, updateType, key, value));
Madan Jampani4727a112015-07-16 12:12:58 -0700415 }
416 }
417 return computedValue.get();
418 }
419
420 @Override
Jonathan Hartdb3af892015-01-26 13:19:07 -0800421 public void putAll(Map<? extends K, ? extends V> m) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800422 checkState(!destroyed, destroyedMessage);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800423 m.forEach(this::put);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800424 }
425
426 @Override
427 public void clear() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800428 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700429 Maps.filterValues(items, MapValue::isAlive)
430 .forEach((k, v) -> remove(k));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800431 }
432
433 @Override
434 public Set<K> keySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800435 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700436 return Maps.filterValues(items, MapValue::isAlive)
437 .keySet();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800438 }
439
440 @Override
441 public Collection<V> values() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800442 checkState(!destroyed, destroyedMessage);
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700443 return Collections2.transform(Maps.filterValues(items, MapValue::isAlive).values(), MapValue::get);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800444 }
445
446 @Override
447 public Set<Map.Entry<K, V>> entrySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800448 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700449 return Maps.filterValues(items, MapValue::isAlive)
450 .entrySet()
451 .stream()
452 .map(e -> Pair.of(e.getKey(), e.getValue().get()))
453 .collect(Collectors.toSet());
454 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800455
Madan Jampani3d76c942015-06-29 23:37:10 -0700456 /**
457 * Returns true if newValue was accepted i.e. map is updated.
458 * @param key key
459 * @param newValue proposed new value
460 * @return true if update happened; false if map already contains a more recent value for the key
461 */
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700462 private boolean putInternal(K key, MapValue<V> newValue) {
463 checkState(!destroyed, destroyedMessage);
464 checkNotNull(key, ERROR_NULL_KEY);
465 checkNotNull(newValue, ERROR_NULL_VALUE);
466 checkState(newValue.isAlive());
467 counter.incrementCount();
Madan Jampani3d76c942015-06-29 23:37:10 -0700468 AtomicBoolean updated = new AtomicBoolean(false);
469 items.compute(key, (k, existing) -> {
470 if (existing == null || newValue.isNewerThan(existing)) {
471 updated.set(true);
Madan Jampani3d76c942015-06-29 23:37:10 -0700472 return newValue;
473 }
474 return existing;
475 });
476 if (updated.get() && persistent) {
477 persistentStore.update(key, newValue);
478 }
479 return updated.get();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800480 }
481
482 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800483 public void addListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800484 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800485
486 listeners.add(checkNotNull(listener));
487 }
488
489 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800490 public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800491 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800492
493 listeners.remove(checkNotNull(listener));
494 }
495
496 @Override
497 public void destroy() {
498 destroyed = true;
499
500 executor.shutdown();
501 backgroundExecutor.shutdown();
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800502 communicationExecutor.shutdown();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800503
Jonathan Hart584d2f32015-01-27 19:46:14 -0800504 listeners.clear();
505
Jonathan Hartdb3af892015-01-26 13:19:07 -0800506 clusterCommunicator.removeSubscriber(updateMessageSubject);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800507 clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800508 }
509
Jonathan Hartaaa56572015-01-28 21:56:35 -0800510 private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700511 listeners.forEach(listener -> listener.event(event));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800512 }
513
Madan Jampani3d76c942015-06-29 23:37:10 -0700514 private void notifyPeers(UpdateEntry<K, V> event, Collection<NodeId> peers) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800515 queueUpdate(event, peers);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800516 }
517
Madan Jampani3d76c942015-06-29 23:37:10 -0700518 private void queueUpdate(UpdateEntry<K, V> event, Collection<NodeId> peers) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800519 if (peers == null) {
520 // we have no friends :(
521 return;
522 }
523 peers.forEach(node ->
Madan Jampani3d76c942015-06-29 23:37:10 -0700524 senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event)
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800525 );
526 }
527
Jonathan Hart233a18a2015-03-02 17:24:58 -0800528 private boolean underHighLoad() {
529 return counter.get(LOAD_WINDOW) > HIGH_LOAD_THRESHOLD;
530 }
531
Madan Jampani3d76c942015-06-29 23:37:10 -0700532 private void sendAdvertisement() {
533 try {
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700534 if (underHighLoad() || destroyed) {
Jonathan Hart233a18a2015-03-02 17:24:58 -0800535 return;
536 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700537 pickRandomActivePeer().ifPresent(this::sendAdvertisementToPeer);
538 } catch (Exception e) {
539 // Catch all exceptions to avoid scheduled task being suppressed.
540 log.error("Exception thrown while sending advertisement", e);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800541 }
542 }
543
Madan Jampani3d76c942015-06-29 23:37:10 -0700544 private Optional<NodeId> pickRandomActivePeer() {
545 List<NodeId> activePeers = clusterService.getNodes()
546 .stream()
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700547 .map(ControllerNode::id)
548 .filter(id -> !localNodeId.equals(id))
Madan Jampani3d76c942015-06-29 23:37:10 -0700549 .filter(id -> clusterService.getState(id) == ControllerNode.State.ACTIVE)
550 .collect(Collectors.toList());
551 Collections.shuffle(activePeers);
552 return activePeers.isEmpty() ? Optional.empty() : Optional.of(activePeers.get(0));
553 }
554
555 private void sendAdvertisementToPeer(NodeId peer) {
556 clusterCommunicator.unicast(createAdvertisement(),
557 antiEntropyAdvertisementSubject,
558 serializer::encode,
559 peer)
560 .whenComplete((result, error) -> {
561 if (error != null) {
Madan Jampania0ac4872015-07-02 11:23:49 -0700562 log.debug("Failed to send anti-entropy advertisement to {}", peer, error);
Madan Jampani3d76c942015-06-29 23:37:10 -0700563 }
564 });
565 }
566
Jonathan Hartaaa56572015-01-28 21:56:35 -0800567 private AntiEntropyAdvertisement<K> createAdvertisement() {
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700568 return new AntiEntropyAdvertisement<K>(localNodeId,
569 ImmutableMap.copyOf(Maps.transformValues(items, MapValue::digest)));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800570 }
571
572 private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700573 if (destroyed || underHighLoad()) {
574 return;
575 }
576 try {
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700577 log.debug("Received anti-entropy advertisement from {} for {} with {} entries in it",
578 mapName, ad.sender(), ad.digest().size());
Madan Jampani3d76c942015-06-29 23:37:10 -0700579 antiEntropyCheckLocalItems(ad).forEach(this::notifyListeners);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800580
Madan Jampani3d76c942015-06-29 23:37:10 -0700581 if (!lightweightAntiEntropy) {
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700582 // if remote ad has any entries that the local copy is missing, actively sync
583 // TODO: Missing keys is not the way local copy can be behind.
584 if (Sets.difference(ad.digest().keySet(), items.keySet()).size() > 0) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700585 // TODO: Send ad for missing keys and for entries that are stale
586 sendAdvertisementToPeer(ad.sender());
Jonathan Hartf893be82015-02-24 17:35:51 -0800587 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800588 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700589 } catch (Exception e) {
590 log.warn("Error handling anti-entropy advertisement", e);
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800591 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800592 }
593
594 /**
Madan Jampani3d76c942015-06-29 23:37:10 -0700595 * Processes anti-entropy ad from peer by taking following actions:
596 * 1. If peer has an old entry, updates peer.
597 * 2. If peer indicates an entry is removed and has a more recent
598 * timestamp than the local entry, update local state.
Jonathan Hartaaa56572015-01-28 21:56:35 -0800599 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800600 private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
601 AntiEntropyAdvertisement<K> ad) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700602 final List<EventuallyConsistentMapEvent<K, V>> externalEvents = Lists.newLinkedList();
Jonathan Hartaaa56572015-01-28 21:56:35 -0800603 final NodeId sender = ad.sender();
Madan Jampani3d76c942015-06-29 23:37:10 -0700604 items.forEach((key, localValue) -> {
605 MapValue.Digest remoteValueDigest = ad.digest().get(key);
606 if (remoteValueDigest == null || localValue.isNewerThan(remoteValueDigest.timestamp())) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800607 // local value is more recent, push to sender
Madan Jampani3d76c942015-06-29 23:37:10 -0700608 queueUpdate(new UpdateEntry<>(key, localValue), ImmutableList.of(sender));
Madan Jampanid13f3b82015-07-01 17:37:50 -0700609 }
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700610 if (remoteValueDigest != null
611 && remoteValueDigest.isNewerThan(localValue.digest())
612 && remoteValueDigest.isTombstone()) {
Madan Jampani483d0a22015-08-19 17:33:00 -0700613 MapValue<V> tombstone = MapValue.tombstone(remoteValueDigest.timestamp());
Madan Jampanid13f3b82015-07-01 17:37:50 -0700614 MapValue<V> previousValue = removeInternal(key,
615 Optional.empty(),
Madan Jampani483d0a22015-08-19 17:33:00 -0700616 Optional.of(tombstone));
Madan Jampanid13f3b82015-07-01 17:37:50 -0700617 if (previousValue != null && previousValue.isAlive()) {
Madan Jampanicab114c2015-07-23 00:14:19 -0700618 externalEvents.add(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800619 }
620 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700621 });
Jonathan Hartaaa56572015-01-28 21:56:35 -0800622 return externalEvents;
623 }
624
Madan Jampani3d76c942015-06-29 23:37:10 -0700625 private void processUpdates(Collection<UpdateEntry<K, V>> updates) {
626 if (destroyed) {
627 return;
Jonathan Hartaaa56572015-01-28 21:56:35 -0800628 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700629 updates.forEach(update -> {
630 final K key = update.key();
631 final MapValue<V> value = update.value();
Madan Jampani483d0a22015-08-19 17:33:00 -0700632 if (value == null || value.isTombstone()) {
633 MapValue<V> previousValue = removeInternal(key, Optional.empty(), Optional.ofNullable(value));
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700634 if (previousValue != null && previousValue.isAlive()) {
Madan Jampanicab114c2015-07-23 00:14:19 -0700635 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
Madan Jampanid13f3b82015-07-01 17:37:50 -0700636 }
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700637 } else if (putInternal(key, value)) {
Madan Jampanicab114c2015-07-23 00:14:19 -0700638 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, PUT, key, value.get()));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800639 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700640 });
Jonathan Hartdb3af892015-01-26 13:19:07 -0800641 }
642
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800643 // TODO pull this into the class if this gets pulled out...
644 private static final int DEFAULT_MAX_EVENTS = 1000;
645 private static final int DEFAULT_MAX_IDLE_MS = 10;
646 private static final int DEFAULT_MAX_BATCH_MS = 50;
647 private static final Timer TIMER = new Timer("onos-ecm-sender-events");
Jonathan Hartdb3af892015-01-26 13:19:07 -0800648
Madan Jampani3d76c942015-06-29 23:37:10 -0700649 private final class EventAccumulator extends AbstractAccumulator<UpdateEntry<K, V>> {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800650
651 private final NodeId peer;
652
653 private EventAccumulator(NodeId peer) {
654 super(TIMER, DEFAULT_MAX_EVENTS, DEFAULT_MAX_BATCH_MS, DEFAULT_MAX_IDLE_MS);
655 this.peer = peer;
656 }
657
658 @Override
Madan Jampani3d76c942015-06-29 23:37:10 -0700659 public void processItems(List<UpdateEntry<K, V>> items) {
660 Map<K, UpdateEntry<K, V>> map = Maps.newHashMap();
661 items.forEach(item -> map.compute(item.key(), (key, existing) ->
Madan Jampani92c64eb2015-07-23 15:37:07 -0700662 item.isNewerThan(existing) ? item : existing));
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800663 communicationExecutor.submit(() -> {
Madan Jampani3d76c942015-06-29 23:37:10 -0700664 clusterCommunicator.unicast(ImmutableList.copyOf(map.values()),
Madan Jampani175e8fd2015-05-20 14:10:45 -0700665 updateMessageSubject,
666 serializer::encode,
667 peer)
668 .whenComplete((result, error) -> {
669 if (error != null) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700670 log.debug("Failed to send to {}", peer, error);
Madan Jampani175e8fd2015-05-20 14:10:45 -0700671 }
672 });
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800673 });
Jonathan Hartdb3af892015-01-26 13:19:07 -0800674 }
675 }
Jonathan Hart4a29c592015-09-23 17:55:07 -0700676}