blob: 33e425184fa5105fa34dfdd461be220987d1f164 [file] [log] [blame]
Jonathan Hartdb3af892015-01-26 13:19:07 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Jonathan Hart77bdd262015-02-03 09:07:48 -080016package org.onosproject.store.ecmap;
Jonathan Hartdb3af892015-01-26 13:19:07 -080017
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080018import com.google.common.collect.ImmutableList;
19import com.google.common.collect.Lists;
20import com.google.common.collect.Maps;
Madan Jampani3e033bd2015-04-08 13:03:49 -070021
Jonathan Hartaaa56572015-01-28 21:56:35 -080022import org.apache.commons.lang3.RandomUtils;
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -080023import org.apache.commons.lang3.mutable.MutableBoolean;
Jonathan Hartf9108232015-02-02 16:37:35 -080024import org.apache.commons.lang3.tuple.Pair;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080025import org.onlab.util.AbstractAccumulator;
Jonathan Hartdb3af892015-01-26 13:19:07 -080026import org.onlab.util.KryoNamespace;
Jonathan Hart233a18a2015-03-02 17:24:58 -080027import org.onlab.util.SlidingWindowCounter;
Jonathan Hartdb3af892015-01-26 13:19:07 -080028import org.onosproject.cluster.ClusterService;
Jonathan Hartaaa56572015-01-28 21:56:35 -080029import org.onosproject.cluster.ControllerNode;
Jonathan Hartdb3af892015-01-26 13:19:07 -080030import org.onosproject.cluster.NodeId;
31import org.onosproject.store.Timestamp;
32import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
33import org.onosproject.store.cluster.messaging.ClusterMessage;
34import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
35import org.onosproject.store.cluster.messaging.MessageSubject;
Madan Jampani3e033bd2015-04-08 13:03:49 -070036import org.onosproject.store.impl.LogicalTimestamp;
Jonathan Hart77bdd262015-02-03 09:07:48 -080037import org.onosproject.store.impl.Timestamped;
38import org.onosproject.store.impl.WallClockTimestamp;
Jonathan Hartdb3af892015-01-26 13:19:07 -080039import org.onosproject.store.serializers.KryoSerializer;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070040import org.onosproject.store.service.ClockService;
41import org.onosproject.store.service.EventuallyConsistentMap;
42import org.onosproject.store.service.EventuallyConsistentMapEvent;
43import org.onosproject.store.service.EventuallyConsistentMapListener;
Jonathan Hartdb3af892015-01-26 13:19:07 -080044import org.slf4j.Logger;
45import org.slf4j.LoggerFactory;
46
Jonathan Hartdb3af892015-01-26 13:19:07 -080047import java.util.ArrayList;
48import java.util.Collection;
Jonathan Hartaaa56572015-01-28 21:56:35 -080049import java.util.HashMap;
50import java.util.LinkedList;
Jonathan Hartdb3af892015-01-26 13:19:07 -080051import java.util.List;
52import java.util.Map;
53import java.util.Set;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080054import java.util.Timer;
Jonathan Hartdb3af892015-01-26 13:19:07 -080055import java.util.concurrent.ConcurrentHashMap;
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -080056import java.util.concurrent.ConcurrentMap;
Jonathan Hartdb3af892015-01-26 13:19:07 -080057import java.util.concurrent.CopyOnWriteArraySet;
58import java.util.concurrent.ExecutorService;
59import java.util.concurrent.Executors;
60import java.util.concurrent.ScheduledExecutorService;
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -080061import java.util.concurrent.TimeUnit;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080062import java.util.function.BiFunction;
Jonathan Hartdb3af892015-01-26 13:19:07 -080063import java.util.stream.Collectors;
64
65import static com.google.common.base.Preconditions.checkNotNull;
66import static com.google.common.base.Preconditions.checkState;
67import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Brian O'Connorc6713a82015-02-24 11:55:48 -080068import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
Brian O'Connor4b2ba5f2015-02-18 20:54:00 -080069import static org.onlab.util.Tools.groupedThreads;
Jonathan Hartdb3af892015-01-26 13:19:07 -080070
71/**
72 * Distributed Map implementation which uses optimistic replication and gossip
73 * based techniques to provide an eventually consistent data store.
74 */
75public class EventuallyConsistentMapImpl<K, V>
76 implements EventuallyConsistentMap<K, V> {
77
78 private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
79
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -080080 private final ConcurrentMap<K, Timestamped<V>> items;
81 private final ConcurrentMap<K, Timestamp> removedItems;
Jonathan Hartdb3af892015-01-26 13:19:07 -080082
Jonathan Hartdb3af892015-01-26 13:19:07 -080083 private final ClusterService clusterService;
84 private final ClusterCommunicationService clusterCommunicator;
85 private final KryoSerializer serializer;
86
Jonathan Hart5ec32ba2015-02-05 13:33:58 -080087 private final ClockService<K, V> clockService;
Jonathan Hartdb3af892015-01-26 13:19:07 -080088
89 private final MessageSubject updateMessageSubject;
Jonathan Hartaaa56572015-01-28 21:56:35 -080090 private final MessageSubject antiEntropyAdvertisementSubject;
Jonathan Hartdb3af892015-01-26 13:19:07 -080091
Jonathan Hartaaa56572015-01-28 21:56:35 -080092 private final Set<EventuallyConsistentMapListener<K, V>> listeners
Jonathan Hartdb3af892015-01-26 13:19:07 -080093 = new CopyOnWriteArraySet<>();
94
95 private final ExecutorService executor;
Jonathan Hartdb3af892015-01-26 13:19:07 -080096 private final ScheduledExecutorService backgroundExecutor;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080097 private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
Jonathan Hartdb3af892015-01-26 13:19:07 -080098
Jonathan Hart6ec029a2015-03-24 17:12:35 -070099 private final ExecutorService communicationExecutor;
100 private final Map<NodeId, EventAccumulator> senderPending;
Madan Jampani28726282015-02-19 11:40:23 -0800101
Jonathan Hartdb3af892015-01-26 13:19:07 -0800102 private volatile boolean destroyed = false;
Jonathan Hart539a6462015-01-27 17:05:43 -0800103 private static final String ERROR_DESTROYED = " map is already destroyed";
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800104 private final String destroyedMessage;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800105
Jonathan Hart4f397e82015-02-04 09:10:41 -0800106 private static final String ERROR_NULL_KEY = "Key cannot be null";
107 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
108
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700109 private final long initialDelaySec = 5;
110 private final boolean lightweightAntiEntropy;
111 private final boolean tombstonesDisabled;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800112
Jonathan Hart233a18a2015-03-02 17:24:58 -0800113 private static final int WINDOW_SIZE = 5;
114 private static final int HIGH_LOAD_THRESHOLD = 0;
115 private static final int LOAD_WINDOW = 2;
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700116 private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
Jonathan Hart233a18a2015-03-02 17:24:58 -0800117
Jonathan Hartca335e92015-03-05 10:34:32 -0800118 private final boolean persistent;
119 private final PersistentStore<K, V> persistentStore;
120
Jonathan Hartdb3af892015-01-26 13:19:07 -0800121 /**
122 * Creates a new eventually consistent map shared amongst multiple instances.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800123 * <p>
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700124 * See {@link org.onosproject.store.service.EventuallyConsistentMapBuilder}
125 * for more description of the parameters expected by the map.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800126 * </p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800127 *
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700128 * @param mapName a String identifier for the map.
129 * @param clusterService the cluster service
130 * @param clusterCommunicator the cluster communications service
131 * @param serializerBuilder a Kryo namespace builder that can serialize
132 * both K and V
133 * @param clockService a clock service able to generate timestamps
134 * for K and V
135 * @param peerUpdateFunction function that provides a set of nodes to immediately
136 * update to when there writes to the map
137 * @param eventExecutor executor to use for processing incoming
138 * events from peers
139 * @param communicationExecutor executor to use for sending events to peers
140 * @param backgroundExecutor executor to use for background anti-entropy
141 * tasks
142 * @param tombstonesDisabled true if this map should not maintain
143 * tombstones
144 * @param antiEntropyPeriod period that the anti-entropy task should run
Jonathan Hartca335e92015-03-05 10:34:32 -0800145 * @param antiEntropyTimeUnit time unit for anti-entropy period
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700146 * @param convergeFaster make anti-entropy try to converge faster
Jonathan Hartca335e92015-03-05 10:34:32 -0800147 * @param persistent persist data to disk
Jonathan Hartdb3af892015-01-26 13:19:07 -0800148 */
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700149 EventuallyConsistentMapImpl(String mapName,
150 ClusterService clusterService,
151 ClusterCommunicationService clusterCommunicator,
152 KryoNamespace.Builder serializerBuilder,
153 ClockService<K, V> clockService,
154 BiFunction<K, V, Collection<NodeId>> peerUpdateFunction,
155 ExecutorService eventExecutor,
156 ExecutorService communicationExecutor,
157 ScheduledExecutorService backgroundExecutor,
158 boolean tombstonesDisabled,
159 long antiEntropyPeriod,
160 TimeUnit antiEntropyTimeUnit,
Jonathan Hartca335e92015-03-05 10:34:32 -0800161 boolean convergeFaster,
162 boolean persistent) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800163 items = new ConcurrentHashMap<>();
164 removedItems = new ConcurrentHashMap<>();
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800165 senderPending = Maps.newConcurrentMap();
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700166 destroyedMessage = mapName + ERROR_DESTROYED;
Madan Jampani28726282015-02-19 11:40:23 -0800167
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700168 this.clusterService = clusterService;
169 this.clusterCommunicator = clusterCommunicator;
170
171 this.serializer = createSerializer(serializerBuilder);
172
173 this.clockService = clockService;
174
175 if (peerUpdateFunction != null) {
176 this.peerUpdateFunction = peerUpdateFunction;
177 } else {
178 this.peerUpdateFunction = (key, value) -> clusterService.getNodes().stream()
179 .map(ControllerNode::id)
180 .filter(nodeId -> !nodeId.equals(clusterService.getLocalNode().id()))
181 .collect(Collectors.toList());
182 }
183
184 if (eventExecutor != null) {
185 this.executor = eventExecutor;
186 } else {
187 // should be a normal executor; it's used for receiving messages
188 this.executor =
189 Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d"));
190 }
191
192 if (communicationExecutor != null) {
193 this.communicationExecutor = communicationExecutor;
194 } else {
195 // sending executor; should be capped
196 //TODO this probably doesn't need to be bounded anymore
197 this.communicationExecutor =
198 newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
199 }
200
Jonathan Hartca335e92015-03-05 10:34:32 -0800201 this.persistent = persistent;
202
203 if (this.persistent) {
204 String dataDirectory = System.getProperty("karaf.data", "./data");
205 String filename = dataDirectory + "/" + "mapdb-ecm-" + mapName;
206
207 ExecutorService dbExecutor =
208 newFixedThreadPool(1, groupedThreads("onos/ecm", mapName + "-dbwriter"));
209
210 persistentStore = new MapDbPersistentStore<>(filename, dbExecutor, serializer);
211 persistentStore.readInto(items, removedItems);
212 } else {
213 this.persistentStore = null;
214 }
215
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700216 if (backgroundExecutor != null) {
217 this.backgroundExecutor = backgroundExecutor;
218 } else {
219 this.backgroundExecutor =
220 newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d"));
221 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800222
Jonathan Hartaaa56572015-01-28 21:56:35 -0800223 // start anti-entropy thread
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700224 this.backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
225 initialDelaySec, antiEntropyPeriod,
226 antiEntropyTimeUnit);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800227
Jonathan Hartdb3af892015-01-26 13:19:07 -0800228 updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
229 clusterCommunicator.addSubscriber(updateMessageSubject,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700230 new InternalEventListener(), this.executor);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800231
Jonathan Hartaaa56572015-01-28 21:56:35 -0800232 antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
233 clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700234 new InternalAntiEntropyListener(), this.backgroundExecutor);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800235
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700236 this.tombstonesDisabled = tombstonesDisabled;
237 this.lightweightAntiEntropy = !convergeFaster;
Madan Jampanie1356282015-03-10 19:05:36 -0700238 }
239
Jonathan Hartdb3af892015-01-26 13:19:07 -0800240 private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
241 return new KryoSerializer() {
242 @Override
243 protected void setupKryoPool() {
244 // Add the map's internal helper classes to the user-supplied serializer
245 serializerPool = builder
Madan Jampani3e033bd2015-04-08 13:03:49 -0700246 .register(LogicalTimestamp.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800247 .register(WallClockTimestamp.class)
248 .register(PutEntry.class)
Jonathan Hart539a6462015-01-27 17:05:43 -0800249 .register(RemoveEntry.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800250 .register(ArrayList.class)
Jonathan Hartaaa56572015-01-28 21:56:35 -0800251 .register(AntiEntropyAdvertisement.class)
252 .register(HashMap.class)
Jonathan Hartca335e92015-03-05 10:34:32 -0800253 .register(Timestamped.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800254 .build();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800255 }
256 };
257 }
258
259 @Override
260 public int size() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800261 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800262 return items.size();
263 }
264
265 @Override
266 public boolean isEmpty() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800267 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800268 return items.isEmpty();
269 }
270
271 @Override
272 public boolean containsKey(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800273 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800274 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800275 return items.containsKey(key);
276 }
277
278 @Override
279 public boolean containsValue(V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800280 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800281 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800282
283 return items.values().stream()
284 .anyMatch(timestamped -> timestamped.value().equals(value));
285 }
286
287 @Override
288 public V get(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800289 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800290 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800291
292 Timestamped<V> value = items.get(key);
293 if (value != null) {
294 return value.value();
295 }
296 return null;
297 }
298
299 @Override
300 public void put(K key, V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800301 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800302 checkNotNull(key, ERROR_NULL_KEY);
303 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800304
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800305 Timestamp timestamp = clockService.getTimestamp(key, value);
306
Jonathan Hartdb3af892015-01-26 13:19:07 -0800307 if (putInternal(key, value, timestamp)) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800308 notifyPeers(new PutEntry<>(key, value, timestamp),
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800309 peerUpdateFunction.apply(key, value));
310 notifyListeners(new EventuallyConsistentMapEvent<>(
311 EventuallyConsistentMapEvent.Type.PUT, key, value));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800312 }
313 }
314
315 private boolean putInternal(K key, V value, Timestamp timestamp) {
Jonathan Hart233a18a2015-03-02 17:24:58 -0800316 counter.incrementCount();
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800317 Timestamp removed = removedItems.get(key);
Jonathan Hart403ea932015-02-20 16:23:00 -0800318 if (removed != null && removed.isNewerThan(timestamp)) {
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800319 log.debug("ecmap - removed was newer {}", value);
320 return false;
321 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800322
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800323 final MutableBoolean updated = new MutableBoolean(false);
324
325 items.compute(key, (k, existing) -> {
Jonathan Hart403ea932015-02-20 16:23:00 -0800326 if (existing != null && existing.isNewerThan(timestamp)) {
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800327 updated.setFalse();
328 return existing;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800329 } else {
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800330 updated.setTrue();
331 return new Timestamped<>(value, timestamp);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800332 }
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800333 });
334
335 boolean success = updated.booleanValue();
336 if (!success) {
337 log.debug("ecmap - existing was newer {}", value);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800338 }
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800339
340 if (success && removed != null) {
341 removedItems.remove(key, removed);
342 }
Jonathan Hartca335e92015-03-05 10:34:32 -0800343
344 if (success && persistent) {
345 persistentStore.put(key, value, timestamp);
346 }
347
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800348 return success;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800349 }
350
351 @Override
352 public void remove(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800353 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800354 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800355
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800356 // TODO prevent calls here if value is important for timestamp
357 Timestamp timestamp = clockService.getTimestamp(key, null);
358
Jonathan Hartdb3af892015-01-26 13:19:07 -0800359 if (removeInternal(key, timestamp)) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800360 notifyPeers(new RemoveEntry<>(key, timestamp),
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800361 peerUpdateFunction.apply(key, null));
362 notifyListeners(new EventuallyConsistentMapEvent<>(
363 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800364 }
365 }
366
367 private boolean removeInternal(K key, Timestamp timestamp) {
Madan Jampani54d34992015-03-06 17:27:52 -0800368 if (timestamp == null) {
369 return false;
370 }
371
Jonathan Hart233a18a2015-03-02 17:24:58 -0800372 counter.incrementCount();
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800373 final MutableBoolean updated = new MutableBoolean(false);
374
375 items.compute(key, (k, existing) -> {
376 if (existing != null && existing.isNewerThan(timestamp)) {
377 updated.setFalse();
378 return existing;
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800379 } else {
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800380 updated.setTrue();
381 // remove from items map
382 return null;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800383 }
Jonathan Hartca335e92015-03-05 10:34:32 -0800384 });
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800385
386 if (updated.isFalse()) {
387 return false;
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800388 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800389
Jonathan Hartca335e92015-03-05 10:34:32 -0800390 boolean updatedTombstone = false;
391
Madan Jampanie1356282015-03-10 19:05:36 -0700392 if (!tombstonesDisabled) {
393 Timestamp removedTimestamp = removedItems.get(key);
394 if (removedTimestamp == null) {
Jonathan Hartca335e92015-03-05 10:34:32 -0800395 //Timestamp removed = removedItems.putIfAbsent(key, timestamp);
396 updatedTombstone = (removedItems.putIfAbsent(key, timestamp) == null);
Madan Jampanie1356282015-03-10 19:05:36 -0700397 } else if (timestamp.isNewerThan(removedTimestamp)) {
Jonathan Hartca335e92015-03-05 10:34:32 -0800398 updatedTombstone = removedItems.replace(key, removedTimestamp, timestamp);
Madan Jampanie1356282015-03-10 19:05:36 -0700399 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800400 }
Madan Jampanie1356282015-03-10 19:05:36 -0700401
Jonathan Hartca335e92015-03-05 10:34:32 -0800402 if (updated.booleanValue() && persistent) {
403 persistentStore.remove(key, timestamp);
404 }
405
406 return (!tombstonesDisabled && updatedTombstone) || updated.booleanValue();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800407 }
408
409 @Override
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800410 public void remove(K key, V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800411 checkState(!destroyed, destroyedMessage);
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800412 checkNotNull(key, ERROR_NULL_KEY);
413 checkNotNull(value, ERROR_NULL_VALUE);
414
415 Timestamp timestamp = clockService.getTimestamp(key, value);
416
417 if (removeInternal(key, timestamp)) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800418 notifyPeers(new RemoveEntry<>(key, timestamp),
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800419 peerUpdateFunction.apply(key, value));
420 notifyListeners(new EventuallyConsistentMapEvent<>(
421 EventuallyConsistentMapEvent.Type.REMOVE, key, value));
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800422 }
423 }
424
425 @Override
Jonathan Hartdb3af892015-01-26 13:19:07 -0800426 public void putAll(Map<? extends K, ? extends V> m) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800427 checkState(!destroyed, destroyedMessage);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800428 m.forEach(this::put);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800429 }
430
431 @Override
432 public void clear() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800433 checkState(!destroyed, destroyedMessage);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800434 items.forEach((key, value) -> remove(key));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800435 }
436
437 @Override
438 public Set<K> keySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800439 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800440 return items.keySet();
441 }
442
443 @Override
444 public Collection<V> values() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800445 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800446 return items.values().stream()
447 .map(Timestamped::value)
448 .collect(Collectors.toList());
449 }
450
451 @Override
452 public Set<Map.Entry<K, V>> entrySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800453 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800454
455 return items.entrySet().stream()
Jonathan Hartf9108232015-02-02 16:37:35 -0800456 .map(e -> Pair.of(e.getKey(), e.getValue().value()))
Jonathan Hartdb3af892015-01-26 13:19:07 -0800457 .collect(Collectors.toSet());
458 }
459
460 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800461 public void addListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800462 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800463
464 listeners.add(checkNotNull(listener));
465 }
466
467 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800468 public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800469 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800470
471 listeners.remove(checkNotNull(listener));
472 }
473
474 @Override
475 public void destroy() {
476 destroyed = true;
477
478 executor.shutdown();
479 backgroundExecutor.shutdown();
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800480 communicationExecutor.shutdown();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800481
Jonathan Hart584d2f32015-01-27 19:46:14 -0800482 listeners.clear();
483
Jonathan Hartdb3af892015-01-26 13:19:07 -0800484 clusterCommunicator.removeSubscriber(updateMessageSubject);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800485 clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800486 }
487
Jonathan Hartaaa56572015-01-28 21:56:35 -0800488 private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
489 for (EventuallyConsistentMapListener<K, V> listener : listeners) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800490 listener.event(event);
491 }
492 }
493
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800494 private void notifyPeers(PutEntry<K, V> event, Collection<NodeId> peers) {
495 queueUpdate(event, peers);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800496 }
497
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800498 private void notifyPeers(RemoveEntry<K, V> event, Collection<NodeId> peers) {
499 queueUpdate(event, peers);
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800500 }
501
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800502 private void queueUpdate(AbstractEntry<K, V> event, Collection<NodeId> peers) {
503 if (peers == null) {
504 // we have no friends :(
505 return;
506 }
507 peers.forEach(node ->
508 senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event)
509 );
510 }
511
512 private boolean unicastMessage(NodeId peer, MessageSubject subject, Object event) {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800513 ClusterMessage message = new ClusterMessage(
514 clusterService.getLocalNode().id(),
515 subject,
516 serializer.encode(event));
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800517 return clusterCommunicator.unicast(message, peer);
518 // Note: we had this flipped before...
519// communicationExecutor.execute(() -> clusterCommunicator.unicast(message, peer));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800520 }
521
Jonathan Hart233a18a2015-03-02 17:24:58 -0800522 private boolean underHighLoad() {
523 return counter.get(LOAD_WINDOW) > HIGH_LOAD_THRESHOLD;
524 }
525
Jonathan Hartaaa56572015-01-28 21:56:35 -0800526 private final class SendAdvertisementTask implements Runnable {
527 @Override
528 public void run() {
529 if (Thread.currentThread().isInterrupted()) {
530 log.info("Interrupted, quitting");
531 return;
532 }
533
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700534 if (underHighLoad() || destroyed) {
Jonathan Hart233a18a2015-03-02 17:24:58 -0800535 return;
536 }
537
Jonathan Hartaaa56572015-01-28 21:56:35 -0800538 try {
539 final NodeId self = clusterService.getLocalNode().id();
540 Set<ControllerNode> nodes = clusterService.getNodes();
541
542 List<NodeId> nodeIds = nodes.stream()
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800543 .map(ControllerNode::id)
Jonathan Hartaaa56572015-01-28 21:56:35 -0800544 .collect(Collectors.toList());
545
546 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
547 log.trace("No other peers in the cluster.");
548 return;
549 }
550
551 NodeId peer;
552 do {
553 int idx = RandomUtils.nextInt(0, nodeIds.size());
554 peer = nodeIds.get(idx);
555 } while (peer.equals(self));
556
557 if (Thread.currentThread().isInterrupted()) {
558 log.info("Interrupted, quitting");
559 return;
560 }
561
562 AntiEntropyAdvertisement<K> ad = createAdvertisement();
563
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800564 if (!unicastMessage(peer, antiEntropyAdvertisementSubject, ad)) {
565 log.debug("Failed to send anti-entropy advertisement to {}", peer);
566 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800567 } catch (Exception e) {
568 // Catch all exceptions to avoid scheduled task being suppressed.
569 log.error("Exception thrown while sending advertisement", e);
570 }
571 }
572 }
573
574 private AntiEntropyAdvertisement<K> createAdvertisement() {
575 final NodeId self = clusterService.getLocalNode().id();
576
577 Map<K, Timestamp> timestamps = new HashMap<>(items.size());
578
579 items.forEach((key, value) -> timestamps.put(key, value.timestamp()));
580
581 Map<K, Timestamp> tombstones = new HashMap<>(removedItems);
582
583 return new AntiEntropyAdvertisement<>(self, timestamps, tombstones);
584 }
585
586 private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
587 List<EventuallyConsistentMapEvent<K, V>> externalEvents;
588
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800589 externalEvents = antiEntropyCheckLocalItems(ad);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800590
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800591 antiEntropyCheckLocalRemoved(ad);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800592
Jonathan Hartf893be82015-02-24 17:35:51 -0800593 if (!lightweightAntiEntropy) {
594 externalEvents.addAll(antiEntropyCheckRemoteRemoved(ad));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800595
Jonathan Hartf893be82015-02-24 17:35:51 -0800596 // if remote ad has something unknown, actively sync
597 for (K key : ad.timestamps().keySet()) {
598 if (!items.containsKey(key)) {
599 // Send the advertisement back if this peer is out-of-sync
600 final NodeId sender = ad.sender();
601 AntiEntropyAdvertisement<K> myAd = createAdvertisement();
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800602 if (!unicastMessage(sender, antiEntropyAdvertisementSubject, myAd)) {
603 log.debug("Failed to send reactive anti-entropy advertisement to {}", sender);
604 }
Jonathan Hartf893be82015-02-24 17:35:51 -0800605 break;
606 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800607 }
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800608 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800609 externalEvents.forEach(this::notifyListeners);
610 }
611
612 /**
613 * Checks if any of the remote's live items or tombstones are out of date
614 * according to our local live item list, or if our live items are out of
615 * date according to the remote's tombstone list.
616 * If the local copy is more recent, it will be pushed to the remote. If the
617 * remote has a more recent remove, we apply that to the local state.
618 *
619 * @param ad remote anti-entropy advertisement
620 * @return list of external events relating to local operations performed
621 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800622 private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
623 AntiEntropyAdvertisement<K> ad) {
624 final List<EventuallyConsistentMapEvent<K, V>> externalEvents
625 = new LinkedList<>();
626 final NodeId sender = ad.sender();
627
Jonathan Hartaaa56572015-01-28 21:56:35 -0800628 for (Map.Entry<K, Timestamped<V>> item : items.entrySet()) {
629 K key = item.getKey();
630 Timestamped<V> localValue = item.getValue();
631
632 Timestamp remoteTimestamp = ad.timestamps().get(key);
633 if (remoteTimestamp == null) {
634 remoteTimestamp = ad.tombstones().get(key);
635 }
636 if (remoteTimestamp == null || localValue
Jonathan Hart403ea932015-02-20 16:23:00 -0800637 .isNewerThan(remoteTimestamp)) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800638 // local value is more recent, push to sender
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800639 queueUpdate(new PutEntry<>(key, localValue.value(),
640 localValue.timestamp()), ImmutableList.of(sender));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800641 }
642
643 Timestamp remoteDeadTimestamp = ad.tombstones().get(key);
644 if (remoteDeadTimestamp != null &&
Jonathan Hart403ea932015-02-20 16:23:00 -0800645 remoteDeadTimestamp.isNewerThan(localValue.timestamp())) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800646 // sender has a more recent remove
647 if (removeInternal(key, remoteDeadTimestamp)) {
648 externalEvents.add(new EventuallyConsistentMapEvent<>(
649 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
650 }
651 }
652 }
653
Jonathan Hartaaa56572015-01-28 21:56:35 -0800654 return externalEvents;
655 }
656
657 /**
658 * Checks if any items in the remote live list are out of date according
659 * to our tombstone list. If we find we have a more up to date tombstone,
660 * we'll send it to the remote.
661 *
662 * @param ad remote anti-entropy advertisement
663 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800664 private void antiEntropyCheckLocalRemoved(AntiEntropyAdvertisement<K> ad) {
665 final NodeId sender = ad.sender();
666
Jonathan Hartaaa56572015-01-28 21:56:35 -0800667 for (Map.Entry<K, Timestamp> dead : removedItems.entrySet()) {
668 K key = dead.getKey();
669 Timestamp localDeadTimestamp = dead.getValue();
670
671 Timestamp remoteLiveTimestamp = ad.timestamps().get(key);
672 if (remoteLiveTimestamp != null
Jonathan Hart403ea932015-02-20 16:23:00 -0800673 && localDeadTimestamp.isNewerThan(remoteLiveTimestamp)) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800674 // sender has zombie, push remove
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800675 queueUpdate(new RemoveEntry<>(key, localDeadTimestamp), ImmutableList.of(sender));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800676 }
677 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800678 }
679
680 /**
681 * Checks if any of the local live items are out of date according to the
682 * remote's tombstone advertisements. If we find a local item is out of date,
683 * we'll apply the remove operation to the local state.
684 *
685 * @param ad remote anti-entropy advertisement
686 * @return list of external events relating to local operations performed
687 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800688 private List<EventuallyConsistentMapEvent<K, V>>
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800689 antiEntropyCheckRemoteRemoved(AntiEntropyAdvertisement<K> ad) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800690 final List<EventuallyConsistentMapEvent<K, V>> externalEvents
691 = new LinkedList<>();
692
693 for (Map.Entry<K, Timestamp> remoteDead : ad.tombstones().entrySet()) {
694 K key = remoteDead.getKey();
695 Timestamp remoteDeadTimestamp = remoteDead.getValue();
696
697 Timestamped<V> local = items.get(key);
698 Timestamp localDead = removedItems.get(key);
Jonathan Hart403ea932015-02-20 16:23:00 -0800699 if (local != null && remoteDeadTimestamp.isNewerThan(
700 local.timestamp())) {
701 // If the remote has a more recent tombstone than either our local
702 // value, then do a remove with their timestamp
Jonathan Hartaaa56572015-01-28 21:56:35 -0800703 if (removeInternal(key, remoteDeadTimestamp)) {
704 externalEvents.add(new EventuallyConsistentMapEvent<>(
705 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
706 }
Jonathan Hart403ea932015-02-20 16:23:00 -0800707 } else if (localDead != null && remoteDeadTimestamp.isNewerThan(
708 localDead)) {
709 // If the remote has a more recent tombstone than us, update ours
710 // to their timestamp
711 removeInternal(key, remoteDeadTimestamp);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800712 }
713 }
714
715 return externalEvents;
716 }
717
718 private final class InternalAntiEntropyListener
719 implements ClusterMessageHandler {
720
721 @Override
722 public void handle(ClusterMessage message) {
Jonathan Hart4fd4ebb2015-02-04 17:38:48 -0800723 log.trace("Received anti-entropy advertisement from peer: {}",
724 message.sender());
Jonathan Hartaaa56572015-01-28 21:56:35 -0800725 AntiEntropyAdvertisement<K> advertisement = serializer.decode(message.payload());
Madan Jampani2af244a2015-02-22 13:12:01 -0800726 try {
Jonathan Hart233a18a2015-03-02 17:24:58 -0800727 if (!underHighLoad()) {
728 handleAntiEntropyAdvertisement(advertisement);
729 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800730 } catch (Exception e) {
731 log.warn("Exception thrown handling advertisements", e);
732 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800733 }
734 }
735
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700736 private final class InternalEventListener implements ClusterMessageHandler {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800737 @Override
738 public void handle(ClusterMessage message) {
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700739 if (destroyed) {
740 return;
741 }
742
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800743 log.debug("Received update event from peer: {}", message.sender());
744 Collection<AbstractEntry<K, V>> events = serializer.decode(message.payload());
Jonathan Hartdb3af892015-01-26 13:19:07 -0800745
Madan Jampani2af244a2015-02-22 13:12:01 -0800746 try {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800747 // TODO clean this for loop up
748 for (AbstractEntry<K, V> entry : events) {
749 final K key = entry.key();
750 final V value;
751 final Timestamp timestamp = entry.timestamp();
752 final EventuallyConsistentMapEvent.Type type;
753 if (entry instanceof PutEntry) {
754 PutEntry<K, V> putEntry = (PutEntry<K, V>) entry;
755 value = putEntry.value();
756 type = EventuallyConsistentMapEvent.Type.PUT;
757 } else if (entry instanceof RemoveEntry) {
758 type = EventuallyConsistentMapEvent.Type.REMOVE;
759 value = null;
760 } else {
761 throw new IllegalStateException("Unknown entry type " + entry.getClass());
762 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800763
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800764 boolean success;
765 switch (type) {
766 case PUT:
767 success = putInternal(key, value, timestamp);
768 break;
769 case REMOVE:
770 success = removeInternal(key, timestamp);
771 break;
772 default:
773 success = false;
774 }
775 if (success) {
776 notifyListeners(new EventuallyConsistentMapEvent<>(type, key, value));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800777 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800778 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800779 } catch (Exception e) {
780 log.warn("Exception thrown handling put", e);
781 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800782 }
783 }
784
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800785 // TODO pull this into the class if this gets pulled out...
786 private static final int DEFAULT_MAX_EVENTS = 1000;
787 private static final int DEFAULT_MAX_IDLE_MS = 10;
788 private static final int DEFAULT_MAX_BATCH_MS = 50;
789 private static final Timer TIMER = new Timer("onos-ecm-sender-events");
Jonathan Hartdb3af892015-01-26 13:19:07 -0800790
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800791 private final class EventAccumulator extends AbstractAccumulator<AbstractEntry<K, V>> {
792
793 private final NodeId peer;
794
795 private EventAccumulator(NodeId peer) {
796 super(TIMER, DEFAULT_MAX_EVENTS, DEFAULT_MAX_BATCH_MS, DEFAULT_MAX_IDLE_MS);
797 this.peer = peer;
798 }
799
800 @Override
801 public void processItems(List<AbstractEntry<K, V>> items) {
802 Map<K, AbstractEntry<K, V>> map = Maps.newHashMap();
803 items.forEach(item -> map.compute(item.key(), (key, oldValue) ->
804 oldValue == null || item.compareTo(oldValue) > 0 ? item : oldValue
805 )
806 );
807 communicationExecutor.submit(() -> {
808 try {
809 unicastMessage(peer, updateMessageSubject, Lists.newArrayList(map.values()));
810 } catch (Exception e) {
811 log.warn("broadcast error", e);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800812 }
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800813 });
Jonathan Hartdb3af892015-01-26 13:19:07 -0800814 }
815 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800816}