blob: 62d145dc8de03719b79460b8717066160cf380ed [file] [log] [blame]
Jonathan Hartdb3af892015-01-26 13:19:07 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Jonathan Hart77bdd262015-02-03 09:07:48 -080016package org.onosproject.store.ecmap;
Jonathan Hartdb3af892015-01-26 13:19:07 -080017
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080018import com.google.common.collect.ImmutableList;
19import com.google.common.collect.Lists;
20import com.google.common.collect.Maps;
Jonathan Hartaaa56572015-01-28 21:56:35 -080021import org.apache.commons.lang3.RandomUtils;
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -080022import org.apache.commons.lang3.mutable.MutableBoolean;
Jonathan Hartf9108232015-02-02 16:37:35 -080023import org.apache.commons.lang3.tuple.Pair;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080024import org.onlab.util.AbstractAccumulator;
Jonathan Hartdb3af892015-01-26 13:19:07 -080025import org.onlab.util.KryoNamespace;
Jonathan Hart233a18a2015-03-02 17:24:58 -080026import org.onlab.util.SlidingWindowCounter;
Jonathan Hartdb3af892015-01-26 13:19:07 -080027import org.onosproject.cluster.ClusterService;
Jonathan Hartaaa56572015-01-28 21:56:35 -080028import org.onosproject.cluster.ControllerNode;
Jonathan Hartdb3af892015-01-26 13:19:07 -080029import org.onosproject.cluster.NodeId;
30import org.onosproject.store.Timestamp;
31import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
32import org.onosproject.store.cluster.messaging.ClusterMessage;
33import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
34import org.onosproject.store.cluster.messaging.MessageSubject;
Jonathan Hart77bdd262015-02-03 09:07:48 -080035import org.onosproject.store.impl.Timestamped;
36import org.onosproject.store.impl.WallClockTimestamp;
Jonathan Hartdb3af892015-01-26 13:19:07 -080037import org.onosproject.store.serializers.KryoSerializer;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070038import org.onosproject.store.service.ClockService;
39import org.onosproject.store.service.EventuallyConsistentMap;
40import org.onosproject.store.service.EventuallyConsistentMapEvent;
41import org.onosproject.store.service.EventuallyConsistentMapListener;
Jonathan Hartdb3af892015-01-26 13:19:07 -080042import org.slf4j.Logger;
43import org.slf4j.LoggerFactory;
44
Jonathan Hartdb3af892015-01-26 13:19:07 -080045import java.util.ArrayList;
46import java.util.Collection;
Jonathan Hartaaa56572015-01-28 21:56:35 -080047import java.util.HashMap;
48import java.util.LinkedList;
Jonathan Hartdb3af892015-01-26 13:19:07 -080049import java.util.List;
50import java.util.Map;
51import java.util.Set;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080052import java.util.Timer;
Jonathan Hartdb3af892015-01-26 13:19:07 -080053import java.util.concurrent.ConcurrentHashMap;
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -080054import java.util.concurrent.ConcurrentMap;
Jonathan Hartdb3af892015-01-26 13:19:07 -080055import java.util.concurrent.CopyOnWriteArraySet;
56import java.util.concurrent.ExecutorService;
57import java.util.concurrent.Executors;
58import java.util.concurrent.ScheduledExecutorService;
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -080059import java.util.concurrent.TimeUnit;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080060import java.util.function.BiFunction;
Jonathan Hartdb3af892015-01-26 13:19:07 -080061import java.util.stream.Collectors;
62
63import static com.google.common.base.Preconditions.checkNotNull;
64import static com.google.common.base.Preconditions.checkState;
65import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Brian O'Connorc6713a82015-02-24 11:55:48 -080066import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
Brian O'Connor4b2ba5f2015-02-18 20:54:00 -080067import static org.onlab.util.Tools.groupedThreads;
Jonathan Hartdb3af892015-01-26 13:19:07 -080068
69/**
70 * Distributed Map implementation which uses optimistic replication and gossip
71 * based techniques to provide an eventually consistent data store.
72 */
73public class EventuallyConsistentMapImpl<K, V>
74 implements EventuallyConsistentMap<K, V> {
75
76 private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
77
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -080078 private final ConcurrentMap<K, Timestamped<V>> items;
79 private final ConcurrentMap<K, Timestamp> removedItems;
Jonathan Hartdb3af892015-01-26 13:19:07 -080080
Jonathan Hartdb3af892015-01-26 13:19:07 -080081 private final ClusterService clusterService;
82 private final ClusterCommunicationService clusterCommunicator;
83 private final KryoSerializer serializer;
84
Jonathan Hart5ec32ba2015-02-05 13:33:58 -080085 private final ClockService<K, V> clockService;
Jonathan Hartdb3af892015-01-26 13:19:07 -080086
87 private final MessageSubject updateMessageSubject;
Jonathan Hartaaa56572015-01-28 21:56:35 -080088 private final MessageSubject antiEntropyAdvertisementSubject;
Jonathan Hartdb3af892015-01-26 13:19:07 -080089
Jonathan Hartaaa56572015-01-28 21:56:35 -080090 private final Set<EventuallyConsistentMapListener<K, V>> listeners
Jonathan Hartdb3af892015-01-26 13:19:07 -080091 = new CopyOnWriteArraySet<>();
92
93 private final ExecutorService executor;
94
95 private final ScheduledExecutorService backgroundExecutor;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080096 private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
Jonathan Hartdb3af892015-01-26 13:19:07 -080097
Jonathan Hart6ec029a2015-03-24 17:12:35 -070098 private final ExecutorService communicationExecutor;
99 private final Map<NodeId, EventAccumulator> senderPending;
Madan Jampani28726282015-02-19 11:40:23 -0800100
Jonathan Hartdb3af892015-01-26 13:19:07 -0800101 private volatile boolean destroyed = false;
Jonathan Hart539a6462015-01-27 17:05:43 -0800102 private static final String ERROR_DESTROYED = " map is already destroyed";
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800103 private final String destroyedMessage;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800104
Jonathan Hart4f397e82015-02-04 09:10:41 -0800105 private static final String ERROR_NULL_KEY = "Key cannot be null";
106 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
107
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700108 private final long initialDelaySec = 5;
109 private final boolean lightweightAntiEntropy;
110 private final boolean tombstonesDisabled;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800111
Jonathan Hart233a18a2015-03-02 17:24:58 -0800112 private static final int WINDOW_SIZE = 5;
113 private static final int HIGH_LOAD_THRESHOLD = 0;
114 private static final int LOAD_WINDOW = 2;
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700115 private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
Jonathan Hart233a18a2015-03-02 17:24:58 -0800116
Jonathan Hartdb3af892015-01-26 13:19:07 -0800117 /**
118 * Creates a new eventually consistent map shared amongst multiple instances.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800119 * <p>
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700120 * See {@link org.onosproject.store.service.EventuallyConsistentMapBuilder}
121 * for more description of the parameters expected by the map.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800122 * </p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800123 *
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700124 * @param mapName a String identifier for the map.
125 * @param clusterService the cluster service
126 * @param clusterCommunicator the cluster communications service
127 * @param serializerBuilder a Kryo namespace builder that can serialize
128 * both K and V
129 * @param clockService a clock service able to generate timestamps
130 * for K and V
131 * @param peerUpdateFunction function that provides a set of nodes to immediately
132 * update to when there writes to the map
133 * @param eventExecutor executor to use for processing incoming
134 * events from peers
135 * @param communicationExecutor executor to use for sending events to peers
136 * @param backgroundExecutor executor to use for background anti-entropy
137 * tasks
138 * @param tombstonesDisabled true if this map should not maintain
139 * tombstones
140 * @param antiEntropyPeriod period that the anti-entropy task should run
141 * in seconds
142 * @param convergeFaster make anti-entropy try to converge faster
Jonathan Hartdb3af892015-01-26 13:19:07 -0800143 */
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700144 EventuallyConsistentMapImpl(String mapName,
145 ClusterService clusterService,
146 ClusterCommunicationService clusterCommunicator,
147 KryoNamespace.Builder serializerBuilder,
148 ClockService<K, V> clockService,
149 BiFunction<K, V, Collection<NodeId>> peerUpdateFunction,
150 ExecutorService eventExecutor,
151 ExecutorService communicationExecutor,
152 ScheduledExecutorService backgroundExecutor,
153 boolean tombstonesDisabled,
154 long antiEntropyPeriod,
155 TimeUnit antiEntropyTimeUnit,
156 boolean convergeFaster) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800157 items = new ConcurrentHashMap<>();
158 removedItems = new ConcurrentHashMap<>();
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800159 senderPending = Maps.newConcurrentMap();
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700160 destroyedMessage = mapName + ERROR_DESTROYED;
Madan Jampani28726282015-02-19 11:40:23 -0800161
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700162 this.clusterService = clusterService;
163 this.clusterCommunicator = clusterCommunicator;
164
165 this.serializer = createSerializer(serializerBuilder);
166
167 this.clockService = clockService;
168
169 if (peerUpdateFunction != null) {
170 this.peerUpdateFunction = peerUpdateFunction;
171 } else {
172 this.peerUpdateFunction = (key, value) -> clusterService.getNodes().stream()
173 .map(ControllerNode::id)
174 .filter(nodeId -> !nodeId.equals(clusterService.getLocalNode().id()))
175 .collect(Collectors.toList());
176 }
177
178 if (eventExecutor != null) {
179 this.executor = eventExecutor;
180 } else {
181 // should be a normal executor; it's used for receiving messages
182 this.executor =
183 Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d"));
184 }
185
186 if (communicationExecutor != null) {
187 this.communicationExecutor = communicationExecutor;
188 } else {
189 // sending executor; should be capped
190 //TODO this probably doesn't need to be bounded anymore
191 this.communicationExecutor =
192 newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
193 }
194
195 if (backgroundExecutor != null) {
196 this.backgroundExecutor = backgroundExecutor;
197 } else {
198 this.backgroundExecutor =
199 newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d"));
200 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800201
Jonathan Hartaaa56572015-01-28 21:56:35 -0800202 // start anti-entropy thread
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700203 this.backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
204 initialDelaySec, antiEntropyPeriod,
205 antiEntropyTimeUnit);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800206
Jonathan Hartdb3af892015-01-26 13:19:07 -0800207 updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
208 clusterCommunicator.addSubscriber(updateMessageSubject,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700209 new InternalEventListener(), this.executor);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800210
Jonathan Hartaaa56572015-01-28 21:56:35 -0800211 antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
212 clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700213 new InternalAntiEntropyListener(), this.backgroundExecutor);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800214
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700215 this.tombstonesDisabled = tombstonesDisabled;
216 this.lightweightAntiEntropy = !convergeFaster;
Madan Jampanie1356282015-03-10 19:05:36 -0700217 }
218
Jonathan Hartdb3af892015-01-26 13:19:07 -0800219 private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
220 return new KryoSerializer() {
221 @Override
222 protected void setupKryoPool() {
223 // Add the map's internal helper classes to the user-supplied serializer
224 serializerPool = builder
225 .register(WallClockTimestamp.class)
226 .register(PutEntry.class)
Jonathan Hart539a6462015-01-27 17:05:43 -0800227 .register(RemoveEntry.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800228 .register(ArrayList.class)
Jonathan Hartaaa56572015-01-28 21:56:35 -0800229 .register(AntiEntropyAdvertisement.class)
230 .register(HashMap.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800231 .build();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800232 }
233 };
234 }
235
236 @Override
237 public int size() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800238 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800239 return items.size();
240 }
241
242 @Override
243 public boolean isEmpty() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800244 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800245 return items.isEmpty();
246 }
247
248 @Override
249 public boolean containsKey(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800250 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800251 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800252 return items.containsKey(key);
253 }
254
255 @Override
256 public boolean containsValue(V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800257 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800258 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800259
260 return items.values().stream()
261 .anyMatch(timestamped -> timestamped.value().equals(value));
262 }
263
264 @Override
265 public V get(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800266 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800267 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800268
269 Timestamped<V> value = items.get(key);
270 if (value != null) {
271 return value.value();
272 }
273 return null;
274 }
275
276 @Override
277 public void put(K key, V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800278 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800279 checkNotNull(key, ERROR_NULL_KEY);
280 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800281
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800282 Timestamp timestamp = clockService.getTimestamp(key, value);
283
Jonathan Hartdb3af892015-01-26 13:19:07 -0800284 if (putInternal(key, value, timestamp)) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800285 notifyPeers(new PutEntry<>(key, value, timestamp),
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800286 peerUpdateFunction.apply(key, value));
287 notifyListeners(new EventuallyConsistentMapEvent<>(
288 EventuallyConsistentMapEvent.Type.PUT, key, value));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800289 }
290 }
291
292 private boolean putInternal(K key, V value, Timestamp timestamp) {
Jonathan Hart233a18a2015-03-02 17:24:58 -0800293 counter.incrementCount();
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800294 Timestamp removed = removedItems.get(key);
Jonathan Hart403ea932015-02-20 16:23:00 -0800295 if (removed != null && removed.isNewerThan(timestamp)) {
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800296 log.debug("ecmap - removed was newer {}", value);
297 return false;
298 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800299
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800300 final MutableBoolean updated = new MutableBoolean(false);
301
302 items.compute(key, (k, existing) -> {
Jonathan Hart403ea932015-02-20 16:23:00 -0800303 if (existing != null && existing.isNewerThan(timestamp)) {
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800304 updated.setFalse();
305 return existing;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800306 } else {
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800307 updated.setTrue();
308 return new Timestamped<>(value, timestamp);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800309 }
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800310 });
311
312 boolean success = updated.booleanValue();
313 if (!success) {
314 log.debug("ecmap - existing was newer {}", value);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800315 }
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800316
317 if (success && removed != null) {
318 removedItems.remove(key, removed);
319 }
320 return success;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800321 }
322
323 @Override
324 public void remove(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800325 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800326 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800327
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800328 // TODO prevent calls here if value is important for timestamp
329 Timestamp timestamp = clockService.getTimestamp(key, null);
330
Jonathan Hartdb3af892015-01-26 13:19:07 -0800331 if (removeInternal(key, timestamp)) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800332 notifyPeers(new RemoveEntry<>(key, timestamp),
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800333 peerUpdateFunction.apply(key, null));
334 notifyListeners(new EventuallyConsistentMapEvent<>(
335 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800336 }
337 }
338
339 private boolean removeInternal(K key, Timestamp timestamp) {
Madan Jampani54d34992015-03-06 17:27:52 -0800340 if (timestamp == null) {
341 return false;
342 }
343
Jonathan Hart233a18a2015-03-02 17:24:58 -0800344 counter.incrementCount();
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800345 final MutableBoolean updated = new MutableBoolean(false);
346
347 items.compute(key, (k, existing) -> {
348 if (existing != null && existing.isNewerThan(timestamp)) {
349 updated.setFalse();
350 return existing;
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800351 } else {
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800352 updated.setTrue();
353 // remove from items map
354 return null;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800355 }
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800356 });
357
358 if (updated.isFalse()) {
359 return false;
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800360 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800361
Madan Jampanie1356282015-03-10 19:05:36 -0700362 if (!tombstonesDisabled) {
363 Timestamp removedTimestamp = removedItems.get(key);
364 if (removedTimestamp == null) {
365 return removedItems.putIfAbsent(key, timestamp) == null;
366 } else if (timestamp.isNewerThan(removedTimestamp)) {
367 return removedItems.replace(key, removedTimestamp, timestamp);
368 } else {
369 return false;
370 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800371 }
Madan Jampanie1356282015-03-10 19:05:36 -0700372
373 return updated.booleanValue();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800374 }
375
376 @Override
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800377 public void remove(K key, V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800378 checkState(!destroyed, destroyedMessage);
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800379 checkNotNull(key, ERROR_NULL_KEY);
380 checkNotNull(value, ERROR_NULL_VALUE);
381
382 Timestamp timestamp = clockService.getTimestamp(key, value);
383
384 if (removeInternal(key, timestamp)) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800385 notifyPeers(new RemoveEntry<>(key, timestamp),
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800386 peerUpdateFunction.apply(key, value));
387 notifyListeners(new EventuallyConsistentMapEvent<>(
388 EventuallyConsistentMapEvent.Type.REMOVE, key, value));
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800389 }
390 }
391
392 @Override
Jonathan Hartdb3af892015-01-26 13:19:07 -0800393 public void putAll(Map<? extends K, ? extends V> m) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800394 checkState(!destroyed, destroyedMessage);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800395 m.forEach(this::put);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800396 }
397
398 @Override
399 public void clear() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800400 checkState(!destroyed, destroyedMessage);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800401 items.forEach((key, value) -> remove(key));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800402 }
403
404 @Override
405 public Set<K> keySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800406 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800407 return items.keySet();
408 }
409
410 @Override
411 public Collection<V> values() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800412 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800413 return items.values().stream()
414 .map(Timestamped::value)
415 .collect(Collectors.toList());
416 }
417
418 @Override
419 public Set<Map.Entry<K, V>> entrySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800420 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800421
422 return items.entrySet().stream()
Jonathan Hartf9108232015-02-02 16:37:35 -0800423 .map(e -> Pair.of(e.getKey(), e.getValue().value()))
Jonathan Hartdb3af892015-01-26 13:19:07 -0800424 .collect(Collectors.toSet());
425 }
426
427 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800428 public void addListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800429 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800430
431 listeners.add(checkNotNull(listener));
432 }
433
434 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800435 public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800436 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800437
438 listeners.remove(checkNotNull(listener));
439 }
440
441 @Override
442 public void destroy() {
443 destroyed = true;
444
445 executor.shutdown();
446 backgroundExecutor.shutdown();
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800447 communicationExecutor.shutdown();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800448
Jonathan Hart584d2f32015-01-27 19:46:14 -0800449 listeners.clear();
450
Jonathan Hartdb3af892015-01-26 13:19:07 -0800451 clusterCommunicator.removeSubscriber(updateMessageSubject);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800452 clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800453 }
454
Jonathan Hartaaa56572015-01-28 21:56:35 -0800455 private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
456 for (EventuallyConsistentMapListener<K, V> listener : listeners) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800457 listener.event(event);
458 }
459 }
460
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800461 private void notifyPeers(PutEntry<K, V> event, Collection<NodeId> peers) {
462 queueUpdate(event, peers);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800463 }
464
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800465 private void notifyPeers(RemoveEntry<K, V> event, Collection<NodeId> peers) {
466 queueUpdate(event, peers);
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800467 }
468
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800469 private void queueUpdate(AbstractEntry<K, V> event, Collection<NodeId> peers) {
470 if (peers == null) {
471 // we have no friends :(
472 return;
473 }
474 peers.forEach(node ->
475 senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event)
476 );
477 }
478
479 private boolean unicastMessage(NodeId peer, MessageSubject subject, Object event) {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800480 ClusterMessage message = new ClusterMessage(
481 clusterService.getLocalNode().id(),
482 subject,
483 serializer.encode(event));
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800484 return clusterCommunicator.unicast(message, peer);
485 // Note: we had this flipped before...
486// communicationExecutor.execute(() -> clusterCommunicator.unicast(message, peer));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800487 }
488
Jonathan Hart233a18a2015-03-02 17:24:58 -0800489 private boolean underHighLoad() {
490 return counter.get(LOAD_WINDOW) > HIGH_LOAD_THRESHOLD;
491 }
492
Jonathan Hartaaa56572015-01-28 21:56:35 -0800493 private final class SendAdvertisementTask implements Runnable {
494 @Override
495 public void run() {
496 if (Thread.currentThread().isInterrupted()) {
497 log.info("Interrupted, quitting");
498 return;
499 }
500
Jonathan Hart233a18a2015-03-02 17:24:58 -0800501 if (underHighLoad()) {
502 return;
503 }
504
Jonathan Hartaaa56572015-01-28 21:56:35 -0800505 try {
506 final NodeId self = clusterService.getLocalNode().id();
507 Set<ControllerNode> nodes = clusterService.getNodes();
508
509 List<NodeId> nodeIds = nodes.stream()
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800510 .map(ControllerNode::id)
Jonathan Hartaaa56572015-01-28 21:56:35 -0800511 .collect(Collectors.toList());
512
513 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
514 log.trace("No other peers in the cluster.");
515 return;
516 }
517
518 NodeId peer;
519 do {
520 int idx = RandomUtils.nextInt(0, nodeIds.size());
521 peer = nodeIds.get(idx);
522 } while (peer.equals(self));
523
524 if (Thread.currentThread().isInterrupted()) {
525 log.info("Interrupted, quitting");
526 return;
527 }
528
529 AntiEntropyAdvertisement<K> ad = createAdvertisement();
530
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800531 if (!unicastMessage(peer, antiEntropyAdvertisementSubject, ad)) {
532 log.debug("Failed to send anti-entropy advertisement to {}", peer);
533 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800534 } catch (Exception e) {
535 // Catch all exceptions to avoid scheduled task being suppressed.
536 log.error("Exception thrown while sending advertisement", e);
537 }
538 }
539 }
540
541 private AntiEntropyAdvertisement<K> createAdvertisement() {
542 final NodeId self = clusterService.getLocalNode().id();
543
544 Map<K, Timestamp> timestamps = new HashMap<>(items.size());
545
546 items.forEach((key, value) -> timestamps.put(key, value.timestamp()));
547
548 Map<K, Timestamp> tombstones = new HashMap<>(removedItems);
549
550 return new AntiEntropyAdvertisement<>(self, timestamps, tombstones);
551 }
552
553 private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
554 List<EventuallyConsistentMapEvent<K, V>> externalEvents;
555
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800556 externalEvents = antiEntropyCheckLocalItems(ad);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800557
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800558 antiEntropyCheckLocalRemoved(ad);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800559
Jonathan Hartf893be82015-02-24 17:35:51 -0800560 if (!lightweightAntiEntropy) {
561 externalEvents.addAll(antiEntropyCheckRemoteRemoved(ad));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800562
Jonathan Hartf893be82015-02-24 17:35:51 -0800563 // if remote ad has something unknown, actively sync
564 for (K key : ad.timestamps().keySet()) {
565 if (!items.containsKey(key)) {
566 // Send the advertisement back if this peer is out-of-sync
567 final NodeId sender = ad.sender();
568 AntiEntropyAdvertisement<K> myAd = createAdvertisement();
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800569 if (!unicastMessage(sender, antiEntropyAdvertisementSubject, myAd)) {
570 log.debug("Failed to send reactive anti-entropy advertisement to {}", sender);
571 }
Jonathan Hartf893be82015-02-24 17:35:51 -0800572 break;
573 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800574 }
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800575 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800576 externalEvents.forEach(this::notifyListeners);
577 }
578
579 /**
580 * Checks if any of the remote's live items or tombstones are out of date
581 * according to our local live item list, or if our live items are out of
582 * date according to the remote's tombstone list.
583 * If the local copy is more recent, it will be pushed to the remote. If the
584 * remote has a more recent remove, we apply that to the local state.
585 *
586 * @param ad remote anti-entropy advertisement
587 * @return list of external events relating to local operations performed
588 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800589 private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
590 AntiEntropyAdvertisement<K> ad) {
591 final List<EventuallyConsistentMapEvent<K, V>> externalEvents
592 = new LinkedList<>();
593 final NodeId sender = ad.sender();
594
Jonathan Hartaaa56572015-01-28 21:56:35 -0800595 for (Map.Entry<K, Timestamped<V>> item : items.entrySet()) {
596 K key = item.getKey();
597 Timestamped<V> localValue = item.getValue();
598
599 Timestamp remoteTimestamp = ad.timestamps().get(key);
600 if (remoteTimestamp == null) {
601 remoteTimestamp = ad.tombstones().get(key);
602 }
603 if (remoteTimestamp == null || localValue
Jonathan Hart403ea932015-02-20 16:23:00 -0800604 .isNewerThan(remoteTimestamp)) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800605 // local value is more recent, push to sender
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800606 queueUpdate(new PutEntry<>(key, localValue.value(),
607 localValue.timestamp()), ImmutableList.of(sender));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800608 }
609
610 Timestamp remoteDeadTimestamp = ad.tombstones().get(key);
611 if (remoteDeadTimestamp != null &&
Jonathan Hart403ea932015-02-20 16:23:00 -0800612 remoteDeadTimestamp.isNewerThan(localValue.timestamp())) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800613 // sender has a more recent remove
614 if (removeInternal(key, remoteDeadTimestamp)) {
615 externalEvents.add(new EventuallyConsistentMapEvent<>(
616 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
617 }
618 }
619 }
620
Jonathan Hartaaa56572015-01-28 21:56:35 -0800621 return externalEvents;
622 }
623
624 /**
625 * Checks if any items in the remote live list are out of date according
626 * to our tombstone list. If we find we have a more up to date tombstone,
627 * we'll send it to the remote.
628 *
629 * @param ad remote anti-entropy advertisement
630 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800631 private void antiEntropyCheckLocalRemoved(AntiEntropyAdvertisement<K> ad) {
632 final NodeId sender = ad.sender();
633
Jonathan Hartaaa56572015-01-28 21:56:35 -0800634 for (Map.Entry<K, Timestamp> dead : removedItems.entrySet()) {
635 K key = dead.getKey();
636 Timestamp localDeadTimestamp = dead.getValue();
637
638 Timestamp remoteLiveTimestamp = ad.timestamps().get(key);
639 if (remoteLiveTimestamp != null
Jonathan Hart403ea932015-02-20 16:23:00 -0800640 && localDeadTimestamp.isNewerThan(remoteLiveTimestamp)) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800641 // sender has zombie, push remove
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800642 queueUpdate(new RemoveEntry<>(key, localDeadTimestamp), ImmutableList.of(sender));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800643 }
644 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800645 }
646
647 /**
648 * Checks if any of the local live items are out of date according to the
649 * remote's tombstone advertisements. If we find a local item is out of date,
650 * we'll apply the remove operation to the local state.
651 *
652 * @param ad remote anti-entropy advertisement
653 * @return list of external events relating to local operations performed
654 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800655 private List<EventuallyConsistentMapEvent<K, V>>
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800656 antiEntropyCheckRemoteRemoved(AntiEntropyAdvertisement<K> ad) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800657 final List<EventuallyConsistentMapEvent<K, V>> externalEvents
658 = new LinkedList<>();
659
660 for (Map.Entry<K, Timestamp> remoteDead : ad.tombstones().entrySet()) {
661 K key = remoteDead.getKey();
662 Timestamp remoteDeadTimestamp = remoteDead.getValue();
663
664 Timestamped<V> local = items.get(key);
665 Timestamp localDead = removedItems.get(key);
Jonathan Hart403ea932015-02-20 16:23:00 -0800666 if (local != null && remoteDeadTimestamp.isNewerThan(
667 local.timestamp())) {
668 // If the remote has a more recent tombstone than either our local
669 // value, then do a remove with their timestamp
Jonathan Hartaaa56572015-01-28 21:56:35 -0800670 if (removeInternal(key, remoteDeadTimestamp)) {
671 externalEvents.add(new EventuallyConsistentMapEvent<>(
672 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
673 }
Jonathan Hart403ea932015-02-20 16:23:00 -0800674 } else if (localDead != null && remoteDeadTimestamp.isNewerThan(
675 localDead)) {
676 // If the remote has a more recent tombstone than us, update ours
677 // to their timestamp
678 removeInternal(key, remoteDeadTimestamp);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800679 }
680 }
681
682 return externalEvents;
683 }
684
685 private final class InternalAntiEntropyListener
686 implements ClusterMessageHandler {
687
688 @Override
689 public void handle(ClusterMessage message) {
Jonathan Hart4fd4ebb2015-02-04 17:38:48 -0800690 log.trace("Received anti-entropy advertisement from peer: {}",
691 message.sender());
Jonathan Hartaaa56572015-01-28 21:56:35 -0800692 AntiEntropyAdvertisement<K> advertisement = serializer.decode(message.payload());
Madan Jampani2af244a2015-02-22 13:12:01 -0800693 try {
Jonathan Hart233a18a2015-03-02 17:24:58 -0800694 if (!underHighLoad()) {
695 handleAntiEntropyAdvertisement(advertisement);
696 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800697 } catch (Exception e) {
698 log.warn("Exception thrown handling advertisements", e);
699 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800700 }
701 }
702
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800703 private final class InternalEventListener implements
Jonathan Hartdb3af892015-01-26 13:19:07 -0800704 ClusterMessageHandler {
705 @Override
706 public void handle(ClusterMessage message) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800707 log.debug("Received update event from peer: {}", message.sender());
708 Collection<AbstractEntry<K, V>> events = serializer.decode(message.payload());
Jonathan Hartdb3af892015-01-26 13:19:07 -0800709
Madan Jampani2af244a2015-02-22 13:12:01 -0800710 try {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800711 // TODO clean this for loop up
712 for (AbstractEntry<K, V> entry : events) {
713 final K key = entry.key();
714 final V value;
715 final Timestamp timestamp = entry.timestamp();
716 final EventuallyConsistentMapEvent.Type type;
717 if (entry instanceof PutEntry) {
718 PutEntry<K, V> putEntry = (PutEntry<K, V>) entry;
719 value = putEntry.value();
720 type = EventuallyConsistentMapEvent.Type.PUT;
721 } else if (entry instanceof RemoveEntry) {
722 type = EventuallyConsistentMapEvent.Type.REMOVE;
723 value = null;
724 } else {
725 throw new IllegalStateException("Unknown entry type " + entry.getClass());
726 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800727
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800728 boolean success;
729 switch (type) {
730 case PUT:
731 success = putInternal(key, value, timestamp);
732 break;
733 case REMOVE:
734 success = removeInternal(key, timestamp);
735 break;
736 default:
737 success = false;
738 }
739 if (success) {
740 notifyListeners(new EventuallyConsistentMapEvent<>(type, key, value));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800741 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800742 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800743 } catch (Exception e) {
744 log.warn("Exception thrown handling put", e);
745 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800746 }
747 }
748
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800749 // TODO pull this into the class if this gets pulled out...
750 private static final int DEFAULT_MAX_EVENTS = 1000;
751 private static final int DEFAULT_MAX_IDLE_MS = 10;
752 private static final int DEFAULT_MAX_BATCH_MS = 50;
753 private static final Timer TIMER = new Timer("onos-ecm-sender-events");
Jonathan Hartdb3af892015-01-26 13:19:07 -0800754
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800755 private final class EventAccumulator extends AbstractAccumulator<AbstractEntry<K, V>> {
756
757 private final NodeId peer;
758
759 private EventAccumulator(NodeId peer) {
760 super(TIMER, DEFAULT_MAX_EVENTS, DEFAULT_MAX_BATCH_MS, DEFAULT_MAX_IDLE_MS);
761 this.peer = peer;
762 }
763
764 @Override
765 public void processItems(List<AbstractEntry<K, V>> items) {
766 Map<K, AbstractEntry<K, V>> map = Maps.newHashMap();
767 items.forEach(item -> map.compute(item.key(), (key, oldValue) ->
768 oldValue == null || item.compareTo(oldValue) > 0 ? item : oldValue
769 )
770 );
771 communicationExecutor.submit(() -> {
772 try {
773 unicastMessage(peer, updateMessageSubject, Lists.newArrayList(map.values()));
774 } catch (Exception e) {
775 log.warn("broadcast error", e);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800776 }
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800777 });
Jonathan Hartdb3af892015-01-26 13:19:07 -0800778 }
779 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800780}