blob: 2987529d7b57ded827ec5a7b1e353e6cb2fa934f [file] [log] [blame]
Jonathan Hartdb3af892015-01-26 13:19:07 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Jonathan Hart77bdd262015-02-03 09:07:48 -080016package org.onosproject.store.ecmap;
Jonathan Hartdb3af892015-01-26 13:19:07 -080017
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080018import com.google.common.collect.ImmutableList;
19import com.google.common.collect.Lists;
20import com.google.common.collect.Maps;
Jonathan Hartaaa56572015-01-28 21:56:35 -080021import org.apache.commons.lang3.RandomUtils;
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -080022import org.apache.commons.lang3.mutable.MutableBoolean;
Jonathan Hartf9108232015-02-02 16:37:35 -080023import org.apache.commons.lang3.tuple.Pair;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080024import org.onlab.util.AbstractAccumulator;
Jonathan Hartdb3af892015-01-26 13:19:07 -080025import org.onlab.util.KryoNamespace;
Jonathan Hart233a18a2015-03-02 17:24:58 -080026import org.onlab.util.SlidingWindowCounter;
Jonathan Hartdb3af892015-01-26 13:19:07 -080027import org.onosproject.cluster.ClusterService;
Jonathan Hartaaa56572015-01-28 21:56:35 -080028import org.onosproject.cluster.ControllerNode;
Jonathan Hartdb3af892015-01-26 13:19:07 -080029import org.onosproject.cluster.NodeId;
30import org.onosproject.store.Timestamp;
31import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
32import org.onosproject.store.cluster.messaging.ClusterMessage;
33import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
34import org.onosproject.store.cluster.messaging.MessageSubject;
Jonathan Hart77bdd262015-02-03 09:07:48 -080035import org.onosproject.store.impl.ClockService;
36import org.onosproject.store.impl.Timestamped;
37import org.onosproject.store.impl.WallClockTimestamp;
Jonathan Hartdb3af892015-01-26 13:19:07 -080038import org.onosproject.store.serializers.KryoSerializer;
39import org.slf4j.Logger;
40import org.slf4j.LoggerFactory;
41
Jonathan Hartdb3af892015-01-26 13:19:07 -080042import java.util.ArrayList;
43import java.util.Collection;
Jonathan Hartaaa56572015-01-28 21:56:35 -080044import java.util.HashMap;
45import java.util.LinkedList;
Jonathan Hartdb3af892015-01-26 13:19:07 -080046import java.util.List;
47import java.util.Map;
48import java.util.Set;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080049import java.util.Timer;
Jonathan Hartdb3af892015-01-26 13:19:07 -080050import java.util.concurrent.ConcurrentHashMap;
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -080051import java.util.concurrent.ConcurrentMap;
Jonathan Hartdb3af892015-01-26 13:19:07 -080052import java.util.concurrent.CopyOnWriteArraySet;
53import java.util.concurrent.ExecutorService;
54import java.util.concurrent.Executors;
55import java.util.concurrent.ScheduledExecutorService;
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -080056import java.util.concurrent.TimeUnit;
Jonathan Hart233a18a2015-03-02 17:24:58 -080057import java.util.concurrent.atomic.AtomicLong;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080058import java.util.function.BiFunction;
Jonathan Hartdb3af892015-01-26 13:19:07 -080059import java.util.stream.Collectors;
60
61import static com.google.common.base.Preconditions.checkNotNull;
62import static com.google.common.base.Preconditions.checkState;
63import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Brian O'Connorc6713a82015-02-24 11:55:48 -080064import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
Brian O'Connor4b2ba5f2015-02-18 20:54:00 -080065import static org.onlab.util.Tools.groupedThreads;
Jonathan Hartdb3af892015-01-26 13:19:07 -080066
67/**
68 * Distributed Map implementation which uses optimistic replication and gossip
69 * based techniques to provide an eventually consistent data store.
70 */
71public class EventuallyConsistentMapImpl<K, V>
72 implements EventuallyConsistentMap<K, V> {
73
74 private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
75
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -080076 private final ConcurrentMap<K, Timestamped<V>> items;
77 private final ConcurrentMap<K, Timestamp> removedItems;
Jonathan Hartdb3af892015-01-26 13:19:07 -080078
Jonathan Hartdb3af892015-01-26 13:19:07 -080079 private final ClusterService clusterService;
80 private final ClusterCommunicationService clusterCommunicator;
81 private final KryoSerializer serializer;
82
Jonathan Hart5ec32ba2015-02-05 13:33:58 -080083 private final ClockService<K, V> clockService;
Jonathan Hartdb3af892015-01-26 13:19:07 -080084
85 private final MessageSubject updateMessageSubject;
Jonathan Hartaaa56572015-01-28 21:56:35 -080086 private final MessageSubject antiEntropyAdvertisementSubject;
Jonathan Hartdb3af892015-01-26 13:19:07 -080087
Jonathan Hartaaa56572015-01-28 21:56:35 -080088 private final Set<EventuallyConsistentMapListener<K, V>> listeners
Jonathan Hartdb3af892015-01-26 13:19:07 -080089 = new CopyOnWriteArraySet<>();
90
91 private final ExecutorService executor;
92
93 private final ScheduledExecutorService backgroundExecutor;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080094 private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
Jonathan Hartdb3af892015-01-26 13:19:07 -080095
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080096 private ExecutorService communicationExecutor;
97 private Map<NodeId, EventAccumulator> senderPending;
Madan Jampani28726282015-02-19 11:40:23 -080098
Jonathan Hartdb3af892015-01-26 13:19:07 -080099 private volatile boolean destroyed = false;
Jonathan Hart539a6462015-01-27 17:05:43 -0800100 private static final String ERROR_DESTROYED = " map is already destroyed";
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800101 private final String destroyedMessage;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800102
Jonathan Hart4f397e82015-02-04 09:10:41 -0800103 private static final String ERROR_NULL_KEY = "Key cannot be null";
104 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
105
Jonathan Hartdb3af892015-01-26 13:19:07 -0800106 // TODO: Make these anti-entropy params configurable
107 private long initialDelaySec = 5;
108 private long periodSec = 5;
Jonathan Hartf893be82015-02-24 17:35:51 -0800109 private boolean lightweightAntiEntropy = true;
Madan Jampanie1356282015-03-10 19:05:36 -0700110 private boolean tombstonesDisabled = false;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800111
Jonathan Hart233a18a2015-03-02 17:24:58 -0800112 private static final int WINDOW_SIZE = 5;
113 private static final int HIGH_LOAD_THRESHOLD = 0;
114 private static final int LOAD_WINDOW = 2;
115 SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
116 AtomicLong operations = new AtomicLong();
117
Jonathan Hartdb3af892015-01-26 13:19:07 -0800118 /**
119 * Creates a new eventually consistent map shared amongst multiple instances.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800120 * <p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800121 * Each map is identified by a string map name. EventuallyConsistentMapImpl
122 * objects in different JVMs that use the same map name will form a
123 * distributed map across JVMs (provided the cluster service is aware of
124 * both nodes).
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800125 * </p>
126 * <p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800127 * The client is expected to provide an
128 * {@link org.onlab.util.KryoNamespace.Builder} with which all classes that
129 * will be stored in this map have been registered (including referenced
130 * classes). This serializer will be used to serialize both K and V for
131 * inter-node notifications.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800132 * </p>
133 * <p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800134 * The client must provide an {@link org.onosproject.store.impl.ClockService}
135 * which can generate timestamps for a given key. The clock service is free
136 * to generate timestamps however it wishes, however these timestamps will
137 * be used to serialize updates to the map so they must be strict enough
138 * to ensure updates are properly ordered for the use case (i.e. in some
139 * cases wallclock time will suffice, whereas in other cases logical time
140 * will be necessary).
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800141 * </p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800142 *
143 * @param mapName a String identifier for the map.
144 * @param clusterService the cluster service
145 * @param clusterCommunicator the cluster communications service
146 * @param serializerBuilder a Kryo namespace builder that can serialize
147 * both K and V
148 * @param clockService a clock service able to generate timestamps
149 * for K
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800150 * @param peerUpdateFunction function that provides a set of nodes to immediately
151 * update to when there writes to the map
Jonathan Hartdb3af892015-01-26 13:19:07 -0800152 */
153 public EventuallyConsistentMapImpl(String mapName,
154 ClusterService clusterService,
155 ClusterCommunicationService clusterCommunicator,
156 KryoNamespace.Builder serializerBuilder,
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800157 ClockService<K, V> clockService,
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800158 BiFunction<K, V, Collection<NodeId>> peerUpdateFunction) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800159 this.clusterService = checkNotNull(clusterService);
160 this.clusterCommunicator = checkNotNull(clusterCommunicator);
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800161 this.peerUpdateFunction = checkNotNull(peerUpdateFunction);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800162
163 serializer = createSerializer(checkNotNull(serializerBuilder));
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800164 destroyedMessage = mapName + ERROR_DESTROYED;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800165
166 this.clockService = checkNotNull(clockService);
167
168 items = new ConcurrentHashMap<>();
169 removedItems = new ConcurrentHashMap<>();
170
Brian O'Connorc6713a82015-02-24 11:55:48 -0800171 // should be a normal executor; it's used for receiving messages
172 //TODO make # of threads configurable
173 executor = Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d"));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800174
Brian O'Connorc6713a82015-02-24 11:55:48 -0800175 // sending executor; should be capped
176 //TODO make # of threads configurable
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800177 //TODO this probably doesn't need to be bounded anymore
178 communicationExecutor =
179 newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
180 senderPending = Maps.newConcurrentMap();
Madan Jampani28726282015-02-19 11:40:23 -0800181
Jonathan Hartdb3af892015-01-26 13:19:07 -0800182 backgroundExecutor =
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800183 newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d"));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800184
Jonathan Hartaaa56572015-01-28 21:56:35 -0800185 // start anti-entropy thread
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800186 backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
187 initialDelaySec, periodSec,
188 TimeUnit.SECONDS);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800189
Jonathan Hartdb3af892015-01-26 13:19:07 -0800190 updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
191 clusterCommunicator.addSubscriber(updateMessageSubject,
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800192 new InternalEventListener(), executor);
193
Jonathan Hartaaa56572015-01-28 21:56:35 -0800194 antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
195 clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
Madan Jampani2af244a2015-02-22 13:12:01 -0800196 new InternalAntiEntropyListener(), backgroundExecutor);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800197 }
198
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800199 /**
200 * Creates a new eventually consistent map shared amongst multiple instances.
201 * <p>
202 * Take a look at the other constructor for usage information. The only difference
203 * is that a BiFunction is provided that returns all nodes in the cluster, so
204 * all nodes will be sent write updates immediately.
205 * </p>
206 *
207 * @param mapName a String identifier for the map.
208 * @param clusterService the cluster service
209 * @param clusterCommunicator the cluster communications service
210 * @param serializerBuilder a Kryo namespace builder that can serialize
211 * both K and V
212 * @param clockService a clock service able to generate timestamps
213 * for K
214 */
215 public EventuallyConsistentMapImpl(String mapName,
216 ClusterService clusterService,
217 ClusterCommunicationService clusterCommunicator,
218 KryoNamespace.Builder serializerBuilder,
219 ClockService<K, V> clockService) {
220 this(mapName, clusterService, clusterCommunicator, serializerBuilder, clockService,
221 (key, value) -> clusterService.getNodes().stream()
222 .map(ControllerNode::id)
223 .filter(nodeId -> !nodeId.equals(clusterService.getLocalNode().id()))
224 .collect(Collectors.toList()));
225 }
226
Madan Jampanie1356282015-03-10 19:05:36 -0700227 public EventuallyConsistentMapImpl<K, V> withTombstonesDisabled(boolean status) {
228 tombstonesDisabled = status;
229 return this;
230 }
231
Jonathan Hartdb3af892015-01-26 13:19:07 -0800232 private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
233 return new KryoSerializer() {
234 @Override
235 protected void setupKryoPool() {
236 // Add the map's internal helper classes to the user-supplied serializer
237 serializerPool = builder
238 .register(WallClockTimestamp.class)
239 .register(PutEntry.class)
Jonathan Hart539a6462015-01-27 17:05:43 -0800240 .register(RemoveEntry.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800241 .register(ArrayList.class)
Jonathan Hartaaa56572015-01-28 21:56:35 -0800242 .register(AntiEntropyAdvertisement.class)
243 .register(HashMap.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800244 .build();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800245 }
246 };
247 }
248
Madan Jampanib28e4ad2015-02-19 12:31:37 -0800249 /**
250 * Sets the executor to use for broadcasting messages and returns this
251 * instance for method chaining.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800252 *
Madan Jampanib28e4ad2015-02-19 12:31:37 -0800253 * @param executor executor service
254 * @return this instance
255 */
256 public EventuallyConsistentMapImpl<K, V> withBroadcastMessageExecutor(ExecutorService executor) {
257 checkNotNull(executor, "Null executor");
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800258 communicationExecutor = executor;
Madan Jampanib28e4ad2015-02-19 12:31:37 -0800259 return this;
260 }
261
Jonathan Hartdb3af892015-01-26 13:19:07 -0800262 @Override
263 public int size() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800264 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800265 return items.size();
266 }
267
268 @Override
269 public boolean isEmpty() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800270 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800271 return items.isEmpty();
272 }
273
274 @Override
275 public boolean containsKey(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800276 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800277 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800278 return items.containsKey(key);
279 }
280
281 @Override
282 public boolean containsValue(V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800283 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800284 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800285
286 return items.values().stream()
287 .anyMatch(timestamped -> timestamped.value().equals(value));
288 }
289
290 @Override
291 public V get(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800292 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800293 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800294
295 Timestamped<V> value = items.get(key);
296 if (value != null) {
297 return value.value();
298 }
299 return null;
300 }
301
302 @Override
303 public void put(K key, V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800304 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800305 checkNotNull(key, ERROR_NULL_KEY);
306 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800307
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800308 Timestamp timestamp = clockService.getTimestamp(key, value);
309
Jonathan Hartdb3af892015-01-26 13:19:07 -0800310 if (putInternal(key, value, timestamp)) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800311 notifyPeers(new PutEntry<>(key, value, timestamp),
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800312 peerUpdateFunction.apply(key, value));
313 notifyListeners(new EventuallyConsistentMapEvent<>(
314 EventuallyConsistentMapEvent.Type.PUT, key, value));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800315 }
316 }
317
318 private boolean putInternal(K key, V value, Timestamp timestamp) {
Jonathan Hart233a18a2015-03-02 17:24:58 -0800319 counter.incrementCount();
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800320 Timestamp removed = removedItems.get(key);
Jonathan Hart403ea932015-02-20 16:23:00 -0800321 if (removed != null && removed.isNewerThan(timestamp)) {
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800322 log.debug("ecmap - removed was newer {}", value);
323 return false;
324 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800325
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800326 final MutableBoolean updated = new MutableBoolean(false);
327
328 items.compute(key, (k, existing) -> {
Jonathan Hart403ea932015-02-20 16:23:00 -0800329 if (existing != null && existing.isNewerThan(timestamp)) {
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800330 updated.setFalse();
331 return existing;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800332 } else {
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800333 updated.setTrue();
334 return new Timestamped<>(value, timestamp);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800335 }
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800336 });
337
338 boolean success = updated.booleanValue();
339 if (!success) {
340 log.debug("ecmap - existing was newer {}", value);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800341 }
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800342
343 if (success && removed != null) {
344 removedItems.remove(key, removed);
345 }
346 return success;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800347 }
348
349 @Override
350 public void remove(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800351 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800352 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800353
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800354 // TODO prevent calls here if value is important for timestamp
355 Timestamp timestamp = clockService.getTimestamp(key, null);
356
Jonathan Hartdb3af892015-01-26 13:19:07 -0800357 if (removeInternal(key, timestamp)) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800358 notifyPeers(new RemoveEntry<>(key, timestamp),
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800359 peerUpdateFunction.apply(key, null));
360 notifyListeners(new EventuallyConsistentMapEvent<>(
361 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800362 }
363 }
364
365 private boolean removeInternal(K key, Timestamp timestamp) {
Madan Jampani54d34992015-03-06 17:27:52 -0800366 if (timestamp == null) {
367 return false;
368 }
369
Jonathan Hart233a18a2015-03-02 17:24:58 -0800370 counter.incrementCount();
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800371 final MutableBoolean updated = new MutableBoolean(false);
372
373 items.compute(key, (k, existing) -> {
374 if (existing != null && existing.isNewerThan(timestamp)) {
375 updated.setFalse();
376 return existing;
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800377 } else {
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800378 updated.setTrue();
379 // remove from items map
380 return null;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800381 }
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800382 });
383
384 if (updated.isFalse()) {
385 return false;
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800386 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800387
Madan Jampanie1356282015-03-10 19:05:36 -0700388 if (!tombstonesDisabled) {
389 Timestamp removedTimestamp = removedItems.get(key);
390 if (removedTimestamp == null) {
391 return removedItems.putIfAbsent(key, timestamp) == null;
392 } else if (timestamp.isNewerThan(removedTimestamp)) {
393 return removedItems.replace(key, removedTimestamp, timestamp);
394 } else {
395 return false;
396 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800397 }
Madan Jampanie1356282015-03-10 19:05:36 -0700398
399 return updated.booleanValue();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800400 }
401
402 @Override
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800403 public void remove(K key, V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800404 checkState(!destroyed, destroyedMessage);
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800405 checkNotNull(key, ERROR_NULL_KEY);
406 checkNotNull(value, ERROR_NULL_VALUE);
407
408 Timestamp timestamp = clockService.getTimestamp(key, value);
409
410 if (removeInternal(key, timestamp)) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800411 notifyPeers(new RemoveEntry<>(key, timestamp),
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800412 peerUpdateFunction.apply(key, value));
413 notifyListeners(new EventuallyConsistentMapEvent<>(
414 EventuallyConsistentMapEvent.Type.REMOVE, key, value));
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800415 }
416 }
417
418 @Override
Jonathan Hartdb3af892015-01-26 13:19:07 -0800419 public void putAll(Map<? extends K, ? extends V> m) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800420 checkState(!destroyed, destroyedMessage);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800421 m.forEach(this::put);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800422 }
423
424 @Override
425 public void clear() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800426 checkState(!destroyed, destroyedMessage);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800427 items.forEach((key, value) -> remove(key));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800428 }
429
430 @Override
431 public Set<K> keySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800432 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800433 return items.keySet();
434 }
435
436 @Override
437 public Collection<V> values() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800438 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800439 return items.values().stream()
440 .map(Timestamped::value)
441 .collect(Collectors.toList());
442 }
443
444 @Override
445 public Set<Map.Entry<K, V>> entrySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800446 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800447
448 return items.entrySet().stream()
Jonathan Hartf9108232015-02-02 16:37:35 -0800449 .map(e -> Pair.of(e.getKey(), e.getValue().value()))
Jonathan Hartdb3af892015-01-26 13:19:07 -0800450 .collect(Collectors.toSet());
451 }
452
453 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800454 public void addListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800455 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800456
457 listeners.add(checkNotNull(listener));
458 }
459
460 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800461 public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800462 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800463
464 listeners.remove(checkNotNull(listener));
465 }
466
467 @Override
468 public void destroy() {
469 destroyed = true;
470
471 executor.shutdown();
472 backgroundExecutor.shutdown();
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800473 communicationExecutor.shutdown();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800474
Jonathan Hart584d2f32015-01-27 19:46:14 -0800475 listeners.clear();
476
Jonathan Hartdb3af892015-01-26 13:19:07 -0800477 clusterCommunicator.removeSubscriber(updateMessageSubject);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800478 clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800479 }
480
Jonathan Hartaaa56572015-01-28 21:56:35 -0800481 private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
482 for (EventuallyConsistentMapListener<K, V> listener : listeners) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800483 listener.event(event);
484 }
485 }
486
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800487 private void notifyPeers(PutEntry<K, V> event, Collection<NodeId> peers) {
488 queueUpdate(event, peers);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800489 }
490
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800491 private void notifyPeers(RemoveEntry<K, V> event, Collection<NodeId> peers) {
492 queueUpdate(event, peers);
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800493 }
494
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800495 private void queueUpdate(AbstractEntry<K, V> event, Collection<NodeId> peers) {
496 if (peers == null) {
497 // we have no friends :(
498 return;
499 }
500 peers.forEach(node ->
501 senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event)
502 );
503 }
504
505 private boolean unicastMessage(NodeId peer, MessageSubject subject, Object event) {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800506 ClusterMessage message = new ClusterMessage(
507 clusterService.getLocalNode().id(),
508 subject,
509 serializer.encode(event));
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800510 return clusterCommunicator.unicast(message, peer);
511 // Note: we had this flipped before...
512// communicationExecutor.execute(() -> clusterCommunicator.unicast(message, peer));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800513 }
514
Jonathan Hart233a18a2015-03-02 17:24:58 -0800515 private boolean underHighLoad() {
516 return counter.get(LOAD_WINDOW) > HIGH_LOAD_THRESHOLD;
517 }
518
Jonathan Hartaaa56572015-01-28 21:56:35 -0800519 private final class SendAdvertisementTask implements Runnable {
520 @Override
521 public void run() {
522 if (Thread.currentThread().isInterrupted()) {
523 log.info("Interrupted, quitting");
524 return;
525 }
526
Jonathan Hart233a18a2015-03-02 17:24:58 -0800527 if (underHighLoad()) {
528 return;
529 }
530
Jonathan Hartaaa56572015-01-28 21:56:35 -0800531 try {
532 final NodeId self = clusterService.getLocalNode().id();
533 Set<ControllerNode> nodes = clusterService.getNodes();
534
535 List<NodeId> nodeIds = nodes.stream()
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800536 .map(ControllerNode::id)
Jonathan Hartaaa56572015-01-28 21:56:35 -0800537 .collect(Collectors.toList());
538
539 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
540 log.trace("No other peers in the cluster.");
541 return;
542 }
543
544 NodeId peer;
545 do {
546 int idx = RandomUtils.nextInt(0, nodeIds.size());
547 peer = nodeIds.get(idx);
548 } while (peer.equals(self));
549
550 if (Thread.currentThread().isInterrupted()) {
551 log.info("Interrupted, quitting");
552 return;
553 }
554
555 AntiEntropyAdvertisement<K> ad = createAdvertisement();
556
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800557 if (!unicastMessage(peer, antiEntropyAdvertisementSubject, ad)) {
558 log.debug("Failed to send anti-entropy advertisement to {}", peer);
559 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800560 } catch (Exception e) {
561 // Catch all exceptions to avoid scheduled task being suppressed.
562 log.error("Exception thrown while sending advertisement", e);
563 }
564 }
565 }
566
567 private AntiEntropyAdvertisement<K> createAdvertisement() {
568 final NodeId self = clusterService.getLocalNode().id();
569
570 Map<K, Timestamp> timestamps = new HashMap<>(items.size());
571
572 items.forEach((key, value) -> timestamps.put(key, value.timestamp()));
573
574 Map<K, Timestamp> tombstones = new HashMap<>(removedItems);
575
576 return new AntiEntropyAdvertisement<>(self, timestamps, tombstones);
577 }
578
579 private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
580 List<EventuallyConsistentMapEvent<K, V>> externalEvents;
581
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800582 externalEvents = antiEntropyCheckLocalItems(ad);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800583
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800584 antiEntropyCheckLocalRemoved(ad);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800585
Jonathan Hartf893be82015-02-24 17:35:51 -0800586 if (!lightweightAntiEntropy) {
587 externalEvents.addAll(antiEntropyCheckRemoteRemoved(ad));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800588
Jonathan Hartf893be82015-02-24 17:35:51 -0800589 // if remote ad has something unknown, actively sync
590 for (K key : ad.timestamps().keySet()) {
591 if (!items.containsKey(key)) {
592 // Send the advertisement back if this peer is out-of-sync
593 final NodeId sender = ad.sender();
594 AntiEntropyAdvertisement<K> myAd = createAdvertisement();
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800595 if (!unicastMessage(sender, antiEntropyAdvertisementSubject, myAd)) {
596 log.debug("Failed to send reactive anti-entropy advertisement to {}", sender);
597 }
Jonathan Hartf893be82015-02-24 17:35:51 -0800598 break;
599 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800600 }
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800601 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800602 externalEvents.forEach(this::notifyListeners);
603 }
604
605 /**
606 * Checks if any of the remote's live items or tombstones are out of date
607 * according to our local live item list, or if our live items are out of
608 * date according to the remote's tombstone list.
609 * If the local copy is more recent, it will be pushed to the remote. If the
610 * remote has a more recent remove, we apply that to the local state.
611 *
612 * @param ad remote anti-entropy advertisement
613 * @return list of external events relating to local operations performed
614 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800615 private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
616 AntiEntropyAdvertisement<K> ad) {
617 final List<EventuallyConsistentMapEvent<K, V>> externalEvents
618 = new LinkedList<>();
619 final NodeId sender = ad.sender();
620
Jonathan Hartaaa56572015-01-28 21:56:35 -0800621 for (Map.Entry<K, Timestamped<V>> item : items.entrySet()) {
622 K key = item.getKey();
623 Timestamped<V> localValue = item.getValue();
624
625 Timestamp remoteTimestamp = ad.timestamps().get(key);
626 if (remoteTimestamp == null) {
627 remoteTimestamp = ad.tombstones().get(key);
628 }
629 if (remoteTimestamp == null || localValue
Jonathan Hart403ea932015-02-20 16:23:00 -0800630 .isNewerThan(remoteTimestamp)) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800631 // local value is more recent, push to sender
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800632 queueUpdate(new PutEntry<>(key, localValue.value(),
633 localValue.timestamp()), ImmutableList.of(sender));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800634 }
635
636 Timestamp remoteDeadTimestamp = ad.tombstones().get(key);
637 if (remoteDeadTimestamp != null &&
Jonathan Hart403ea932015-02-20 16:23:00 -0800638 remoteDeadTimestamp.isNewerThan(localValue.timestamp())) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800639 // sender has a more recent remove
640 if (removeInternal(key, remoteDeadTimestamp)) {
641 externalEvents.add(new EventuallyConsistentMapEvent<>(
642 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
643 }
644 }
645 }
646
Jonathan Hartaaa56572015-01-28 21:56:35 -0800647 return externalEvents;
648 }
649
650 /**
651 * Checks if any items in the remote live list are out of date according
652 * to our tombstone list. If we find we have a more up to date tombstone,
653 * we'll send it to the remote.
654 *
655 * @param ad remote anti-entropy advertisement
656 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800657 private void antiEntropyCheckLocalRemoved(AntiEntropyAdvertisement<K> ad) {
658 final NodeId sender = ad.sender();
659
Jonathan Hartaaa56572015-01-28 21:56:35 -0800660 for (Map.Entry<K, Timestamp> dead : removedItems.entrySet()) {
661 K key = dead.getKey();
662 Timestamp localDeadTimestamp = dead.getValue();
663
664 Timestamp remoteLiveTimestamp = ad.timestamps().get(key);
665 if (remoteLiveTimestamp != null
Jonathan Hart403ea932015-02-20 16:23:00 -0800666 && localDeadTimestamp.isNewerThan(remoteLiveTimestamp)) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800667 // sender has zombie, push remove
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800668 queueUpdate(new RemoveEntry<>(key, localDeadTimestamp), ImmutableList.of(sender));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800669 }
670 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800671 }
672
673 /**
674 * Checks if any of the local live items are out of date according to the
675 * remote's tombstone advertisements. If we find a local item is out of date,
676 * we'll apply the remove operation to the local state.
677 *
678 * @param ad remote anti-entropy advertisement
679 * @return list of external events relating to local operations performed
680 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800681 private List<EventuallyConsistentMapEvent<K, V>>
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800682 antiEntropyCheckRemoteRemoved(AntiEntropyAdvertisement<K> ad) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800683 final List<EventuallyConsistentMapEvent<K, V>> externalEvents
684 = new LinkedList<>();
685
686 for (Map.Entry<K, Timestamp> remoteDead : ad.tombstones().entrySet()) {
687 K key = remoteDead.getKey();
688 Timestamp remoteDeadTimestamp = remoteDead.getValue();
689
690 Timestamped<V> local = items.get(key);
691 Timestamp localDead = removedItems.get(key);
Jonathan Hart403ea932015-02-20 16:23:00 -0800692 if (local != null && remoteDeadTimestamp.isNewerThan(
693 local.timestamp())) {
694 // If the remote has a more recent tombstone than either our local
695 // value, then do a remove with their timestamp
Jonathan Hartaaa56572015-01-28 21:56:35 -0800696 if (removeInternal(key, remoteDeadTimestamp)) {
697 externalEvents.add(new EventuallyConsistentMapEvent<>(
698 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
699 }
Jonathan Hart403ea932015-02-20 16:23:00 -0800700 } else if (localDead != null && remoteDeadTimestamp.isNewerThan(
701 localDead)) {
702 // If the remote has a more recent tombstone than us, update ours
703 // to their timestamp
704 removeInternal(key, remoteDeadTimestamp);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800705 }
706 }
707
708 return externalEvents;
709 }
710
711 private final class InternalAntiEntropyListener
712 implements ClusterMessageHandler {
713
714 @Override
715 public void handle(ClusterMessage message) {
Jonathan Hart4fd4ebb2015-02-04 17:38:48 -0800716 log.trace("Received anti-entropy advertisement from peer: {}",
717 message.sender());
Jonathan Hartaaa56572015-01-28 21:56:35 -0800718 AntiEntropyAdvertisement<K> advertisement = serializer.decode(message.payload());
Madan Jampani2af244a2015-02-22 13:12:01 -0800719 try {
Jonathan Hart233a18a2015-03-02 17:24:58 -0800720 if (!underHighLoad()) {
721 handleAntiEntropyAdvertisement(advertisement);
722 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800723 } catch (Exception e) {
724 log.warn("Exception thrown handling advertisements", e);
725 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800726 }
727 }
728
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800729 private final class InternalEventListener implements
Jonathan Hartdb3af892015-01-26 13:19:07 -0800730 ClusterMessageHandler {
731 @Override
732 public void handle(ClusterMessage message) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800733 log.debug("Received update event from peer: {}", message.sender());
734 Collection<AbstractEntry<K, V>> events = serializer.decode(message.payload());
Jonathan Hartdb3af892015-01-26 13:19:07 -0800735
Madan Jampani2af244a2015-02-22 13:12:01 -0800736 try {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800737 // TODO clean this for loop up
738 for (AbstractEntry<K, V> entry : events) {
739 final K key = entry.key();
740 final V value;
741 final Timestamp timestamp = entry.timestamp();
742 final EventuallyConsistentMapEvent.Type type;
743 if (entry instanceof PutEntry) {
744 PutEntry<K, V> putEntry = (PutEntry<K, V>) entry;
745 value = putEntry.value();
746 type = EventuallyConsistentMapEvent.Type.PUT;
747 } else if (entry instanceof RemoveEntry) {
748 type = EventuallyConsistentMapEvent.Type.REMOVE;
749 value = null;
750 } else {
751 throw new IllegalStateException("Unknown entry type " + entry.getClass());
752 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800753
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800754 boolean success;
755 switch (type) {
756 case PUT:
757 success = putInternal(key, value, timestamp);
758 break;
759 case REMOVE:
760 success = removeInternal(key, timestamp);
761 break;
762 default:
763 success = false;
764 }
765 if (success) {
766 notifyListeners(new EventuallyConsistentMapEvent<>(type, key, value));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800767 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800768 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800769 } catch (Exception e) {
770 log.warn("Exception thrown handling put", e);
771 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800772 }
773 }
774
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800775 // TODO pull this into the class if this gets pulled out...
776 private static final int DEFAULT_MAX_EVENTS = 1000;
777 private static final int DEFAULT_MAX_IDLE_MS = 10;
778 private static final int DEFAULT_MAX_BATCH_MS = 50;
779 private static final Timer TIMER = new Timer("onos-ecm-sender-events");
Jonathan Hartdb3af892015-01-26 13:19:07 -0800780
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800781 private final class EventAccumulator extends AbstractAccumulator<AbstractEntry<K, V>> {
782
783 private final NodeId peer;
784
785 private EventAccumulator(NodeId peer) {
786 super(TIMER, DEFAULT_MAX_EVENTS, DEFAULT_MAX_BATCH_MS, DEFAULT_MAX_IDLE_MS);
787 this.peer = peer;
788 }
789
790 @Override
791 public void processItems(List<AbstractEntry<K, V>> items) {
792 Map<K, AbstractEntry<K, V>> map = Maps.newHashMap();
793 items.forEach(item -> map.compute(item.key(), (key, oldValue) ->
794 oldValue == null || item.compareTo(oldValue) > 0 ? item : oldValue
795 )
796 );
797 communicationExecutor.submit(() -> {
798 try {
799 unicastMessage(peer, updateMessageSubject, Lists.newArrayList(map.values()));
800 } catch (Exception e) {
801 log.warn("broadcast error", e);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800802 }
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800803 });
Jonathan Hartdb3af892015-01-26 13:19:07 -0800804 }
805 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800806}