blob: 8111db8e075dc6560bceaab896f9ce20792fda68 [file] [log] [blame]
Jonathan Hartdb3af892015-01-26 13:19:07 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Jonathan Hart77bdd262015-02-03 09:07:48 -080016package org.onosproject.store.ecmap;
Jonathan Hartdb3af892015-01-26 13:19:07 -080017
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080018import com.google.common.collect.ImmutableList;
19import com.google.common.collect.Lists;
20import com.google.common.collect.Maps;
Jonathan Hartaaa56572015-01-28 21:56:35 -080021import org.apache.commons.lang3.RandomUtils;
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -080022import org.apache.commons.lang3.mutable.MutableBoolean;
Jonathan Hartf9108232015-02-02 16:37:35 -080023import org.apache.commons.lang3.tuple.Pair;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080024import org.onlab.util.AbstractAccumulator;
Jonathan Hartdb3af892015-01-26 13:19:07 -080025import org.onlab.util.KryoNamespace;
Jonathan Hart233a18a2015-03-02 17:24:58 -080026import org.onlab.util.SlidingWindowCounter;
Jonathan Hartdb3af892015-01-26 13:19:07 -080027import org.onosproject.cluster.ClusterService;
Jonathan Hartaaa56572015-01-28 21:56:35 -080028import org.onosproject.cluster.ControllerNode;
Jonathan Hartdb3af892015-01-26 13:19:07 -080029import org.onosproject.cluster.NodeId;
30import org.onosproject.store.Timestamp;
31import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
32import org.onosproject.store.cluster.messaging.ClusterMessage;
33import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
34import org.onosproject.store.cluster.messaging.MessageSubject;
Jonathan Hart77bdd262015-02-03 09:07:48 -080035import org.onosproject.store.impl.ClockService;
36import org.onosproject.store.impl.Timestamped;
37import org.onosproject.store.impl.WallClockTimestamp;
Jonathan Hartdb3af892015-01-26 13:19:07 -080038import org.onosproject.store.serializers.KryoSerializer;
39import org.slf4j.Logger;
40import org.slf4j.LoggerFactory;
41
Jonathan Hartdb3af892015-01-26 13:19:07 -080042import java.util.ArrayList;
43import java.util.Collection;
Jonathan Hartaaa56572015-01-28 21:56:35 -080044import java.util.HashMap;
45import java.util.LinkedList;
Jonathan Hartdb3af892015-01-26 13:19:07 -080046import java.util.List;
47import java.util.Map;
48import java.util.Set;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080049import java.util.Timer;
Jonathan Hartdb3af892015-01-26 13:19:07 -080050import java.util.concurrent.ConcurrentHashMap;
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -080051import java.util.concurrent.ConcurrentMap;
Jonathan Hartdb3af892015-01-26 13:19:07 -080052import java.util.concurrent.CopyOnWriteArraySet;
53import java.util.concurrent.ExecutorService;
54import java.util.concurrent.Executors;
55import java.util.concurrent.ScheduledExecutorService;
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -080056import java.util.concurrent.TimeUnit;
Jonathan Hart233a18a2015-03-02 17:24:58 -080057import java.util.concurrent.atomic.AtomicLong;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080058import java.util.function.BiFunction;
Jonathan Hartdb3af892015-01-26 13:19:07 -080059import java.util.stream.Collectors;
60
61import static com.google.common.base.Preconditions.checkNotNull;
62import static com.google.common.base.Preconditions.checkState;
63import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Brian O'Connorc6713a82015-02-24 11:55:48 -080064import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
Brian O'Connor4b2ba5f2015-02-18 20:54:00 -080065import static org.onlab.util.Tools.groupedThreads;
Jonathan Hartdb3af892015-01-26 13:19:07 -080066
67/**
68 * Distributed Map implementation which uses optimistic replication and gossip
69 * based techniques to provide an eventually consistent data store.
70 */
71public class EventuallyConsistentMapImpl<K, V>
72 implements EventuallyConsistentMap<K, V> {
73
74 private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
75
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -080076 private final ConcurrentMap<K, Timestamped<V>> items;
77 private final ConcurrentMap<K, Timestamp> removedItems;
Jonathan Hartdb3af892015-01-26 13:19:07 -080078
Jonathan Hartdb3af892015-01-26 13:19:07 -080079 private final ClusterService clusterService;
80 private final ClusterCommunicationService clusterCommunicator;
81 private final KryoSerializer serializer;
82
Jonathan Hart5ec32ba2015-02-05 13:33:58 -080083 private final ClockService<K, V> clockService;
Jonathan Hartdb3af892015-01-26 13:19:07 -080084
85 private final MessageSubject updateMessageSubject;
Jonathan Hartaaa56572015-01-28 21:56:35 -080086 private final MessageSubject antiEntropyAdvertisementSubject;
Jonathan Hartdb3af892015-01-26 13:19:07 -080087
Jonathan Hartaaa56572015-01-28 21:56:35 -080088 private final Set<EventuallyConsistentMapListener<K, V>> listeners
Jonathan Hartdb3af892015-01-26 13:19:07 -080089 = new CopyOnWriteArraySet<>();
90
91 private final ExecutorService executor;
92
93 private final ScheduledExecutorService backgroundExecutor;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080094 private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
Jonathan Hartdb3af892015-01-26 13:19:07 -080095
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080096 private ExecutorService communicationExecutor;
97 private Map<NodeId, EventAccumulator> senderPending;
Madan Jampani28726282015-02-19 11:40:23 -080098
Jonathan Hartdb3af892015-01-26 13:19:07 -080099 private volatile boolean destroyed = false;
Jonathan Hart539a6462015-01-27 17:05:43 -0800100 private static final String ERROR_DESTROYED = " map is already destroyed";
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800101 private final String destroyedMessage;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800102
Jonathan Hart4f397e82015-02-04 09:10:41 -0800103 private static final String ERROR_NULL_KEY = "Key cannot be null";
104 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
105
Jonathan Hartdb3af892015-01-26 13:19:07 -0800106 // TODO: Make these anti-entropy params configurable
107 private long initialDelaySec = 5;
108 private long periodSec = 5;
Jonathan Hartf893be82015-02-24 17:35:51 -0800109 private boolean lightweightAntiEntropy = true;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800110
Jonathan Hart233a18a2015-03-02 17:24:58 -0800111 private static final int WINDOW_SIZE = 5;
112 private static final int HIGH_LOAD_THRESHOLD = 0;
113 private static final int LOAD_WINDOW = 2;
114 SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
115 AtomicLong operations = new AtomicLong();
116
Jonathan Hartdb3af892015-01-26 13:19:07 -0800117 /**
118 * Creates a new eventually consistent map shared amongst multiple instances.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800119 * <p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800120 * Each map is identified by a string map name. EventuallyConsistentMapImpl
121 * objects in different JVMs that use the same map name will form a
122 * distributed map across JVMs (provided the cluster service is aware of
123 * both nodes).
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800124 * </p>
125 * <p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800126 * The client is expected to provide an
127 * {@link org.onlab.util.KryoNamespace.Builder} with which all classes that
128 * will be stored in this map have been registered (including referenced
129 * classes). This serializer will be used to serialize both K and V for
130 * inter-node notifications.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800131 * </p>
132 * <p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800133 * The client must provide an {@link org.onosproject.store.impl.ClockService}
134 * which can generate timestamps for a given key. The clock service is free
135 * to generate timestamps however it wishes, however these timestamps will
136 * be used to serialize updates to the map so they must be strict enough
137 * to ensure updates are properly ordered for the use case (i.e. in some
138 * cases wallclock time will suffice, whereas in other cases logical time
139 * will be necessary).
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800140 * </p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800141 *
142 * @param mapName a String identifier for the map.
143 * @param clusterService the cluster service
144 * @param clusterCommunicator the cluster communications service
145 * @param serializerBuilder a Kryo namespace builder that can serialize
146 * both K and V
147 * @param clockService a clock service able to generate timestamps
148 * for K
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800149 * @param peerUpdateFunction function that provides a set of nodes to immediately
150 * update to when there writes to the map
Jonathan Hartdb3af892015-01-26 13:19:07 -0800151 */
152 public EventuallyConsistentMapImpl(String mapName,
153 ClusterService clusterService,
154 ClusterCommunicationService clusterCommunicator,
155 KryoNamespace.Builder serializerBuilder,
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800156 ClockService<K, V> clockService,
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800157 BiFunction<K, V, Collection<NodeId>> peerUpdateFunction) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800158 this.clusterService = checkNotNull(clusterService);
159 this.clusterCommunicator = checkNotNull(clusterCommunicator);
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800160 this.peerUpdateFunction = checkNotNull(peerUpdateFunction);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800161
162 serializer = createSerializer(checkNotNull(serializerBuilder));
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800163 destroyedMessage = mapName + ERROR_DESTROYED;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800164
165 this.clockService = checkNotNull(clockService);
166
167 items = new ConcurrentHashMap<>();
168 removedItems = new ConcurrentHashMap<>();
169
Brian O'Connorc6713a82015-02-24 11:55:48 -0800170 // should be a normal executor; it's used for receiving messages
171 //TODO make # of threads configurable
172 executor = Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d"));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800173
Brian O'Connorc6713a82015-02-24 11:55:48 -0800174 // sending executor; should be capped
175 //TODO make # of threads configurable
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800176 //TODO this probably doesn't need to be bounded anymore
177 communicationExecutor =
178 newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
179 senderPending = Maps.newConcurrentMap();
Madan Jampani28726282015-02-19 11:40:23 -0800180
Jonathan Hartdb3af892015-01-26 13:19:07 -0800181 backgroundExecutor =
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800182 newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d"));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800183
Jonathan Hartaaa56572015-01-28 21:56:35 -0800184 // start anti-entropy thread
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800185 backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
186 initialDelaySec, periodSec,
187 TimeUnit.SECONDS);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800188
Jonathan Hartdb3af892015-01-26 13:19:07 -0800189 updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
190 clusterCommunicator.addSubscriber(updateMessageSubject,
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800191 new InternalEventListener(), executor);
192
Jonathan Hartaaa56572015-01-28 21:56:35 -0800193 antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
194 clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
Madan Jampani2af244a2015-02-22 13:12:01 -0800195 new InternalAntiEntropyListener(), backgroundExecutor);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800196 }
197
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800198 /**
199 * Creates a new eventually consistent map shared amongst multiple instances.
200 * <p>
201 * Take a look at the other constructor for usage information. The only difference
202 * is that a BiFunction is provided that returns all nodes in the cluster, so
203 * all nodes will be sent write updates immediately.
204 * </p>
205 *
206 * @param mapName a String identifier for the map.
207 * @param clusterService the cluster service
208 * @param clusterCommunicator the cluster communications service
209 * @param serializerBuilder a Kryo namespace builder that can serialize
210 * both K and V
211 * @param clockService a clock service able to generate timestamps
212 * for K
213 */
214 public EventuallyConsistentMapImpl(String mapName,
215 ClusterService clusterService,
216 ClusterCommunicationService clusterCommunicator,
217 KryoNamespace.Builder serializerBuilder,
218 ClockService<K, V> clockService) {
219 this(mapName, clusterService, clusterCommunicator, serializerBuilder, clockService,
220 (key, value) -> clusterService.getNodes().stream()
221 .map(ControllerNode::id)
222 .filter(nodeId -> !nodeId.equals(clusterService.getLocalNode().id()))
223 .collect(Collectors.toList()));
224 }
225
Jonathan Hartdb3af892015-01-26 13:19:07 -0800226 private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
227 return new KryoSerializer() {
228 @Override
229 protected void setupKryoPool() {
230 // Add the map's internal helper classes to the user-supplied serializer
231 serializerPool = builder
232 .register(WallClockTimestamp.class)
233 .register(PutEntry.class)
Jonathan Hart539a6462015-01-27 17:05:43 -0800234 .register(RemoveEntry.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800235 .register(ArrayList.class)
Jonathan Hartaaa56572015-01-28 21:56:35 -0800236 .register(AntiEntropyAdvertisement.class)
237 .register(HashMap.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800238 .build();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800239 }
240 };
241 }
242
Madan Jampanib28e4ad2015-02-19 12:31:37 -0800243 /**
244 * Sets the executor to use for broadcasting messages and returns this
245 * instance for method chaining.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800246 *
Madan Jampanib28e4ad2015-02-19 12:31:37 -0800247 * @param executor executor service
248 * @return this instance
249 */
250 public EventuallyConsistentMapImpl<K, V> withBroadcastMessageExecutor(ExecutorService executor) {
251 checkNotNull(executor, "Null executor");
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800252 communicationExecutor = executor;
Madan Jampanib28e4ad2015-02-19 12:31:37 -0800253 return this;
254 }
255
Jonathan Hartdb3af892015-01-26 13:19:07 -0800256 @Override
257 public int size() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800258 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800259 return items.size();
260 }
261
262 @Override
263 public boolean isEmpty() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800264 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800265 return items.isEmpty();
266 }
267
268 @Override
269 public boolean containsKey(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800270 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800271 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800272 return items.containsKey(key);
273 }
274
275 @Override
276 public boolean containsValue(V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800277 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800278 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800279
280 return items.values().stream()
281 .anyMatch(timestamped -> timestamped.value().equals(value));
282 }
283
284 @Override
285 public V get(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800286 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800287 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800288
289 Timestamped<V> value = items.get(key);
290 if (value != null) {
291 return value.value();
292 }
293 return null;
294 }
295
296 @Override
297 public void put(K key, V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800298 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800299 checkNotNull(key, ERROR_NULL_KEY);
300 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800301
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800302 Timestamp timestamp = clockService.getTimestamp(key, value);
303
Jonathan Hartdb3af892015-01-26 13:19:07 -0800304 if (putInternal(key, value, timestamp)) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800305 notifyPeers(new PutEntry<>(key, value, timestamp),
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800306 peerUpdateFunction.apply(key, value));
307 notifyListeners(new EventuallyConsistentMapEvent<>(
308 EventuallyConsistentMapEvent.Type.PUT, key, value));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800309 }
310 }
311
312 private boolean putInternal(K key, V value, Timestamp timestamp) {
Jonathan Hart233a18a2015-03-02 17:24:58 -0800313 counter.incrementCount();
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800314 Timestamp removed = removedItems.get(key);
Jonathan Hart403ea932015-02-20 16:23:00 -0800315 if (removed != null && removed.isNewerThan(timestamp)) {
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800316 log.debug("ecmap - removed was newer {}", value);
317 return false;
318 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800319
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800320 final MutableBoolean updated = new MutableBoolean(false);
321
322 items.compute(key, (k, existing) -> {
Jonathan Hart403ea932015-02-20 16:23:00 -0800323 if (existing != null && existing.isNewerThan(timestamp)) {
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800324 updated.setFalse();
325 return existing;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800326 } else {
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800327 updated.setTrue();
328 return new Timestamped<>(value, timestamp);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800329 }
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800330 });
331
332 boolean success = updated.booleanValue();
333 if (!success) {
334 log.debug("ecmap - existing was newer {}", value);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800335 }
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800336
337 if (success && removed != null) {
338 removedItems.remove(key, removed);
339 }
340 return success;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800341 }
342
343 @Override
344 public void remove(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800345 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800346 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800347
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800348 // TODO prevent calls here if value is important for timestamp
349 Timestamp timestamp = clockService.getTimestamp(key, null);
350
Jonathan Hartdb3af892015-01-26 13:19:07 -0800351 if (removeInternal(key, timestamp)) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800352 notifyPeers(new RemoveEntry<>(key, timestamp),
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800353 peerUpdateFunction.apply(key, null));
354 notifyListeners(new EventuallyConsistentMapEvent<>(
355 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800356 }
357 }
358
359 private boolean removeInternal(K key, Timestamp timestamp) {
Madan Jampani54d34992015-03-06 17:27:52 -0800360 if (timestamp == null) {
361 return false;
362 }
363
Jonathan Hart233a18a2015-03-02 17:24:58 -0800364 counter.incrementCount();
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800365 final MutableBoolean updated = new MutableBoolean(false);
366
367 items.compute(key, (k, existing) -> {
368 if (existing != null && existing.isNewerThan(timestamp)) {
369 updated.setFalse();
370 return existing;
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800371 } else {
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800372 updated.setTrue();
373 // remove from items map
374 return null;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800375 }
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800376 });
377
378 if (updated.isFalse()) {
379 return false;
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800380 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800381
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800382 Timestamp removedTimestamp = removedItems.get(key);
383 if (removedTimestamp == null) {
384 return removedItems.putIfAbsent(key, timestamp) == null;
Jonathan Hart403ea932015-02-20 16:23:00 -0800385 } else if (timestamp.isNewerThan(removedTimestamp)) {
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800386 return removedItems.replace(key, removedTimestamp, timestamp);
387 } else {
388 return false;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800389 }
390 }
391
392 @Override
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800393 public void remove(K key, V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800394 checkState(!destroyed, destroyedMessage);
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800395 checkNotNull(key, ERROR_NULL_KEY);
396 checkNotNull(value, ERROR_NULL_VALUE);
397
398 Timestamp timestamp = clockService.getTimestamp(key, value);
399
400 if (removeInternal(key, timestamp)) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800401 notifyPeers(new RemoveEntry<>(key, timestamp),
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800402 peerUpdateFunction.apply(key, value));
403 notifyListeners(new EventuallyConsistentMapEvent<>(
404 EventuallyConsistentMapEvent.Type.REMOVE, key, value));
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800405 }
406 }
407
408 @Override
Jonathan Hartdb3af892015-01-26 13:19:07 -0800409 public void putAll(Map<? extends K, ? extends V> m) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800410 checkState(!destroyed, destroyedMessage);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800411 m.forEach(this::put);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800412 }
413
414 @Override
415 public void clear() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800416 checkState(!destroyed, destroyedMessage);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800417 items.forEach((key, value) -> remove(key));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800418 }
419
420 @Override
421 public Set<K> keySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800422 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800423 return items.keySet();
424 }
425
426 @Override
427 public Collection<V> values() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800428 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800429 return items.values().stream()
430 .map(Timestamped::value)
431 .collect(Collectors.toList());
432 }
433
434 @Override
435 public Set<Map.Entry<K, V>> entrySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800436 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800437
438 return items.entrySet().stream()
Jonathan Hartf9108232015-02-02 16:37:35 -0800439 .map(e -> Pair.of(e.getKey(), e.getValue().value()))
Jonathan Hartdb3af892015-01-26 13:19:07 -0800440 .collect(Collectors.toSet());
441 }
442
443 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800444 public void addListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800445 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800446
447 listeners.add(checkNotNull(listener));
448 }
449
450 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800451 public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800452 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800453
454 listeners.remove(checkNotNull(listener));
455 }
456
457 @Override
458 public void destroy() {
459 destroyed = true;
460
461 executor.shutdown();
462 backgroundExecutor.shutdown();
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800463 communicationExecutor.shutdown();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800464
Jonathan Hart584d2f32015-01-27 19:46:14 -0800465 listeners.clear();
466
Jonathan Hartdb3af892015-01-26 13:19:07 -0800467 clusterCommunicator.removeSubscriber(updateMessageSubject);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800468 clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800469 }
470
Jonathan Hartaaa56572015-01-28 21:56:35 -0800471 private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
472 for (EventuallyConsistentMapListener<K, V> listener : listeners) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800473 listener.event(event);
474 }
475 }
476
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800477 private void notifyPeers(PutEntry<K, V> event, Collection<NodeId> peers) {
478 queueUpdate(event, peers);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800479 }
480
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800481 private void notifyPeers(RemoveEntry<K, V> event, Collection<NodeId> peers) {
482 queueUpdate(event, peers);
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800483 }
484
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800485 private void queueUpdate(AbstractEntry<K, V> event, Collection<NodeId> peers) {
486 if (peers == null) {
487 // we have no friends :(
488 return;
489 }
490 peers.forEach(node ->
491 senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event)
492 );
493 }
494
495 private boolean unicastMessage(NodeId peer, MessageSubject subject, Object event) {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800496 ClusterMessage message = new ClusterMessage(
497 clusterService.getLocalNode().id(),
498 subject,
499 serializer.encode(event));
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800500 return clusterCommunicator.unicast(message, peer);
501 // Note: we had this flipped before...
502// communicationExecutor.execute(() -> clusterCommunicator.unicast(message, peer));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800503 }
504
Jonathan Hart233a18a2015-03-02 17:24:58 -0800505 private boolean underHighLoad() {
506 return counter.get(LOAD_WINDOW) > HIGH_LOAD_THRESHOLD;
507 }
508
Jonathan Hartaaa56572015-01-28 21:56:35 -0800509 private final class SendAdvertisementTask implements Runnable {
510 @Override
511 public void run() {
512 if (Thread.currentThread().isInterrupted()) {
513 log.info("Interrupted, quitting");
514 return;
515 }
516
Jonathan Hart233a18a2015-03-02 17:24:58 -0800517 if (underHighLoad()) {
518 return;
519 }
520
Jonathan Hartaaa56572015-01-28 21:56:35 -0800521 try {
522 final NodeId self = clusterService.getLocalNode().id();
523 Set<ControllerNode> nodes = clusterService.getNodes();
524
525 List<NodeId> nodeIds = nodes.stream()
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800526 .map(ControllerNode::id)
Jonathan Hartaaa56572015-01-28 21:56:35 -0800527 .collect(Collectors.toList());
528
529 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
530 log.trace("No other peers in the cluster.");
531 return;
532 }
533
534 NodeId peer;
535 do {
536 int idx = RandomUtils.nextInt(0, nodeIds.size());
537 peer = nodeIds.get(idx);
538 } while (peer.equals(self));
539
540 if (Thread.currentThread().isInterrupted()) {
541 log.info("Interrupted, quitting");
542 return;
543 }
544
545 AntiEntropyAdvertisement<K> ad = createAdvertisement();
546
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800547 if (!unicastMessage(peer, antiEntropyAdvertisementSubject, ad)) {
548 log.debug("Failed to send anti-entropy advertisement to {}", peer);
549 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800550 } catch (Exception e) {
551 // Catch all exceptions to avoid scheduled task being suppressed.
552 log.error("Exception thrown while sending advertisement", e);
553 }
554 }
555 }
556
557 private AntiEntropyAdvertisement<K> createAdvertisement() {
558 final NodeId self = clusterService.getLocalNode().id();
559
560 Map<K, Timestamp> timestamps = new HashMap<>(items.size());
561
562 items.forEach((key, value) -> timestamps.put(key, value.timestamp()));
563
564 Map<K, Timestamp> tombstones = new HashMap<>(removedItems);
565
566 return new AntiEntropyAdvertisement<>(self, timestamps, tombstones);
567 }
568
569 private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
570 List<EventuallyConsistentMapEvent<K, V>> externalEvents;
571
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800572 externalEvents = antiEntropyCheckLocalItems(ad);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800573
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800574 antiEntropyCheckLocalRemoved(ad);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800575
Jonathan Hartf893be82015-02-24 17:35:51 -0800576 if (!lightweightAntiEntropy) {
577 externalEvents.addAll(antiEntropyCheckRemoteRemoved(ad));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800578
Jonathan Hartf893be82015-02-24 17:35:51 -0800579 // if remote ad has something unknown, actively sync
580 for (K key : ad.timestamps().keySet()) {
581 if (!items.containsKey(key)) {
582 // Send the advertisement back if this peer is out-of-sync
583 final NodeId sender = ad.sender();
584 AntiEntropyAdvertisement<K> myAd = createAdvertisement();
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800585 if (!unicastMessage(sender, antiEntropyAdvertisementSubject, myAd)) {
586 log.debug("Failed to send reactive anti-entropy advertisement to {}", sender);
587 }
Jonathan Hartf893be82015-02-24 17:35:51 -0800588 break;
589 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800590 }
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800591 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800592 externalEvents.forEach(this::notifyListeners);
593 }
594
595 /**
596 * Checks if any of the remote's live items or tombstones are out of date
597 * according to our local live item list, or if our live items are out of
598 * date according to the remote's tombstone list.
599 * If the local copy is more recent, it will be pushed to the remote. If the
600 * remote has a more recent remove, we apply that to the local state.
601 *
602 * @param ad remote anti-entropy advertisement
603 * @return list of external events relating to local operations performed
604 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800605 private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
606 AntiEntropyAdvertisement<K> ad) {
607 final List<EventuallyConsistentMapEvent<K, V>> externalEvents
608 = new LinkedList<>();
609 final NodeId sender = ad.sender();
610
Jonathan Hartaaa56572015-01-28 21:56:35 -0800611 for (Map.Entry<K, Timestamped<V>> item : items.entrySet()) {
612 K key = item.getKey();
613 Timestamped<V> localValue = item.getValue();
614
615 Timestamp remoteTimestamp = ad.timestamps().get(key);
616 if (remoteTimestamp == null) {
617 remoteTimestamp = ad.tombstones().get(key);
618 }
619 if (remoteTimestamp == null || localValue
Jonathan Hart403ea932015-02-20 16:23:00 -0800620 .isNewerThan(remoteTimestamp)) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800621 // local value is more recent, push to sender
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800622 queueUpdate(new PutEntry<>(key, localValue.value(),
623 localValue.timestamp()), ImmutableList.of(sender));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800624 }
625
626 Timestamp remoteDeadTimestamp = ad.tombstones().get(key);
627 if (remoteDeadTimestamp != null &&
Jonathan Hart403ea932015-02-20 16:23:00 -0800628 remoteDeadTimestamp.isNewerThan(localValue.timestamp())) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800629 // sender has a more recent remove
630 if (removeInternal(key, remoteDeadTimestamp)) {
631 externalEvents.add(new EventuallyConsistentMapEvent<>(
632 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
633 }
634 }
635 }
636
Jonathan Hartaaa56572015-01-28 21:56:35 -0800637 return externalEvents;
638 }
639
640 /**
641 * Checks if any items in the remote live list are out of date according
642 * to our tombstone list. If we find we have a more up to date tombstone,
643 * we'll send it to the remote.
644 *
645 * @param ad remote anti-entropy advertisement
646 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800647 private void antiEntropyCheckLocalRemoved(AntiEntropyAdvertisement<K> ad) {
648 final NodeId sender = ad.sender();
649
Jonathan Hartaaa56572015-01-28 21:56:35 -0800650 for (Map.Entry<K, Timestamp> dead : removedItems.entrySet()) {
651 K key = dead.getKey();
652 Timestamp localDeadTimestamp = dead.getValue();
653
654 Timestamp remoteLiveTimestamp = ad.timestamps().get(key);
655 if (remoteLiveTimestamp != null
Jonathan Hart403ea932015-02-20 16:23:00 -0800656 && localDeadTimestamp.isNewerThan(remoteLiveTimestamp)) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800657 // sender has zombie, push remove
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800658 queueUpdate(new RemoveEntry<>(key, localDeadTimestamp), ImmutableList.of(sender));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800659 }
660 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800661 }
662
663 /**
664 * Checks if any of the local live items are out of date according to the
665 * remote's tombstone advertisements. If we find a local item is out of date,
666 * we'll apply the remove operation to the local state.
667 *
668 * @param ad remote anti-entropy advertisement
669 * @return list of external events relating to local operations performed
670 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800671 private List<EventuallyConsistentMapEvent<K, V>>
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800672 antiEntropyCheckRemoteRemoved(AntiEntropyAdvertisement<K> ad) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800673 final List<EventuallyConsistentMapEvent<K, V>> externalEvents
674 = new LinkedList<>();
675
676 for (Map.Entry<K, Timestamp> remoteDead : ad.tombstones().entrySet()) {
677 K key = remoteDead.getKey();
678 Timestamp remoteDeadTimestamp = remoteDead.getValue();
679
680 Timestamped<V> local = items.get(key);
681 Timestamp localDead = removedItems.get(key);
Jonathan Hart403ea932015-02-20 16:23:00 -0800682 if (local != null && remoteDeadTimestamp.isNewerThan(
683 local.timestamp())) {
684 // If the remote has a more recent tombstone than either our local
685 // value, then do a remove with their timestamp
Jonathan Hartaaa56572015-01-28 21:56:35 -0800686 if (removeInternal(key, remoteDeadTimestamp)) {
687 externalEvents.add(new EventuallyConsistentMapEvent<>(
688 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
689 }
Jonathan Hart403ea932015-02-20 16:23:00 -0800690 } else if (localDead != null && remoteDeadTimestamp.isNewerThan(
691 localDead)) {
692 // If the remote has a more recent tombstone than us, update ours
693 // to their timestamp
694 removeInternal(key, remoteDeadTimestamp);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800695 }
696 }
697
698 return externalEvents;
699 }
700
701 private final class InternalAntiEntropyListener
702 implements ClusterMessageHandler {
703
704 @Override
705 public void handle(ClusterMessage message) {
Jonathan Hart4fd4ebb2015-02-04 17:38:48 -0800706 log.trace("Received anti-entropy advertisement from peer: {}",
707 message.sender());
Jonathan Hartaaa56572015-01-28 21:56:35 -0800708 AntiEntropyAdvertisement<K> advertisement = serializer.decode(message.payload());
Madan Jampani2af244a2015-02-22 13:12:01 -0800709 try {
Jonathan Hart233a18a2015-03-02 17:24:58 -0800710 if (!underHighLoad()) {
711 handleAntiEntropyAdvertisement(advertisement);
712 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800713 } catch (Exception e) {
714 log.warn("Exception thrown handling advertisements", e);
715 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800716 }
717 }
718
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800719 private final class InternalEventListener implements
Jonathan Hartdb3af892015-01-26 13:19:07 -0800720 ClusterMessageHandler {
721 @Override
722 public void handle(ClusterMessage message) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800723 log.debug("Received update event from peer: {}", message.sender());
724 Collection<AbstractEntry<K, V>> events = serializer.decode(message.payload());
Jonathan Hartdb3af892015-01-26 13:19:07 -0800725
Madan Jampani2af244a2015-02-22 13:12:01 -0800726 try {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800727 // TODO clean this for loop up
728 for (AbstractEntry<K, V> entry : events) {
729 final K key = entry.key();
730 final V value;
731 final Timestamp timestamp = entry.timestamp();
732 final EventuallyConsistentMapEvent.Type type;
733 if (entry instanceof PutEntry) {
734 PutEntry<K, V> putEntry = (PutEntry<K, V>) entry;
735 value = putEntry.value();
736 type = EventuallyConsistentMapEvent.Type.PUT;
737 } else if (entry instanceof RemoveEntry) {
738 type = EventuallyConsistentMapEvent.Type.REMOVE;
739 value = null;
740 } else {
741 throw new IllegalStateException("Unknown entry type " + entry.getClass());
742 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800743
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800744 boolean success;
745 switch (type) {
746 case PUT:
747 success = putInternal(key, value, timestamp);
748 break;
749 case REMOVE:
750 success = removeInternal(key, timestamp);
751 break;
752 default:
753 success = false;
754 }
755 if (success) {
756 notifyListeners(new EventuallyConsistentMapEvent<>(type, key, value));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800757 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800758 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800759 } catch (Exception e) {
760 log.warn("Exception thrown handling put", e);
761 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800762 }
763 }
764
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800765 // TODO pull this into the class if this gets pulled out...
766 private static final int DEFAULT_MAX_EVENTS = 1000;
767 private static final int DEFAULT_MAX_IDLE_MS = 10;
768 private static final int DEFAULT_MAX_BATCH_MS = 50;
769 private static final Timer TIMER = new Timer("onos-ecm-sender-events");
Jonathan Hartdb3af892015-01-26 13:19:07 -0800770
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800771 private final class EventAccumulator extends AbstractAccumulator<AbstractEntry<K, V>> {
772
773 private final NodeId peer;
774
775 private EventAccumulator(NodeId peer) {
776 super(TIMER, DEFAULT_MAX_EVENTS, DEFAULT_MAX_BATCH_MS, DEFAULT_MAX_IDLE_MS);
777 this.peer = peer;
778 }
779
780 @Override
781 public void processItems(List<AbstractEntry<K, V>> items) {
782 Map<K, AbstractEntry<K, V>> map = Maps.newHashMap();
783 items.forEach(item -> map.compute(item.key(), (key, oldValue) ->
784 oldValue == null || item.compareTo(oldValue) > 0 ? item : oldValue
785 )
786 );
787 communicationExecutor.submit(() -> {
788 try {
789 unicastMessage(peer, updateMessageSubject, Lists.newArrayList(map.values()));
790 } catch (Exception e) {
791 log.warn("broadcast error", e);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800792 }
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800793 });
Jonathan Hartdb3af892015-01-26 13:19:07 -0800794 }
795 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800796}