blob: cd15895777a76e0526df1402efe136256280a2fd [file] [log] [blame]
Madan Jampani7e55c662016-02-15 21:13:53 -08001/*
Thomas Vachuskab6d31672018-07-27 17:03:46 -07002 * Copyright 2018-present Open Networking Foundation
Madan Jampani7e55c662016-02-15 21:13:53 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Thomas Vachuskab6d31672018-07-27 17:03:46 -070016package org.onosproject.store.atomix.primitives.impl;
Madan Jampani7e55c662016-02-15 21:13:53 -080017
Jordan Halterman980a8c12017-09-22 18:01:19 -070018import com.google.common.collect.Maps;
Jordan Halterman00e92da2018-05-22 23:05:52 -070019import io.atomix.core.Atomix;
20import io.atomix.core.counter.AtomicCounter;
21import io.atomix.core.counter.AtomicCounterType;
22import io.atomix.core.map.AtomicMapType;
23import io.atomix.core.workqueue.WorkQueueType;
24import io.atomix.primitive.partition.PartitionGroup;
25import io.atomix.protocols.raft.MultiRaftProtocol;
Jordan Halterman28183ee2017-10-17 17:29:10 -070026import org.onosproject.cluster.ClusterService;
slowr878625f2017-10-24 14:53:49 -070027import org.onosproject.cluster.ControllerNode;
28import org.onosproject.cluster.Member;
29import org.onosproject.cluster.MembershipService;
30import org.onosproject.cluster.NodeId;
Madan Jampani7e55c662016-02-15 21:13:53 -080031import org.onosproject.persistence.PersistenceService;
Thomas Vachuskab6d31672018-07-27 17:03:46 -070032import org.onosproject.store.atomix.impl.AtomixManager;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070033import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Madan Jampani630c8822016-02-24 10:38:21 -080034import org.onosproject.store.primitives.PartitionAdminService;
Madan Jampani7e55c662016-02-15 21:13:53 -080035import org.onosproject.store.primitives.TransactionId;
36import org.onosproject.store.serializers.KryoNamespaces;
Aaron Kruglikov61582a02016-09-06 13:18:58 -070037import org.onosproject.store.service.AsyncConsistentMultimap;
38import org.onosproject.store.service.AsyncConsistentTreeMap;
Jordan Halterman980a8c12017-09-22 18:01:19 -070039import org.onosproject.store.service.AsyncDocumentTree;
Madan Jampani7e55c662016-02-15 21:13:53 -080040import org.onosproject.store.service.AtomicCounterBuilder;
Jordan Haltermanc955df72017-02-04 20:43:28 -080041import org.onosproject.store.service.AtomicCounterMapBuilder;
Jordan Halterman5a1053e2017-05-19 18:03:47 -070042import org.onosproject.store.service.AtomicIdGeneratorBuilder;
Madan Jampani7e55c662016-02-15 21:13:53 -080043import org.onosproject.store.service.AtomicValueBuilder;
Madan Jampani7e55c662016-02-15 21:13:53 -080044import org.onosproject.store.service.ConsistentMapBuilder;
Aaron Kruglikov61582a02016-09-06 13:18:58 -070045import org.onosproject.store.service.ConsistentMultimapBuilder;
Aaron Kruglikoved88ff62016-08-01 16:02:09 -070046import org.onosproject.store.service.ConsistentTreeMapBuilder;
Jordan Halterman47432582018-01-25 16:56:45 -080047import org.onosproject.store.service.DistributedLockBuilder;
Madan Jampani7e55c662016-02-15 21:13:53 -080048import org.onosproject.store.service.DistributedSetBuilder;
Sithara Punnassery112ed822016-10-24 14:55:19 -070049import org.onosproject.store.service.DocumentTreeBuilder;
Madan Jampani7e55c662016-02-15 21:13:53 -080050import org.onosproject.store.service.EventuallyConsistentMapBuilder;
51import org.onosproject.store.service.LeaderElectorBuilder;
52import org.onosproject.store.service.MapInfo;
53import org.onosproject.store.service.PartitionInfo;
54import org.onosproject.store.service.Serializer;
55import org.onosproject.store.service.StorageAdminService;
56import org.onosproject.store.service.StorageService;
Madan Jampani13f65152016-08-17 13:14:53 -070057import org.onosproject.store.service.Topic;
Jordan Halterman400bbe52018-04-05 23:07:47 -070058import org.onosproject.store.service.TopicBuilder;
Madan Jampani7e55c662016-02-15 21:13:53 -080059import org.onosproject.store.service.TransactionContextBuilder;
Madan Jampani819d61d2016-07-25 20:29:43 -070060import org.onosproject.store.service.WorkQueue;
Jordan Halterman00e92da2018-05-22 23:05:52 -070061import org.onosproject.store.service.WorkQueueBuilder;
Madan Jampani35708a92016-07-06 10:48:19 -070062import org.onosproject.store.service.WorkQueueStats;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070063import org.osgi.service.component.annotations.Activate;
64import org.osgi.service.component.annotations.Component;
65import org.osgi.service.component.annotations.Deactivate;
66import org.osgi.service.component.annotations.Reference;
67import org.osgi.service.component.annotations.ReferenceCardinality;
Madan Jampani7e55c662016-02-15 21:13:53 -080068import org.slf4j.Logger;
69
Ray Milkeyd84f89b2018-08-17 14:54:17 -070070import java.util.Collection;
71import java.util.List;
72import java.util.Map;
73import java.util.function.Supplier;
74import java.util.stream.Collectors;
75
Jordan Halterman980a8c12017-09-22 18:01:19 -070076import static org.onosproject.security.AppGuard.checkPermission;
77import static org.onosproject.security.AppPermission.Type.STORAGE_WRITE;
78import static org.slf4j.LoggerFactory.getLogger;
Madan Jampani7e55c662016-02-15 21:13:53 -080079
80/**
81 * Implementation for {@code StorageService} and {@code StorageAdminService}.
82 */
Ray Milkeyd84f89b2018-08-17 14:54:17 -070083@Component(immediate = true, service = { StorageService.class, StorageAdminService.class })
Madan Jampani7e55c662016-02-15 21:13:53 -080084public class StorageManager implements StorageService, StorageAdminService {
85
86 private final Logger log = getLogger(getClass());
87
Ray Milkeyd84f89b2018-08-17 14:54:17 -070088 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jordan Halterman28183ee2017-10-17 17:29:10 -070089 protected ClusterService clusterService;
Madan Jampani7e55c662016-02-15 21:13:53 -080090
Ray Milkeyd84f89b2018-08-17 14:54:17 -070091 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jordan Halterman28183ee2017-10-17 17:29:10 -070092 protected ClusterCommunicationService clusterCommunicator;
Madan Jampani7e55c662016-02-15 21:13:53 -080093
Ray Milkeyd84f89b2018-08-17 14:54:17 -070094 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Madan Jampani7e55c662016-02-15 21:13:53 -080095 protected PersistenceService persistenceService;
96
Ray Milkeyd84f89b2018-08-17 14:54:17 -070097 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Madan Jampani630c8822016-02-24 10:38:21 -080098 protected PartitionAdminService partitionAdminService;
99
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700100 @Reference(cardinality = ReferenceCardinality.MANDATORY)
slowr878625f2017-10-24 14:53:49 -0700101 protected MembershipService membershipService;
102
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700103 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jordan Halterman00e92da2018-05-22 23:05:52 -0700104 protected AtomixManager atomixManager;
105
106 private Atomix atomix;
107 private PartitionGroup group;
Madan Jampani7e55c662016-02-15 21:13:53 -0800108
109 @Activate
Madan Jampani86cb2432016-02-17 11:07:56 -0800110 public void activate() {
Jordan Halterman00e92da2018-05-22 23:05:52 -0700111 atomix = atomixManager.getAtomix();
112 group = atomix.getPartitionService().getPartitionGroup(MultiRaftProtocol.TYPE);
Madan Jampani7e55c662016-02-15 21:13:53 -0800113 log.info("Started");
114 }
115
116 @Deactivate
117 public void deactivate() {
118 log.info("Stopped");
119 }
120
121 @Override
122 public <K, V> EventuallyConsistentMapBuilder<K, V> eventuallyConsistentMapBuilder() {
Heedo Kang4a47a302016-02-29 17:40:23 +0900123 checkPermission(STORAGE_WRITE);
Jordan Haltermana3f16112018-01-11 09:34:52 -0800124
125 // Note: NPE in the usage of ClusterService/MembershipService prevents rebooting the Karaf container.
126 // We need to reference these services outside the following peer suppliers.
127 final MembershipService membershipService = this.membershipService;
128 final ClusterService clusterService = this.clusterService;
129
slowr878625f2017-10-24 14:53:49 -0700130 final NodeId localNodeId = clusterService.getLocalNode().id();
131
Jordan Haltermana3f16112018-01-11 09:34:52 -0800132 // Use the MembershipService to provide peers for the map that are isolated within the current version.
slowr878625f2017-10-24 14:53:49 -0700133 Supplier<List<NodeId>> peersSupplier = () -> membershipService.getMembers().stream()
Jordan Halterman00e92da2018-05-22 23:05:52 -0700134 .map(Member::nodeId)
135 .filter(nodeId -> !nodeId.equals(localNodeId))
136 .filter(id -> clusterService.getState(id).isActive())
137 .collect(Collectors.toList());
slowr878625f2017-10-24 14:53:49 -0700138
Jordan Haltermana3f16112018-01-11 09:34:52 -0800139 // If this is the first node in its version, bootstrap from the previous version. Otherwise, bootstrap the
140 // map from members isolated within the current version.
slowr878625f2017-10-24 14:53:49 -0700141 Supplier<List<NodeId>> bootstrapPeersSupplier = () -> {
142 if (membershipService.getMembers().size() == 1) {
143 return clusterService.getNodes()
Jordan Halterman00e92da2018-05-22 23:05:52 -0700144 .stream()
145 .map(ControllerNode::id)
146 .filter(id -> !localNodeId.equals(id))
147 .filter(id -> clusterService.getState(id).isActive())
148 .collect(Collectors.toList());
slowr878625f2017-10-24 14:53:49 -0700149 } else {
150 return membershipService.getMembers()
Jordan Halterman00e92da2018-05-22 23:05:52 -0700151 .stream()
152 .map(Member::nodeId)
153 .filter(id -> !localNodeId.equals(id))
154 .filter(id -> clusterService.getState(id).isActive())
155 .collect(Collectors.toList());
slowr878625f2017-10-24 14:53:49 -0700156 }
157 };
158
slowr878625f2017-10-24 14:53:49 -0700159 return new EventuallyConsistentMapBuilderImpl<>(
Jordan Halterman00e92da2018-05-22 23:05:52 -0700160 localNodeId,
161 clusterCommunicator,
162 persistenceService,
163 peersSupplier,
164 bootstrapPeersSupplier
slowr878625f2017-10-24 14:53:49 -0700165 );
Madan Jampani7e55c662016-02-15 21:13:53 -0800166 }
167
168 @Override
169 public <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder() {
Heedo Kang4a47a302016-02-29 17:40:23 +0900170 checkPermission(STORAGE_WRITE);
Jordan Halterman00e92da2018-05-22 23:05:52 -0700171 return new AtomixConsistentMapBuilder<>(atomix, group.name());
Madan Jampani7e55c662016-02-15 21:13:53 -0800172 }
173
174 @Override
Sithara Punnassery112ed822016-10-24 14:55:19 -0700175 public <V> DocumentTreeBuilder<V> documentTreeBuilder() {
176 checkPermission(STORAGE_WRITE);
Jordan Halterman00e92da2018-05-22 23:05:52 -0700177 return new AtomixDocumentTreeBuilder<V>(atomix, group.name());
Sithara Punnassery112ed822016-10-24 14:55:19 -0700178 }
179
180 @Override
Aaron Kruglikoved88ff62016-08-01 16:02:09 -0700181 public <V> ConsistentTreeMapBuilder<V> consistentTreeMapBuilder() {
Jordan Halterman00e92da2018-05-22 23:05:52 -0700182 return new AtomixConsistentTreeMapBuilder<>(atomix, group.name());
Aaron Kruglikoved88ff62016-08-01 16:02:09 -0700183 }
184
185 @Override
Aaron Kruglikov61582a02016-09-06 13:18:58 -0700186 public <K, V> ConsistentMultimapBuilder<K, V> consistentMultimapBuilder() {
187 checkPermission(STORAGE_WRITE);
Jordan Halterman00e92da2018-05-22 23:05:52 -0700188 return new AtomixConsistentMultimapBuilder<>(atomix, group.name());
Aaron Kruglikov61582a02016-09-06 13:18:58 -0700189 }
190
191 @Override
Jordan Haltermanc955df72017-02-04 20:43:28 -0800192 public <K> AtomicCounterMapBuilder<K> atomicCounterMapBuilder() {
193 checkPermission(STORAGE_WRITE);
Jordan Halterman00e92da2018-05-22 23:05:52 -0700194 return new AtomixAtomicCounterMapBuilder<>(atomix, group.name());
Jordan Haltermanc955df72017-02-04 20:43:28 -0800195 }
196
197 @Override
Madan Jampani7e55c662016-02-15 21:13:53 -0800198 public <E> DistributedSetBuilder<E> setBuilder() {
Heedo Kang4a47a302016-02-29 17:40:23 +0900199 checkPermission(STORAGE_WRITE);
Jordan Halterman00e92da2018-05-22 23:05:52 -0700200 return new AtomixDistributedSetBuilder<>(atomix, group.name());
Madan Jampani7e55c662016-02-15 21:13:53 -0800201 }
202
203 @Override
Madan Jampani7e55c662016-02-15 21:13:53 -0800204 public AtomicCounterBuilder atomicCounterBuilder() {
Heedo Kang4a47a302016-02-29 17:40:23 +0900205 checkPermission(STORAGE_WRITE);
Jordan Halterman00e92da2018-05-22 23:05:52 -0700206 return new AtomixAtomicCounterBuilder(atomix, group.name());
Madan Jampani7e55c662016-02-15 21:13:53 -0800207 }
208
209 @Override
Jordan Halterman5a1053e2017-05-19 18:03:47 -0700210 public AtomicIdGeneratorBuilder atomicIdGeneratorBuilder() {
211 checkPermission(STORAGE_WRITE);
Jordan Halterman00e92da2018-05-22 23:05:52 -0700212 return new AtomixAtomicIdGeneratorBuilder(atomix, group.name());
Jordan Halterman5a1053e2017-05-19 18:03:47 -0700213 }
214
215 @Override
Madan Jampani7e55c662016-02-15 21:13:53 -0800216 public <V> AtomicValueBuilder<V> atomicValueBuilder() {
Heedo Kang4a47a302016-02-29 17:40:23 +0900217 checkPermission(STORAGE_WRITE);
Jordan Halterman00e92da2018-05-22 23:05:52 -0700218 return new AtomixAtomicValueBuilder<>(atomix, group.name());
Madan Jampani7e55c662016-02-15 21:13:53 -0800219 }
220
221 @Override
222 public TransactionContextBuilder transactionContextBuilder() {
Heedo Kang4a47a302016-02-29 17:40:23 +0900223 checkPermission(STORAGE_WRITE);
Jordan Halterman00e92da2018-05-22 23:05:52 -0700224 return new AtomixTransactionContextBuilder(atomix, group.name());
Madan Jampani7e55c662016-02-15 21:13:53 -0800225 }
226
227 @Override
Jordan Halterman47432582018-01-25 16:56:45 -0800228 public DistributedLockBuilder lockBuilder() {
229 checkPermission(STORAGE_WRITE);
Jordan Halterman00e92da2018-05-22 23:05:52 -0700230 return new AtomixDistributedLockBuilder(atomix, group.name());
Jordan Halterman47432582018-01-25 16:56:45 -0800231 }
232
233 @Override
Madan Jampani7e55c662016-02-15 21:13:53 -0800234 public LeaderElectorBuilder leaderElectorBuilder() {
Heedo Kang4a47a302016-02-29 17:40:23 +0900235 checkPermission(STORAGE_WRITE);
Jordan Halterman00e92da2018-05-22 23:05:52 -0700236 return new AtomixLeaderElectorBuilder(atomix, group.name(), clusterService.getLocalNode().id());
Madan Jampani7e55c662016-02-15 21:13:53 -0800237 }
238
239 @Override
Jordan Halterman400bbe52018-04-05 23:07:47 -0700240 public <T> TopicBuilder<T> topicBuilder() {
241 checkPermission(STORAGE_WRITE);
Jordan Halterman00e92da2018-05-22 23:05:52 -0700242 return new AtomixDistributedTopicBuilder<>(atomix, group.name());
243 }
244
245 @Override
246 public <E> WorkQueueBuilder<E> workQueueBuilder() {
247 checkPermission(STORAGE_WRITE);
248 return new AtomixWorkQueueBuilder<>(atomix, group.name());
Jordan Halterman400bbe52018-04-05 23:07:47 -0700249 }
250
251 @Override
Madan Jampani35708a92016-07-06 10:48:19 -0700252 public <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer) {
253 checkPermission(STORAGE_WRITE);
Jordan Halterman00e92da2018-05-22 23:05:52 -0700254 return this.<E>workQueueBuilder()
255 .withName(name)
256 .withSerializer(serializer)
257 .build();
Madan Jampani35708a92016-07-06 10:48:19 -0700258 }
259
260 @Override
Madan Jampani2914e4e2016-09-13 17:48:56 -0700261 public <V> AsyncDocumentTree<V> getDocumentTree(String name, Serializer serializer) {
262 checkPermission(STORAGE_WRITE);
Jordan Halterman00e92da2018-05-22 23:05:52 -0700263 return this.<V>documentTreeBuilder()
264 .withName(name)
265 .withSerializer(serializer)
266 .build();
Madan Jampani2914e4e2016-09-13 17:48:56 -0700267 }
268
269 @Override
Jordan Halterman00e92da2018-05-22 23:05:52 -0700270 public <K, V> AsyncConsistentMultimap<K, V> getAsyncSetMultimap(String name, Serializer serializer) {
Aaron Kruglikov61582a02016-09-06 13:18:58 -0700271 checkPermission(STORAGE_WRITE);
Jordan Halterman00e92da2018-05-22 23:05:52 -0700272 return new AtomixConsistentMultimapBuilder<K, V>(atomix, group.name())
273 .withName(name)
274 .withSerializer(serializer)
275 .buildMultimap();
Aaron Kruglikov61582a02016-09-06 13:18:58 -0700276 }
277
278 @Override
Jordan Halterman00e92da2018-05-22 23:05:52 -0700279 public <V> AsyncConsistentTreeMap<V> getAsyncTreeMap(String name, Serializer serializer) {
Aaron Kruglikov61582a02016-09-06 13:18:58 -0700280 checkPermission(STORAGE_WRITE);
Jordan Halterman00e92da2018-05-22 23:05:52 -0700281 return this.<V>consistentTreeMapBuilder()
282 .withName(name)
283 .withSerializer(serializer)
284 .buildTreeMap();
285 }
286
287 @Override
288 public <T> Topic<T> getTopic(String name, Serializer serializer) {
289 checkPermission(STORAGE_WRITE);
290 return this.<T>topicBuilder()
291 .withName(name)
292 .withSerializer(serializer)
293 .build();
Aaron Kruglikov61582a02016-09-06 13:18:58 -0700294 }
295
296 @Override
Madan Jampani7e55c662016-02-15 21:13:53 -0800297 public List<MapInfo> getMapInfo() {
Jordan Halterman00e92da2018-05-22 23:05:52 -0700298 Serializer serializer = Serializer.using(KryoNamespaces.BASIC);
299 return atomix.getPrimitives(AtomicMapType.instance())
300 .stream()
301 .map(info -> {
302 io.atomix.core.map.AtomicMap<String, byte[]> map =
303 atomix.<String, byte[]>atomicMapBuilder(info.name())
304 .withSerializer(new AtomixSerializerAdapter(serializer))
305 .build();
306 int size = map.size();
307 map.close();
308 return new MapInfo(info.name(), size);
309 }).collect(Collectors.toList());
Madan Jampani7e55c662016-02-15 21:13:53 -0800310 }
311
312 @Override
313 public Map<String, Long> getCounters() {
Jordan Halterman00e92da2018-05-22 23:05:52 -0700314 return atomix.getPrimitives(AtomicCounterType.instance())
315 .stream()
316 .map(info -> {
317 AtomicCounter counter = atomix.atomicCounterBuilder(info.name()).build();
318 long value = counter.get();
319 counter.close();
320 return Maps.immutableEntry(info.name(), value);
321 }).collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
Madan Jampani7e55c662016-02-15 21:13:53 -0800322 }
323
324 @Override
Madan Jampani35708a92016-07-06 10:48:19 -0700325 public Map<String, WorkQueueStats> getQueueStats() {
Jordan Halterman00e92da2018-05-22 23:05:52 -0700326 Serializer serializer = Serializer.using(KryoNamespaces.BASIC);
327 return atomix.getPrimitives(WorkQueueType.instance())
328 .stream()
329 .map(info -> {
330 io.atomix.core.workqueue.WorkQueue queue = atomix.workQueueBuilder(info.name())
331 .withSerializer(new AtomixSerializerAdapter(serializer))
332 .build();
333 io.atomix.core.workqueue.WorkQueueStats stats = queue.stats();
334 return Maps.immutableEntry(info.name(), WorkQueueStats.builder()
335 .withTotalCompleted(stats.totalCompleted())
336 .withTotalInProgress(stats.totalInProgress())
337 .withTotalPending(stats.totalPending())
338 .build());
339 }).collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
Madan Jampani35708a92016-07-06 10:48:19 -0700340 }
341
342 @Override
Madan Jampani7e55c662016-02-15 21:13:53 -0800343 public List<PartitionInfo> getPartitionInfo() {
Madan Jampani630c8822016-02-24 10:38:21 -0800344 return partitionAdminService.partitionInfo();
Madan Jampani7e55c662016-02-15 21:13:53 -0800345 }
346
347 @Override
348 public Collection<TransactionId> getPendingTransactions() {
Jordan Halterman00e92da2018-05-22 23:05:52 -0700349 return atomix.getTransactionService().getActiveTransactions()
350 .stream()
351 .map(transactionId -> TransactionId.from(transactionId.id()))
352 .collect(Collectors.toList());
Madan Jampani13f65152016-08-17 13:14:53 -0700353 }
Madan Jampani630c8822016-02-24 10:38:21 -0800354}