blob: 0487084f7511d173a90cf0d8929e55a68c3adee5 [file] [log] [blame]
Jonathan Hartdb3af892015-01-26 13:19:07 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Jonathan Hart77bdd262015-02-03 09:07:48 -080016package org.onosproject.store.ecmap;
Jonathan Hartdb3af892015-01-26 13:19:07 -080017
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080018import com.google.common.collect.ImmutableList;
19import com.google.common.collect.Lists;
20import com.google.common.collect.Maps;
Jonathan Hartaaa56572015-01-28 21:56:35 -080021import org.apache.commons.lang3.RandomUtils;
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -080022import org.apache.commons.lang3.mutable.MutableBoolean;
Jonathan Hartf9108232015-02-02 16:37:35 -080023import org.apache.commons.lang3.tuple.Pair;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080024import org.onlab.util.AbstractAccumulator;
Jonathan Hartdb3af892015-01-26 13:19:07 -080025import org.onlab.util.KryoNamespace;
Jonathan Hart233a18a2015-03-02 17:24:58 -080026import org.onlab.util.SlidingWindowCounter;
Jonathan Hartdb3af892015-01-26 13:19:07 -080027import org.onosproject.cluster.ClusterService;
Jonathan Hartaaa56572015-01-28 21:56:35 -080028import org.onosproject.cluster.ControllerNode;
Jonathan Hartdb3af892015-01-26 13:19:07 -080029import org.onosproject.cluster.NodeId;
30import org.onosproject.store.Timestamp;
31import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
32import org.onosproject.store.cluster.messaging.ClusterMessage;
33import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
34import org.onosproject.store.cluster.messaging.MessageSubject;
Jonathan Hart77bdd262015-02-03 09:07:48 -080035import org.onosproject.store.impl.Timestamped;
36import org.onosproject.store.impl.WallClockTimestamp;
Jonathan Hartdb3af892015-01-26 13:19:07 -080037import org.onosproject.store.serializers.KryoSerializer;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070038import org.onosproject.store.service.ClockService;
39import org.onosproject.store.service.EventuallyConsistentMap;
40import org.onosproject.store.service.EventuallyConsistentMapEvent;
41import org.onosproject.store.service.EventuallyConsistentMapListener;
Jonathan Hartdb3af892015-01-26 13:19:07 -080042import org.slf4j.Logger;
43import org.slf4j.LoggerFactory;
44
Jonathan Hartdb3af892015-01-26 13:19:07 -080045import java.util.ArrayList;
46import java.util.Collection;
Jonathan Hartaaa56572015-01-28 21:56:35 -080047import java.util.HashMap;
48import java.util.LinkedList;
Jonathan Hartdb3af892015-01-26 13:19:07 -080049import java.util.List;
50import java.util.Map;
51import java.util.Set;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080052import java.util.Timer;
Jonathan Hartdb3af892015-01-26 13:19:07 -080053import java.util.concurrent.ConcurrentHashMap;
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -080054import java.util.concurrent.ConcurrentMap;
Jonathan Hartdb3af892015-01-26 13:19:07 -080055import java.util.concurrent.CopyOnWriteArraySet;
56import java.util.concurrent.ExecutorService;
57import java.util.concurrent.Executors;
58import java.util.concurrent.ScheduledExecutorService;
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -080059import java.util.concurrent.TimeUnit;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080060import java.util.function.BiFunction;
Jonathan Hartdb3af892015-01-26 13:19:07 -080061import java.util.stream.Collectors;
62
63import static com.google.common.base.Preconditions.checkNotNull;
64import static com.google.common.base.Preconditions.checkState;
65import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Brian O'Connorc6713a82015-02-24 11:55:48 -080066import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
Brian O'Connor4b2ba5f2015-02-18 20:54:00 -080067import static org.onlab.util.Tools.groupedThreads;
Jonathan Hartdb3af892015-01-26 13:19:07 -080068
69/**
70 * Distributed Map implementation which uses optimistic replication and gossip
71 * based techniques to provide an eventually consistent data store.
72 */
73public class EventuallyConsistentMapImpl<K, V>
74 implements EventuallyConsistentMap<K, V> {
75
76 private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
77
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -080078 private final ConcurrentMap<K, Timestamped<V>> items;
79 private final ConcurrentMap<K, Timestamp> removedItems;
Jonathan Hartdb3af892015-01-26 13:19:07 -080080
Jonathan Hartdb3af892015-01-26 13:19:07 -080081 private final ClusterService clusterService;
82 private final ClusterCommunicationService clusterCommunicator;
83 private final KryoSerializer serializer;
84
Jonathan Hart5ec32ba2015-02-05 13:33:58 -080085 private final ClockService<K, V> clockService;
Jonathan Hartdb3af892015-01-26 13:19:07 -080086
87 private final MessageSubject updateMessageSubject;
Jonathan Hartaaa56572015-01-28 21:56:35 -080088 private final MessageSubject antiEntropyAdvertisementSubject;
Jonathan Hartdb3af892015-01-26 13:19:07 -080089
Jonathan Hartaaa56572015-01-28 21:56:35 -080090 private final Set<EventuallyConsistentMapListener<K, V>> listeners
Jonathan Hartdb3af892015-01-26 13:19:07 -080091 = new CopyOnWriteArraySet<>();
92
93 private final ExecutorService executor;
94
95 private final ScheduledExecutorService backgroundExecutor;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080096 private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
Jonathan Hartdb3af892015-01-26 13:19:07 -080097
Jonathan Hart6ec029a2015-03-24 17:12:35 -070098 private final ExecutorService communicationExecutor;
99 private final Map<NodeId, EventAccumulator> senderPending;
Madan Jampani28726282015-02-19 11:40:23 -0800100
Jonathan Hartdb3af892015-01-26 13:19:07 -0800101 private volatile boolean destroyed = false;
Jonathan Hart539a6462015-01-27 17:05:43 -0800102 private static final String ERROR_DESTROYED = " map is already destroyed";
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800103 private final String destroyedMessage;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800104
Jonathan Hart4f397e82015-02-04 09:10:41 -0800105 private static final String ERROR_NULL_KEY = "Key cannot be null";
106 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
107
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700108 private final long initialDelaySec = 5;
109 private final boolean lightweightAntiEntropy;
110 private final boolean tombstonesDisabled;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800111
Jonathan Hart233a18a2015-03-02 17:24:58 -0800112 private static final int WINDOW_SIZE = 5;
113 private static final int HIGH_LOAD_THRESHOLD = 0;
114 private static final int LOAD_WINDOW = 2;
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700115 private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
Jonathan Hart233a18a2015-03-02 17:24:58 -0800116
Jonathan Hartdb3af892015-01-26 13:19:07 -0800117 /**
118 * Creates a new eventually consistent map shared amongst multiple instances.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800119 * <p>
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700120 * See {@link org.onosproject.store.service.EventuallyConsistentMapBuilder}
121 * for more description of the parameters expected by the map.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800122 * </p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800123 *
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700124 * @param mapName a String identifier for the map.
125 * @param clusterService the cluster service
126 * @param clusterCommunicator the cluster communications service
127 * @param serializerBuilder a Kryo namespace builder that can serialize
128 * both K and V
129 * @param clockService a clock service able to generate timestamps
130 * for K and V
131 * @param peerUpdateFunction function that provides a set of nodes to immediately
132 * update to when there writes to the map
133 * @param eventExecutor executor to use for processing incoming
134 * events from peers
135 * @param communicationExecutor executor to use for sending events to peers
136 * @param backgroundExecutor executor to use for background anti-entropy
137 * tasks
138 * @param tombstonesDisabled true if this map should not maintain
139 * tombstones
140 * @param antiEntropyPeriod period that the anti-entropy task should run
141 * in seconds
Thomas Vachuska00121ed2015-04-02 17:07:57 -0700142 * @param antiEntropyTimeUnit time unit for anti-entropy task scheduling
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700143 * @param convergeFaster make anti-entropy try to converge faster
Jonathan Hartdb3af892015-01-26 13:19:07 -0800144 */
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700145 EventuallyConsistentMapImpl(String mapName,
146 ClusterService clusterService,
147 ClusterCommunicationService clusterCommunicator,
148 KryoNamespace.Builder serializerBuilder,
149 ClockService<K, V> clockService,
150 BiFunction<K, V, Collection<NodeId>> peerUpdateFunction,
151 ExecutorService eventExecutor,
152 ExecutorService communicationExecutor,
153 ScheduledExecutorService backgroundExecutor,
154 boolean tombstonesDisabled,
155 long antiEntropyPeriod,
156 TimeUnit antiEntropyTimeUnit,
157 boolean convergeFaster) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800158 items = new ConcurrentHashMap<>();
159 removedItems = new ConcurrentHashMap<>();
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800160 senderPending = Maps.newConcurrentMap();
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700161 destroyedMessage = mapName + ERROR_DESTROYED;
Madan Jampani28726282015-02-19 11:40:23 -0800162
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700163 this.clusterService = clusterService;
164 this.clusterCommunicator = clusterCommunicator;
165
166 this.serializer = createSerializer(serializerBuilder);
167
168 this.clockService = clockService;
169
170 if (peerUpdateFunction != null) {
171 this.peerUpdateFunction = peerUpdateFunction;
172 } else {
173 this.peerUpdateFunction = (key, value) -> clusterService.getNodes().stream()
174 .map(ControllerNode::id)
175 .filter(nodeId -> !nodeId.equals(clusterService.getLocalNode().id()))
176 .collect(Collectors.toList());
177 }
178
179 if (eventExecutor != null) {
180 this.executor = eventExecutor;
181 } else {
182 // should be a normal executor; it's used for receiving messages
183 this.executor =
184 Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d"));
185 }
186
187 if (communicationExecutor != null) {
188 this.communicationExecutor = communicationExecutor;
189 } else {
190 // sending executor; should be capped
191 //TODO this probably doesn't need to be bounded anymore
192 this.communicationExecutor =
193 newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
194 }
195
196 if (backgroundExecutor != null) {
197 this.backgroundExecutor = backgroundExecutor;
198 } else {
199 this.backgroundExecutor =
200 newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d"));
201 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800202
Jonathan Hartaaa56572015-01-28 21:56:35 -0800203 // start anti-entropy thread
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700204 this.backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
205 initialDelaySec, antiEntropyPeriod,
206 antiEntropyTimeUnit);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800207
Jonathan Hartdb3af892015-01-26 13:19:07 -0800208 updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
209 clusterCommunicator.addSubscriber(updateMessageSubject,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700210 new InternalEventListener(), this.executor);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800211
Jonathan Hartaaa56572015-01-28 21:56:35 -0800212 antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
213 clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700214 new InternalAntiEntropyListener(), this.backgroundExecutor);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800215
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700216 this.tombstonesDisabled = tombstonesDisabled;
217 this.lightweightAntiEntropy = !convergeFaster;
Madan Jampanie1356282015-03-10 19:05:36 -0700218 }
219
Jonathan Hartdb3af892015-01-26 13:19:07 -0800220 private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
221 return new KryoSerializer() {
222 @Override
223 protected void setupKryoPool() {
224 // Add the map's internal helper classes to the user-supplied serializer
225 serializerPool = builder
226 .register(WallClockTimestamp.class)
227 .register(PutEntry.class)
Jonathan Hart539a6462015-01-27 17:05:43 -0800228 .register(RemoveEntry.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800229 .register(ArrayList.class)
Jonathan Hartaaa56572015-01-28 21:56:35 -0800230 .register(AntiEntropyAdvertisement.class)
231 .register(HashMap.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800232 .build();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800233 }
234 };
235 }
236
237 @Override
238 public int size() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800239 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800240 return items.size();
241 }
242
243 @Override
244 public boolean isEmpty() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800245 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800246 return items.isEmpty();
247 }
248
249 @Override
250 public boolean containsKey(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800251 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800252 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800253 return items.containsKey(key);
254 }
255
256 @Override
257 public boolean containsValue(V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800258 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800259 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800260
261 return items.values().stream()
262 .anyMatch(timestamped -> timestamped.value().equals(value));
263 }
264
265 @Override
266 public V get(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800267 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800268 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800269
270 Timestamped<V> value = items.get(key);
271 if (value != null) {
272 return value.value();
273 }
274 return null;
275 }
276
277 @Override
278 public void put(K key, V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800279 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800280 checkNotNull(key, ERROR_NULL_KEY);
281 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800282
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800283 Timestamp timestamp = clockService.getTimestamp(key, value);
284
Jonathan Hartdb3af892015-01-26 13:19:07 -0800285 if (putInternal(key, value, timestamp)) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800286 notifyPeers(new PutEntry<>(key, value, timestamp),
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800287 peerUpdateFunction.apply(key, value));
288 notifyListeners(new EventuallyConsistentMapEvent<>(
289 EventuallyConsistentMapEvent.Type.PUT, key, value));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800290 }
291 }
292
293 private boolean putInternal(K key, V value, Timestamp timestamp) {
Jonathan Hart233a18a2015-03-02 17:24:58 -0800294 counter.incrementCount();
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800295 Timestamp removed = removedItems.get(key);
Jonathan Hart403ea932015-02-20 16:23:00 -0800296 if (removed != null && removed.isNewerThan(timestamp)) {
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800297 log.debug("ecmap - removed was newer {}", value);
298 return false;
299 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800300
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800301 final MutableBoolean updated = new MutableBoolean(false);
302
303 items.compute(key, (k, existing) -> {
Jonathan Hart403ea932015-02-20 16:23:00 -0800304 if (existing != null && existing.isNewerThan(timestamp)) {
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800305 updated.setFalse();
306 return existing;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800307 } else {
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800308 updated.setTrue();
309 return new Timestamped<>(value, timestamp);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800310 }
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800311 });
312
313 boolean success = updated.booleanValue();
314 if (!success) {
315 log.debug("ecmap - existing was newer {}", value);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800316 }
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800317
318 if (success && removed != null) {
319 removedItems.remove(key, removed);
320 }
321 return success;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800322 }
323
324 @Override
325 public void remove(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800326 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800327 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800328
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800329 // TODO prevent calls here if value is important for timestamp
330 Timestamp timestamp = clockService.getTimestamp(key, null);
331
Jonathan Hartdb3af892015-01-26 13:19:07 -0800332 if (removeInternal(key, timestamp)) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800333 notifyPeers(new RemoveEntry<>(key, timestamp),
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800334 peerUpdateFunction.apply(key, null));
335 notifyListeners(new EventuallyConsistentMapEvent<>(
336 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800337 }
338 }
339
340 private boolean removeInternal(K key, Timestamp timestamp) {
Madan Jampani54d34992015-03-06 17:27:52 -0800341 if (timestamp == null) {
342 return false;
343 }
344
Jonathan Hart233a18a2015-03-02 17:24:58 -0800345 counter.incrementCount();
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800346 final MutableBoolean updated = new MutableBoolean(false);
347
348 items.compute(key, (k, existing) -> {
349 if (existing != null && existing.isNewerThan(timestamp)) {
350 updated.setFalse();
351 return existing;
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800352 } else {
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800353 updated.setTrue();
354 // remove from items map
355 return null;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800356 }
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800357 });
358
359 if (updated.isFalse()) {
360 return false;
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800361 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800362
Madan Jampanie1356282015-03-10 19:05:36 -0700363 if (!tombstonesDisabled) {
364 Timestamp removedTimestamp = removedItems.get(key);
365 if (removedTimestamp == null) {
366 return removedItems.putIfAbsent(key, timestamp) == null;
367 } else if (timestamp.isNewerThan(removedTimestamp)) {
368 return removedItems.replace(key, removedTimestamp, timestamp);
369 } else {
370 return false;
371 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800372 }
Madan Jampanie1356282015-03-10 19:05:36 -0700373
374 return updated.booleanValue();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800375 }
376
377 @Override
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800378 public void remove(K key, V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800379 checkState(!destroyed, destroyedMessage);
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800380 checkNotNull(key, ERROR_NULL_KEY);
381 checkNotNull(value, ERROR_NULL_VALUE);
382
383 Timestamp timestamp = clockService.getTimestamp(key, value);
384
385 if (removeInternal(key, timestamp)) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800386 notifyPeers(new RemoveEntry<>(key, timestamp),
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800387 peerUpdateFunction.apply(key, value));
388 notifyListeners(new EventuallyConsistentMapEvent<>(
389 EventuallyConsistentMapEvent.Type.REMOVE, key, value));
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800390 }
391 }
392
393 @Override
Jonathan Hartdb3af892015-01-26 13:19:07 -0800394 public void putAll(Map<? extends K, ? extends V> m) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800395 checkState(!destroyed, destroyedMessage);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800396 m.forEach(this::put);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800397 }
398
399 @Override
400 public void clear() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800401 checkState(!destroyed, destroyedMessage);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800402 items.forEach((key, value) -> remove(key));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800403 }
404
405 @Override
406 public Set<K> keySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800407 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800408 return items.keySet();
409 }
410
411 @Override
412 public Collection<V> values() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800413 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800414 return items.values().stream()
415 .map(Timestamped::value)
416 .collect(Collectors.toList());
417 }
418
419 @Override
420 public Set<Map.Entry<K, V>> entrySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800421 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800422
423 return items.entrySet().stream()
Jonathan Hartf9108232015-02-02 16:37:35 -0800424 .map(e -> Pair.of(e.getKey(), e.getValue().value()))
Jonathan Hartdb3af892015-01-26 13:19:07 -0800425 .collect(Collectors.toSet());
426 }
427
428 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800429 public void addListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800430 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800431
432 listeners.add(checkNotNull(listener));
433 }
434
435 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800436 public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800437 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800438
439 listeners.remove(checkNotNull(listener));
440 }
441
442 @Override
443 public void destroy() {
444 destroyed = true;
445
446 executor.shutdown();
447 backgroundExecutor.shutdown();
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800448 communicationExecutor.shutdown();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800449
Jonathan Hart584d2f32015-01-27 19:46:14 -0800450 listeners.clear();
451
Jonathan Hartdb3af892015-01-26 13:19:07 -0800452 clusterCommunicator.removeSubscriber(updateMessageSubject);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800453 clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800454 }
455
Jonathan Hartaaa56572015-01-28 21:56:35 -0800456 private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
457 for (EventuallyConsistentMapListener<K, V> listener : listeners) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800458 listener.event(event);
459 }
460 }
461
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800462 private void notifyPeers(PutEntry<K, V> event, Collection<NodeId> peers) {
463 queueUpdate(event, peers);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800464 }
465
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800466 private void notifyPeers(RemoveEntry<K, V> event, Collection<NodeId> peers) {
467 queueUpdate(event, peers);
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800468 }
469
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800470 private void queueUpdate(AbstractEntry<K, V> event, Collection<NodeId> peers) {
471 if (peers == null) {
472 // we have no friends :(
473 return;
474 }
475 peers.forEach(node ->
476 senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event)
477 );
478 }
479
480 private boolean unicastMessage(NodeId peer, MessageSubject subject, Object event) {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800481 ClusterMessage message = new ClusterMessage(
482 clusterService.getLocalNode().id(),
483 subject,
484 serializer.encode(event));
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800485 return clusterCommunicator.unicast(message, peer);
486 // Note: we had this flipped before...
487// communicationExecutor.execute(() -> clusterCommunicator.unicast(message, peer));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800488 }
489
Jonathan Hart233a18a2015-03-02 17:24:58 -0800490 private boolean underHighLoad() {
491 return counter.get(LOAD_WINDOW) > HIGH_LOAD_THRESHOLD;
492 }
493
Jonathan Hartaaa56572015-01-28 21:56:35 -0800494 private final class SendAdvertisementTask implements Runnable {
495 @Override
496 public void run() {
497 if (Thread.currentThread().isInterrupted()) {
498 log.info("Interrupted, quitting");
499 return;
500 }
501
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700502 if (underHighLoad() || destroyed) {
Jonathan Hart233a18a2015-03-02 17:24:58 -0800503 return;
504 }
505
Jonathan Hartaaa56572015-01-28 21:56:35 -0800506 try {
507 final NodeId self = clusterService.getLocalNode().id();
508 Set<ControllerNode> nodes = clusterService.getNodes();
509
510 List<NodeId> nodeIds = nodes.stream()
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800511 .map(ControllerNode::id)
Jonathan Hartaaa56572015-01-28 21:56:35 -0800512 .collect(Collectors.toList());
513
514 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
515 log.trace("No other peers in the cluster.");
516 return;
517 }
518
519 NodeId peer;
520 do {
521 int idx = RandomUtils.nextInt(0, nodeIds.size());
522 peer = nodeIds.get(idx);
523 } while (peer.equals(self));
524
525 if (Thread.currentThread().isInterrupted()) {
526 log.info("Interrupted, quitting");
527 return;
528 }
529
530 AntiEntropyAdvertisement<K> ad = createAdvertisement();
531
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800532 if (!unicastMessage(peer, antiEntropyAdvertisementSubject, ad)) {
533 log.debug("Failed to send anti-entropy advertisement to {}", peer);
534 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800535 } catch (Exception e) {
536 // Catch all exceptions to avoid scheduled task being suppressed.
537 log.error("Exception thrown while sending advertisement", e);
538 }
539 }
540 }
541
542 private AntiEntropyAdvertisement<K> createAdvertisement() {
543 final NodeId self = clusterService.getLocalNode().id();
544
545 Map<K, Timestamp> timestamps = new HashMap<>(items.size());
546
547 items.forEach((key, value) -> timestamps.put(key, value.timestamp()));
548
549 Map<K, Timestamp> tombstones = new HashMap<>(removedItems);
550
551 return new AntiEntropyAdvertisement<>(self, timestamps, tombstones);
552 }
553
554 private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
555 List<EventuallyConsistentMapEvent<K, V>> externalEvents;
556
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800557 externalEvents = antiEntropyCheckLocalItems(ad);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800558
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800559 antiEntropyCheckLocalRemoved(ad);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800560
Jonathan Hartf893be82015-02-24 17:35:51 -0800561 if (!lightweightAntiEntropy) {
562 externalEvents.addAll(antiEntropyCheckRemoteRemoved(ad));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800563
Jonathan Hartf893be82015-02-24 17:35:51 -0800564 // if remote ad has something unknown, actively sync
565 for (K key : ad.timestamps().keySet()) {
566 if (!items.containsKey(key)) {
567 // Send the advertisement back if this peer is out-of-sync
568 final NodeId sender = ad.sender();
569 AntiEntropyAdvertisement<K> myAd = createAdvertisement();
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800570 if (!unicastMessage(sender, antiEntropyAdvertisementSubject, myAd)) {
571 log.debug("Failed to send reactive anti-entropy advertisement to {}", sender);
572 }
Jonathan Hartf893be82015-02-24 17:35:51 -0800573 break;
574 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800575 }
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800576 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800577 externalEvents.forEach(this::notifyListeners);
578 }
579
580 /**
581 * Checks if any of the remote's live items or tombstones are out of date
582 * according to our local live item list, or if our live items are out of
583 * date according to the remote's tombstone list.
584 * If the local copy is more recent, it will be pushed to the remote. If the
585 * remote has a more recent remove, we apply that to the local state.
586 *
587 * @param ad remote anti-entropy advertisement
588 * @return list of external events relating to local operations performed
589 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800590 private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
591 AntiEntropyAdvertisement<K> ad) {
592 final List<EventuallyConsistentMapEvent<K, V>> externalEvents
593 = new LinkedList<>();
594 final NodeId sender = ad.sender();
595
Jonathan Hartaaa56572015-01-28 21:56:35 -0800596 for (Map.Entry<K, Timestamped<V>> item : items.entrySet()) {
597 K key = item.getKey();
598 Timestamped<V> localValue = item.getValue();
599
600 Timestamp remoteTimestamp = ad.timestamps().get(key);
601 if (remoteTimestamp == null) {
602 remoteTimestamp = ad.tombstones().get(key);
603 }
604 if (remoteTimestamp == null || localValue
Jonathan Hart403ea932015-02-20 16:23:00 -0800605 .isNewerThan(remoteTimestamp)) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800606 // local value is more recent, push to sender
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800607 queueUpdate(new PutEntry<>(key, localValue.value(),
608 localValue.timestamp()), ImmutableList.of(sender));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800609 }
610
611 Timestamp remoteDeadTimestamp = ad.tombstones().get(key);
612 if (remoteDeadTimestamp != null &&
Jonathan Hart403ea932015-02-20 16:23:00 -0800613 remoteDeadTimestamp.isNewerThan(localValue.timestamp())) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800614 // sender has a more recent remove
615 if (removeInternal(key, remoteDeadTimestamp)) {
616 externalEvents.add(new EventuallyConsistentMapEvent<>(
617 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
618 }
619 }
620 }
621
Jonathan Hartaaa56572015-01-28 21:56:35 -0800622 return externalEvents;
623 }
624
625 /**
626 * Checks if any items in the remote live list are out of date according
627 * to our tombstone list. If we find we have a more up to date tombstone,
628 * we'll send it to the remote.
629 *
630 * @param ad remote anti-entropy advertisement
631 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800632 private void antiEntropyCheckLocalRemoved(AntiEntropyAdvertisement<K> ad) {
633 final NodeId sender = ad.sender();
634
Jonathan Hartaaa56572015-01-28 21:56:35 -0800635 for (Map.Entry<K, Timestamp> dead : removedItems.entrySet()) {
636 K key = dead.getKey();
637 Timestamp localDeadTimestamp = dead.getValue();
638
639 Timestamp remoteLiveTimestamp = ad.timestamps().get(key);
640 if (remoteLiveTimestamp != null
Jonathan Hart403ea932015-02-20 16:23:00 -0800641 && localDeadTimestamp.isNewerThan(remoteLiveTimestamp)) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800642 // sender has zombie, push remove
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800643 queueUpdate(new RemoveEntry<>(key, localDeadTimestamp), ImmutableList.of(sender));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800644 }
645 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800646 }
647
648 /**
649 * Checks if any of the local live items are out of date according to the
650 * remote's tombstone advertisements. If we find a local item is out of date,
651 * we'll apply the remove operation to the local state.
652 *
653 * @param ad remote anti-entropy advertisement
654 * @return list of external events relating to local operations performed
655 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800656 private List<EventuallyConsistentMapEvent<K, V>>
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800657 antiEntropyCheckRemoteRemoved(AntiEntropyAdvertisement<K> ad) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800658 final List<EventuallyConsistentMapEvent<K, V>> externalEvents
659 = new LinkedList<>();
660
661 for (Map.Entry<K, Timestamp> remoteDead : ad.tombstones().entrySet()) {
662 K key = remoteDead.getKey();
663 Timestamp remoteDeadTimestamp = remoteDead.getValue();
664
665 Timestamped<V> local = items.get(key);
666 Timestamp localDead = removedItems.get(key);
Jonathan Hart403ea932015-02-20 16:23:00 -0800667 if (local != null && remoteDeadTimestamp.isNewerThan(
668 local.timestamp())) {
669 // If the remote has a more recent tombstone than either our local
670 // value, then do a remove with their timestamp
Jonathan Hartaaa56572015-01-28 21:56:35 -0800671 if (removeInternal(key, remoteDeadTimestamp)) {
672 externalEvents.add(new EventuallyConsistentMapEvent<>(
673 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
674 }
Jonathan Hart403ea932015-02-20 16:23:00 -0800675 } else if (localDead != null && remoteDeadTimestamp.isNewerThan(
676 localDead)) {
677 // If the remote has a more recent tombstone than us, update ours
678 // to their timestamp
679 removeInternal(key, remoteDeadTimestamp);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800680 }
681 }
682
683 return externalEvents;
684 }
685
686 private final class InternalAntiEntropyListener
687 implements ClusterMessageHandler {
688
689 @Override
690 public void handle(ClusterMessage message) {
Jonathan Hart4fd4ebb2015-02-04 17:38:48 -0800691 log.trace("Received anti-entropy advertisement from peer: {}",
692 message.sender());
Jonathan Hartaaa56572015-01-28 21:56:35 -0800693 AntiEntropyAdvertisement<K> advertisement = serializer.decode(message.payload());
Madan Jampani2af244a2015-02-22 13:12:01 -0800694 try {
Jonathan Hart233a18a2015-03-02 17:24:58 -0800695 if (!underHighLoad()) {
696 handleAntiEntropyAdvertisement(advertisement);
697 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800698 } catch (Exception e) {
699 log.warn("Exception thrown handling advertisements", e);
700 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800701 }
702 }
703
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700704 private final class InternalEventListener implements ClusterMessageHandler {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800705 @Override
706 public void handle(ClusterMessage message) {
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700707 if (destroyed) {
708 return;
709 }
710
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800711 log.debug("Received update event from peer: {}", message.sender());
712 Collection<AbstractEntry<K, V>> events = serializer.decode(message.payload());
Jonathan Hartdb3af892015-01-26 13:19:07 -0800713
Madan Jampani2af244a2015-02-22 13:12:01 -0800714 try {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800715 // TODO clean this for loop up
716 for (AbstractEntry<K, V> entry : events) {
717 final K key = entry.key();
718 final V value;
719 final Timestamp timestamp = entry.timestamp();
720 final EventuallyConsistentMapEvent.Type type;
721 if (entry instanceof PutEntry) {
722 PutEntry<K, V> putEntry = (PutEntry<K, V>) entry;
723 value = putEntry.value();
724 type = EventuallyConsistentMapEvent.Type.PUT;
725 } else if (entry instanceof RemoveEntry) {
726 type = EventuallyConsistentMapEvent.Type.REMOVE;
727 value = null;
728 } else {
729 throw new IllegalStateException("Unknown entry type " + entry.getClass());
730 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800731
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800732 boolean success;
733 switch (type) {
734 case PUT:
735 success = putInternal(key, value, timestamp);
736 break;
737 case REMOVE:
738 success = removeInternal(key, timestamp);
739 break;
740 default:
741 success = false;
742 }
743 if (success) {
744 notifyListeners(new EventuallyConsistentMapEvent<>(type, key, value));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800745 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800746 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800747 } catch (Exception e) {
748 log.warn("Exception thrown handling put", e);
749 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800750 }
751 }
752
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800753 // TODO pull this into the class if this gets pulled out...
754 private static final int DEFAULT_MAX_EVENTS = 1000;
755 private static final int DEFAULT_MAX_IDLE_MS = 10;
756 private static final int DEFAULT_MAX_BATCH_MS = 50;
757 private static final Timer TIMER = new Timer("onos-ecm-sender-events");
Jonathan Hartdb3af892015-01-26 13:19:07 -0800758
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800759 private final class EventAccumulator extends AbstractAccumulator<AbstractEntry<K, V>> {
760
761 private final NodeId peer;
762
763 private EventAccumulator(NodeId peer) {
764 super(TIMER, DEFAULT_MAX_EVENTS, DEFAULT_MAX_BATCH_MS, DEFAULT_MAX_IDLE_MS);
765 this.peer = peer;
766 }
767
768 @Override
769 public void processItems(List<AbstractEntry<K, V>> items) {
770 Map<K, AbstractEntry<K, V>> map = Maps.newHashMap();
771 items.forEach(item -> map.compute(item.key(), (key, oldValue) ->
772 oldValue == null || item.compareTo(oldValue) > 0 ? item : oldValue
773 )
774 );
775 communicationExecutor.submit(() -> {
776 try {
777 unicastMessage(peer, updateMessageSubject, Lists.newArrayList(map.values()));
778 } catch (Exception e) {
779 log.warn("broadcast error", e);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800780 }
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800781 });
Jonathan Hartdb3af892015-01-26 13:19:07 -0800782 }
783 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800784}