blob: 6d8daaa31b0a0d37dc605a36f8ebe2516d1b5eb9 [file] [log] [blame]
Jonathan Hartdb3af892015-01-26 13:19:07 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Jonathan Hart77bdd262015-02-03 09:07:48 -080016package org.onosproject.store.ecmap;
Jonathan Hartdb3af892015-01-26 13:19:07 -080017
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080018import com.google.common.collect.ImmutableList;
19import com.google.common.collect.Lists;
20import com.google.common.collect.Maps;
Madan Jampani3d76c942015-06-29 23:37:10 -070021import com.google.common.collect.Sets;
Madan Jampani3e033bd2015-04-08 13:03:49 -070022
Jonathan Hartf9108232015-02-02 16:37:35 -080023import org.apache.commons.lang3.tuple.Pair;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080024import org.onlab.util.AbstractAccumulator;
Jonathan Hartdb3af892015-01-26 13:19:07 -080025import org.onlab.util.KryoNamespace;
Jonathan Hart233a18a2015-03-02 17:24:58 -080026import org.onlab.util.SlidingWindowCounter;
Jonathan Hartdb3af892015-01-26 13:19:07 -080027import org.onosproject.cluster.ClusterService;
Jonathan Hartaaa56572015-01-28 21:56:35 -080028import org.onosproject.cluster.ControllerNode;
Jonathan Hartdb3af892015-01-26 13:19:07 -080029import org.onosproject.cluster.NodeId;
30import org.onosproject.store.Timestamp;
31import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Jonathan Hartdb3af892015-01-26 13:19:07 -080032import org.onosproject.store.cluster.messaging.MessageSubject;
Madan Jampani3e033bd2015-04-08 13:03:49 -070033import org.onosproject.store.impl.LogicalTimestamp;
Jonathan Hart63939a32015-05-08 11:57:03 -070034import org.onosproject.store.service.WallClockTimestamp;
Madan Jampani3d76c942015-06-29 23:37:10 -070035import org.onosproject.store.serializers.KryoNamespaces;
Jonathan Hartdb3af892015-01-26 13:19:07 -080036import org.onosproject.store.serializers.KryoSerializer;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070037import org.onosproject.store.service.EventuallyConsistentMap;
38import org.onosproject.store.service.EventuallyConsistentMapEvent;
39import org.onosproject.store.service.EventuallyConsistentMapListener;
Jonathan Hartdb3af892015-01-26 13:19:07 -080040import org.slf4j.Logger;
41import org.slf4j.LoggerFactory;
42
Madan Jampani3d76c942015-06-29 23:37:10 -070043import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
44import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
45
Jonathan Hartdb3af892015-01-26 13:19:07 -080046import java.util.Collection;
Madan Jampani3d76c942015-06-29 23:37:10 -070047import java.util.Collections;
Jonathan Hartdb3af892015-01-26 13:19:07 -080048import java.util.List;
49import java.util.Map;
Madan Jampani43e9c9c2015-06-26 14:16:46 -070050import java.util.Optional;
Jonathan Hartdb3af892015-01-26 13:19:07 -080051import java.util.Set;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080052import java.util.Timer;
Jonathan Hartdb3af892015-01-26 13:19:07 -080053import java.util.concurrent.ExecutorService;
54import java.util.concurrent.Executors;
55import java.util.concurrent.ScheduledExecutorService;
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -080056import java.util.concurrent.TimeUnit;
Madan Jampani3d76c942015-06-29 23:37:10 -070057import java.util.concurrent.atomic.AtomicBoolean;
Madan Jampani43e9c9c2015-06-26 14:16:46 -070058import java.util.concurrent.atomic.AtomicReference;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080059import java.util.function.BiFunction;
Jonathan Hartdb3af892015-01-26 13:19:07 -080060import java.util.stream.Collectors;
61
62import static com.google.common.base.Preconditions.checkNotNull;
63import static com.google.common.base.Preconditions.checkState;
64import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Brian O'Connorc6713a82015-02-24 11:55:48 -080065import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
Brian O'Connor4b2ba5f2015-02-18 20:54:00 -080066import static org.onlab.util.Tools.groupedThreads;
Jonathan Hartdb3af892015-01-26 13:19:07 -080067
68/**
69 * Distributed Map implementation which uses optimistic replication and gossip
70 * based techniques to provide an eventually consistent data store.
71 */
72public class EventuallyConsistentMapImpl<K, V>
73 implements EventuallyConsistentMap<K, V> {
74
75 private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
76
Madan Jampani3d76c942015-06-29 23:37:10 -070077 private final Map<K, MapValue<V>> items;
Jonathan Hartdb3af892015-01-26 13:19:07 -080078
Jonathan Hartdb3af892015-01-26 13:19:07 -080079 private final ClusterService clusterService;
80 private final ClusterCommunicationService clusterCommunicator;
81 private final KryoSerializer serializer;
Madan Jampani3d76c942015-06-29 23:37:10 -070082 private final NodeId localNodeId;
Jonathan Hartdb3af892015-01-26 13:19:07 -080083
Madan Jampanibcf1a482015-06-24 19:05:56 -070084 private final BiFunction<K, V, Timestamp> timestampProvider;
Jonathan Hartdb3af892015-01-26 13:19:07 -080085
86 private final MessageSubject updateMessageSubject;
Jonathan Hartaaa56572015-01-28 21:56:35 -080087 private final MessageSubject antiEntropyAdvertisementSubject;
Jonathan Hartdb3af892015-01-26 13:19:07 -080088
Jonathan Hartaaa56572015-01-28 21:56:35 -080089 private final Set<EventuallyConsistentMapListener<K, V>> listeners
Madan Jampani3d76c942015-06-29 23:37:10 -070090 = Sets.newCopyOnWriteArraySet();
Jonathan Hartdb3af892015-01-26 13:19:07 -080091
92 private final ExecutorService executor;
Jonathan Hartdb3af892015-01-26 13:19:07 -080093 private final ScheduledExecutorService backgroundExecutor;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080094 private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
Jonathan Hartdb3af892015-01-26 13:19:07 -080095
Jonathan Hart6ec029a2015-03-24 17:12:35 -070096 private final ExecutorService communicationExecutor;
97 private final Map<NodeId, EventAccumulator> senderPending;
Madan Jampani28726282015-02-19 11:40:23 -080098
Jonathan Hartdb3af892015-01-26 13:19:07 -080099 private volatile boolean destroyed = false;
Jonathan Hart539a6462015-01-27 17:05:43 -0800100 private static final String ERROR_DESTROYED = " map is already destroyed";
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800101 private final String destroyedMessage;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800102
Jonathan Hart4f397e82015-02-04 09:10:41 -0800103 private static final String ERROR_NULL_KEY = "Key cannot be null";
104 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
105
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700106 private final long initialDelaySec = 5;
107 private final boolean lightweightAntiEntropy;
108 private final boolean tombstonesDisabled;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800109
Jonathan Hart233a18a2015-03-02 17:24:58 -0800110 private static final int WINDOW_SIZE = 5;
111 private static final int HIGH_LOAD_THRESHOLD = 0;
112 private static final int LOAD_WINDOW = 2;
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700113 private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
Jonathan Hart233a18a2015-03-02 17:24:58 -0800114
Jonathan Hartca335e92015-03-05 10:34:32 -0800115 private final boolean persistent;
116 private final PersistentStore<K, V> persistentStore;
117
Jonathan Hartdb3af892015-01-26 13:19:07 -0800118 /**
119 * Creates a new eventually consistent map shared amongst multiple instances.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800120 * <p>
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700121 * See {@link org.onosproject.store.service.EventuallyConsistentMapBuilder}
122 * for more description of the parameters expected by the map.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800123 * </p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800124 *
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700125 * @param mapName a String identifier for the map.
126 * @param clusterService the cluster service
127 * @param clusterCommunicator the cluster communications service
128 * @param serializerBuilder a Kryo namespace builder that can serialize
129 * both K and V
Madan Jampanibcf1a482015-06-24 19:05:56 -0700130 * @param timestampProvider provider of timestamps for K and V
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700131 * @param peerUpdateFunction function that provides a set of nodes to immediately
132 * update to when there writes to the map
133 * @param eventExecutor executor to use for processing incoming
134 * events from peers
135 * @param communicationExecutor executor to use for sending events to peers
136 * @param backgroundExecutor executor to use for background anti-entropy
137 * tasks
138 * @param tombstonesDisabled true if this map should not maintain
139 * tombstones
140 * @param antiEntropyPeriod period that the anti-entropy task should run
Jonathan Hartca335e92015-03-05 10:34:32 -0800141 * @param antiEntropyTimeUnit time unit for anti-entropy period
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700142 * @param convergeFaster make anti-entropy try to converge faster
Jonathan Hartca335e92015-03-05 10:34:32 -0800143 * @param persistent persist data to disk
Jonathan Hartdb3af892015-01-26 13:19:07 -0800144 */
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700145 EventuallyConsistentMapImpl(String mapName,
146 ClusterService clusterService,
147 ClusterCommunicationService clusterCommunicator,
148 KryoNamespace.Builder serializerBuilder,
Madan Jampanibcf1a482015-06-24 19:05:56 -0700149 BiFunction<K, V, Timestamp> timestampProvider,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700150 BiFunction<K, V, Collection<NodeId>> peerUpdateFunction,
151 ExecutorService eventExecutor,
152 ExecutorService communicationExecutor,
153 ScheduledExecutorService backgroundExecutor,
154 boolean tombstonesDisabled,
155 long antiEntropyPeriod,
156 TimeUnit antiEntropyTimeUnit,
Jonathan Hartca335e92015-03-05 10:34:32 -0800157 boolean convergeFaster,
158 boolean persistent) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700159 items = Maps.newConcurrentMap();
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800160 senderPending = Maps.newConcurrentMap();
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700161 destroyedMessage = mapName + ERROR_DESTROYED;
Madan Jampani28726282015-02-19 11:40:23 -0800162
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700163 this.clusterService = clusterService;
164 this.clusterCommunicator = clusterCommunicator;
Madan Jampani3d76c942015-06-29 23:37:10 -0700165 this.localNodeId = clusterService.getLocalNode().id();
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700166
167 this.serializer = createSerializer(serializerBuilder);
168
Madan Jampanibcf1a482015-06-24 19:05:56 -0700169 this.timestampProvider = timestampProvider;
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700170
171 if (peerUpdateFunction != null) {
172 this.peerUpdateFunction = peerUpdateFunction;
173 } else {
174 this.peerUpdateFunction = (key, value) -> clusterService.getNodes().stream()
175 .map(ControllerNode::id)
Madan Jampani3d76c942015-06-29 23:37:10 -0700176 .filter(nodeId -> !nodeId.equals(localNodeId))
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700177 .collect(Collectors.toList());
178 }
179
180 if (eventExecutor != null) {
181 this.executor = eventExecutor;
182 } else {
183 // should be a normal executor; it's used for receiving messages
184 this.executor =
185 Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d"));
186 }
187
188 if (communicationExecutor != null) {
189 this.communicationExecutor = communicationExecutor;
190 } else {
191 // sending executor; should be capped
192 //TODO this probably doesn't need to be bounded anymore
193 this.communicationExecutor =
194 newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
195 }
196
Jonathan Hartca335e92015-03-05 10:34:32 -0800197 this.persistent = persistent;
198
199 if (this.persistent) {
200 String dataDirectory = System.getProperty("karaf.data", "./data");
201 String filename = dataDirectory + "/" + "mapdb-ecm-" + mapName;
202
203 ExecutorService dbExecutor =
204 newFixedThreadPool(1, groupedThreads("onos/ecm", mapName + "-dbwriter"));
205
206 persistentStore = new MapDbPersistentStore<>(filename, dbExecutor, serializer);
Madan Jampani3d76c942015-06-29 23:37:10 -0700207 persistentStore.readInto(items);
Jonathan Hartca335e92015-03-05 10:34:32 -0800208 } else {
209 this.persistentStore = null;
210 }
211
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700212 if (backgroundExecutor != null) {
213 this.backgroundExecutor = backgroundExecutor;
214 } else {
215 this.backgroundExecutor =
216 newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d"));
217 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800218
Jonathan Hartaaa56572015-01-28 21:56:35 -0800219 // start anti-entropy thread
Madan Jampani3d76c942015-06-29 23:37:10 -0700220 this.backgroundExecutor.scheduleAtFixedRate(this::sendAdvertisement,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700221 initialDelaySec, antiEntropyPeriod,
222 antiEntropyTimeUnit);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800223
Jonathan Hartdb3af892015-01-26 13:19:07 -0800224 updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
225 clusterCommunicator.addSubscriber(updateMessageSubject,
Madan Jampani3d76c942015-06-29 23:37:10 -0700226 serializer::decode,
227 this::processUpdates,
228 this.executor);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800229
Jonathan Hartaaa56572015-01-28 21:56:35 -0800230 antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
231 clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
Madan Jampani3d76c942015-06-29 23:37:10 -0700232 serializer::decode,
233 this::handleAntiEntropyAdvertisement,
234 this.backgroundExecutor);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800235
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700236 this.tombstonesDisabled = tombstonesDisabled;
237 this.lightweightAntiEntropy = !convergeFaster;
Madan Jampanie1356282015-03-10 19:05:36 -0700238 }
239
Jonathan Hartdb3af892015-01-26 13:19:07 -0800240 private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
241 return new KryoSerializer() {
242 @Override
243 protected void setupKryoPool() {
244 // Add the map's internal helper classes to the user-supplied serializer
245 serializerPool = builder
Madan Jampani3d76c942015-06-29 23:37:10 -0700246 .register(KryoNamespaces.BASIC)
Madan Jampanidb5d06a2015-06-30 11:16:48 -0700247 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
Madan Jampani3e033bd2015-04-08 13:03:49 -0700248 .register(LogicalTimestamp.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800249 .register(WallClockTimestamp.class)
Jonathan Hartaaa56572015-01-28 21:56:35 -0800250 .register(AntiEntropyAdvertisement.class)
Madan Jampani3d76c942015-06-29 23:37:10 -0700251 .register(UpdateEntry.class)
252 .register(MapValue.class)
253 .register(MapValue.Digest.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800254 .build();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800255 }
256 };
257 }
258
259 @Override
260 public int size() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800261 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700262 // TODO: Maintain a separate counter for tracking live elements in map.
263 return Maps.filterValues(items, MapValue::isAlive).size();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800264 }
265
266 @Override
267 public boolean isEmpty() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800268 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700269 return size() == 0;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800270 }
271
272 @Override
273 public boolean containsKey(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800274 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800275 checkNotNull(key, ERROR_NULL_KEY);
Madan Jampani3d76c942015-06-29 23:37:10 -0700276 return get(key) != null;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800277 }
278
279 @Override
280 public boolean containsValue(V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800281 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800282 checkNotNull(value, ERROR_NULL_VALUE);
Madan Jampani3d76c942015-06-29 23:37:10 -0700283 return items.values()
284 .stream()
285 .filter(MapValue::isAlive)
286 .anyMatch(v -> v.get().equals(value));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800287 }
288
289 @Override
290 public V get(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800291 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800292 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800293
Madan Jampani3d76c942015-06-29 23:37:10 -0700294 MapValue<V> value = items.get(key);
295 return (value == null || value.isTombstone()) ? null : value.get();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800296 }
297
298 @Override
299 public void put(K key, V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800300 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800301 checkNotNull(key, ERROR_NULL_KEY);
302 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800303
Madan Jampani3d76c942015-06-29 23:37:10 -0700304 MapValue<V> newValue = new MapValue<>(value, timestampProvider.apply(key, value));
305 if (updateInternal(key, newValue)) {
306 notifyPeers(new UpdateEntry<>(key, newValue), peerUpdateFunction.apply(key, value));
307 notifyListeners(new EventuallyConsistentMapEvent<>(PUT, key, value));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800308 }
309 }
310
Jonathan Hartdb3af892015-01-26 13:19:07 -0800311 @Override
Madan Jampani43e9c9c2015-06-26 14:16:46 -0700312 public V remove(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800313 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800314 checkNotNull(key, ERROR_NULL_KEY);
Madan Jampani3d76c942015-06-29 23:37:10 -0700315 return removeInternal(key, Optional.empty());
Jonathan Hartdb3af892015-01-26 13:19:07 -0800316 }
317
318 @Override
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800319 public void remove(K key, V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800320 checkState(!destroyed, destroyedMessage);
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800321 checkNotNull(key, ERROR_NULL_KEY);
322 checkNotNull(value, ERROR_NULL_VALUE);
Madan Jampani3d76c942015-06-29 23:37:10 -0700323 removeInternal(key, Optional.of(value));
324 }
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800325
Madan Jampani3d76c942015-06-29 23:37:10 -0700326 private V removeInternal(K key, Optional<V> value) {
327 checkState(!destroyed, destroyedMessage);
328 checkNotNull(key, ERROR_NULL_KEY);
329 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800330
Madan Jampani3d76c942015-06-29 23:37:10 -0700331 MapValue<V> newValue = new MapValue<>(null, timestampProvider.apply(key, value.orElse(null)));
332 AtomicBoolean updated = new AtomicBoolean(false);
333 AtomicReference<V> previousValue = new AtomicReference<>();
334 items.compute(key, (k, existing) -> {
335 if (existing != null && existing.isAlive()) {
336 updated.set(!value.isPresent() || value.get().equals(existing.get()));
337 previousValue.set(existing.get());
338 }
339 updated.set(existing == null || newValue.isNewerThan(existing));
340 return updated.get() ? newValue : existing;
341 });
342 if (updated.get()) {
343 notifyPeers(new UpdateEntry<>(key, newValue), peerUpdateFunction.apply(key, previousValue.get()));
344 notifyListeners(new EventuallyConsistentMapEvent<>(REMOVE, key, previousValue.get()));
345 if (persistent) {
346 persistentStore.update(key, newValue);
347 }
348 return previousValue.get();
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800349 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700350 return null;
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800351 }
352
353 @Override
Jonathan Hartdb3af892015-01-26 13:19:07 -0800354 public void putAll(Map<? extends K, ? extends V> m) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800355 checkState(!destroyed, destroyedMessage);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800356 m.forEach(this::put);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800357 }
358
359 @Override
360 public void clear() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800361 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700362 Maps.filterValues(items, MapValue::isAlive)
363 .forEach((k, v) -> remove(k));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800364 }
365
366 @Override
367 public Set<K> keySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800368 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700369 return Maps.filterValues(items, MapValue::isAlive)
370 .keySet();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800371 }
372
373 @Override
374 public Collection<V> values() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800375 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700376 return Maps.filterValues(items, MapValue::isAlive)
377 .values()
378 .stream()
379 .map(MapValue::get)
380 .collect(Collectors.toList());
Jonathan Hartdb3af892015-01-26 13:19:07 -0800381 }
382
383 @Override
384 public Set<Map.Entry<K, V>> entrySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800385 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700386 return Maps.filterValues(items, MapValue::isAlive)
387 .entrySet()
388 .stream()
389 .map(e -> Pair.of(e.getKey(), e.getValue().get()))
390 .collect(Collectors.toSet());
391 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800392
Madan Jampani3d76c942015-06-29 23:37:10 -0700393 /**
394 * Returns true if newValue was accepted i.e. map is updated.
395 * @param key key
396 * @param newValue proposed new value
397 * @return true if update happened; false if map already contains a more recent value for the key
398 */
399 private boolean updateInternal(K key, MapValue<V> newValue) {
400 AtomicBoolean updated = new AtomicBoolean(false);
401 items.compute(key, (k, existing) -> {
402 if (existing == null || newValue.isNewerThan(existing)) {
403 updated.set(true);
404 if (newValue.isTombstone()) {
405 return tombstonesDisabled ? null : newValue;
406 }
407 return newValue;
408 }
409 return existing;
410 });
411 if (updated.get() && persistent) {
412 persistentStore.update(key, newValue);
413 }
414 return updated.get();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800415 }
416
417 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800418 public void addListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800419 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800420
421 listeners.add(checkNotNull(listener));
422 }
423
424 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800425 public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800426 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800427
428 listeners.remove(checkNotNull(listener));
429 }
430
431 @Override
432 public void destroy() {
433 destroyed = true;
434
435 executor.shutdown();
436 backgroundExecutor.shutdown();
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800437 communicationExecutor.shutdown();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800438
Jonathan Hart584d2f32015-01-27 19:46:14 -0800439 listeners.clear();
440
Jonathan Hartdb3af892015-01-26 13:19:07 -0800441 clusterCommunicator.removeSubscriber(updateMessageSubject);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800442 clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800443 }
444
Jonathan Hartaaa56572015-01-28 21:56:35 -0800445 private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700446 listeners.forEach(listener -> listener.event(event));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800447 }
448
Madan Jampani3d76c942015-06-29 23:37:10 -0700449 private void notifyPeers(UpdateEntry<K, V> event, Collection<NodeId> peers) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800450 queueUpdate(event, peers);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800451 }
452
Madan Jampani3d76c942015-06-29 23:37:10 -0700453 private void queueUpdate(UpdateEntry<K, V> event, Collection<NodeId> peers) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800454 if (peers == null) {
455 // we have no friends :(
456 return;
457 }
458 peers.forEach(node ->
Madan Jampani3d76c942015-06-29 23:37:10 -0700459 senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event)
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800460 );
461 }
462
Jonathan Hart233a18a2015-03-02 17:24:58 -0800463 private boolean underHighLoad() {
464 return counter.get(LOAD_WINDOW) > HIGH_LOAD_THRESHOLD;
465 }
466
Madan Jampani3d76c942015-06-29 23:37:10 -0700467 private void sendAdvertisement() {
468 try {
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700469 if (underHighLoad() || destroyed) {
Jonathan Hart233a18a2015-03-02 17:24:58 -0800470 return;
471 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700472 pickRandomActivePeer().ifPresent(this::sendAdvertisementToPeer);
473 } catch (Exception e) {
474 // Catch all exceptions to avoid scheduled task being suppressed.
475 log.error("Exception thrown while sending advertisement", e);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800476 }
477 }
478
Madan Jampani3d76c942015-06-29 23:37:10 -0700479 private Optional<NodeId> pickRandomActivePeer() {
480 List<NodeId> activePeers = clusterService.getNodes()
481 .stream()
482 .filter(node -> !localNodeId.equals(node))
483 .map(ControllerNode::id)
484 .filter(id -> clusterService.getState(id) == ControllerNode.State.ACTIVE)
485 .collect(Collectors.toList());
486 Collections.shuffle(activePeers);
487 return activePeers.isEmpty() ? Optional.empty() : Optional.of(activePeers.get(0));
488 }
489
490 private void sendAdvertisementToPeer(NodeId peer) {
491 clusterCommunicator.unicast(createAdvertisement(),
492 antiEntropyAdvertisementSubject,
493 serializer::encode,
494 peer)
495 .whenComplete((result, error) -> {
496 if (error != null) {
497 log.warn("Failed to send anti-entropy advertisement to {}", peer);
498 }
499 });
500 }
501
502
Jonathan Hartaaa56572015-01-28 21:56:35 -0800503 private AntiEntropyAdvertisement<K> createAdvertisement() {
Madan Jampani3d76c942015-06-29 23:37:10 -0700504 return new AntiEntropyAdvertisement<K>(localNodeId, Maps.transformValues(items, MapValue::digest));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800505 }
506
507 private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700508 if (destroyed || underHighLoad()) {
509 return;
510 }
511 try {
512 antiEntropyCheckLocalItems(ad).forEach(this::notifyListeners);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800513
Madan Jampani3d76c942015-06-29 23:37:10 -0700514 if (!lightweightAntiEntropy) {
515 Set<K> missingKeys = Sets.difference(items.keySet(), ad.digest().keySet());
516 // if remote ad has something unknown, actively sync
517 if (missingKeys.size() > 0) {
Jonathan Hartf893be82015-02-24 17:35:51 -0800518 // Send the advertisement back if this peer is out-of-sync
Madan Jampani3d76c942015-06-29 23:37:10 -0700519 // TODO: Send ad for missing keys and for entries that are stale
520 sendAdvertisementToPeer(ad.sender());
Jonathan Hartf893be82015-02-24 17:35:51 -0800521 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800522 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700523 } catch (Exception e) {
524 log.warn("Error handling anti-entropy advertisement", e);
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800525 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800526 }
527
528 /**
Madan Jampani3d76c942015-06-29 23:37:10 -0700529 * Processes anti-entropy ad from peer by taking following actions:
530 * 1. If peer has an old entry, updates peer.
531 * 2. If peer indicates an entry is removed and has a more recent
532 * timestamp than the local entry, update local state.
Jonathan Hartaaa56572015-01-28 21:56:35 -0800533 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800534 private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
535 AntiEntropyAdvertisement<K> ad) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700536 final List<EventuallyConsistentMapEvent<K, V>> externalEvents = Lists.newLinkedList();
Jonathan Hartaaa56572015-01-28 21:56:35 -0800537 final NodeId sender = ad.sender();
Madan Jampani3d76c942015-06-29 23:37:10 -0700538 items.forEach((key, localValue) -> {
539 MapValue.Digest remoteValueDigest = ad.digest().get(key);
540 if (remoteValueDigest == null || localValue.isNewerThan(remoteValueDigest.timestamp())) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800541 // local value is more recent, push to sender
Madan Jampani3d76c942015-06-29 23:37:10 -0700542 queueUpdate(new UpdateEntry<>(key, localValue), ImmutableList.of(sender));
543 } else {
544 if (remoteValueDigest.isTombstone()
545 && remoteValueDigest.timestamp().isNewerThan(localValue.timestamp())) {
546 if (updateInternal(key, new MapValue<>(null, remoteValueDigest.timestamp()))) {
547 externalEvents.add(new EventuallyConsistentMapEvent<>(REMOVE, key, null));
548 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800549 }
550 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700551 });
Jonathan Hartaaa56572015-01-28 21:56:35 -0800552 return externalEvents;
553 }
554
Madan Jampani3d76c942015-06-29 23:37:10 -0700555 private void processUpdates(Collection<UpdateEntry<K, V>> updates) {
556 if (destroyed) {
557 return;
Jonathan Hartaaa56572015-01-28 21:56:35 -0800558 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700559 updates.forEach(update -> {
560 final K key = update.key();
561 final MapValue<V> value = update.value();
Jonathan Hartaaa56572015-01-28 21:56:35 -0800562
Madan Jampani3d76c942015-06-29 23:37:10 -0700563 if (updateInternal(key, value)) {
564 final EventuallyConsistentMapEvent.Type type = value.isTombstone() ? REMOVE : PUT;
565 notifyListeners(new EventuallyConsistentMapEvent<>(type, key, value.get()));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800566 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700567 });
Jonathan Hartdb3af892015-01-26 13:19:07 -0800568 }
569
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800570 // TODO pull this into the class if this gets pulled out...
571 private static final int DEFAULT_MAX_EVENTS = 1000;
572 private static final int DEFAULT_MAX_IDLE_MS = 10;
573 private static final int DEFAULT_MAX_BATCH_MS = 50;
574 private static final Timer TIMER = new Timer("onos-ecm-sender-events");
Jonathan Hartdb3af892015-01-26 13:19:07 -0800575
Madan Jampani3d76c942015-06-29 23:37:10 -0700576 private final class EventAccumulator extends AbstractAccumulator<UpdateEntry<K, V>> {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800577
578 private final NodeId peer;
579
580 private EventAccumulator(NodeId peer) {
581 super(TIMER, DEFAULT_MAX_EVENTS, DEFAULT_MAX_BATCH_MS, DEFAULT_MAX_IDLE_MS);
582 this.peer = peer;
583 }
584
585 @Override
Madan Jampani3d76c942015-06-29 23:37:10 -0700586 public void processItems(List<UpdateEntry<K, V>> items) {
587 Map<K, UpdateEntry<K, V>> map = Maps.newHashMap();
588 items.forEach(item -> map.compute(item.key(), (key, existing) ->
589 existing == null || item.compareTo(existing) > 0 ? item : existing));
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800590 communicationExecutor.submit(() -> {
Madan Jampani3d76c942015-06-29 23:37:10 -0700591 clusterCommunicator.unicast(ImmutableList.copyOf(map.values()),
Madan Jampani175e8fd2015-05-20 14:10:45 -0700592 updateMessageSubject,
593 serializer::encode,
594 peer)
595 .whenComplete((result, error) -> {
596 if (error != null) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700597 log.debug("Failed to send to {}", peer, error);
Madan Jampani175e8fd2015-05-20 14:10:45 -0700598 }
599 });
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800600 });
Jonathan Hartdb3af892015-01-26 13:19:07 -0800601 }
602 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700603}