blob: c2c46fcd8baa72fbeda998874db31e2430ad31b3 [file] [log] [blame]
Jonathan Hartdb3af892015-01-26 13:19:07 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Jonathan Hart77bdd262015-02-03 09:07:48 -080016package org.onosproject.store.ecmap;
Jonathan Hartdb3af892015-01-26 13:19:07 -080017
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080018import com.google.common.collect.ImmutableList;
19import com.google.common.collect.Lists;
20import com.google.common.collect.Maps;
Jonathan Hartaaa56572015-01-28 21:56:35 -080021import org.apache.commons.lang3.RandomUtils;
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -080022import org.apache.commons.lang3.mutable.MutableBoolean;
Jonathan Hartf9108232015-02-02 16:37:35 -080023import org.apache.commons.lang3.tuple.Pair;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080024import org.onlab.util.AbstractAccumulator;
Jonathan Hartdb3af892015-01-26 13:19:07 -080025import org.onlab.util.KryoNamespace;
Jonathan Hart233a18a2015-03-02 17:24:58 -080026import org.onlab.util.SlidingWindowCounter;
Jonathan Hartdb3af892015-01-26 13:19:07 -080027import org.onosproject.cluster.ClusterService;
Jonathan Hartaaa56572015-01-28 21:56:35 -080028import org.onosproject.cluster.ControllerNode;
Jonathan Hartdb3af892015-01-26 13:19:07 -080029import org.onosproject.cluster.NodeId;
30import org.onosproject.store.Timestamp;
31import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
32import org.onosproject.store.cluster.messaging.ClusterMessage;
33import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
34import org.onosproject.store.cluster.messaging.MessageSubject;
Jonathan Hart77bdd262015-02-03 09:07:48 -080035import org.onosproject.store.impl.ClockService;
36import org.onosproject.store.impl.Timestamped;
37import org.onosproject.store.impl.WallClockTimestamp;
Jonathan Hartdb3af892015-01-26 13:19:07 -080038import org.onosproject.store.serializers.KryoSerializer;
39import org.slf4j.Logger;
40import org.slf4j.LoggerFactory;
41
Jonathan Hartdb3af892015-01-26 13:19:07 -080042import java.util.ArrayList;
43import java.util.Collection;
Jonathan Hartaaa56572015-01-28 21:56:35 -080044import java.util.HashMap;
45import java.util.LinkedList;
Jonathan Hartdb3af892015-01-26 13:19:07 -080046import java.util.List;
47import java.util.Map;
48import java.util.Set;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080049import java.util.Timer;
Jonathan Hartdb3af892015-01-26 13:19:07 -080050import java.util.concurrent.ConcurrentHashMap;
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -080051import java.util.concurrent.ConcurrentMap;
Jonathan Hartdb3af892015-01-26 13:19:07 -080052import java.util.concurrent.CopyOnWriteArraySet;
53import java.util.concurrent.ExecutorService;
54import java.util.concurrent.Executors;
55import java.util.concurrent.ScheduledExecutorService;
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -080056import java.util.concurrent.TimeUnit;
Jonathan Hart233a18a2015-03-02 17:24:58 -080057import java.util.concurrent.atomic.AtomicLong;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080058import java.util.function.BiFunction;
Jonathan Hartdb3af892015-01-26 13:19:07 -080059import java.util.stream.Collectors;
60
61import static com.google.common.base.Preconditions.checkNotNull;
62import static com.google.common.base.Preconditions.checkState;
63import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Brian O'Connorc6713a82015-02-24 11:55:48 -080064import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
Brian O'Connor4b2ba5f2015-02-18 20:54:00 -080065import static org.onlab.util.Tools.groupedThreads;
Jonathan Hartdb3af892015-01-26 13:19:07 -080066
67/**
68 * Distributed Map implementation which uses optimistic replication and gossip
69 * based techniques to provide an eventually consistent data store.
70 */
71public class EventuallyConsistentMapImpl<K, V>
72 implements EventuallyConsistentMap<K, V> {
73
74 private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
75
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -080076 private final ConcurrentMap<K, Timestamped<V>> items;
77 private final ConcurrentMap<K, Timestamp> removedItems;
Jonathan Hartdb3af892015-01-26 13:19:07 -080078
Jonathan Hartdb3af892015-01-26 13:19:07 -080079 private final ClusterService clusterService;
80 private final ClusterCommunicationService clusterCommunicator;
81 private final KryoSerializer serializer;
82
Jonathan Hart5ec32ba2015-02-05 13:33:58 -080083 private final ClockService<K, V> clockService;
Jonathan Hartdb3af892015-01-26 13:19:07 -080084
85 private final MessageSubject updateMessageSubject;
Jonathan Hartaaa56572015-01-28 21:56:35 -080086 private final MessageSubject antiEntropyAdvertisementSubject;
Jonathan Hartdb3af892015-01-26 13:19:07 -080087
Jonathan Hartaaa56572015-01-28 21:56:35 -080088 private final Set<EventuallyConsistentMapListener<K, V>> listeners
Jonathan Hartdb3af892015-01-26 13:19:07 -080089 = new CopyOnWriteArraySet<>();
90
91 private final ExecutorService executor;
92
93 private final ScheduledExecutorService backgroundExecutor;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080094 private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
Jonathan Hartdb3af892015-01-26 13:19:07 -080095
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080096 private ExecutorService communicationExecutor;
97 private Map<NodeId, EventAccumulator> senderPending;
Madan Jampani28726282015-02-19 11:40:23 -080098
Jonathan Hartdb3af892015-01-26 13:19:07 -080099 private volatile boolean destroyed = false;
Jonathan Hart539a6462015-01-27 17:05:43 -0800100 private static final String ERROR_DESTROYED = " map is already destroyed";
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800101 private final String destroyedMessage;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800102
Jonathan Hart4f397e82015-02-04 09:10:41 -0800103 private static final String ERROR_NULL_KEY = "Key cannot be null";
104 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
105
Jonathan Hartdb3af892015-01-26 13:19:07 -0800106 // TODO: Make these anti-entropy params configurable
107 private long initialDelaySec = 5;
108 private long periodSec = 5;
Jonathan Hartf893be82015-02-24 17:35:51 -0800109 private boolean lightweightAntiEntropy = true;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800110
Jonathan Hart233a18a2015-03-02 17:24:58 -0800111 private static final int WINDOW_SIZE = 5;
112 private static final int HIGH_LOAD_THRESHOLD = 0;
113 private static final int LOAD_WINDOW = 2;
114 SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
115 AtomicLong operations = new AtomicLong();
116
Jonathan Hartdb3af892015-01-26 13:19:07 -0800117 /**
118 * Creates a new eventually consistent map shared amongst multiple instances.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800119 * <p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800120 * Each map is identified by a string map name. EventuallyConsistentMapImpl
121 * objects in different JVMs that use the same map name will form a
122 * distributed map across JVMs (provided the cluster service is aware of
123 * both nodes).
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800124 * </p>
125 * <p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800126 * The client is expected to provide an
127 * {@link org.onlab.util.KryoNamespace.Builder} with which all classes that
128 * will be stored in this map have been registered (including referenced
129 * classes). This serializer will be used to serialize both K and V for
130 * inter-node notifications.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800131 * </p>
132 * <p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800133 * The client must provide an {@link org.onosproject.store.impl.ClockService}
134 * which can generate timestamps for a given key. The clock service is free
135 * to generate timestamps however it wishes, however these timestamps will
136 * be used to serialize updates to the map so they must be strict enough
137 * to ensure updates are properly ordered for the use case (i.e. in some
138 * cases wallclock time will suffice, whereas in other cases logical time
139 * will be necessary).
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800140 * </p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800141 *
142 * @param mapName a String identifier for the map.
143 * @param clusterService the cluster service
144 * @param clusterCommunicator the cluster communications service
145 * @param serializerBuilder a Kryo namespace builder that can serialize
146 * both K and V
147 * @param clockService a clock service able to generate timestamps
148 * for K
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800149 * @param peerUpdateFunction function that provides a set of nodes to immediately
150 * update to when there writes to the map
Jonathan Hartdb3af892015-01-26 13:19:07 -0800151 */
152 public EventuallyConsistentMapImpl(String mapName,
153 ClusterService clusterService,
154 ClusterCommunicationService clusterCommunicator,
155 KryoNamespace.Builder serializerBuilder,
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800156 ClockService<K, V> clockService,
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800157 BiFunction<K, V, Collection<NodeId>> peerUpdateFunction) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800158 this.clusterService = checkNotNull(clusterService);
159 this.clusterCommunicator = checkNotNull(clusterCommunicator);
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800160 this.peerUpdateFunction = checkNotNull(peerUpdateFunction);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800161
162 serializer = createSerializer(checkNotNull(serializerBuilder));
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800163 destroyedMessage = mapName + ERROR_DESTROYED;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800164
165 this.clockService = checkNotNull(clockService);
166
167 items = new ConcurrentHashMap<>();
168 removedItems = new ConcurrentHashMap<>();
169
Brian O'Connorc6713a82015-02-24 11:55:48 -0800170 // should be a normal executor; it's used for receiving messages
171 //TODO make # of threads configurable
172 executor = Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d"));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800173
Brian O'Connorc6713a82015-02-24 11:55:48 -0800174 // sending executor; should be capped
175 //TODO make # of threads configurable
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800176 //TODO this probably doesn't need to be bounded anymore
177 communicationExecutor =
178 newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
179 senderPending = Maps.newConcurrentMap();
Madan Jampani28726282015-02-19 11:40:23 -0800180
Jonathan Hartdb3af892015-01-26 13:19:07 -0800181 backgroundExecutor =
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800182 newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d"));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800183
Jonathan Hartaaa56572015-01-28 21:56:35 -0800184 // start anti-entropy thread
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800185 backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
186 initialDelaySec, periodSec,
187 TimeUnit.SECONDS);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800188
Jonathan Hartdb3af892015-01-26 13:19:07 -0800189 updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
190 clusterCommunicator.addSubscriber(updateMessageSubject,
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800191 new InternalEventListener(), executor);
192
Jonathan Hartaaa56572015-01-28 21:56:35 -0800193 antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
194 clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
Madan Jampani2af244a2015-02-22 13:12:01 -0800195 new InternalAntiEntropyListener(), backgroundExecutor);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800196 }
197
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800198 /**
199 * Creates a new eventually consistent map shared amongst multiple instances.
200 * <p>
201 * Take a look at the other constructor for usage information. The only difference
202 * is that a BiFunction is provided that returns all nodes in the cluster, so
203 * all nodes will be sent write updates immediately.
204 * </p>
205 *
206 * @param mapName a String identifier for the map.
207 * @param clusterService the cluster service
208 * @param clusterCommunicator the cluster communications service
209 * @param serializerBuilder a Kryo namespace builder that can serialize
210 * both K and V
211 * @param clockService a clock service able to generate timestamps
212 * for K
213 */
214 public EventuallyConsistentMapImpl(String mapName,
215 ClusterService clusterService,
216 ClusterCommunicationService clusterCommunicator,
217 KryoNamespace.Builder serializerBuilder,
218 ClockService<K, V> clockService) {
219 this(mapName, clusterService, clusterCommunicator, serializerBuilder, clockService,
220 (key, value) -> clusterService.getNodes().stream()
221 .map(ControllerNode::id)
222 .filter(nodeId -> !nodeId.equals(clusterService.getLocalNode().id()))
223 .collect(Collectors.toList()));
224 }
225
Jonathan Hartdb3af892015-01-26 13:19:07 -0800226 private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
227 return new KryoSerializer() {
228 @Override
229 protected void setupKryoPool() {
230 // Add the map's internal helper classes to the user-supplied serializer
231 serializerPool = builder
232 .register(WallClockTimestamp.class)
233 .register(PutEntry.class)
Jonathan Hart539a6462015-01-27 17:05:43 -0800234 .register(RemoveEntry.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800235 .register(ArrayList.class)
Jonathan Hartaaa56572015-01-28 21:56:35 -0800236 .register(AntiEntropyAdvertisement.class)
237 .register(HashMap.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800238 .build();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800239 }
240 };
241 }
242
Madan Jampanib28e4ad2015-02-19 12:31:37 -0800243 /**
244 * Sets the executor to use for broadcasting messages and returns this
245 * instance for method chaining.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800246 *
Madan Jampanib28e4ad2015-02-19 12:31:37 -0800247 * @param executor executor service
248 * @return this instance
249 */
250 public EventuallyConsistentMapImpl<K, V> withBroadcastMessageExecutor(ExecutorService executor) {
251 checkNotNull(executor, "Null executor");
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800252 communicationExecutor = executor;
Madan Jampanib28e4ad2015-02-19 12:31:37 -0800253 return this;
254 }
255
Jonathan Hartdb3af892015-01-26 13:19:07 -0800256 @Override
257 public int size() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800258 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800259 return items.size();
260 }
261
262 @Override
263 public boolean isEmpty() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800264 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800265 return items.isEmpty();
266 }
267
268 @Override
269 public boolean containsKey(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800270 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800271 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800272 return items.containsKey(key);
273 }
274
275 @Override
276 public boolean containsValue(V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800277 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800278 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800279
280 return items.values().stream()
281 .anyMatch(timestamped -> timestamped.value().equals(value));
282 }
283
284 @Override
285 public V get(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800286 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800287 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800288
289 Timestamped<V> value = items.get(key);
290 if (value != null) {
291 return value.value();
292 }
293 return null;
294 }
295
296 @Override
297 public void put(K key, V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800298 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800299 checkNotNull(key, ERROR_NULL_KEY);
300 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800301
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800302 Timestamp timestamp = clockService.getTimestamp(key, value);
303
Jonathan Hartdb3af892015-01-26 13:19:07 -0800304 if (putInternal(key, value, timestamp)) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800305 notifyPeers(new PutEntry<>(key, value, timestamp),
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800306 peerUpdateFunction.apply(key, value));
307 notifyListeners(new EventuallyConsistentMapEvent<>(
308 EventuallyConsistentMapEvent.Type.PUT, key, value));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800309 }
310 }
311
312 private boolean putInternal(K key, V value, Timestamp timestamp) {
Jonathan Hart233a18a2015-03-02 17:24:58 -0800313 counter.incrementCount();
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800314 Timestamp removed = removedItems.get(key);
Jonathan Hart403ea932015-02-20 16:23:00 -0800315 if (removed != null && removed.isNewerThan(timestamp)) {
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800316 log.debug("ecmap - removed was newer {}", value);
317 return false;
318 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800319
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800320 final MutableBoolean updated = new MutableBoolean(false);
321
322 items.compute(key, (k, existing) -> {
Jonathan Hart403ea932015-02-20 16:23:00 -0800323 if (existing != null && existing.isNewerThan(timestamp)) {
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800324 updated.setFalse();
325 return existing;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800326 } else {
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800327 updated.setTrue();
328 return new Timestamped<>(value, timestamp);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800329 }
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800330 });
331
332 boolean success = updated.booleanValue();
333 if (!success) {
334 log.debug("ecmap - existing was newer {}", value);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800335 }
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800336
337 if (success && removed != null) {
338 removedItems.remove(key, removed);
339 }
340 return success;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800341 }
342
343 @Override
344 public void remove(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800345 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800346 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800347
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800348 // TODO prevent calls here if value is important for timestamp
349 Timestamp timestamp = clockService.getTimestamp(key, null);
350
Jonathan Hartdb3af892015-01-26 13:19:07 -0800351 if (removeInternal(key, timestamp)) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800352 notifyPeers(new RemoveEntry<>(key, timestamp),
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800353 peerUpdateFunction.apply(key, null));
354 notifyListeners(new EventuallyConsistentMapEvent<>(
355 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800356 }
357 }
358
359 private boolean removeInternal(K key, Timestamp timestamp) {
Jonathan Hart233a18a2015-03-02 17:24:58 -0800360 counter.incrementCount();
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800361 final MutableBoolean updated = new MutableBoolean(false);
362
363 items.compute(key, (k, existing) -> {
364 if (existing != null && existing.isNewerThan(timestamp)) {
365 updated.setFalse();
366 return existing;
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800367 } else {
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800368 updated.setTrue();
369 // remove from items map
370 return null;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800371 }
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800372 });
373
374 if (updated.isFalse()) {
375 return false;
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800376 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800377
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800378 Timestamp removedTimestamp = removedItems.get(key);
379 if (removedTimestamp == null) {
380 return removedItems.putIfAbsent(key, timestamp) == null;
Jonathan Hart403ea932015-02-20 16:23:00 -0800381 } else if (timestamp.isNewerThan(removedTimestamp)) {
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800382 return removedItems.replace(key, removedTimestamp, timestamp);
383 } else {
384 return false;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800385 }
386 }
387
388 @Override
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800389 public void remove(K key, V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800390 checkState(!destroyed, destroyedMessage);
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800391 checkNotNull(key, ERROR_NULL_KEY);
392 checkNotNull(value, ERROR_NULL_VALUE);
393
394 Timestamp timestamp = clockService.getTimestamp(key, value);
395
396 if (removeInternal(key, timestamp)) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800397 notifyPeers(new RemoveEntry<>(key, timestamp),
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800398 peerUpdateFunction.apply(key, value));
399 notifyListeners(new EventuallyConsistentMapEvent<>(
400 EventuallyConsistentMapEvent.Type.REMOVE, key, value));
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800401 }
402 }
403
404 @Override
Jonathan Hartdb3af892015-01-26 13:19:07 -0800405 public void putAll(Map<? extends K, ? extends V> m) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800406 checkState(!destroyed, destroyedMessage);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800407 m.forEach(this::put);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800408 }
409
410 @Override
411 public void clear() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800412 checkState(!destroyed, destroyedMessage);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800413 items.forEach((key, value) -> remove(key));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800414 }
415
416 @Override
417 public Set<K> keySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800418 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800419 return items.keySet();
420 }
421
422 @Override
423 public Collection<V> values() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800424 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800425 return items.values().stream()
426 .map(Timestamped::value)
427 .collect(Collectors.toList());
428 }
429
430 @Override
431 public Set<Map.Entry<K, V>> entrySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800432 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800433
434 return items.entrySet().stream()
Jonathan Hartf9108232015-02-02 16:37:35 -0800435 .map(e -> Pair.of(e.getKey(), e.getValue().value()))
Jonathan Hartdb3af892015-01-26 13:19:07 -0800436 .collect(Collectors.toSet());
437 }
438
439 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800440 public void addListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800441 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800442
443 listeners.add(checkNotNull(listener));
444 }
445
446 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800447 public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800448 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800449
450 listeners.remove(checkNotNull(listener));
451 }
452
453 @Override
454 public void destroy() {
455 destroyed = true;
456
457 executor.shutdown();
458 backgroundExecutor.shutdown();
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800459 communicationExecutor.shutdown();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800460
Jonathan Hart584d2f32015-01-27 19:46:14 -0800461 listeners.clear();
462
Jonathan Hartdb3af892015-01-26 13:19:07 -0800463 clusterCommunicator.removeSubscriber(updateMessageSubject);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800464 clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800465 }
466
Jonathan Hartaaa56572015-01-28 21:56:35 -0800467 private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
468 for (EventuallyConsistentMapListener<K, V> listener : listeners) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800469 listener.event(event);
470 }
471 }
472
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800473 private void notifyPeers(PutEntry<K, V> event, Collection<NodeId> peers) {
474 queueUpdate(event, peers);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800475 }
476
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800477 private void notifyPeers(RemoveEntry<K, V> event, Collection<NodeId> peers) {
478 queueUpdate(event, peers);
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800479 }
480
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800481 private void queueUpdate(AbstractEntry<K, V> event, Collection<NodeId> peers) {
482 if (peers == null) {
483 // we have no friends :(
484 return;
485 }
486 peers.forEach(node ->
487 senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event)
488 );
489 }
490
491 private boolean unicastMessage(NodeId peer, MessageSubject subject, Object event) {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800492 ClusterMessage message = new ClusterMessage(
493 clusterService.getLocalNode().id(),
494 subject,
495 serializer.encode(event));
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800496 return clusterCommunicator.unicast(message, peer);
497 // Note: we had this flipped before...
498// communicationExecutor.execute(() -> clusterCommunicator.unicast(message, peer));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800499 }
500
Jonathan Hart233a18a2015-03-02 17:24:58 -0800501 private boolean underHighLoad() {
502 return counter.get(LOAD_WINDOW) > HIGH_LOAD_THRESHOLD;
503 }
504
Jonathan Hartaaa56572015-01-28 21:56:35 -0800505 private final class SendAdvertisementTask implements Runnable {
506 @Override
507 public void run() {
508 if (Thread.currentThread().isInterrupted()) {
509 log.info("Interrupted, quitting");
510 return;
511 }
512
Jonathan Hart233a18a2015-03-02 17:24:58 -0800513 if (underHighLoad()) {
514 return;
515 }
516
Jonathan Hartaaa56572015-01-28 21:56:35 -0800517 try {
518 final NodeId self = clusterService.getLocalNode().id();
519 Set<ControllerNode> nodes = clusterService.getNodes();
520
521 List<NodeId> nodeIds = nodes.stream()
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800522 .map(ControllerNode::id)
Jonathan Hartaaa56572015-01-28 21:56:35 -0800523 .collect(Collectors.toList());
524
525 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
526 log.trace("No other peers in the cluster.");
527 return;
528 }
529
530 NodeId peer;
531 do {
532 int idx = RandomUtils.nextInt(0, nodeIds.size());
533 peer = nodeIds.get(idx);
534 } while (peer.equals(self));
535
536 if (Thread.currentThread().isInterrupted()) {
537 log.info("Interrupted, quitting");
538 return;
539 }
540
541 AntiEntropyAdvertisement<K> ad = createAdvertisement();
542
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800543 if (!unicastMessage(peer, antiEntropyAdvertisementSubject, ad)) {
544 log.debug("Failed to send anti-entropy advertisement to {}", peer);
545 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800546 } catch (Exception e) {
547 // Catch all exceptions to avoid scheduled task being suppressed.
548 log.error("Exception thrown while sending advertisement", e);
549 }
550 }
551 }
552
553 private AntiEntropyAdvertisement<K> createAdvertisement() {
554 final NodeId self = clusterService.getLocalNode().id();
555
556 Map<K, Timestamp> timestamps = new HashMap<>(items.size());
557
558 items.forEach((key, value) -> timestamps.put(key, value.timestamp()));
559
560 Map<K, Timestamp> tombstones = new HashMap<>(removedItems);
561
562 return new AntiEntropyAdvertisement<>(self, timestamps, tombstones);
563 }
564
565 private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
566 List<EventuallyConsistentMapEvent<K, V>> externalEvents;
567
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800568 externalEvents = antiEntropyCheckLocalItems(ad);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800569
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800570 antiEntropyCheckLocalRemoved(ad);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800571
Jonathan Hartf893be82015-02-24 17:35:51 -0800572 if (!lightweightAntiEntropy) {
573 externalEvents.addAll(antiEntropyCheckRemoteRemoved(ad));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800574
Jonathan Hartf893be82015-02-24 17:35:51 -0800575 // if remote ad has something unknown, actively sync
576 for (K key : ad.timestamps().keySet()) {
577 if (!items.containsKey(key)) {
578 // Send the advertisement back if this peer is out-of-sync
579 final NodeId sender = ad.sender();
580 AntiEntropyAdvertisement<K> myAd = createAdvertisement();
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800581 if (!unicastMessage(sender, antiEntropyAdvertisementSubject, myAd)) {
582 log.debug("Failed to send reactive anti-entropy advertisement to {}", sender);
583 }
Jonathan Hartf893be82015-02-24 17:35:51 -0800584 break;
585 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800586 }
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800587 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800588 externalEvents.forEach(this::notifyListeners);
589 }
590
591 /**
592 * Checks if any of the remote's live items or tombstones are out of date
593 * according to our local live item list, or if our live items are out of
594 * date according to the remote's tombstone list.
595 * If the local copy is more recent, it will be pushed to the remote. If the
596 * remote has a more recent remove, we apply that to the local state.
597 *
598 * @param ad remote anti-entropy advertisement
599 * @return list of external events relating to local operations performed
600 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800601 private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
602 AntiEntropyAdvertisement<K> ad) {
603 final List<EventuallyConsistentMapEvent<K, V>> externalEvents
604 = new LinkedList<>();
605 final NodeId sender = ad.sender();
606
Jonathan Hartaaa56572015-01-28 21:56:35 -0800607 for (Map.Entry<K, Timestamped<V>> item : items.entrySet()) {
608 K key = item.getKey();
609 Timestamped<V> localValue = item.getValue();
610
611 Timestamp remoteTimestamp = ad.timestamps().get(key);
612 if (remoteTimestamp == null) {
613 remoteTimestamp = ad.tombstones().get(key);
614 }
615 if (remoteTimestamp == null || localValue
Jonathan Hart403ea932015-02-20 16:23:00 -0800616 .isNewerThan(remoteTimestamp)) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800617 // local value is more recent, push to sender
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800618 queueUpdate(new PutEntry<>(key, localValue.value(),
619 localValue.timestamp()), ImmutableList.of(sender));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800620 }
621
622 Timestamp remoteDeadTimestamp = ad.tombstones().get(key);
623 if (remoteDeadTimestamp != null &&
Jonathan Hart403ea932015-02-20 16:23:00 -0800624 remoteDeadTimestamp.isNewerThan(localValue.timestamp())) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800625 // sender has a more recent remove
626 if (removeInternal(key, remoteDeadTimestamp)) {
627 externalEvents.add(new EventuallyConsistentMapEvent<>(
628 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
629 }
630 }
631 }
632
Jonathan Hartaaa56572015-01-28 21:56:35 -0800633 return externalEvents;
634 }
635
636 /**
637 * Checks if any items in the remote live list are out of date according
638 * to our tombstone list. If we find we have a more up to date tombstone,
639 * we'll send it to the remote.
640 *
641 * @param ad remote anti-entropy advertisement
642 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800643 private void antiEntropyCheckLocalRemoved(AntiEntropyAdvertisement<K> ad) {
644 final NodeId sender = ad.sender();
645
Jonathan Hartaaa56572015-01-28 21:56:35 -0800646 for (Map.Entry<K, Timestamp> dead : removedItems.entrySet()) {
647 K key = dead.getKey();
648 Timestamp localDeadTimestamp = dead.getValue();
649
650 Timestamp remoteLiveTimestamp = ad.timestamps().get(key);
651 if (remoteLiveTimestamp != null
Jonathan Hart403ea932015-02-20 16:23:00 -0800652 && localDeadTimestamp.isNewerThan(remoteLiveTimestamp)) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800653 // sender has zombie, push remove
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800654 queueUpdate(new RemoveEntry<>(key, localDeadTimestamp), ImmutableList.of(sender));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800655 }
656 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800657 }
658
659 /**
660 * Checks if any of the local live items are out of date according to the
661 * remote's tombstone advertisements. If we find a local item is out of date,
662 * we'll apply the remove operation to the local state.
663 *
664 * @param ad remote anti-entropy advertisement
665 * @return list of external events relating to local operations performed
666 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800667 private List<EventuallyConsistentMapEvent<K, V>>
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800668 antiEntropyCheckRemoteRemoved(AntiEntropyAdvertisement<K> ad) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800669 final List<EventuallyConsistentMapEvent<K, V>> externalEvents
670 = new LinkedList<>();
671
672 for (Map.Entry<K, Timestamp> remoteDead : ad.tombstones().entrySet()) {
673 K key = remoteDead.getKey();
674 Timestamp remoteDeadTimestamp = remoteDead.getValue();
675
676 Timestamped<V> local = items.get(key);
677 Timestamp localDead = removedItems.get(key);
Jonathan Hart403ea932015-02-20 16:23:00 -0800678 if (local != null && remoteDeadTimestamp.isNewerThan(
679 local.timestamp())) {
680 // If the remote has a more recent tombstone than either our local
681 // value, then do a remove with their timestamp
Jonathan Hartaaa56572015-01-28 21:56:35 -0800682 if (removeInternal(key, remoteDeadTimestamp)) {
683 externalEvents.add(new EventuallyConsistentMapEvent<>(
684 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
685 }
Jonathan Hart403ea932015-02-20 16:23:00 -0800686 } else if (localDead != null && remoteDeadTimestamp.isNewerThan(
687 localDead)) {
688 // If the remote has a more recent tombstone than us, update ours
689 // to their timestamp
690 removeInternal(key, remoteDeadTimestamp);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800691 }
692 }
693
694 return externalEvents;
695 }
696
697 private final class InternalAntiEntropyListener
698 implements ClusterMessageHandler {
699
700 @Override
701 public void handle(ClusterMessage message) {
Jonathan Hart4fd4ebb2015-02-04 17:38:48 -0800702 log.trace("Received anti-entropy advertisement from peer: {}",
703 message.sender());
Jonathan Hartaaa56572015-01-28 21:56:35 -0800704 AntiEntropyAdvertisement<K> advertisement = serializer.decode(message.payload());
Madan Jampani2af244a2015-02-22 13:12:01 -0800705 try {
Jonathan Hart233a18a2015-03-02 17:24:58 -0800706 if (!underHighLoad()) {
707 handleAntiEntropyAdvertisement(advertisement);
708 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800709 } catch (Exception e) {
710 log.warn("Exception thrown handling advertisements", e);
711 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800712 }
713 }
714
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800715 private final class InternalEventListener implements
Jonathan Hartdb3af892015-01-26 13:19:07 -0800716 ClusterMessageHandler {
717 @Override
718 public void handle(ClusterMessage message) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800719 log.debug("Received update event from peer: {}", message.sender());
720 Collection<AbstractEntry<K, V>> events = serializer.decode(message.payload());
Jonathan Hartdb3af892015-01-26 13:19:07 -0800721
Madan Jampani2af244a2015-02-22 13:12:01 -0800722 try {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800723 // TODO clean this for loop up
724 for (AbstractEntry<K, V> entry : events) {
725 final K key = entry.key();
726 final V value;
727 final Timestamp timestamp = entry.timestamp();
728 final EventuallyConsistentMapEvent.Type type;
729 if (entry instanceof PutEntry) {
730 PutEntry<K, V> putEntry = (PutEntry<K, V>) entry;
731 value = putEntry.value();
732 type = EventuallyConsistentMapEvent.Type.PUT;
733 } else if (entry instanceof RemoveEntry) {
734 type = EventuallyConsistentMapEvent.Type.REMOVE;
735 value = null;
736 } else {
737 throw new IllegalStateException("Unknown entry type " + entry.getClass());
738 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800739
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800740 boolean success;
741 switch (type) {
742 case PUT:
743 success = putInternal(key, value, timestamp);
744 break;
745 case REMOVE:
746 success = removeInternal(key, timestamp);
747 break;
748 default:
749 success = false;
750 }
751 if (success) {
752 notifyListeners(new EventuallyConsistentMapEvent<>(type, key, value));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800753 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800754 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800755 } catch (Exception e) {
756 log.warn("Exception thrown handling put", e);
757 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800758 }
759 }
760
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800761 // TODO pull this into the class if this gets pulled out...
762 private static final int DEFAULT_MAX_EVENTS = 1000;
763 private static final int DEFAULT_MAX_IDLE_MS = 10;
764 private static final int DEFAULT_MAX_BATCH_MS = 50;
765 private static final Timer TIMER = new Timer("onos-ecm-sender-events");
Jonathan Hartdb3af892015-01-26 13:19:07 -0800766
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800767 private final class EventAccumulator extends AbstractAccumulator<AbstractEntry<K, V>> {
768
769 private final NodeId peer;
770
771 private EventAccumulator(NodeId peer) {
772 super(TIMER, DEFAULT_MAX_EVENTS, DEFAULT_MAX_BATCH_MS, DEFAULT_MAX_IDLE_MS);
773 this.peer = peer;
774 }
775
776 @Override
777 public void processItems(List<AbstractEntry<K, V>> items) {
778 Map<K, AbstractEntry<K, V>> map = Maps.newHashMap();
779 items.forEach(item -> map.compute(item.key(), (key, oldValue) ->
780 oldValue == null || item.compareTo(oldValue) > 0 ? item : oldValue
781 )
782 );
783 communicationExecutor.submit(() -> {
784 try {
785 unicastMessage(peer, updateMessageSubject, Lists.newArrayList(map.values()));
786 } catch (Exception e) {
787 log.warn("broadcast error", e);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800788 }
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800789 });
Jonathan Hartdb3af892015-01-26 13:19:07 -0800790 }
791 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800792}