blob: b78b0d33135b4aff6f97dad80548f21bb08568cc [file] [log] [blame]
Jonathan Hartdb3af892015-01-26 13:19:07 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Jonathan Hart77bdd262015-02-03 09:07:48 -080016package org.onosproject.store.ecmap;
Jonathan Hartdb3af892015-01-26 13:19:07 -080017
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080018import com.google.common.collect.ImmutableList;
19import com.google.common.collect.Lists;
20import com.google.common.collect.Maps;
Madan Jampani3e033bd2015-04-08 13:03:49 -070021
Jonathan Hartaaa56572015-01-28 21:56:35 -080022import org.apache.commons.lang3.RandomUtils;
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -080023import org.apache.commons.lang3.mutable.MutableBoolean;
Jonathan Hartf9108232015-02-02 16:37:35 -080024import org.apache.commons.lang3.tuple.Pair;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080025import org.onlab.util.AbstractAccumulator;
Jonathan Hartdb3af892015-01-26 13:19:07 -080026import org.onlab.util.KryoNamespace;
Jonathan Hart233a18a2015-03-02 17:24:58 -080027import org.onlab.util.SlidingWindowCounter;
Jonathan Hartdb3af892015-01-26 13:19:07 -080028import org.onosproject.cluster.ClusterService;
Jonathan Hartaaa56572015-01-28 21:56:35 -080029import org.onosproject.cluster.ControllerNode;
Jonathan Hartdb3af892015-01-26 13:19:07 -080030import org.onosproject.cluster.NodeId;
31import org.onosproject.store.Timestamp;
32import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
33import org.onosproject.store.cluster.messaging.ClusterMessage;
34import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
35import org.onosproject.store.cluster.messaging.MessageSubject;
Madan Jampani3e033bd2015-04-08 13:03:49 -070036import org.onosproject.store.impl.LogicalTimestamp;
Jonathan Hart77bdd262015-02-03 09:07:48 -080037import org.onosproject.store.impl.Timestamped;
Jonathan Hart63939a32015-05-08 11:57:03 -070038import org.onosproject.store.service.WallClockTimestamp;
Jonathan Hartdb3af892015-01-26 13:19:07 -080039import org.onosproject.store.serializers.KryoSerializer;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070040import org.onosproject.store.service.EventuallyConsistentMap;
41import org.onosproject.store.service.EventuallyConsistentMapEvent;
42import org.onosproject.store.service.EventuallyConsistentMapListener;
Jonathan Hartdb3af892015-01-26 13:19:07 -080043import org.slf4j.Logger;
44import org.slf4j.LoggerFactory;
45
Jonathan Hartdb3af892015-01-26 13:19:07 -080046import java.util.ArrayList;
47import java.util.Collection;
Jonathan Hartaaa56572015-01-28 21:56:35 -080048import java.util.HashMap;
49import java.util.LinkedList;
Jonathan Hartdb3af892015-01-26 13:19:07 -080050import java.util.List;
51import java.util.Map;
52import java.util.Set;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080053import java.util.Timer;
Jonathan Hartdb3af892015-01-26 13:19:07 -080054import java.util.concurrent.ConcurrentHashMap;
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -080055import java.util.concurrent.ConcurrentMap;
Jonathan Hartdb3af892015-01-26 13:19:07 -080056import java.util.concurrent.CopyOnWriteArraySet;
57import java.util.concurrent.ExecutorService;
58import java.util.concurrent.Executors;
59import java.util.concurrent.ScheduledExecutorService;
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -080060import java.util.concurrent.TimeUnit;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080061import java.util.function.BiFunction;
Jonathan Hartdb3af892015-01-26 13:19:07 -080062import java.util.stream.Collectors;
63
64import static com.google.common.base.Preconditions.checkNotNull;
65import static com.google.common.base.Preconditions.checkState;
66import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Brian O'Connorc6713a82015-02-24 11:55:48 -080067import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
Brian O'Connor4b2ba5f2015-02-18 20:54:00 -080068import static org.onlab.util.Tools.groupedThreads;
Jonathan Hartdb3af892015-01-26 13:19:07 -080069
70/**
71 * Distributed Map implementation which uses optimistic replication and gossip
72 * based techniques to provide an eventually consistent data store.
73 */
74public class EventuallyConsistentMapImpl<K, V>
75 implements EventuallyConsistentMap<K, V> {
76
77 private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
78
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -080079 private final ConcurrentMap<K, Timestamped<V>> items;
80 private final ConcurrentMap<K, Timestamp> removedItems;
Jonathan Hartdb3af892015-01-26 13:19:07 -080081
Jonathan Hartdb3af892015-01-26 13:19:07 -080082 private final ClusterService clusterService;
83 private final ClusterCommunicationService clusterCommunicator;
84 private final KryoSerializer serializer;
85
Madan Jampanibcf1a482015-06-24 19:05:56 -070086 private final BiFunction<K, V, Timestamp> timestampProvider;
Jonathan Hartdb3af892015-01-26 13:19:07 -080087
88 private final MessageSubject updateMessageSubject;
Jonathan Hartaaa56572015-01-28 21:56:35 -080089 private final MessageSubject antiEntropyAdvertisementSubject;
Jonathan Hartdb3af892015-01-26 13:19:07 -080090
Jonathan Hartaaa56572015-01-28 21:56:35 -080091 private final Set<EventuallyConsistentMapListener<K, V>> listeners
Jonathan Hartdb3af892015-01-26 13:19:07 -080092 = new CopyOnWriteArraySet<>();
93
94 private final ExecutorService executor;
Jonathan Hartdb3af892015-01-26 13:19:07 -080095 private final ScheduledExecutorService backgroundExecutor;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080096 private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
Jonathan Hartdb3af892015-01-26 13:19:07 -080097
Jonathan Hart6ec029a2015-03-24 17:12:35 -070098 private final ExecutorService communicationExecutor;
99 private final Map<NodeId, EventAccumulator> senderPending;
Madan Jampani28726282015-02-19 11:40:23 -0800100
Jonathan Hartdb3af892015-01-26 13:19:07 -0800101 private volatile boolean destroyed = false;
Jonathan Hart539a6462015-01-27 17:05:43 -0800102 private static final String ERROR_DESTROYED = " map is already destroyed";
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800103 private final String destroyedMessage;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800104
Jonathan Hart4f397e82015-02-04 09:10:41 -0800105 private static final String ERROR_NULL_KEY = "Key cannot be null";
106 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
107
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700108 private final long initialDelaySec = 5;
109 private final boolean lightweightAntiEntropy;
110 private final boolean tombstonesDisabled;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800111
Jonathan Hart233a18a2015-03-02 17:24:58 -0800112 private static final int WINDOW_SIZE = 5;
113 private static final int HIGH_LOAD_THRESHOLD = 0;
114 private static final int LOAD_WINDOW = 2;
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700115 private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
Jonathan Hart233a18a2015-03-02 17:24:58 -0800116
Jonathan Hartca335e92015-03-05 10:34:32 -0800117 private final boolean persistent;
118 private final PersistentStore<K, V> persistentStore;
119
Jonathan Hartdb3af892015-01-26 13:19:07 -0800120 /**
121 * Creates a new eventually consistent map shared amongst multiple instances.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800122 * <p>
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700123 * See {@link org.onosproject.store.service.EventuallyConsistentMapBuilder}
124 * for more description of the parameters expected by the map.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800125 * </p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800126 *
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700127 * @param mapName a String identifier for the map.
128 * @param clusterService the cluster service
129 * @param clusterCommunicator the cluster communications service
130 * @param serializerBuilder a Kryo namespace builder that can serialize
131 * both K and V
Madan Jampanibcf1a482015-06-24 19:05:56 -0700132 * @param timestampProvider provider of timestamps for K and V
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700133 * @param peerUpdateFunction function that provides a set of nodes to immediately
134 * update to when there writes to the map
135 * @param eventExecutor executor to use for processing incoming
136 * events from peers
137 * @param communicationExecutor executor to use for sending events to peers
138 * @param backgroundExecutor executor to use for background anti-entropy
139 * tasks
140 * @param tombstonesDisabled true if this map should not maintain
141 * tombstones
142 * @param antiEntropyPeriod period that the anti-entropy task should run
Jonathan Hartca335e92015-03-05 10:34:32 -0800143 * @param antiEntropyTimeUnit time unit for anti-entropy period
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700144 * @param convergeFaster make anti-entropy try to converge faster
Jonathan Hartca335e92015-03-05 10:34:32 -0800145 * @param persistent persist data to disk
Jonathan Hartdb3af892015-01-26 13:19:07 -0800146 */
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700147 EventuallyConsistentMapImpl(String mapName,
148 ClusterService clusterService,
149 ClusterCommunicationService clusterCommunicator,
150 KryoNamespace.Builder serializerBuilder,
Madan Jampanibcf1a482015-06-24 19:05:56 -0700151 BiFunction<K, V, Timestamp> timestampProvider,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700152 BiFunction<K, V, Collection<NodeId>> peerUpdateFunction,
153 ExecutorService eventExecutor,
154 ExecutorService communicationExecutor,
155 ScheduledExecutorService backgroundExecutor,
156 boolean tombstonesDisabled,
157 long antiEntropyPeriod,
158 TimeUnit antiEntropyTimeUnit,
Jonathan Hartca335e92015-03-05 10:34:32 -0800159 boolean convergeFaster,
160 boolean persistent) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800161 items = new ConcurrentHashMap<>();
162 removedItems = new ConcurrentHashMap<>();
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800163 senderPending = Maps.newConcurrentMap();
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700164 destroyedMessage = mapName + ERROR_DESTROYED;
Madan Jampani28726282015-02-19 11:40:23 -0800165
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700166 this.clusterService = clusterService;
167 this.clusterCommunicator = clusterCommunicator;
168
169 this.serializer = createSerializer(serializerBuilder);
170
Madan Jampanibcf1a482015-06-24 19:05:56 -0700171 this.timestampProvider = timestampProvider;
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700172
173 if (peerUpdateFunction != null) {
174 this.peerUpdateFunction = peerUpdateFunction;
175 } else {
176 this.peerUpdateFunction = (key, value) -> clusterService.getNodes().stream()
177 .map(ControllerNode::id)
178 .filter(nodeId -> !nodeId.equals(clusterService.getLocalNode().id()))
179 .collect(Collectors.toList());
180 }
181
182 if (eventExecutor != null) {
183 this.executor = eventExecutor;
184 } else {
185 // should be a normal executor; it's used for receiving messages
186 this.executor =
187 Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d"));
188 }
189
190 if (communicationExecutor != null) {
191 this.communicationExecutor = communicationExecutor;
192 } else {
193 // sending executor; should be capped
194 //TODO this probably doesn't need to be bounded anymore
195 this.communicationExecutor =
196 newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
197 }
198
Jonathan Hartca335e92015-03-05 10:34:32 -0800199 this.persistent = persistent;
200
201 if (this.persistent) {
202 String dataDirectory = System.getProperty("karaf.data", "./data");
203 String filename = dataDirectory + "/" + "mapdb-ecm-" + mapName;
204
205 ExecutorService dbExecutor =
206 newFixedThreadPool(1, groupedThreads("onos/ecm", mapName + "-dbwriter"));
207
208 persistentStore = new MapDbPersistentStore<>(filename, dbExecutor, serializer);
209 persistentStore.readInto(items, removedItems);
210 } else {
211 this.persistentStore = null;
212 }
213
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700214 if (backgroundExecutor != null) {
215 this.backgroundExecutor = backgroundExecutor;
216 } else {
217 this.backgroundExecutor =
218 newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d"));
219 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800220
Jonathan Hartaaa56572015-01-28 21:56:35 -0800221 // start anti-entropy thread
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700222 this.backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
223 initialDelaySec, antiEntropyPeriod,
224 antiEntropyTimeUnit);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800225
Jonathan Hartdb3af892015-01-26 13:19:07 -0800226 updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
227 clusterCommunicator.addSubscriber(updateMessageSubject,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700228 new InternalEventListener(), this.executor);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800229
Jonathan Hartaaa56572015-01-28 21:56:35 -0800230 antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
231 clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700232 new InternalAntiEntropyListener(), this.backgroundExecutor);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800233
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700234 this.tombstonesDisabled = tombstonesDisabled;
235 this.lightweightAntiEntropy = !convergeFaster;
Madan Jampanie1356282015-03-10 19:05:36 -0700236 }
237
Jonathan Hartdb3af892015-01-26 13:19:07 -0800238 private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
239 return new KryoSerializer() {
240 @Override
241 protected void setupKryoPool() {
242 // Add the map's internal helper classes to the user-supplied serializer
243 serializerPool = builder
Madan Jampani3e033bd2015-04-08 13:03:49 -0700244 .register(LogicalTimestamp.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800245 .register(WallClockTimestamp.class)
246 .register(PutEntry.class)
Jonathan Hart539a6462015-01-27 17:05:43 -0800247 .register(RemoveEntry.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800248 .register(ArrayList.class)
Jonathan Hartaaa56572015-01-28 21:56:35 -0800249 .register(AntiEntropyAdvertisement.class)
250 .register(HashMap.class)
Jonathan Hartca335e92015-03-05 10:34:32 -0800251 .register(Timestamped.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800252 .build();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800253 }
254 };
255 }
256
257 @Override
258 public int size() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800259 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800260 return items.size();
261 }
262
263 @Override
264 public boolean isEmpty() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800265 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800266 return items.isEmpty();
267 }
268
269 @Override
270 public boolean containsKey(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800271 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800272 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800273 return items.containsKey(key);
274 }
275
276 @Override
277 public boolean containsValue(V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800278 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800279 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800280
281 return items.values().stream()
282 .anyMatch(timestamped -> timestamped.value().equals(value));
283 }
284
285 @Override
286 public V get(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800287 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800288 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800289
290 Timestamped<V> value = items.get(key);
291 if (value != null) {
292 return value.value();
293 }
294 return null;
295 }
296
297 @Override
298 public void put(K key, V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800299 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800300 checkNotNull(key, ERROR_NULL_KEY);
301 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800302
Madan Jampanibcf1a482015-06-24 19:05:56 -0700303 Timestamp timestamp = timestampProvider.apply(key, value);
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800304
Jonathan Hartdb3af892015-01-26 13:19:07 -0800305 if (putInternal(key, value, timestamp)) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800306 notifyPeers(new PutEntry<>(key, value, timestamp),
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800307 peerUpdateFunction.apply(key, value));
308 notifyListeners(new EventuallyConsistentMapEvent<>(
309 EventuallyConsistentMapEvent.Type.PUT, key, value));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800310 }
311 }
312
313 private boolean putInternal(K key, V value, Timestamp timestamp) {
Jonathan Hart233a18a2015-03-02 17:24:58 -0800314 counter.incrementCount();
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800315 Timestamp removed = removedItems.get(key);
Jonathan Hart403ea932015-02-20 16:23:00 -0800316 if (removed != null && removed.isNewerThan(timestamp)) {
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800317 log.debug("ecmap - removed was newer {}", value);
318 return false;
319 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800320
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800321 final MutableBoolean updated = new MutableBoolean(false);
322
323 items.compute(key, (k, existing) -> {
Jonathan Hart403ea932015-02-20 16:23:00 -0800324 if (existing != null && existing.isNewerThan(timestamp)) {
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800325 updated.setFalse();
326 return existing;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800327 } else {
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800328 updated.setTrue();
329 return new Timestamped<>(value, timestamp);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800330 }
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800331 });
332
333 boolean success = updated.booleanValue();
334 if (!success) {
335 log.debug("ecmap - existing was newer {}", value);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800336 }
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800337
338 if (success && removed != null) {
339 removedItems.remove(key, removed);
340 }
Jonathan Hartca335e92015-03-05 10:34:32 -0800341
342 if (success && persistent) {
343 persistentStore.put(key, value, timestamp);
344 }
345
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800346 return success;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800347 }
348
349 @Override
350 public void remove(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800351 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800352 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800353
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800354 // TODO prevent calls here if value is important for timestamp
Madan Jampanibcf1a482015-06-24 19:05:56 -0700355 Timestamp timestamp = timestampProvider.apply(key, null);
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800356
Jonathan Hartdb3af892015-01-26 13:19:07 -0800357 if (removeInternal(key, timestamp)) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800358 notifyPeers(new RemoveEntry<>(key, timestamp),
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800359 peerUpdateFunction.apply(key, null));
360 notifyListeners(new EventuallyConsistentMapEvent<>(
361 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800362 }
363 }
364
365 private boolean removeInternal(K key, Timestamp timestamp) {
Madan Jampani54d34992015-03-06 17:27:52 -0800366 if (timestamp == null) {
367 return false;
368 }
369
Jonathan Hart233a18a2015-03-02 17:24:58 -0800370 counter.incrementCount();
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800371 final MutableBoolean updated = new MutableBoolean(false);
372
373 items.compute(key, (k, existing) -> {
374 if (existing != null && existing.isNewerThan(timestamp)) {
375 updated.setFalse();
376 return existing;
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800377 } else {
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800378 updated.setTrue();
379 // remove from items map
380 return null;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800381 }
Jonathan Hartca335e92015-03-05 10:34:32 -0800382 });
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800383
384 if (updated.isFalse()) {
385 return false;
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800386 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800387
Jonathan Hartca335e92015-03-05 10:34:32 -0800388 boolean updatedTombstone = false;
389
Madan Jampanie1356282015-03-10 19:05:36 -0700390 if (!tombstonesDisabled) {
391 Timestamp removedTimestamp = removedItems.get(key);
392 if (removedTimestamp == null) {
Jonathan Hartca335e92015-03-05 10:34:32 -0800393 //Timestamp removed = removedItems.putIfAbsent(key, timestamp);
394 updatedTombstone = (removedItems.putIfAbsent(key, timestamp) == null);
Madan Jampanie1356282015-03-10 19:05:36 -0700395 } else if (timestamp.isNewerThan(removedTimestamp)) {
Jonathan Hartca335e92015-03-05 10:34:32 -0800396 updatedTombstone = removedItems.replace(key, removedTimestamp, timestamp);
Madan Jampanie1356282015-03-10 19:05:36 -0700397 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800398 }
Madan Jampanie1356282015-03-10 19:05:36 -0700399
Jonathan Hartca335e92015-03-05 10:34:32 -0800400 if (updated.booleanValue() && persistent) {
401 persistentStore.remove(key, timestamp);
402 }
403
404 return (!tombstonesDisabled && updatedTombstone) || updated.booleanValue();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800405 }
406
407 @Override
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800408 public void remove(K key, V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800409 checkState(!destroyed, destroyedMessage);
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800410 checkNotNull(key, ERROR_NULL_KEY);
411 checkNotNull(value, ERROR_NULL_VALUE);
412
Madan Jampanibcf1a482015-06-24 19:05:56 -0700413 Timestamp timestamp = timestampProvider.apply(key, value);
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800414
415 if (removeInternal(key, timestamp)) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800416 notifyPeers(new RemoveEntry<>(key, timestamp),
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800417 peerUpdateFunction.apply(key, value));
418 notifyListeners(new EventuallyConsistentMapEvent<>(
419 EventuallyConsistentMapEvent.Type.REMOVE, key, value));
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800420 }
421 }
422
423 @Override
Jonathan Hartdb3af892015-01-26 13:19:07 -0800424 public void putAll(Map<? extends K, ? extends V> m) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800425 checkState(!destroyed, destroyedMessage);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800426 m.forEach(this::put);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800427 }
428
429 @Override
430 public void clear() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800431 checkState(!destroyed, destroyedMessage);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800432 items.forEach((key, value) -> remove(key));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800433 }
434
435 @Override
436 public Set<K> keySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800437 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800438 return items.keySet();
439 }
440
441 @Override
442 public Collection<V> values() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800443 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800444 return items.values().stream()
445 .map(Timestamped::value)
446 .collect(Collectors.toList());
447 }
448
449 @Override
450 public Set<Map.Entry<K, V>> entrySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800451 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800452
453 return items.entrySet().stream()
Jonathan Hartf9108232015-02-02 16:37:35 -0800454 .map(e -> Pair.of(e.getKey(), e.getValue().value()))
Jonathan Hartdb3af892015-01-26 13:19:07 -0800455 .collect(Collectors.toSet());
456 }
457
458 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800459 public void addListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800460 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800461
462 listeners.add(checkNotNull(listener));
463 }
464
465 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800466 public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800467 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800468
469 listeners.remove(checkNotNull(listener));
470 }
471
472 @Override
473 public void destroy() {
474 destroyed = true;
475
476 executor.shutdown();
477 backgroundExecutor.shutdown();
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800478 communicationExecutor.shutdown();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800479
Jonathan Hart584d2f32015-01-27 19:46:14 -0800480 listeners.clear();
481
Jonathan Hartdb3af892015-01-26 13:19:07 -0800482 clusterCommunicator.removeSubscriber(updateMessageSubject);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800483 clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800484 }
485
Jonathan Hartaaa56572015-01-28 21:56:35 -0800486 private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
487 for (EventuallyConsistentMapListener<K, V> listener : listeners) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800488 listener.event(event);
489 }
490 }
491
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800492 private void notifyPeers(PutEntry<K, V> event, Collection<NodeId> peers) {
493 queueUpdate(event, peers);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800494 }
495
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800496 private void notifyPeers(RemoveEntry<K, V> event, Collection<NodeId> peers) {
497 queueUpdate(event, peers);
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800498 }
499
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800500 private void queueUpdate(AbstractEntry<K, V> event, Collection<NodeId> peers) {
501 if (peers == null) {
502 // we have no friends :(
503 return;
504 }
505 peers.forEach(node ->
506 senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event)
507 );
508 }
509
Jonathan Hart233a18a2015-03-02 17:24:58 -0800510 private boolean underHighLoad() {
511 return counter.get(LOAD_WINDOW) > HIGH_LOAD_THRESHOLD;
512 }
513
Jonathan Hartaaa56572015-01-28 21:56:35 -0800514 private final class SendAdvertisementTask implements Runnable {
515 @Override
516 public void run() {
517 if (Thread.currentThread().isInterrupted()) {
518 log.info("Interrupted, quitting");
519 return;
520 }
521
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700522 if (underHighLoad() || destroyed) {
Jonathan Hart233a18a2015-03-02 17:24:58 -0800523 return;
524 }
525
Jonathan Hartaaa56572015-01-28 21:56:35 -0800526 try {
527 final NodeId self = clusterService.getLocalNode().id();
528 Set<ControllerNode> nodes = clusterService.getNodes();
529
530 List<NodeId> nodeIds = nodes.stream()
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800531 .map(ControllerNode::id)
Jonathan Hartaaa56572015-01-28 21:56:35 -0800532 .collect(Collectors.toList());
533
534 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
535 log.trace("No other peers in the cluster.");
536 return;
537 }
538
539 NodeId peer;
540 do {
541 int idx = RandomUtils.nextInt(0, nodeIds.size());
542 peer = nodeIds.get(idx);
543 } while (peer.equals(self));
544
545 if (Thread.currentThread().isInterrupted()) {
546 log.info("Interrupted, quitting");
547 return;
548 }
549
550 AntiEntropyAdvertisement<K> ad = createAdvertisement();
Madan Jampani175e8fd2015-05-20 14:10:45 -0700551 NodeId destination = peer;
552 clusterCommunicator.unicast(ad, antiEntropyAdvertisementSubject, serializer::encode, peer)
553 .whenComplete((result, error) -> {
554 if (error != null) {
555 log.debug("Failed to send anti-entropy advertisement to {}", destination);
556 }
557 });
Jonathan Hartaaa56572015-01-28 21:56:35 -0800558
Jonathan Hartaaa56572015-01-28 21:56:35 -0800559 } catch (Exception e) {
560 // Catch all exceptions to avoid scheduled task being suppressed.
561 log.error("Exception thrown while sending advertisement", e);
562 }
563 }
564 }
565
566 private AntiEntropyAdvertisement<K> createAdvertisement() {
567 final NodeId self = clusterService.getLocalNode().id();
568
569 Map<K, Timestamp> timestamps = new HashMap<>(items.size());
570
571 items.forEach((key, value) -> timestamps.put(key, value.timestamp()));
572
573 Map<K, Timestamp> tombstones = new HashMap<>(removedItems);
574
575 return new AntiEntropyAdvertisement<>(self, timestamps, tombstones);
576 }
577
578 private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
579 List<EventuallyConsistentMapEvent<K, V>> externalEvents;
580
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800581 externalEvents = antiEntropyCheckLocalItems(ad);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800582
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800583 antiEntropyCheckLocalRemoved(ad);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800584
Jonathan Hartf893be82015-02-24 17:35:51 -0800585 if (!lightweightAntiEntropy) {
586 externalEvents.addAll(antiEntropyCheckRemoteRemoved(ad));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800587
Jonathan Hartf893be82015-02-24 17:35:51 -0800588 // if remote ad has something unknown, actively sync
589 for (K key : ad.timestamps().keySet()) {
590 if (!items.containsKey(key)) {
591 // Send the advertisement back if this peer is out-of-sync
592 final NodeId sender = ad.sender();
593 AntiEntropyAdvertisement<K> myAd = createAdvertisement();
Madan Jampani175e8fd2015-05-20 14:10:45 -0700594
595 clusterCommunicator.unicast(myAd, antiEntropyAdvertisementSubject, serializer::encode, sender)
596 .whenComplete((result, error) -> {
597 if (error != null) {
598 log.debug("Failed to send reactive "
599 + "anti-entropy advertisement to {}", sender);
600 }
601 });
Jonathan Hartf893be82015-02-24 17:35:51 -0800602 break;
603 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800604 }
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800605 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800606 externalEvents.forEach(this::notifyListeners);
607 }
608
609 /**
610 * Checks if any of the remote's live items or tombstones are out of date
611 * according to our local live item list, or if our live items are out of
612 * date according to the remote's tombstone list.
613 * If the local copy is more recent, it will be pushed to the remote. If the
614 * remote has a more recent remove, we apply that to the local state.
615 *
616 * @param ad remote anti-entropy advertisement
617 * @return list of external events relating to local operations performed
618 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800619 private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
620 AntiEntropyAdvertisement<K> ad) {
621 final List<EventuallyConsistentMapEvent<K, V>> externalEvents
622 = new LinkedList<>();
623 final NodeId sender = ad.sender();
624
Jonathan Hartaaa56572015-01-28 21:56:35 -0800625 for (Map.Entry<K, Timestamped<V>> item : items.entrySet()) {
626 K key = item.getKey();
627 Timestamped<V> localValue = item.getValue();
628
629 Timestamp remoteTimestamp = ad.timestamps().get(key);
630 if (remoteTimestamp == null) {
631 remoteTimestamp = ad.tombstones().get(key);
632 }
633 if (remoteTimestamp == null || localValue
Jonathan Hart403ea932015-02-20 16:23:00 -0800634 .isNewerThan(remoteTimestamp)) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800635 // local value is more recent, push to sender
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800636 queueUpdate(new PutEntry<>(key, localValue.value(),
637 localValue.timestamp()), ImmutableList.of(sender));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800638 }
639
640 Timestamp remoteDeadTimestamp = ad.tombstones().get(key);
641 if (remoteDeadTimestamp != null &&
Jonathan Hart403ea932015-02-20 16:23:00 -0800642 remoteDeadTimestamp.isNewerThan(localValue.timestamp())) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800643 // sender has a more recent remove
644 if (removeInternal(key, remoteDeadTimestamp)) {
645 externalEvents.add(new EventuallyConsistentMapEvent<>(
646 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
647 }
648 }
649 }
650
Jonathan Hartaaa56572015-01-28 21:56:35 -0800651 return externalEvents;
652 }
653
654 /**
655 * Checks if any items in the remote live list are out of date according
656 * to our tombstone list. If we find we have a more up to date tombstone,
657 * we'll send it to the remote.
658 *
659 * @param ad remote anti-entropy advertisement
660 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800661 private void antiEntropyCheckLocalRemoved(AntiEntropyAdvertisement<K> ad) {
662 final NodeId sender = ad.sender();
663
Jonathan Hartaaa56572015-01-28 21:56:35 -0800664 for (Map.Entry<K, Timestamp> dead : removedItems.entrySet()) {
665 K key = dead.getKey();
666 Timestamp localDeadTimestamp = dead.getValue();
667
668 Timestamp remoteLiveTimestamp = ad.timestamps().get(key);
669 if (remoteLiveTimestamp != null
Jonathan Hart403ea932015-02-20 16:23:00 -0800670 && localDeadTimestamp.isNewerThan(remoteLiveTimestamp)) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800671 // sender has zombie, push remove
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800672 queueUpdate(new RemoveEntry<>(key, localDeadTimestamp), ImmutableList.of(sender));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800673 }
674 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800675 }
676
677 /**
678 * Checks if any of the local live items are out of date according to the
679 * remote's tombstone advertisements. If we find a local item is out of date,
680 * we'll apply the remove operation to the local state.
681 *
682 * @param ad remote anti-entropy advertisement
683 * @return list of external events relating to local operations performed
684 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800685 private List<EventuallyConsistentMapEvent<K, V>>
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800686 antiEntropyCheckRemoteRemoved(AntiEntropyAdvertisement<K> ad) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800687 final List<EventuallyConsistentMapEvent<K, V>> externalEvents
688 = new LinkedList<>();
689
690 for (Map.Entry<K, Timestamp> remoteDead : ad.tombstones().entrySet()) {
691 K key = remoteDead.getKey();
692 Timestamp remoteDeadTimestamp = remoteDead.getValue();
693
694 Timestamped<V> local = items.get(key);
695 Timestamp localDead = removedItems.get(key);
Jonathan Hart403ea932015-02-20 16:23:00 -0800696 if (local != null && remoteDeadTimestamp.isNewerThan(
697 local.timestamp())) {
698 // If the remote has a more recent tombstone than either our local
699 // value, then do a remove with their timestamp
Jonathan Hartaaa56572015-01-28 21:56:35 -0800700 if (removeInternal(key, remoteDeadTimestamp)) {
701 externalEvents.add(new EventuallyConsistentMapEvent<>(
702 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
703 }
Jonathan Hart403ea932015-02-20 16:23:00 -0800704 } else if (localDead != null && remoteDeadTimestamp.isNewerThan(
705 localDead)) {
706 // If the remote has a more recent tombstone than us, update ours
707 // to their timestamp
708 removeInternal(key, remoteDeadTimestamp);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800709 }
710 }
711
712 return externalEvents;
713 }
714
715 private final class InternalAntiEntropyListener
716 implements ClusterMessageHandler {
717
718 @Override
719 public void handle(ClusterMessage message) {
Jonathan Hart4fd4ebb2015-02-04 17:38:48 -0800720 log.trace("Received anti-entropy advertisement from peer: {}",
721 message.sender());
Jonathan Hartaaa56572015-01-28 21:56:35 -0800722 AntiEntropyAdvertisement<K> advertisement = serializer.decode(message.payload());
Madan Jampani2af244a2015-02-22 13:12:01 -0800723 try {
Jonathan Hart233a18a2015-03-02 17:24:58 -0800724 if (!underHighLoad()) {
725 handleAntiEntropyAdvertisement(advertisement);
726 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800727 } catch (Exception e) {
728 log.warn("Exception thrown handling advertisements", e);
729 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800730 }
731 }
732
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700733 private final class InternalEventListener implements ClusterMessageHandler {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800734 @Override
735 public void handle(ClusterMessage message) {
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700736 if (destroyed) {
737 return;
738 }
739
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800740 log.debug("Received update event from peer: {}", message.sender());
741 Collection<AbstractEntry<K, V>> events = serializer.decode(message.payload());
Jonathan Hartdb3af892015-01-26 13:19:07 -0800742
Madan Jampani2af244a2015-02-22 13:12:01 -0800743 try {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800744 // TODO clean this for loop up
745 for (AbstractEntry<K, V> entry : events) {
746 final K key = entry.key();
747 final V value;
748 final Timestamp timestamp = entry.timestamp();
749 final EventuallyConsistentMapEvent.Type type;
750 if (entry instanceof PutEntry) {
751 PutEntry<K, V> putEntry = (PutEntry<K, V>) entry;
752 value = putEntry.value();
753 type = EventuallyConsistentMapEvent.Type.PUT;
754 } else if (entry instanceof RemoveEntry) {
755 type = EventuallyConsistentMapEvent.Type.REMOVE;
756 value = null;
757 } else {
758 throw new IllegalStateException("Unknown entry type " + entry.getClass());
759 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800760
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800761 boolean success;
762 switch (type) {
763 case PUT:
764 success = putInternal(key, value, timestamp);
765 break;
766 case REMOVE:
767 success = removeInternal(key, timestamp);
768 break;
769 default:
770 success = false;
771 }
772 if (success) {
773 notifyListeners(new EventuallyConsistentMapEvent<>(type, key, value));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800774 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800775 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800776 } catch (Exception e) {
777 log.warn("Exception thrown handling put", e);
778 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800779 }
780 }
781
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800782 // TODO pull this into the class if this gets pulled out...
783 private static final int DEFAULT_MAX_EVENTS = 1000;
784 private static final int DEFAULT_MAX_IDLE_MS = 10;
785 private static final int DEFAULT_MAX_BATCH_MS = 50;
786 private static final Timer TIMER = new Timer("onos-ecm-sender-events");
Jonathan Hartdb3af892015-01-26 13:19:07 -0800787
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800788 private final class EventAccumulator extends AbstractAccumulator<AbstractEntry<K, V>> {
789
790 private final NodeId peer;
791
792 private EventAccumulator(NodeId peer) {
793 super(TIMER, DEFAULT_MAX_EVENTS, DEFAULT_MAX_BATCH_MS, DEFAULT_MAX_IDLE_MS);
794 this.peer = peer;
795 }
796
797 @Override
798 public void processItems(List<AbstractEntry<K, V>> items) {
799 Map<K, AbstractEntry<K, V>> map = Maps.newHashMap();
800 items.forEach(item -> map.compute(item.key(), (key, oldValue) ->
801 oldValue == null || item.compareTo(oldValue) > 0 ? item : oldValue
802 )
803 );
804 communicationExecutor.submit(() -> {
Madan Jampani175e8fd2015-05-20 14:10:45 -0700805 clusterCommunicator.unicast(Lists.newArrayList(map.values()),
806 updateMessageSubject,
807 serializer::encode,
808 peer)
809 .whenComplete((result, error) -> {
810 if (error != null) {
811 log.debug("Failed to send to {}", peer);
812 }
813 });
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800814 });
Jonathan Hartdb3af892015-01-26 13:19:07 -0800815 }
816 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800817}