blob: 99eef3d3559c6fe32de4f003c1b8d486c5b30904 [file] [log] [blame]
Jonathan Hartdb3af892015-01-26 13:19:07 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Jonathan Hart77bdd262015-02-03 09:07:48 -080016package org.onosproject.store.ecmap;
Jonathan Hartdb3af892015-01-26 13:19:07 -080017
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080018import com.google.common.collect.ImmutableList;
19import com.google.common.collect.Lists;
20import com.google.common.collect.Maps;
Madan Jampani3e033bd2015-04-08 13:03:49 -070021
Jonathan Hartaaa56572015-01-28 21:56:35 -080022import org.apache.commons.lang3.RandomUtils;
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -080023import org.apache.commons.lang3.mutable.MutableBoolean;
Jonathan Hartf9108232015-02-02 16:37:35 -080024import org.apache.commons.lang3.tuple.Pair;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080025import org.onlab.util.AbstractAccumulator;
Jonathan Hartdb3af892015-01-26 13:19:07 -080026import org.onlab.util.KryoNamespace;
Jonathan Hart233a18a2015-03-02 17:24:58 -080027import org.onlab.util.SlidingWindowCounter;
Jonathan Hartdb3af892015-01-26 13:19:07 -080028import org.onosproject.cluster.ClusterService;
Jonathan Hartaaa56572015-01-28 21:56:35 -080029import org.onosproject.cluster.ControllerNode;
Jonathan Hartdb3af892015-01-26 13:19:07 -080030import org.onosproject.cluster.NodeId;
31import org.onosproject.store.Timestamp;
32import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
33import org.onosproject.store.cluster.messaging.ClusterMessage;
34import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
35import org.onosproject.store.cluster.messaging.MessageSubject;
Madan Jampani3e033bd2015-04-08 13:03:49 -070036import org.onosproject.store.impl.LogicalTimestamp;
Jonathan Hart77bdd262015-02-03 09:07:48 -080037import org.onosproject.store.impl.Timestamped;
38import org.onosproject.store.impl.WallClockTimestamp;
Jonathan Hartdb3af892015-01-26 13:19:07 -080039import org.onosproject.store.serializers.KryoSerializer;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070040import org.onosproject.store.service.ClockService;
41import org.onosproject.store.service.EventuallyConsistentMap;
42import org.onosproject.store.service.EventuallyConsistentMapEvent;
43import org.onosproject.store.service.EventuallyConsistentMapListener;
Jonathan Hartdb3af892015-01-26 13:19:07 -080044import org.slf4j.Logger;
45import org.slf4j.LoggerFactory;
46
Jonathan Hartdb3af892015-01-26 13:19:07 -080047import java.util.ArrayList;
48import java.util.Collection;
Jonathan Hartaaa56572015-01-28 21:56:35 -080049import java.util.HashMap;
50import java.util.LinkedList;
Jonathan Hartdb3af892015-01-26 13:19:07 -080051import java.util.List;
52import java.util.Map;
53import java.util.Set;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080054import java.util.Timer;
Jonathan Hartdb3af892015-01-26 13:19:07 -080055import java.util.concurrent.ConcurrentHashMap;
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -080056import java.util.concurrent.ConcurrentMap;
Jonathan Hartdb3af892015-01-26 13:19:07 -080057import java.util.concurrent.CopyOnWriteArraySet;
58import java.util.concurrent.ExecutorService;
59import java.util.concurrent.Executors;
60import java.util.concurrent.ScheduledExecutorService;
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -080061import java.util.concurrent.TimeUnit;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080062import java.util.function.BiFunction;
Jonathan Hartdb3af892015-01-26 13:19:07 -080063import java.util.stream.Collectors;
64
65import static com.google.common.base.Preconditions.checkNotNull;
66import static com.google.common.base.Preconditions.checkState;
67import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Brian O'Connorc6713a82015-02-24 11:55:48 -080068import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
Brian O'Connor4b2ba5f2015-02-18 20:54:00 -080069import static org.onlab.util.Tools.groupedThreads;
Jonathan Hartdb3af892015-01-26 13:19:07 -080070
71/**
72 * Distributed Map implementation which uses optimistic replication and gossip
73 * based techniques to provide an eventually consistent data store.
74 */
75public class EventuallyConsistentMapImpl<K, V>
76 implements EventuallyConsistentMap<K, V> {
77
78 private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
79
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -080080 private final ConcurrentMap<K, Timestamped<V>> items;
81 private final ConcurrentMap<K, Timestamp> removedItems;
Jonathan Hartdb3af892015-01-26 13:19:07 -080082
Jonathan Hartdb3af892015-01-26 13:19:07 -080083 private final ClusterService clusterService;
84 private final ClusterCommunicationService clusterCommunicator;
85 private final KryoSerializer serializer;
86
Jonathan Hart5ec32ba2015-02-05 13:33:58 -080087 private final ClockService<K, V> clockService;
Jonathan Hartdb3af892015-01-26 13:19:07 -080088
89 private final MessageSubject updateMessageSubject;
Jonathan Hartaaa56572015-01-28 21:56:35 -080090 private final MessageSubject antiEntropyAdvertisementSubject;
Jonathan Hartdb3af892015-01-26 13:19:07 -080091
Jonathan Hartaaa56572015-01-28 21:56:35 -080092 private final Set<EventuallyConsistentMapListener<K, V>> listeners
Jonathan Hartdb3af892015-01-26 13:19:07 -080093 = new CopyOnWriteArraySet<>();
94
95 private final ExecutorService executor;
96
97 private final ScheduledExecutorService backgroundExecutor;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080098 private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
Jonathan Hartdb3af892015-01-26 13:19:07 -080099
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700100 private final ExecutorService communicationExecutor;
101 private final Map<NodeId, EventAccumulator> senderPending;
Madan Jampani28726282015-02-19 11:40:23 -0800102
Jonathan Hartdb3af892015-01-26 13:19:07 -0800103 private volatile boolean destroyed = false;
Jonathan Hart539a6462015-01-27 17:05:43 -0800104 private static final String ERROR_DESTROYED = " map is already destroyed";
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800105 private final String destroyedMessage;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800106
Jonathan Hart4f397e82015-02-04 09:10:41 -0800107 private static final String ERROR_NULL_KEY = "Key cannot be null";
108 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
109
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700110 private final long initialDelaySec = 5;
111 private final boolean lightweightAntiEntropy;
112 private final boolean tombstonesDisabled;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800113
Jonathan Hart233a18a2015-03-02 17:24:58 -0800114 private static final int WINDOW_SIZE = 5;
115 private static final int HIGH_LOAD_THRESHOLD = 0;
116 private static final int LOAD_WINDOW = 2;
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700117 private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
Jonathan Hart233a18a2015-03-02 17:24:58 -0800118
Jonathan Hartdb3af892015-01-26 13:19:07 -0800119 /**
120 * Creates a new eventually consistent map shared amongst multiple instances.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800121 * <p>
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700122 * See {@link org.onosproject.store.service.EventuallyConsistentMapBuilder}
123 * for more description of the parameters expected by the map.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800124 * </p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800125 *
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700126 * @param mapName a String identifier for the map.
127 * @param clusterService the cluster service
128 * @param clusterCommunicator the cluster communications service
129 * @param serializerBuilder a Kryo namespace builder that can serialize
130 * both K and V
131 * @param clockService a clock service able to generate timestamps
132 * for K and V
133 * @param peerUpdateFunction function that provides a set of nodes to immediately
134 * update to when there writes to the map
135 * @param eventExecutor executor to use for processing incoming
136 * events from peers
137 * @param communicationExecutor executor to use for sending events to peers
138 * @param backgroundExecutor executor to use for background anti-entropy
139 * tasks
140 * @param tombstonesDisabled true if this map should not maintain
141 * tombstones
142 * @param antiEntropyPeriod period that the anti-entropy task should run
143 * in seconds
Thomas Vachuska00121ed2015-04-02 17:07:57 -0700144 * @param antiEntropyTimeUnit time unit for anti-entropy task scheduling
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700145 * @param convergeFaster make anti-entropy try to converge faster
Jonathan Hartdb3af892015-01-26 13:19:07 -0800146 */
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700147 EventuallyConsistentMapImpl(String mapName,
148 ClusterService clusterService,
149 ClusterCommunicationService clusterCommunicator,
150 KryoNamespace.Builder serializerBuilder,
151 ClockService<K, V> clockService,
152 BiFunction<K, V, Collection<NodeId>> peerUpdateFunction,
153 ExecutorService eventExecutor,
154 ExecutorService communicationExecutor,
155 ScheduledExecutorService backgroundExecutor,
156 boolean tombstonesDisabled,
157 long antiEntropyPeriod,
158 TimeUnit antiEntropyTimeUnit,
159 boolean convergeFaster) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800160 items = new ConcurrentHashMap<>();
161 removedItems = new ConcurrentHashMap<>();
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800162 senderPending = Maps.newConcurrentMap();
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700163 destroyedMessage = mapName + ERROR_DESTROYED;
Madan Jampani28726282015-02-19 11:40:23 -0800164
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700165 this.clusterService = clusterService;
166 this.clusterCommunicator = clusterCommunicator;
167
168 this.serializer = createSerializer(serializerBuilder);
169
170 this.clockService = clockService;
171
172 if (peerUpdateFunction != null) {
173 this.peerUpdateFunction = peerUpdateFunction;
174 } else {
175 this.peerUpdateFunction = (key, value) -> clusterService.getNodes().stream()
176 .map(ControllerNode::id)
177 .filter(nodeId -> !nodeId.equals(clusterService.getLocalNode().id()))
178 .collect(Collectors.toList());
179 }
180
181 if (eventExecutor != null) {
182 this.executor = eventExecutor;
183 } else {
184 // should be a normal executor; it's used for receiving messages
185 this.executor =
186 Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d"));
187 }
188
189 if (communicationExecutor != null) {
190 this.communicationExecutor = communicationExecutor;
191 } else {
192 // sending executor; should be capped
193 //TODO this probably doesn't need to be bounded anymore
194 this.communicationExecutor =
195 newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
196 }
197
198 if (backgroundExecutor != null) {
199 this.backgroundExecutor = backgroundExecutor;
200 } else {
201 this.backgroundExecutor =
202 newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d"));
203 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800204
Jonathan Hartaaa56572015-01-28 21:56:35 -0800205 // start anti-entropy thread
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700206 this.backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
207 initialDelaySec, antiEntropyPeriod,
208 antiEntropyTimeUnit);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800209
Jonathan Hartdb3af892015-01-26 13:19:07 -0800210 updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
211 clusterCommunicator.addSubscriber(updateMessageSubject,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700212 new InternalEventListener(), this.executor);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800213
Jonathan Hartaaa56572015-01-28 21:56:35 -0800214 antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
215 clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700216 new InternalAntiEntropyListener(), this.backgroundExecutor);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800217
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700218 this.tombstonesDisabled = tombstonesDisabled;
219 this.lightweightAntiEntropy = !convergeFaster;
Madan Jampanie1356282015-03-10 19:05:36 -0700220 }
221
Jonathan Hartdb3af892015-01-26 13:19:07 -0800222 private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
223 return new KryoSerializer() {
224 @Override
225 protected void setupKryoPool() {
226 // Add the map's internal helper classes to the user-supplied serializer
227 serializerPool = builder
Madan Jampani3e033bd2015-04-08 13:03:49 -0700228 .register(LogicalTimestamp.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800229 .register(WallClockTimestamp.class)
230 .register(PutEntry.class)
Jonathan Hart539a6462015-01-27 17:05:43 -0800231 .register(RemoveEntry.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800232 .register(ArrayList.class)
Jonathan Hartaaa56572015-01-28 21:56:35 -0800233 .register(AntiEntropyAdvertisement.class)
234 .register(HashMap.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800235 .build();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800236 }
237 };
238 }
239
240 @Override
241 public int size() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800242 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800243 return items.size();
244 }
245
246 @Override
247 public boolean isEmpty() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800248 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800249 return items.isEmpty();
250 }
251
252 @Override
253 public boolean containsKey(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800254 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800255 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800256 return items.containsKey(key);
257 }
258
259 @Override
260 public boolean containsValue(V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800261 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800262 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800263
264 return items.values().stream()
265 .anyMatch(timestamped -> timestamped.value().equals(value));
266 }
267
268 @Override
269 public V get(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800270 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800271 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800272
273 Timestamped<V> value = items.get(key);
274 if (value != null) {
275 return value.value();
276 }
277 return null;
278 }
279
280 @Override
281 public void put(K key, V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800282 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800283 checkNotNull(key, ERROR_NULL_KEY);
284 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800285
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800286 Timestamp timestamp = clockService.getTimestamp(key, value);
287
Jonathan Hartdb3af892015-01-26 13:19:07 -0800288 if (putInternal(key, value, timestamp)) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800289 notifyPeers(new PutEntry<>(key, value, timestamp),
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800290 peerUpdateFunction.apply(key, value));
291 notifyListeners(new EventuallyConsistentMapEvent<>(
292 EventuallyConsistentMapEvent.Type.PUT, key, value));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800293 }
294 }
295
296 private boolean putInternal(K key, V value, Timestamp timestamp) {
Jonathan Hart233a18a2015-03-02 17:24:58 -0800297 counter.incrementCount();
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800298 Timestamp removed = removedItems.get(key);
Jonathan Hart403ea932015-02-20 16:23:00 -0800299 if (removed != null && removed.isNewerThan(timestamp)) {
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800300 log.debug("ecmap - removed was newer {}", value);
301 return false;
302 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800303
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800304 final MutableBoolean updated = new MutableBoolean(false);
305
306 items.compute(key, (k, existing) -> {
Jonathan Hart403ea932015-02-20 16:23:00 -0800307 if (existing != null && existing.isNewerThan(timestamp)) {
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800308 updated.setFalse();
309 return existing;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800310 } else {
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800311 updated.setTrue();
312 return new Timestamped<>(value, timestamp);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800313 }
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800314 });
315
316 boolean success = updated.booleanValue();
317 if (!success) {
318 log.debug("ecmap - existing was newer {}", value);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800319 }
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800320
321 if (success && removed != null) {
322 removedItems.remove(key, removed);
323 }
324 return success;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800325 }
326
327 @Override
328 public void remove(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800329 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800330 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800331
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800332 // TODO prevent calls here if value is important for timestamp
333 Timestamp timestamp = clockService.getTimestamp(key, null);
334
Jonathan Hartdb3af892015-01-26 13:19:07 -0800335 if (removeInternal(key, timestamp)) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800336 notifyPeers(new RemoveEntry<>(key, timestamp),
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800337 peerUpdateFunction.apply(key, null));
338 notifyListeners(new EventuallyConsistentMapEvent<>(
339 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800340 }
341 }
342
343 private boolean removeInternal(K key, Timestamp timestamp) {
Madan Jampani54d34992015-03-06 17:27:52 -0800344 if (timestamp == null) {
345 return false;
346 }
347
Jonathan Hart233a18a2015-03-02 17:24:58 -0800348 counter.incrementCount();
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800349 final MutableBoolean updated = new MutableBoolean(false);
350
351 items.compute(key, (k, existing) -> {
352 if (existing != null && existing.isNewerThan(timestamp)) {
353 updated.setFalse();
354 return existing;
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800355 } else {
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800356 updated.setTrue();
357 // remove from items map
358 return null;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800359 }
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800360 });
361
362 if (updated.isFalse()) {
363 return false;
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800364 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800365
Madan Jampanie1356282015-03-10 19:05:36 -0700366 if (!tombstonesDisabled) {
367 Timestamp removedTimestamp = removedItems.get(key);
368 if (removedTimestamp == null) {
369 return removedItems.putIfAbsent(key, timestamp) == null;
370 } else if (timestamp.isNewerThan(removedTimestamp)) {
371 return removedItems.replace(key, removedTimestamp, timestamp);
372 } else {
373 return false;
374 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800375 }
Madan Jampanie1356282015-03-10 19:05:36 -0700376
377 return updated.booleanValue();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800378 }
379
380 @Override
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800381 public void remove(K key, V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800382 checkState(!destroyed, destroyedMessage);
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800383 checkNotNull(key, ERROR_NULL_KEY);
384 checkNotNull(value, ERROR_NULL_VALUE);
385
386 Timestamp timestamp = clockService.getTimestamp(key, value);
387
388 if (removeInternal(key, timestamp)) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800389 notifyPeers(new RemoveEntry<>(key, timestamp),
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800390 peerUpdateFunction.apply(key, value));
391 notifyListeners(new EventuallyConsistentMapEvent<>(
392 EventuallyConsistentMapEvent.Type.REMOVE, key, value));
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800393 }
394 }
395
396 @Override
Jonathan Hartdb3af892015-01-26 13:19:07 -0800397 public void putAll(Map<? extends K, ? extends V> m) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800398 checkState(!destroyed, destroyedMessage);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800399 m.forEach(this::put);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800400 }
401
402 @Override
403 public void clear() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800404 checkState(!destroyed, destroyedMessage);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800405 items.forEach((key, value) -> remove(key));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800406 }
407
408 @Override
409 public Set<K> keySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800410 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800411 return items.keySet();
412 }
413
414 @Override
415 public Collection<V> values() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800416 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800417 return items.values().stream()
418 .map(Timestamped::value)
419 .collect(Collectors.toList());
420 }
421
422 @Override
423 public Set<Map.Entry<K, V>> entrySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800424 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800425
426 return items.entrySet().stream()
Jonathan Hartf9108232015-02-02 16:37:35 -0800427 .map(e -> Pair.of(e.getKey(), e.getValue().value()))
Jonathan Hartdb3af892015-01-26 13:19:07 -0800428 .collect(Collectors.toSet());
429 }
430
431 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800432 public void addListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800433 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800434
435 listeners.add(checkNotNull(listener));
436 }
437
438 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800439 public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800440 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800441
442 listeners.remove(checkNotNull(listener));
443 }
444
445 @Override
446 public void destroy() {
447 destroyed = true;
448
449 executor.shutdown();
450 backgroundExecutor.shutdown();
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800451 communicationExecutor.shutdown();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800452
Jonathan Hart584d2f32015-01-27 19:46:14 -0800453 listeners.clear();
454
Jonathan Hartdb3af892015-01-26 13:19:07 -0800455 clusterCommunicator.removeSubscriber(updateMessageSubject);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800456 clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800457 }
458
Jonathan Hartaaa56572015-01-28 21:56:35 -0800459 private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
460 for (EventuallyConsistentMapListener<K, V> listener : listeners) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800461 listener.event(event);
462 }
463 }
464
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800465 private void notifyPeers(PutEntry<K, V> event, Collection<NodeId> peers) {
466 queueUpdate(event, peers);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800467 }
468
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800469 private void notifyPeers(RemoveEntry<K, V> event, Collection<NodeId> peers) {
470 queueUpdate(event, peers);
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800471 }
472
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800473 private void queueUpdate(AbstractEntry<K, V> event, Collection<NodeId> peers) {
474 if (peers == null) {
475 // we have no friends :(
476 return;
477 }
478 peers.forEach(node ->
479 senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event)
480 );
481 }
482
483 private boolean unicastMessage(NodeId peer, MessageSubject subject, Object event) {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800484 ClusterMessage message = new ClusterMessage(
485 clusterService.getLocalNode().id(),
486 subject,
487 serializer.encode(event));
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800488 return clusterCommunicator.unicast(message, peer);
489 // Note: we had this flipped before...
490// communicationExecutor.execute(() -> clusterCommunicator.unicast(message, peer));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800491 }
492
Jonathan Hart233a18a2015-03-02 17:24:58 -0800493 private boolean underHighLoad() {
494 return counter.get(LOAD_WINDOW) > HIGH_LOAD_THRESHOLD;
495 }
496
Jonathan Hartaaa56572015-01-28 21:56:35 -0800497 private final class SendAdvertisementTask implements Runnable {
498 @Override
499 public void run() {
500 if (Thread.currentThread().isInterrupted()) {
501 log.info("Interrupted, quitting");
502 return;
503 }
504
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700505 if (underHighLoad() || destroyed) {
Jonathan Hart233a18a2015-03-02 17:24:58 -0800506 return;
507 }
508
Jonathan Hartaaa56572015-01-28 21:56:35 -0800509 try {
510 final NodeId self = clusterService.getLocalNode().id();
511 Set<ControllerNode> nodes = clusterService.getNodes();
512
513 List<NodeId> nodeIds = nodes.stream()
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800514 .map(ControllerNode::id)
Jonathan Hartaaa56572015-01-28 21:56:35 -0800515 .collect(Collectors.toList());
516
517 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
518 log.trace("No other peers in the cluster.");
519 return;
520 }
521
522 NodeId peer;
523 do {
524 int idx = RandomUtils.nextInt(0, nodeIds.size());
525 peer = nodeIds.get(idx);
526 } while (peer.equals(self));
527
528 if (Thread.currentThread().isInterrupted()) {
529 log.info("Interrupted, quitting");
530 return;
531 }
532
533 AntiEntropyAdvertisement<K> ad = createAdvertisement();
534
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800535 if (!unicastMessage(peer, antiEntropyAdvertisementSubject, ad)) {
536 log.debug("Failed to send anti-entropy advertisement to {}", peer);
537 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800538 } catch (Exception e) {
539 // Catch all exceptions to avoid scheduled task being suppressed.
540 log.error("Exception thrown while sending advertisement", e);
541 }
542 }
543 }
544
545 private AntiEntropyAdvertisement<K> createAdvertisement() {
546 final NodeId self = clusterService.getLocalNode().id();
547
548 Map<K, Timestamp> timestamps = new HashMap<>(items.size());
549
550 items.forEach((key, value) -> timestamps.put(key, value.timestamp()));
551
552 Map<K, Timestamp> tombstones = new HashMap<>(removedItems);
553
554 return new AntiEntropyAdvertisement<>(self, timestamps, tombstones);
555 }
556
557 private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
558 List<EventuallyConsistentMapEvent<K, V>> externalEvents;
559
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800560 externalEvents = antiEntropyCheckLocalItems(ad);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800561
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800562 antiEntropyCheckLocalRemoved(ad);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800563
Jonathan Hartf893be82015-02-24 17:35:51 -0800564 if (!lightweightAntiEntropy) {
565 externalEvents.addAll(antiEntropyCheckRemoteRemoved(ad));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800566
Jonathan Hartf893be82015-02-24 17:35:51 -0800567 // if remote ad has something unknown, actively sync
568 for (K key : ad.timestamps().keySet()) {
569 if (!items.containsKey(key)) {
570 // Send the advertisement back if this peer is out-of-sync
571 final NodeId sender = ad.sender();
572 AntiEntropyAdvertisement<K> myAd = createAdvertisement();
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800573 if (!unicastMessage(sender, antiEntropyAdvertisementSubject, myAd)) {
574 log.debug("Failed to send reactive anti-entropy advertisement to {}", sender);
575 }
Jonathan Hartf893be82015-02-24 17:35:51 -0800576 break;
577 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800578 }
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800579 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800580 externalEvents.forEach(this::notifyListeners);
581 }
582
583 /**
584 * Checks if any of the remote's live items or tombstones are out of date
585 * according to our local live item list, or if our live items are out of
586 * date according to the remote's tombstone list.
587 * If the local copy is more recent, it will be pushed to the remote. If the
588 * remote has a more recent remove, we apply that to the local state.
589 *
590 * @param ad remote anti-entropy advertisement
591 * @return list of external events relating to local operations performed
592 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800593 private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
594 AntiEntropyAdvertisement<K> ad) {
595 final List<EventuallyConsistentMapEvent<K, V>> externalEvents
596 = new LinkedList<>();
597 final NodeId sender = ad.sender();
598
Jonathan Hartaaa56572015-01-28 21:56:35 -0800599 for (Map.Entry<K, Timestamped<V>> item : items.entrySet()) {
600 K key = item.getKey();
601 Timestamped<V> localValue = item.getValue();
602
603 Timestamp remoteTimestamp = ad.timestamps().get(key);
604 if (remoteTimestamp == null) {
605 remoteTimestamp = ad.tombstones().get(key);
606 }
607 if (remoteTimestamp == null || localValue
Jonathan Hart403ea932015-02-20 16:23:00 -0800608 .isNewerThan(remoteTimestamp)) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800609 // local value is more recent, push to sender
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800610 queueUpdate(new PutEntry<>(key, localValue.value(),
611 localValue.timestamp()), ImmutableList.of(sender));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800612 }
613
614 Timestamp remoteDeadTimestamp = ad.tombstones().get(key);
615 if (remoteDeadTimestamp != null &&
Jonathan Hart403ea932015-02-20 16:23:00 -0800616 remoteDeadTimestamp.isNewerThan(localValue.timestamp())) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800617 // sender has a more recent remove
618 if (removeInternal(key, remoteDeadTimestamp)) {
619 externalEvents.add(new EventuallyConsistentMapEvent<>(
620 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
621 }
622 }
623 }
624
Jonathan Hartaaa56572015-01-28 21:56:35 -0800625 return externalEvents;
626 }
627
628 /**
629 * Checks if any items in the remote live list are out of date according
630 * to our tombstone list. If we find we have a more up to date tombstone,
631 * we'll send it to the remote.
632 *
633 * @param ad remote anti-entropy advertisement
634 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800635 private void antiEntropyCheckLocalRemoved(AntiEntropyAdvertisement<K> ad) {
636 final NodeId sender = ad.sender();
637
Jonathan Hartaaa56572015-01-28 21:56:35 -0800638 for (Map.Entry<K, Timestamp> dead : removedItems.entrySet()) {
639 K key = dead.getKey();
640 Timestamp localDeadTimestamp = dead.getValue();
641
642 Timestamp remoteLiveTimestamp = ad.timestamps().get(key);
643 if (remoteLiveTimestamp != null
Jonathan Hart403ea932015-02-20 16:23:00 -0800644 && localDeadTimestamp.isNewerThan(remoteLiveTimestamp)) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800645 // sender has zombie, push remove
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800646 queueUpdate(new RemoveEntry<>(key, localDeadTimestamp), ImmutableList.of(sender));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800647 }
648 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800649 }
650
651 /**
652 * Checks if any of the local live items are out of date according to the
653 * remote's tombstone advertisements. If we find a local item is out of date,
654 * we'll apply the remove operation to the local state.
655 *
656 * @param ad remote anti-entropy advertisement
657 * @return list of external events relating to local operations performed
658 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800659 private List<EventuallyConsistentMapEvent<K, V>>
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800660 antiEntropyCheckRemoteRemoved(AntiEntropyAdvertisement<K> ad) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800661 final List<EventuallyConsistentMapEvent<K, V>> externalEvents
662 = new LinkedList<>();
663
664 for (Map.Entry<K, Timestamp> remoteDead : ad.tombstones().entrySet()) {
665 K key = remoteDead.getKey();
666 Timestamp remoteDeadTimestamp = remoteDead.getValue();
667
668 Timestamped<V> local = items.get(key);
669 Timestamp localDead = removedItems.get(key);
Jonathan Hart403ea932015-02-20 16:23:00 -0800670 if (local != null && remoteDeadTimestamp.isNewerThan(
671 local.timestamp())) {
672 // If the remote has a more recent tombstone than either our local
673 // value, then do a remove with their timestamp
Jonathan Hartaaa56572015-01-28 21:56:35 -0800674 if (removeInternal(key, remoteDeadTimestamp)) {
675 externalEvents.add(new EventuallyConsistentMapEvent<>(
676 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
677 }
Jonathan Hart403ea932015-02-20 16:23:00 -0800678 } else if (localDead != null && remoteDeadTimestamp.isNewerThan(
679 localDead)) {
680 // If the remote has a more recent tombstone than us, update ours
681 // to their timestamp
682 removeInternal(key, remoteDeadTimestamp);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800683 }
684 }
685
686 return externalEvents;
687 }
688
689 private final class InternalAntiEntropyListener
690 implements ClusterMessageHandler {
691
692 @Override
693 public void handle(ClusterMessage message) {
Jonathan Hart4fd4ebb2015-02-04 17:38:48 -0800694 log.trace("Received anti-entropy advertisement from peer: {}",
695 message.sender());
Jonathan Hartaaa56572015-01-28 21:56:35 -0800696 AntiEntropyAdvertisement<K> advertisement = serializer.decode(message.payload());
Madan Jampani2af244a2015-02-22 13:12:01 -0800697 try {
Jonathan Hart233a18a2015-03-02 17:24:58 -0800698 if (!underHighLoad()) {
699 handleAntiEntropyAdvertisement(advertisement);
700 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800701 } catch (Exception e) {
702 log.warn("Exception thrown handling advertisements", e);
703 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800704 }
705 }
706
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700707 private final class InternalEventListener implements ClusterMessageHandler {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800708 @Override
709 public void handle(ClusterMessage message) {
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700710 if (destroyed) {
711 return;
712 }
713
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800714 log.debug("Received update event from peer: {}", message.sender());
715 Collection<AbstractEntry<K, V>> events = serializer.decode(message.payload());
Jonathan Hartdb3af892015-01-26 13:19:07 -0800716
Madan Jampani2af244a2015-02-22 13:12:01 -0800717 try {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800718 // TODO clean this for loop up
719 for (AbstractEntry<K, V> entry : events) {
720 final K key = entry.key();
721 final V value;
722 final Timestamp timestamp = entry.timestamp();
723 final EventuallyConsistentMapEvent.Type type;
724 if (entry instanceof PutEntry) {
725 PutEntry<K, V> putEntry = (PutEntry<K, V>) entry;
726 value = putEntry.value();
727 type = EventuallyConsistentMapEvent.Type.PUT;
728 } else if (entry instanceof RemoveEntry) {
729 type = EventuallyConsistentMapEvent.Type.REMOVE;
730 value = null;
731 } else {
732 throw new IllegalStateException("Unknown entry type " + entry.getClass());
733 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800734
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800735 boolean success;
736 switch (type) {
737 case PUT:
738 success = putInternal(key, value, timestamp);
739 break;
740 case REMOVE:
741 success = removeInternal(key, timestamp);
742 break;
743 default:
744 success = false;
745 }
746 if (success) {
747 notifyListeners(new EventuallyConsistentMapEvent<>(type, key, value));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800748 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800749 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800750 } catch (Exception e) {
751 log.warn("Exception thrown handling put", e);
752 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800753 }
754 }
755
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800756 // TODO pull this into the class if this gets pulled out...
757 private static final int DEFAULT_MAX_EVENTS = 1000;
758 private static final int DEFAULT_MAX_IDLE_MS = 10;
759 private static final int DEFAULT_MAX_BATCH_MS = 50;
760 private static final Timer TIMER = new Timer("onos-ecm-sender-events");
Jonathan Hartdb3af892015-01-26 13:19:07 -0800761
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800762 private final class EventAccumulator extends AbstractAccumulator<AbstractEntry<K, V>> {
763
764 private final NodeId peer;
765
766 private EventAccumulator(NodeId peer) {
767 super(TIMER, DEFAULT_MAX_EVENTS, DEFAULT_MAX_BATCH_MS, DEFAULT_MAX_IDLE_MS);
768 this.peer = peer;
769 }
770
771 @Override
772 public void processItems(List<AbstractEntry<K, V>> items) {
773 Map<K, AbstractEntry<K, V>> map = Maps.newHashMap();
774 items.forEach(item -> map.compute(item.key(), (key, oldValue) ->
775 oldValue == null || item.compareTo(oldValue) > 0 ? item : oldValue
776 )
777 );
778 communicationExecutor.submit(() -> {
779 try {
780 unicastMessage(peer, updateMessageSubject, Lists.newArrayList(map.values()));
781 } catch (Exception e) {
782 log.warn("broadcast error", e);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800783 }
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800784 });
Jonathan Hartdb3af892015-01-26 13:19:07 -0800785 }
786 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800787}