blob: 1fd27d35d758d1874cd31ce079ae48995098a695 [file] [log] [blame]
Jonathan Hartdb3af892015-01-26 13:19:07 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Jonathan Hart77bdd262015-02-03 09:07:48 -080016package org.onosproject.store.ecmap;
Jonathan Hartdb3af892015-01-26 13:19:07 -080017
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080018import com.google.common.collect.ImmutableList;
19import com.google.common.collect.Lists;
20import com.google.common.collect.Maps;
Madan Jampani3d76c942015-06-29 23:37:10 -070021import com.google.common.collect.Sets;
Madan Jampani3e033bd2015-04-08 13:03:49 -070022
Jonathan Hartf9108232015-02-02 16:37:35 -080023import org.apache.commons.lang3.tuple.Pair;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080024import org.onlab.util.AbstractAccumulator;
Jonathan Hartdb3af892015-01-26 13:19:07 -080025import org.onlab.util.KryoNamespace;
Jonathan Hart233a18a2015-03-02 17:24:58 -080026import org.onlab.util.SlidingWindowCounter;
Jonathan Hartdb3af892015-01-26 13:19:07 -080027import org.onosproject.cluster.ClusterService;
Jonathan Hartaaa56572015-01-28 21:56:35 -080028import org.onosproject.cluster.ControllerNode;
Jonathan Hartdb3af892015-01-26 13:19:07 -080029import org.onosproject.cluster.NodeId;
30import org.onosproject.store.Timestamp;
31import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Jonathan Hartdb3af892015-01-26 13:19:07 -080032import org.onosproject.store.cluster.messaging.MessageSubject;
Madan Jampani3e033bd2015-04-08 13:03:49 -070033import org.onosproject.store.impl.LogicalTimestamp;
Jonathan Hart63939a32015-05-08 11:57:03 -070034import org.onosproject.store.service.WallClockTimestamp;
Madan Jampani3d76c942015-06-29 23:37:10 -070035import org.onosproject.store.serializers.KryoNamespaces;
Jonathan Hartdb3af892015-01-26 13:19:07 -080036import org.onosproject.store.serializers.KryoSerializer;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070037import org.onosproject.store.service.EventuallyConsistentMap;
38import org.onosproject.store.service.EventuallyConsistentMapEvent;
39import org.onosproject.store.service.EventuallyConsistentMapListener;
Jonathan Hartdb3af892015-01-26 13:19:07 -080040import org.slf4j.Logger;
41import org.slf4j.LoggerFactory;
42
Madan Jampani3d76c942015-06-29 23:37:10 -070043import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
44import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
45
Jonathan Hartdb3af892015-01-26 13:19:07 -080046import java.util.Collection;
Madan Jampani3d76c942015-06-29 23:37:10 -070047import java.util.Collections;
Jonathan Hartdb3af892015-01-26 13:19:07 -080048import java.util.List;
49import java.util.Map;
Madan Jampani43e9c9c2015-06-26 14:16:46 -070050import java.util.Optional;
Jonathan Hartdb3af892015-01-26 13:19:07 -080051import java.util.Set;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080052import java.util.Timer;
Jonathan Hartdb3af892015-01-26 13:19:07 -080053import java.util.concurrent.ExecutorService;
54import java.util.concurrent.Executors;
55import java.util.concurrent.ScheduledExecutorService;
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -080056import java.util.concurrent.TimeUnit;
Madan Jampani3d76c942015-06-29 23:37:10 -070057import java.util.concurrent.atomic.AtomicBoolean;
Madan Jampani43e9c9c2015-06-26 14:16:46 -070058import java.util.concurrent.atomic.AtomicReference;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080059import java.util.function.BiFunction;
Jonathan Hartdb3af892015-01-26 13:19:07 -080060import java.util.stream.Collectors;
61
62import static com.google.common.base.Preconditions.checkNotNull;
63import static com.google.common.base.Preconditions.checkState;
64import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Brian O'Connorc6713a82015-02-24 11:55:48 -080065import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
Brian O'Connor4b2ba5f2015-02-18 20:54:00 -080066import static org.onlab.util.Tools.groupedThreads;
Jonathan Hartdb3af892015-01-26 13:19:07 -080067
68/**
69 * Distributed Map implementation which uses optimistic replication and gossip
70 * based techniques to provide an eventually consistent data store.
71 */
72public class EventuallyConsistentMapImpl<K, V>
73 implements EventuallyConsistentMap<K, V> {
74
75 private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
76
Madan Jampani3d76c942015-06-29 23:37:10 -070077 private final Map<K, MapValue<V>> items;
Jonathan Hartdb3af892015-01-26 13:19:07 -080078
Jonathan Hartdb3af892015-01-26 13:19:07 -080079 private final ClusterService clusterService;
80 private final ClusterCommunicationService clusterCommunicator;
81 private final KryoSerializer serializer;
Madan Jampani3d76c942015-06-29 23:37:10 -070082 private final NodeId localNodeId;
Jonathan Hartdb3af892015-01-26 13:19:07 -080083
Madan Jampanibcf1a482015-06-24 19:05:56 -070084 private final BiFunction<K, V, Timestamp> timestampProvider;
Jonathan Hartdb3af892015-01-26 13:19:07 -080085
86 private final MessageSubject updateMessageSubject;
Jonathan Hartaaa56572015-01-28 21:56:35 -080087 private final MessageSubject antiEntropyAdvertisementSubject;
Jonathan Hartdb3af892015-01-26 13:19:07 -080088
Jonathan Hartaaa56572015-01-28 21:56:35 -080089 private final Set<EventuallyConsistentMapListener<K, V>> listeners
Madan Jampani3d76c942015-06-29 23:37:10 -070090 = Sets.newCopyOnWriteArraySet();
Jonathan Hartdb3af892015-01-26 13:19:07 -080091
92 private final ExecutorService executor;
Jonathan Hartdb3af892015-01-26 13:19:07 -080093 private final ScheduledExecutorService backgroundExecutor;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080094 private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
Jonathan Hartdb3af892015-01-26 13:19:07 -080095
Jonathan Hart6ec029a2015-03-24 17:12:35 -070096 private final ExecutorService communicationExecutor;
97 private final Map<NodeId, EventAccumulator> senderPending;
Madan Jampani28726282015-02-19 11:40:23 -080098
Jonathan Hartdb3af892015-01-26 13:19:07 -080099 private volatile boolean destroyed = false;
Jonathan Hart539a6462015-01-27 17:05:43 -0800100 private static final String ERROR_DESTROYED = " map is already destroyed";
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800101 private final String destroyedMessage;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800102
Jonathan Hart4f397e82015-02-04 09:10:41 -0800103 private static final String ERROR_NULL_KEY = "Key cannot be null";
104 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
105
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700106 private final long initialDelaySec = 5;
107 private final boolean lightweightAntiEntropy;
108 private final boolean tombstonesDisabled;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800109
Jonathan Hart233a18a2015-03-02 17:24:58 -0800110 private static final int WINDOW_SIZE = 5;
111 private static final int HIGH_LOAD_THRESHOLD = 0;
112 private static final int LOAD_WINDOW = 2;
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700113 private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
Jonathan Hart233a18a2015-03-02 17:24:58 -0800114
Jonathan Hartca335e92015-03-05 10:34:32 -0800115 private final boolean persistent;
116 private final PersistentStore<K, V> persistentStore;
117
Jonathan Hartdb3af892015-01-26 13:19:07 -0800118 /**
119 * Creates a new eventually consistent map shared amongst multiple instances.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800120 * <p>
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700121 * See {@link org.onosproject.store.service.EventuallyConsistentMapBuilder}
122 * for more description of the parameters expected by the map.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800123 * </p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800124 *
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700125 * @param mapName a String identifier for the map.
126 * @param clusterService the cluster service
127 * @param clusterCommunicator the cluster communications service
128 * @param serializerBuilder a Kryo namespace builder that can serialize
129 * both K and V
Madan Jampanibcf1a482015-06-24 19:05:56 -0700130 * @param timestampProvider provider of timestamps for K and V
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700131 * @param peerUpdateFunction function that provides a set of nodes to immediately
132 * update to when there writes to the map
133 * @param eventExecutor executor to use for processing incoming
134 * events from peers
135 * @param communicationExecutor executor to use for sending events to peers
136 * @param backgroundExecutor executor to use for background anti-entropy
137 * tasks
138 * @param tombstonesDisabled true if this map should not maintain
139 * tombstones
140 * @param antiEntropyPeriod period that the anti-entropy task should run
Jonathan Hartca335e92015-03-05 10:34:32 -0800141 * @param antiEntropyTimeUnit time unit for anti-entropy period
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700142 * @param convergeFaster make anti-entropy try to converge faster
Jonathan Hartca335e92015-03-05 10:34:32 -0800143 * @param persistent persist data to disk
Jonathan Hartdb3af892015-01-26 13:19:07 -0800144 */
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700145 EventuallyConsistentMapImpl(String mapName,
146 ClusterService clusterService,
147 ClusterCommunicationService clusterCommunicator,
148 KryoNamespace.Builder serializerBuilder,
Madan Jampanibcf1a482015-06-24 19:05:56 -0700149 BiFunction<K, V, Timestamp> timestampProvider,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700150 BiFunction<K, V, Collection<NodeId>> peerUpdateFunction,
151 ExecutorService eventExecutor,
152 ExecutorService communicationExecutor,
153 ScheduledExecutorService backgroundExecutor,
154 boolean tombstonesDisabled,
155 long antiEntropyPeriod,
156 TimeUnit antiEntropyTimeUnit,
Jonathan Hartca335e92015-03-05 10:34:32 -0800157 boolean convergeFaster,
158 boolean persistent) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700159 items = Maps.newConcurrentMap();
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800160 senderPending = Maps.newConcurrentMap();
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700161 destroyedMessage = mapName + ERROR_DESTROYED;
Madan Jampani28726282015-02-19 11:40:23 -0800162
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700163 this.clusterService = clusterService;
164 this.clusterCommunicator = clusterCommunicator;
Madan Jampani3d76c942015-06-29 23:37:10 -0700165 this.localNodeId = clusterService.getLocalNode().id();
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700166
167 this.serializer = createSerializer(serializerBuilder);
168
Madan Jampanibcf1a482015-06-24 19:05:56 -0700169 this.timestampProvider = timestampProvider;
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700170
171 if (peerUpdateFunction != null) {
172 this.peerUpdateFunction = peerUpdateFunction;
173 } else {
174 this.peerUpdateFunction = (key, value) -> clusterService.getNodes().stream()
175 .map(ControllerNode::id)
Madan Jampani3d76c942015-06-29 23:37:10 -0700176 .filter(nodeId -> !nodeId.equals(localNodeId))
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700177 .collect(Collectors.toList());
178 }
179
180 if (eventExecutor != null) {
181 this.executor = eventExecutor;
182 } else {
183 // should be a normal executor; it's used for receiving messages
184 this.executor =
185 Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d"));
186 }
187
188 if (communicationExecutor != null) {
189 this.communicationExecutor = communicationExecutor;
190 } else {
191 // sending executor; should be capped
192 //TODO this probably doesn't need to be bounded anymore
193 this.communicationExecutor =
194 newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
195 }
196
Jonathan Hartca335e92015-03-05 10:34:32 -0800197 this.persistent = persistent;
198
199 if (this.persistent) {
200 String dataDirectory = System.getProperty("karaf.data", "./data");
201 String filename = dataDirectory + "/" + "mapdb-ecm-" + mapName;
202
203 ExecutorService dbExecutor =
204 newFixedThreadPool(1, groupedThreads("onos/ecm", mapName + "-dbwriter"));
205
206 persistentStore = new MapDbPersistentStore<>(filename, dbExecutor, serializer);
Madan Jampani3d76c942015-06-29 23:37:10 -0700207 persistentStore.readInto(items);
Jonathan Hartca335e92015-03-05 10:34:32 -0800208 } else {
209 this.persistentStore = null;
210 }
211
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700212 if (backgroundExecutor != null) {
213 this.backgroundExecutor = backgroundExecutor;
214 } else {
215 this.backgroundExecutor =
216 newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d"));
217 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800218
Jonathan Hartaaa56572015-01-28 21:56:35 -0800219 // start anti-entropy thread
Madan Jampani3d76c942015-06-29 23:37:10 -0700220 this.backgroundExecutor.scheduleAtFixedRate(this::sendAdvertisement,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700221 initialDelaySec, antiEntropyPeriod,
222 antiEntropyTimeUnit);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800223
Jonathan Hartdb3af892015-01-26 13:19:07 -0800224 updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
225 clusterCommunicator.addSubscriber(updateMessageSubject,
Madan Jampani3d76c942015-06-29 23:37:10 -0700226 serializer::decode,
227 this::processUpdates,
228 this.executor);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800229
Jonathan Hartaaa56572015-01-28 21:56:35 -0800230 antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
231 clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
Madan Jampani3d76c942015-06-29 23:37:10 -0700232 serializer::decode,
233 this::handleAntiEntropyAdvertisement,
234 this.backgroundExecutor);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800235
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700236 this.tombstonesDisabled = tombstonesDisabled;
237 this.lightweightAntiEntropy = !convergeFaster;
Madan Jampanie1356282015-03-10 19:05:36 -0700238 }
239
Jonathan Hartdb3af892015-01-26 13:19:07 -0800240 private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
241 return new KryoSerializer() {
242 @Override
243 protected void setupKryoPool() {
244 // Add the map's internal helper classes to the user-supplied serializer
245 serializerPool = builder
Madan Jampani3d76c942015-06-29 23:37:10 -0700246 .register(KryoNamespaces.BASIC)
Madan Jampani3e033bd2015-04-08 13:03:49 -0700247 .register(LogicalTimestamp.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800248 .register(WallClockTimestamp.class)
Jonathan Hartaaa56572015-01-28 21:56:35 -0800249 .register(AntiEntropyAdvertisement.class)
Madan Jampani3d76c942015-06-29 23:37:10 -0700250 .register(UpdateEntry.class)
251 .register(MapValue.class)
252 .register(MapValue.Digest.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800253 .build();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800254 }
255 };
256 }
257
258 @Override
259 public int size() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800260 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700261 // TODO: Maintain a separate counter for tracking live elements in map.
262 return Maps.filterValues(items, MapValue::isAlive).size();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800263 }
264
265 @Override
266 public boolean isEmpty() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800267 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700268 return size() == 0;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800269 }
270
271 @Override
272 public boolean containsKey(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800273 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800274 checkNotNull(key, ERROR_NULL_KEY);
Madan Jampani3d76c942015-06-29 23:37:10 -0700275 return get(key) != null;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800276 }
277
278 @Override
279 public boolean containsValue(V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800280 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800281 checkNotNull(value, ERROR_NULL_VALUE);
Madan Jampani3d76c942015-06-29 23:37:10 -0700282 return items.values()
283 .stream()
284 .filter(MapValue::isAlive)
285 .anyMatch(v -> v.get().equals(value));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800286 }
287
288 @Override
289 public V get(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800290 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800291 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800292
Madan Jampani3d76c942015-06-29 23:37:10 -0700293 MapValue<V> value = items.get(key);
294 return (value == null || value.isTombstone()) ? null : value.get();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800295 }
296
297 @Override
298 public void put(K key, V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800299 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800300 checkNotNull(key, ERROR_NULL_KEY);
301 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800302
Madan Jampani3d76c942015-06-29 23:37:10 -0700303 MapValue<V> newValue = new MapValue<>(value, timestampProvider.apply(key, value));
304 if (updateInternal(key, newValue)) {
305 notifyPeers(new UpdateEntry<>(key, newValue), peerUpdateFunction.apply(key, value));
306 notifyListeners(new EventuallyConsistentMapEvent<>(PUT, key, value));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800307 }
308 }
309
Jonathan Hartdb3af892015-01-26 13:19:07 -0800310 @Override
Madan Jampani43e9c9c2015-06-26 14:16:46 -0700311 public V remove(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800312 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800313 checkNotNull(key, ERROR_NULL_KEY);
Madan Jampani3d76c942015-06-29 23:37:10 -0700314 return removeInternal(key, Optional.empty());
Jonathan Hartdb3af892015-01-26 13:19:07 -0800315 }
316
317 @Override
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800318 public void remove(K key, V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800319 checkState(!destroyed, destroyedMessage);
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800320 checkNotNull(key, ERROR_NULL_KEY);
321 checkNotNull(value, ERROR_NULL_VALUE);
Madan Jampani3d76c942015-06-29 23:37:10 -0700322 removeInternal(key, Optional.of(value));
323 }
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800324
Madan Jampani3d76c942015-06-29 23:37:10 -0700325 private V removeInternal(K key, Optional<V> value) {
326 checkState(!destroyed, destroyedMessage);
327 checkNotNull(key, ERROR_NULL_KEY);
328 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800329
Madan Jampani3d76c942015-06-29 23:37:10 -0700330 MapValue<V> newValue = new MapValue<>(null, timestampProvider.apply(key, value.orElse(null)));
331 AtomicBoolean updated = new AtomicBoolean(false);
332 AtomicReference<V> previousValue = new AtomicReference<>();
333 items.compute(key, (k, existing) -> {
334 if (existing != null && existing.isAlive()) {
335 updated.set(!value.isPresent() || value.get().equals(existing.get()));
336 previousValue.set(existing.get());
337 }
338 updated.set(existing == null || newValue.isNewerThan(existing));
339 return updated.get() ? newValue : existing;
340 });
341 if (updated.get()) {
342 notifyPeers(new UpdateEntry<>(key, newValue), peerUpdateFunction.apply(key, previousValue.get()));
343 notifyListeners(new EventuallyConsistentMapEvent<>(REMOVE, key, previousValue.get()));
344 if (persistent) {
345 persistentStore.update(key, newValue);
346 }
347 return previousValue.get();
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800348 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700349 return null;
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800350 }
351
352 @Override
Jonathan Hartdb3af892015-01-26 13:19:07 -0800353 public void putAll(Map<? extends K, ? extends V> m) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800354 checkState(!destroyed, destroyedMessage);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800355 m.forEach(this::put);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800356 }
357
358 @Override
359 public void clear() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800360 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700361 Maps.filterValues(items, MapValue::isAlive)
362 .forEach((k, v) -> remove(k));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800363 }
364
365 @Override
366 public Set<K> keySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800367 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700368 return Maps.filterValues(items, MapValue::isAlive)
369 .keySet();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800370 }
371
372 @Override
373 public Collection<V> values() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800374 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700375 return Maps.filterValues(items, MapValue::isAlive)
376 .values()
377 .stream()
378 .map(MapValue::get)
379 .collect(Collectors.toList());
Jonathan Hartdb3af892015-01-26 13:19:07 -0800380 }
381
382 @Override
383 public Set<Map.Entry<K, V>> entrySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800384 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700385 return Maps.filterValues(items, MapValue::isAlive)
386 .entrySet()
387 .stream()
388 .map(e -> Pair.of(e.getKey(), e.getValue().get()))
389 .collect(Collectors.toSet());
390 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800391
Madan Jampani3d76c942015-06-29 23:37:10 -0700392 /**
393 * Returns true if newValue was accepted i.e. map is updated.
394 * @param key key
395 * @param newValue proposed new value
396 * @return true if update happened; false if map already contains a more recent value for the key
397 */
398 private boolean updateInternal(K key, MapValue<V> newValue) {
399 AtomicBoolean updated = new AtomicBoolean(false);
400 items.compute(key, (k, existing) -> {
401 if (existing == null || newValue.isNewerThan(existing)) {
402 updated.set(true);
403 if (newValue.isTombstone()) {
404 return tombstonesDisabled ? null : newValue;
405 }
406 return newValue;
407 }
408 return existing;
409 });
410 if (updated.get() && persistent) {
411 persistentStore.update(key, newValue);
412 }
413 return updated.get();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800414 }
415
416 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800417 public void addListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800418 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800419
420 listeners.add(checkNotNull(listener));
421 }
422
423 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800424 public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800425 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800426
427 listeners.remove(checkNotNull(listener));
428 }
429
430 @Override
431 public void destroy() {
432 destroyed = true;
433
434 executor.shutdown();
435 backgroundExecutor.shutdown();
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800436 communicationExecutor.shutdown();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800437
Jonathan Hart584d2f32015-01-27 19:46:14 -0800438 listeners.clear();
439
Jonathan Hartdb3af892015-01-26 13:19:07 -0800440 clusterCommunicator.removeSubscriber(updateMessageSubject);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800441 clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800442 }
443
Jonathan Hartaaa56572015-01-28 21:56:35 -0800444 private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700445 listeners.forEach(listener -> listener.event(event));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800446 }
447
Madan Jampani3d76c942015-06-29 23:37:10 -0700448 private void notifyPeers(UpdateEntry<K, V> event, Collection<NodeId> peers) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800449 queueUpdate(event, peers);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800450 }
451
Madan Jampani3d76c942015-06-29 23:37:10 -0700452 private void queueUpdate(UpdateEntry<K, V> event, Collection<NodeId> peers) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800453 if (peers == null) {
454 // we have no friends :(
455 return;
456 }
457 peers.forEach(node ->
Madan Jampani3d76c942015-06-29 23:37:10 -0700458 senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event)
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800459 );
460 }
461
Jonathan Hart233a18a2015-03-02 17:24:58 -0800462 private boolean underHighLoad() {
463 return counter.get(LOAD_WINDOW) > HIGH_LOAD_THRESHOLD;
464 }
465
Madan Jampani3d76c942015-06-29 23:37:10 -0700466 private void sendAdvertisement() {
467 try {
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700468 if (underHighLoad() || destroyed) {
Jonathan Hart233a18a2015-03-02 17:24:58 -0800469 return;
470 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700471 pickRandomActivePeer().ifPresent(this::sendAdvertisementToPeer);
472 } catch (Exception e) {
473 // Catch all exceptions to avoid scheduled task being suppressed.
474 log.error("Exception thrown while sending advertisement", e);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800475 }
476 }
477
Madan Jampani3d76c942015-06-29 23:37:10 -0700478 private Optional<NodeId> pickRandomActivePeer() {
479 List<NodeId> activePeers = clusterService.getNodes()
480 .stream()
481 .filter(node -> !localNodeId.equals(node))
482 .map(ControllerNode::id)
483 .filter(id -> clusterService.getState(id) == ControllerNode.State.ACTIVE)
484 .collect(Collectors.toList());
485 Collections.shuffle(activePeers);
486 return activePeers.isEmpty() ? Optional.empty() : Optional.of(activePeers.get(0));
487 }
488
489 private void sendAdvertisementToPeer(NodeId peer) {
490 clusterCommunicator.unicast(createAdvertisement(),
491 antiEntropyAdvertisementSubject,
492 serializer::encode,
493 peer)
494 .whenComplete((result, error) -> {
495 if (error != null) {
496 log.warn("Failed to send anti-entropy advertisement to {}", peer);
497 }
498 });
499 }
500
501
Jonathan Hartaaa56572015-01-28 21:56:35 -0800502 private AntiEntropyAdvertisement<K> createAdvertisement() {
Madan Jampani3d76c942015-06-29 23:37:10 -0700503 return new AntiEntropyAdvertisement<K>(localNodeId, Maps.transformValues(items, MapValue::digest));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800504 }
505
506 private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700507 if (destroyed || underHighLoad()) {
508 return;
509 }
510 try {
511 antiEntropyCheckLocalItems(ad).forEach(this::notifyListeners);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800512
Madan Jampani3d76c942015-06-29 23:37:10 -0700513 if (!lightweightAntiEntropy) {
514 Set<K> missingKeys = Sets.difference(items.keySet(), ad.digest().keySet());
515 // if remote ad has something unknown, actively sync
516 if (missingKeys.size() > 0) {
Jonathan Hartf893be82015-02-24 17:35:51 -0800517 // Send the advertisement back if this peer is out-of-sync
Madan Jampani3d76c942015-06-29 23:37:10 -0700518 // TODO: Send ad for missing keys and for entries that are stale
519 sendAdvertisementToPeer(ad.sender());
Jonathan Hartf893be82015-02-24 17:35:51 -0800520 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800521 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700522 } catch (Exception e) {
523 log.warn("Error handling anti-entropy advertisement", e);
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800524 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800525 }
526
527 /**
Madan Jampani3d76c942015-06-29 23:37:10 -0700528 * Processes anti-entropy ad from peer by taking following actions:
529 * 1. If peer has an old entry, updates peer.
530 * 2. If peer indicates an entry is removed and has a more recent
531 * timestamp than the local entry, update local state.
Jonathan Hartaaa56572015-01-28 21:56:35 -0800532 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800533 private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
534 AntiEntropyAdvertisement<K> ad) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700535 final List<EventuallyConsistentMapEvent<K, V>> externalEvents = Lists.newLinkedList();
Jonathan Hartaaa56572015-01-28 21:56:35 -0800536 final NodeId sender = ad.sender();
Madan Jampani3d76c942015-06-29 23:37:10 -0700537 items.forEach((key, localValue) -> {
538 MapValue.Digest remoteValueDigest = ad.digest().get(key);
539 if (remoteValueDigest == null || localValue.isNewerThan(remoteValueDigest.timestamp())) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800540 // local value is more recent, push to sender
Madan Jampani3d76c942015-06-29 23:37:10 -0700541 queueUpdate(new UpdateEntry<>(key, localValue), ImmutableList.of(sender));
542 } else {
543 if (remoteValueDigest.isTombstone()
544 && remoteValueDigest.timestamp().isNewerThan(localValue.timestamp())) {
545 if (updateInternal(key, new MapValue<>(null, remoteValueDigest.timestamp()))) {
546 externalEvents.add(new EventuallyConsistentMapEvent<>(REMOVE, key, null));
547 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800548 }
549 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700550 });
Jonathan Hartaaa56572015-01-28 21:56:35 -0800551 return externalEvents;
552 }
553
Madan Jampani3d76c942015-06-29 23:37:10 -0700554 private void processUpdates(Collection<UpdateEntry<K, V>> updates) {
555 if (destroyed) {
556 return;
Jonathan Hartaaa56572015-01-28 21:56:35 -0800557 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700558 updates.forEach(update -> {
559 final K key = update.key();
560 final MapValue<V> value = update.value();
Jonathan Hartaaa56572015-01-28 21:56:35 -0800561
Madan Jampani3d76c942015-06-29 23:37:10 -0700562 if (updateInternal(key, value)) {
563 final EventuallyConsistentMapEvent.Type type = value.isTombstone() ? REMOVE : PUT;
564 notifyListeners(new EventuallyConsistentMapEvent<>(type, key, value.get()));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800565 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700566 });
Jonathan Hartdb3af892015-01-26 13:19:07 -0800567 }
568
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800569 // TODO pull this into the class if this gets pulled out...
570 private static final int DEFAULT_MAX_EVENTS = 1000;
571 private static final int DEFAULT_MAX_IDLE_MS = 10;
572 private static final int DEFAULT_MAX_BATCH_MS = 50;
573 private static final Timer TIMER = new Timer("onos-ecm-sender-events");
Jonathan Hartdb3af892015-01-26 13:19:07 -0800574
Madan Jampani3d76c942015-06-29 23:37:10 -0700575 private final class EventAccumulator extends AbstractAccumulator<UpdateEntry<K, V>> {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800576
577 private final NodeId peer;
578
579 private EventAccumulator(NodeId peer) {
580 super(TIMER, DEFAULT_MAX_EVENTS, DEFAULT_MAX_BATCH_MS, DEFAULT_MAX_IDLE_MS);
581 this.peer = peer;
582 }
583
584 @Override
Madan Jampani3d76c942015-06-29 23:37:10 -0700585 public void processItems(List<UpdateEntry<K, V>> items) {
586 Map<K, UpdateEntry<K, V>> map = Maps.newHashMap();
587 items.forEach(item -> map.compute(item.key(), (key, existing) ->
588 existing == null || item.compareTo(existing) > 0 ? item : existing));
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800589 communicationExecutor.submit(() -> {
Madan Jampani3d76c942015-06-29 23:37:10 -0700590 clusterCommunicator.unicast(ImmutableList.copyOf(map.values()),
Madan Jampani175e8fd2015-05-20 14:10:45 -0700591 updateMessageSubject,
592 serializer::encode,
593 peer)
594 .whenComplete((result, error) -> {
595 if (error != null) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700596 log.debug("Failed to send to {}", peer, error);
Madan Jampani175e8fd2015-05-20 14:10:45 -0700597 }
598 });
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800599 });
Jonathan Hartdb3af892015-01-26 13:19:07 -0800600 }
601 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700602}