blob: 8fe012064966cfe28742f9a0ed5977b80316dd94 [file] [log] [blame]
Jonathan Hartdb3af892015-01-26 13:19:07 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Jonathan Hart77bdd262015-02-03 09:07:48 -080016package org.onosproject.store.ecmap;
Jonathan Hartdb3af892015-01-26 13:19:07 -080017
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080018import com.google.common.collect.ImmutableList;
19import com.google.common.collect.Lists;
20import com.google.common.collect.Maps;
Jonathan Hartaaa56572015-01-28 21:56:35 -080021import org.apache.commons.lang3.RandomUtils;
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -080022import org.apache.commons.lang3.mutable.MutableBoolean;
Jonathan Hartf9108232015-02-02 16:37:35 -080023import org.apache.commons.lang3.tuple.Pair;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080024import org.onlab.util.AbstractAccumulator;
Jonathan Hartdb3af892015-01-26 13:19:07 -080025import org.onlab.util.KryoNamespace;
Jonathan Hart233a18a2015-03-02 17:24:58 -080026import org.onlab.util.SlidingWindowCounter;
Jonathan Hartdb3af892015-01-26 13:19:07 -080027import org.onosproject.cluster.ClusterService;
Jonathan Hartaaa56572015-01-28 21:56:35 -080028import org.onosproject.cluster.ControllerNode;
Jonathan Hartdb3af892015-01-26 13:19:07 -080029import org.onosproject.cluster.NodeId;
30import org.onosproject.store.Timestamp;
31import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
32import org.onosproject.store.cluster.messaging.ClusterMessage;
33import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
34import org.onosproject.store.cluster.messaging.MessageSubject;
Jonathan Hart77bdd262015-02-03 09:07:48 -080035import org.onosproject.store.impl.ClockService;
36import org.onosproject.store.impl.Timestamped;
37import org.onosproject.store.impl.WallClockTimestamp;
Jonathan Hartdb3af892015-01-26 13:19:07 -080038import org.onosproject.store.serializers.KryoSerializer;
39import org.slf4j.Logger;
40import org.slf4j.LoggerFactory;
41
Jonathan Hartdb3af892015-01-26 13:19:07 -080042import java.util.ArrayList;
43import java.util.Collection;
Jonathan Hartaaa56572015-01-28 21:56:35 -080044import java.util.HashMap;
45import java.util.LinkedList;
Jonathan Hartdb3af892015-01-26 13:19:07 -080046import java.util.List;
47import java.util.Map;
48import java.util.Set;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080049import java.util.Timer;
Jonathan Hartdb3af892015-01-26 13:19:07 -080050import java.util.concurrent.ConcurrentHashMap;
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -080051import java.util.concurrent.ConcurrentMap;
Jonathan Hartdb3af892015-01-26 13:19:07 -080052import java.util.concurrent.CopyOnWriteArraySet;
53import java.util.concurrent.ExecutorService;
54import java.util.concurrent.Executors;
55import java.util.concurrent.ScheduledExecutorService;
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -080056import java.util.concurrent.TimeUnit;
Jonathan Hart233a18a2015-03-02 17:24:58 -080057import java.util.concurrent.atomic.AtomicLong;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080058import java.util.function.BiFunction;
Jonathan Hartdb3af892015-01-26 13:19:07 -080059import java.util.stream.Collectors;
60
61import static com.google.common.base.Preconditions.checkNotNull;
62import static com.google.common.base.Preconditions.checkState;
63import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Brian O'Connorc6713a82015-02-24 11:55:48 -080064import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
Brian O'Connor4b2ba5f2015-02-18 20:54:00 -080065import static org.onlab.util.Tools.groupedThreads;
Jonathan Hartdb3af892015-01-26 13:19:07 -080066
67/**
68 * Distributed Map implementation which uses optimistic replication and gossip
69 * based techniques to provide an eventually consistent data store.
70 */
71public class EventuallyConsistentMapImpl<K, V>
72 implements EventuallyConsistentMap<K, V> {
73
74 private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
75
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -080076 private final ConcurrentMap<K, Timestamped<V>> items;
77 private final ConcurrentMap<K, Timestamp> removedItems;
Jonathan Hartdb3af892015-01-26 13:19:07 -080078
Jonathan Hartdb3af892015-01-26 13:19:07 -080079 private final ClusterService clusterService;
80 private final ClusterCommunicationService clusterCommunicator;
81 private final KryoSerializer serializer;
82
Jonathan Hart5ec32ba2015-02-05 13:33:58 -080083 private final ClockService<K, V> clockService;
Jonathan Hartdb3af892015-01-26 13:19:07 -080084
85 private final MessageSubject updateMessageSubject;
Jonathan Hartaaa56572015-01-28 21:56:35 -080086 private final MessageSubject antiEntropyAdvertisementSubject;
Jonathan Hartdb3af892015-01-26 13:19:07 -080087
Jonathan Hartaaa56572015-01-28 21:56:35 -080088 private final Set<EventuallyConsistentMapListener<K, V>> listeners
Jonathan Hartdb3af892015-01-26 13:19:07 -080089 = new CopyOnWriteArraySet<>();
90
91 private final ExecutorService executor;
92
93 private final ScheduledExecutorService backgroundExecutor;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080094 private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
Jonathan Hartdb3af892015-01-26 13:19:07 -080095
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080096 private ExecutorService communicationExecutor;
97 private Map<NodeId, EventAccumulator> senderPending;
Madan Jampani28726282015-02-19 11:40:23 -080098
Jonathan Hartdb3af892015-01-26 13:19:07 -080099 private volatile boolean destroyed = false;
Jonathan Hart539a6462015-01-27 17:05:43 -0800100 private static final String ERROR_DESTROYED = " map is already destroyed";
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800101 private final String destroyedMessage;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800102
Jonathan Hart4f397e82015-02-04 09:10:41 -0800103 private static final String ERROR_NULL_KEY = "Key cannot be null";
104 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
105
Jonathan Hartdb3af892015-01-26 13:19:07 -0800106 // TODO: Make these anti-entropy params configurable
107 private long initialDelaySec = 5;
108 private long periodSec = 5;
Jonathan Hartf893be82015-02-24 17:35:51 -0800109 private boolean lightweightAntiEntropy = true;
Madan Jampanie1356282015-03-10 19:05:36 -0700110 private boolean tombstonesDisabled = false;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800111
Jonathan Hart233a18a2015-03-02 17:24:58 -0800112 private static final int WINDOW_SIZE = 5;
113 private static final int HIGH_LOAD_THRESHOLD = 0;
114 private static final int LOAD_WINDOW = 2;
115 SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
116 AtomicLong operations = new AtomicLong();
117
Jonathan Hartdb3af892015-01-26 13:19:07 -0800118 /**
119 * Creates a new eventually consistent map shared amongst multiple instances.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800120 * <p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800121 * Each map is identified by a string map name. EventuallyConsistentMapImpl
122 * objects in different JVMs that use the same map name will form a
123 * distributed map across JVMs (provided the cluster service is aware of
124 * both nodes).
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800125 * </p>
126 * <p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800127 * The client is expected to provide an
128 * {@link org.onlab.util.KryoNamespace.Builder} with which all classes that
129 * will be stored in this map have been registered (including referenced
130 * classes). This serializer will be used to serialize both K and V for
131 * inter-node notifications.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800132 * </p>
133 * <p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800134 * The client must provide an {@link org.onosproject.store.impl.ClockService}
135 * which can generate timestamps for a given key. The clock service is free
136 * to generate timestamps however it wishes, however these timestamps will
137 * be used to serialize updates to the map so they must be strict enough
138 * to ensure updates are properly ordered for the use case (i.e. in some
139 * cases wallclock time will suffice, whereas in other cases logical time
140 * will be necessary).
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800141 * </p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800142 *
143 * @param mapName a String identifier for the map.
144 * @param clusterService the cluster service
145 * @param clusterCommunicator the cluster communications service
146 * @param serializerBuilder a Kryo namespace builder that can serialize
147 * both K and V
148 * @param clockService a clock service able to generate timestamps
149 * for K
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800150 * @param peerUpdateFunction function that provides a set of nodes to immediately
151 * update to when there writes to the map
Jonathan Hartdb3af892015-01-26 13:19:07 -0800152 */
153 public EventuallyConsistentMapImpl(String mapName,
154 ClusterService clusterService,
155 ClusterCommunicationService clusterCommunicator,
156 KryoNamespace.Builder serializerBuilder,
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800157 ClockService<K, V> clockService,
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800158 BiFunction<K, V, Collection<NodeId>> peerUpdateFunction) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800159 this.clusterService = checkNotNull(clusterService);
160 this.clusterCommunicator = checkNotNull(clusterCommunicator);
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800161 this.peerUpdateFunction = checkNotNull(peerUpdateFunction);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800162
163 serializer = createSerializer(checkNotNull(serializerBuilder));
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800164 destroyedMessage = mapName + ERROR_DESTROYED;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800165
166 this.clockService = checkNotNull(clockService);
167
168 items = new ConcurrentHashMap<>();
169 removedItems = new ConcurrentHashMap<>();
170
Brian O'Connorc6713a82015-02-24 11:55:48 -0800171 // should be a normal executor; it's used for receiving messages
172 //TODO make # of threads configurable
173 executor = Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d"));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800174
Brian O'Connorc6713a82015-02-24 11:55:48 -0800175 // sending executor; should be capped
176 //TODO make # of threads configurable
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800177 //TODO this probably doesn't need to be bounded anymore
178 communicationExecutor =
179 newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
180 senderPending = Maps.newConcurrentMap();
Madan Jampani28726282015-02-19 11:40:23 -0800181
Jonathan Hartdb3af892015-01-26 13:19:07 -0800182 backgroundExecutor =
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800183 newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d"));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800184
Jonathan Hartaaa56572015-01-28 21:56:35 -0800185 // start anti-entropy thread
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800186 backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
187 initialDelaySec, periodSec,
188 TimeUnit.SECONDS);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800189
Jonathan Hartdb3af892015-01-26 13:19:07 -0800190 updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
191 clusterCommunicator.addSubscriber(updateMessageSubject,
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800192 new InternalEventListener(), executor);
193
Jonathan Hartaaa56572015-01-28 21:56:35 -0800194 antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
195 clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
Madan Jampani2af244a2015-02-22 13:12:01 -0800196 new InternalAntiEntropyListener(), backgroundExecutor);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800197 }
198
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800199 /**
200 * Creates a new eventually consistent map shared amongst multiple instances.
201 * <p>
202 * Take a look at the other constructor for usage information. The only difference
203 * is that a BiFunction is provided that returns all nodes in the cluster, so
204 * all nodes will be sent write updates immediately.
205 * </p>
206 *
207 * @param mapName a String identifier for the map.
208 * @param clusterService the cluster service
209 * @param clusterCommunicator the cluster communications service
210 * @param serializerBuilder a Kryo namespace builder that can serialize
211 * both K and V
212 * @param clockService a clock service able to generate timestamps
213 * for K
214 */
215 public EventuallyConsistentMapImpl(String mapName,
216 ClusterService clusterService,
217 ClusterCommunicationService clusterCommunicator,
218 KryoNamespace.Builder serializerBuilder,
219 ClockService<K, V> clockService) {
220 this(mapName, clusterService, clusterCommunicator, serializerBuilder, clockService,
221 (key, value) -> clusterService.getNodes().stream()
222 .map(ControllerNode::id)
223 .filter(nodeId -> !nodeId.equals(clusterService.getLocalNode().id()))
224 .collect(Collectors.toList()));
225 }
226
Madan Jampanie1356282015-03-10 19:05:36 -0700227 public EventuallyConsistentMapImpl<K, V> withTombstonesDisabled(boolean status) {
228 tombstonesDisabled = status;
229 return this;
230 }
231
Jonathan Hartdb3af892015-01-26 13:19:07 -0800232 private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
233 return new KryoSerializer() {
234 @Override
235 protected void setupKryoPool() {
236 // Add the map's internal helper classes to the user-supplied serializer
237 serializerPool = builder
238 .register(WallClockTimestamp.class)
239 .register(PutEntry.class)
Jonathan Hart539a6462015-01-27 17:05:43 -0800240 .register(RemoveEntry.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800241 .register(ArrayList.class)
Jonathan Hartaaa56572015-01-28 21:56:35 -0800242 .register(AntiEntropyAdvertisement.class)
243 .register(HashMap.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800244 .build();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800245 }
246 };
247 }
248
Madan Jampanib28e4ad2015-02-19 12:31:37 -0800249 /**
250 * Sets the executor to use for broadcasting messages and returns this
251 * instance for method chaining.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800252 *
Madan Jampanib28e4ad2015-02-19 12:31:37 -0800253 * @param executor executor service
254 * @return this instance
255 */
256 public EventuallyConsistentMapImpl<K, V> withBroadcastMessageExecutor(ExecutorService executor) {
257 checkNotNull(executor, "Null executor");
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800258 communicationExecutor = executor;
Madan Jampanib28e4ad2015-02-19 12:31:37 -0800259 return this;
260 }
261
Jonathan Hartdb3af892015-01-26 13:19:07 -0800262 @Override
263 public int size() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800264 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800265 return items.size();
266 }
267
268 @Override
269 public boolean isEmpty() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800270 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800271 return items.isEmpty();
272 }
273
274 @Override
275 public boolean containsKey(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800276 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800277 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800278 return items.containsKey(key);
279 }
280
281 @Override
282 public boolean containsValue(V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800283 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800284 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800285
286 return items.values().stream()
287 .anyMatch(timestamped -> timestamped.value().equals(value));
288 }
289
290 @Override
291 public V get(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800292 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800293 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800294
295 Timestamped<V> value = items.get(key);
296 if (value != null) {
297 return value.value();
298 }
299 return null;
300 }
301
302 @Override
303 public void put(K key, V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800304 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800305 checkNotNull(key, ERROR_NULL_KEY);
306 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800307
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800308 Timestamp timestamp = clockService.getTimestamp(key, value);
309
Jonathan Hartdb3af892015-01-26 13:19:07 -0800310 if (putInternal(key, value, timestamp)) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800311 notifyPeers(new PutEntry<>(key, value, timestamp),
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800312 peerUpdateFunction.apply(key, value));
313 notifyListeners(new EventuallyConsistentMapEvent<>(
314 EventuallyConsistentMapEvent.Type.PUT, key, value));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800315 }
316 }
317
318 private boolean putInternal(K key, V value, Timestamp timestamp) {
Jonathan Hart233a18a2015-03-02 17:24:58 -0800319 counter.incrementCount();
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800320 Timestamp removed = removedItems.get(key);
Jonathan Hart403ea932015-02-20 16:23:00 -0800321 if (removed != null && removed.isNewerThan(timestamp)) {
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800322 log.debug("ecmap - removed was newer {}", value);
323 return false;
324 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800325
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800326 final MutableBoolean updated = new MutableBoolean(false);
327
328 items.compute(key, (k, existing) -> {
Jonathan Hart403ea932015-02-20 16:23:00 -0800329 if (existing != null && existing.isNewerThan(timestamp)) {
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800330 updated.setFalse();
331 return existing;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800332 } else {
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800333 updated.setTrue();
334 return new Timestamped<>(value, timestamp);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800335 }
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800336 });
337
338 boolean success = updated.booleanValue();
339 if (!success) {
340 log.debug("ecmap - existing was newer {}", value);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800341 }
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800342
343 if (success && removed != null) {
344 removedItems.remove(key, removed);
345 }
346 return success;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800347 }
348
349 @Override
350 public void remove(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800351 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800352 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800353
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800354 // TODO prevent calls here if value is important for timestamp
355 Timestamp timestamp = clockService.getTimestamp(key, null);
356
Jonathan Hartdb3af892015-01-26 13:19:07 -0800357 if (removeInternal(key, timestamp)) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800358 notifyPeers(new RemoveEntry<>(key, timestamp),
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800359 peerUpdateFunction.apply(key, null));
360 notifyListeners(new EventuallyConsistentMapEvent<>(
361 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800362 }
363 }
364
365 private boolean removeInternal(K key, Timestamp timestamp) {
Madan Jampani54d34992015-03-06 17:27:52 -0800366 if (timestamp == null) {
367 return false;
368 }
369
Jonathan Hart233a18a2015-03-02 17:24:58 -0800370 counter.incrementCount();
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800371 final MutableBoolean updated = new MutableBoolean(false);
372
373 items.compute(key, (k, existing) -> {
374 if (existing != null && existing.isNewerThan(timestamp)) {
375 updated.setFalse();
376 return existing;
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800377 } else {
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800378 updated.setTrue();
379 // remove from items map
380 return null;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800381 }
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800382 });
383
384 if (updated.isFalse()) {
385 return false;
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800386 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800387
Madan Jampanie1356282015-03-10 19:05:36 -0700388 if (!tombstonesDisabled) {
389 Timestamp removedTimestamp = removedItems.get(key);
390 if (removedTimestamp == null) {
391 return removedItems.putIfAbsent(key, timestamp) == null;
392 } else if (timestamp.isNewerThan(removedTimestamp)) {
393 return removedItems.replace(key, removedTimestamp, timestamp);
394 } else {
395 return false;
396 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800397 }
Madan Jampanie1356282015-03-10 19:05:36 -0700398
399 return updated.booleanValue();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800400 }
401
402 @Override
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800403 public void remove(K key, V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800404 checkState(!destroyed, destroyedMessage);
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800405 checkNotNull(key, ERROR_NULL_KEY);
406 checkNotNull(value, ERROR_NULL_VALUE);
407
408 Timestamp timestamp = clockService.getTimestamp(key, value);
409
410 if (removeInternal(key, timestamp)) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800411 notifyPeers(new RemoveEntry<>(key, timestamp),
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800412 peerUpdateFunction.apply(key, value));
413 notifyListeners(new EventuallyConsistentMapEvent<>(
414 EventuallyConsistentMapEvent.Type.REMOVE, key, value));
Ray Milkey8c6d00e2015-03-13 14:14:34 -0700415 } else {
416 // TODO remove this extra call when ONOS-1207 is resolved
417 Timestamped<V> latest = (Timestamped) items.get(key);
418 log.info("Remove of intent {} failed; request time {} vs. latest time {}",
419 key, timestamp, latest.timestamp());
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800420 }
421 }
422
423 @Override
Jonathan Hartdb3af892015-01-26 13:19:07 -0800424 public void putAll(Map<? extends K, ? extends V> m) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800425 checkState(!destroyed, destroyedMessage);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800426 m.forEach(this::put);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800427 }
428
429 @Override
430 public void clear() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800431 checkState(!destroyed, destroyedMessage);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800432 items.forEach((key, value) -> remove(key));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800433 }
434
435 @Override
436 public Set<K> keySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800437 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800438 return items.keySet();
439 }
440
441 @Override
442 public Collection<V> values() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800443 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800444 return items.values().stream()
445 .map(Timestamped::value)
446 .collect(Collectors.toList());
447 }
448
449 @Override
450 public Set<Map.Entry<K, V>> entrySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800451 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800452
453 return items.entrySet().stream()
Jonathan Hartf9108232015-02-02 16:37:35 -0800454 .map(e -> Pair.of(e.getKey(), e.getValue().value()))
Jonathan Hartdb3af892015-01-26 13:19:07 -0800455 .collect(Collectors.toSet());
456 }
457
458 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800459 public void addListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800460 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800461
462 listeners.add(checkNotNull(listener));
463 }
464
465 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800466 public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800467 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800468
469 listeners.remove(checkNotNull(listener));
470 }
471
472 @Override
473 public void destroy() {
474 destroyed = true;
475
476 executor.shutdown();
477 backgroundExecutor.shutdown();
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800478 communicationExecutor.shutdown();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800479
Jonathan Hart584d2f32015-01-27 19:46:14 -0800480 listeners.clear();
481
Jonathan Hartdb3af892015-01-26 13:19:07 -0800482 clusterCommunicator.removeSubscriber(updateMessageSubject);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800483 clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800484 }
485
Jonathan Hartaaa56572015-01-28 21:56:35 -0800486 private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
487 for (EventuallyConsistentMapListener<K, V> listener : listeners) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800488 listener.event(event);
489 }
490 }
491
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800492 private void notifyPeers(PutEntry<K, V> event, Collection<NodeId> peers) {
493 queueUpdate(event, peers);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800494 }
495
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800496 private void notifyPeers(RemoveEntry<K, V> event, Collection<NodeId> peers) {
497 queueUpdate(event, peers);
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800498 }
499
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800500 private void queueUpdate(AbstractEntry<K, V> event, Collection<NodeId> peers) {
501 if (peers == null) {
502 // we have no friends :(
503 return;
504 }
505 peers.forEach(node ->
506 senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event)
507 );
508 }
509
510 private boolean unicastMessage(NodeId peer, MessageSubject subject, Object event) {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800511 ClusterMessage message = new ClusterMessage(
512 clusterService.getLocalNode().id(),
513 subject,
514 serializer.encode(event));
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800515 return clusterCommunicator.unicast(message, peer);
516 // Note: we had this flipped before...
517// communicationExecutor.execute(() -> clusterCommunicator.unicast(message, peer));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800518 }
519
Jonathan Hart233a18a2015-03-02 17:24:58 -0800520 private boolean underHighLoad() {
521 return counter.get(LOAD_WINDOW) > HIGH_LOAD_THRESHOLD;
522 }
523
Jonathan Hartaaa56572015-01-28 21:56:35 -0800524 private final class SendAdvertisementTask implements Runnable {
525 @Override
526 public void run() {
527 if (Thread.currentThread().isInterrupted()) {
528 log.info("Interrupted, quitting");
529 return;
530 }
531
Jonathan Hart233a18a2015-03-02 17:24:58 -0800532 if (underHighLoad()) {
533 return;
534 }
535
Jonathan Hartaaa56572015-01-28 21:56:35 -0800536 try {
537 final NodeId self = clusterService.getLocalNode().id();
538 Set<ControllerNode> nodes = clusterService.getNodes();
539
540 List<NodeId> nodeIds = nodes.stream()
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800541 .map(ControllerNode::id)
Jonathan Hartaaa56572015-01-28 21:56:35 -0800542 .collect(Collectors.toList());
543
544 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
545 log.trace("No other peers in the cluster.");
546 return;
547 }
548
549 NodeId peer;
550 do {
551 int idx = RandomUtils.nextInt(0, nodeIds.size());
552 peer = nodeIds.get(idx);
553 } while (peer.equals(self));
554
555 if (Thread.currentThread().isInterrupted()) {
556 log.info("Interrupted, quitting");
557 return;
558 }
559
560 AntiEntropyAdvertisement<K> ad = createAdvertisement();
561
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800562 if (!unicastMessage(peer, antiEntropyAdvertisementSubject, ad)) {
563 log.debug("Failed to send anti-entropy advertisement to {}", peer);
564 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800565 } catch (Exception e) {
566 // Catch all exceptions to avoid scheduled task being suppressed.
567 log.error("Exception thrown while sending advertisement", e);
568 }
569 }
570 }
571
572 private AntiEntropyAdvertisement<K> createAdvertisement() {
573 final NodeId self = clusterService.getLocalNode().id();
574
575 Map<K, Timestamp> timestamps = new HashMap<>(items.size());
576
577 items.forEach((key, value) -> timestamps.put(key, value.timestamp()));
578
579 Map<K, Timestamp> tombstones = new HashMap<>(removedItems);
580
581 return new AntiEntropyAdvertisement<>(self, timestamps, tombstones);
582 }
583
584 private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
585 List<EventuallyConsistentMapEvent<K, V>> externalEvents;
586
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800587 externalEvents = antiEntropyCheckLocalItems(ad);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800588
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800589 antiEntropyCheckLocalRemoved(ad);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800590
Jonathan Hartf893be82015-02-24 17:35:51 -0800591 if (!lightweightAntiEntropy) {
592 externalEvents.addAll(antiEntropyCheckRemoteRemoved(ad));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800593
Jonathan Hartf893be82015-02-24 17:35:51 -0800594 // if remote ad has something unknown, actively sync
595 for (K key : ad.timestamps().keySet()) {
596 if (!items.containsKey(key)) {
597 // Send the advertisement back if this peer is out-of-sync
598 final NodeId sender = ad.sender();
599 AntiEntropyAdvertisement<K> myAd = createAdvertisement();
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800600 if (!unicastMessage(sender, antiEntropyAdvertisementSubject, myAd)) {
601 log.debug("Failed to send reactive anti-entropy advertisement to {}", sender);
602 }
Jonathan Hartf893be82015-02-24 17:35:51 -0800603 break;
604 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800605 }
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800606 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800607 externalEvents.forEach(this::notifyListeners);
608 }
609
610 /**
611 * Checks if any of the remote's live items or tombstones are out of date
612 * according to our local live item list, or if our live items are out of
613 * date according to the remote's tombstone list.
614 * If the local copy is more recent, it will be pushed to the remote. If the
615 * remote has a more recent remove, we apply that to the local state.
616 *
617 * @param ad remote anti-entropy advertisement
618 * @return list of external events relating to local operations performed
619 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800620 private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
621 AntiEntropyAdvertisement<K> ad) {
622 final List<EventuallyConsistentMapEvent<K, V>> externalEvents
623 = new LinkedList<>();
624 final NodeId sender = ad.sender();
625
Jonathan Hartaaa56572015-01-28 21:56:35 -0800626 for (Map.Entry<K, Timestamped<V>> item : items.entrySet()) {
627 K key = item.getKey();
628 Timestamped<V> localValue = item.getValue();
629
630 Timestamp remoteTimestamp = ad.timestamps().get(key);
631 if (remoteTimestamp == null) {
632 remoteTimestamp = ad.tombstones().get(key);
633 }
634 if (remoteTimestamp == null || localValue
Jonathan Hart403ea932015-02-20 16:23:00 -0800635 .isNewerThan(remoteTimestamp)) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800636 // local value is more recent, push to sender
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800637 queueUpdate(new PutEntry<>(key, localValue.value(),
638 localValue.timestamp()), ImmutableList.of(sender));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800639 }
640
641 Timestamp remoteDeadTimestamp = ad.tombstones().get(key);
642 if (remoteDeadTimestamp != null &&
Jonathan Hart403ea932015-02-20 16:23:00 -0800643 remoteDeadTimestamp.isNewerThan(localValue.timestamp())) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800644 // sender has a more recent remove
645 if (removeInternal(key, remoteDeadTimestamp)) {
646 externalEvents.add(new EventuallyConsistentMapEvent<>(
647 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
648 }
649 }
650 }
651
Jonathan Hartaaa56572015-01-28 21:56:35 -0800652 return externalEvents;
653 }
654
655 /**
656 * Checks if any items in the remote live list are out of date according
657 * to our tombstone list. If we find we have a more up to date tombstone,
658 * we'll send it to the remote.
659 *
660 * @param ad remote anti-entropy advertisement
661 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800662 private void antiEntropyCheckLocalRemoved(AntiEntropyAdvertisement<K> ad) {
663 final NodeId sender = ad.sender();
664
Jonathan Hartaaa56572015-01-28 21:56:35 -0800665 for (Map.Entry<K, Timestamp> dead : removedItems.entrySet()) {
666 K key = dead.getKey();
667 Timestamp localDeadTimestamp = dead.getValue();
668
669 Timestamp remoteLiveTimestamp = ad.timestamps().get(key);
670 if (remoteLiveTimestamp != null
Jonathan Hart403ea932015-02-20 16:23:00 -0800671 && localDeadTimestamp.isNewerThan(remoteLiveTimestamp)) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800672 // sender has zombie, push remove
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800673 queueUpdate(new RemoveEntry<>(key, localDeadTimestamp), ImmutableList.of(sender));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800674 }
675 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800676 }
677
678 /**
679 * Checks if any of the local live items are out of date according to the
680 * remote's tombstone advertisements. If we find a local item is out of date,
681 * we'll apply the remove operation to the local state.
682 *
683 * @param ad remote anti-entropy advertisement
684 * @return list of external events relating to local operations performed
685 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800686 private List<EventuallyConsistentMapEvent<K, V>>
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800687 antiEntropyCheckRemoteRemoved(AntiEntropyAdvertisement<K> ad) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800688 final List<EventuallyConsistentMapEvent<K, V>> externalEvents
689 = new LinkedList<>();
690
691 for (Map.Entry<K, Timestamp> remoteDead : ad.tombstones().entrySet()) {
692 K key = remoteDead.getKey();
693 Timestamp remoteDeadTimestamp = remoteDead.getValue();
694
695 Timestamped<V> local = items.get(key);
696 Timestamp localDead = removedItems.get(key);
Jonathan Hart403ea932015-02-20 16:23:00 -0800697 if (local != null && remoteDeadTimestamp.isNewerThan(
698 local.timestamp())) {
699 // If the remote has a more recent tombstone than either our local
700 // value, then do a remove with their timestamp
Jonathan Hartaaa56572015-01-28 21:56:35 -0800701 if (removeInternal(key, remoteDeadTimestamp)) {
702 externalEvents.add(new EventuallyConsistentMapEvent<>(
703 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
704 }
Jonathan Hart403ea932015-02-20 16:23:00 -0800705 } else if (localDead != null && remoteDeadTimestamp.isNewerThan(
706 localDead)) {
707 // If the remote has a more recent tombstone than us, update ours
708 // to their timestamp
709 removeInternal(key, remoteDeadTimestamp);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800710 }
711 }
712
713 return externalEvents;
714 }
715
716 private final class InternalAntiEntropyListener
717 implements ClusterMessageHandler {
718
719 @Override
720 public void handle(ClusterMessage message) {
Jonathan Hart4fd4ebb2015-02-04 17:38:48 -0800721 log.trace("Received anti-entropy advertisement from peer: {}",
722 message.sender());
Jonathan Hartaaa56572015-01-28 21:56:35 -0800723 AntiEntropyAdvertisement<K> advertisement = serializer.decode(message.payload());
Madan Jampani2af244a2015-02-22 13:12:01 -0800724 try {
Jonathan Hart233a18a2015-03-02 17:24:58 -0800725 if (!underHighLoad()) {
726 handleAntiEntropyAdvertisement(advertisement);
727 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800728 } catch (Exception e) {
729 log.warn("Exception thrown handling advertisements", e);
730 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800731 }
732 }
733
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800734 private final class InternalEventListener implements
Jonathan Hartdb3af892015-01-26 13:19:07 -0800735 ClusterMessageHandler {
736 @Override
737 public void handle(ClusterMessage message) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800738 log.debug("Received update event from peer: {}", message.sender());
739 Collection<AbstractEntry<K, V>> events = serializer.decode(message.payload());
Jonathan Hartdb3af892015-01-26 13:19:07 -0800740
Madan Jampani2af244a2015-02-22 13:12:01 -0800741 try {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800742 // TODO clean this for loop up
743 for (AbstractEntry<K, V> entry : events) {
744 final K key = entry.key();
745 final V value;
746 final Timestamp timestamp = entry.timestamp();
747 final EventuallyConsistentMapEvent.Type type;
748 if (entry instanceof PutEntry) {
749 PutEntry<K, V> putEntry = (PutEntry<K, V>) entry;
750 value = putEntry.value();
751 type = EventuallyConsistentMapEvent.Type.PUT;
752 } else if (entry instanceof RemoveEntry) {
753 type = EventuallyConsistentMapEvent.Type.REMOVE;
754 value = null;
755 } else {
756 throw new IllegalStateException("Unknown entry type " + entry.getClass());
757 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800758
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800759 boolean success;
760 switch (type) {
761 case PUT:
762 success = putInternal(key, value, timestamp);
763 break;
764 case REMOVE:
765 success = removeInternal(key, timestamp);
766 break;
767 default:
768 success = false;
769 }
770 if (success) {
771 notifyListeners(new EventuallyConsistentMapEvent<>(type, key, value));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800772 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800773 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800774 } catch (Exception e) {
775 log.warn("Exception thrown handling put", e);
776 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800777 }
778 }
779
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800780 // TODO pull this into the class if this gets pulled out...
781 private static final int DEFAULT_MAX_EVENTS = 1000;
782 private static final int DEFAULT_MAX_IDLE_MS = 10;
783 private static final int DEFAULT_MAX_BATCH_MS = 50;
784 private static final Timer TIMER = new Timer("onos-ecm-sender-events");
Jonathan Hartdb3af892015-01-26 13:19:07 -0800785
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800786 private final class EventAccumulator extends AbstractAccumulator<AbstractEntry<K, V>> {
787
788 private final NodeId peer;
789
790 private EventAccumulator(NodeId peer) {
791 super(TIMER, DEFAULT_MAX_EVENTS, DEFAULT_MAX_BATCH_MS, DEFAULT_MAX_IDLE_MS);
792 this.peer = peer;
793 }
794
795 @Override
796 public void processItems(List<AbstractEntry<K, V>> items) {
797 Map<K, AbstractEntry<K, V>> map = Maps.newHashMap();
798 items.forEach(item -> map.compute(item.key(), (key, oldValue) ->
799 oldValue == null || item.compareTo(oldValue) > 0 ? item : oldValue
800 )
801 );
802 communicationExecutor.submit(() -> {
803 try {
804 unicastMessage(peer, updateMessageSubject, Lists.newArrayList(map.values()));
805 } catch (Exception e) {
806 log.warn("broadcast error", e);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800807 }
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800808 });
Jonathan Hartdb3af892015-01-26 13:19:07 -0800809 }
810 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800811}