blob: 08001798de6b43092fc1cb544299fe118a6025c6 [file] [log] [blame]
Madan Jampani7e55c662016-02-15 21:13:53 -08001/*
Thomas Vachuskab6d31672018-07-27 17:03:46 -07002 * Copyright 2018-present Open Networking Foundation
Madan Jampani7e55c662016-02-15 21:13:53 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Thomas Vachuskab6d31672018-07-27 17:03:46 -070016package org.onosproject.store.atomix.primitives.impl;
Madan Jampani7e55c662016-02-15 21:13:53 -080017
Jordan Halterman980a8c12017-09-22 18:01:19 -070018import com.google.common.collect.Maps;
Jordan Halterman00e92da2018-05-22 23:05:52 -070019import io.atomix.core.Atomix;
20import io.atomix.core.counter.AtomicCounter;
21import io.atomix.core.counter.AtomicCounterType;
22import io.atomix.core.map.AtomicMapType;
23import io.atomix.core.workqueue.WorkQueueType;
24import io.atomix.primitive.partition.PartitionGroup;
25import io.atomix.protocols.raft.MultiRaftProtocol;
Jordan Halterman28183ee2017-10-17 17:29:10 -070026import org.onosproject.cluster.ClusterService;
slowr878625f2017-10-24 14:53:49 -070027import org.onosproject.cluster.ControllerNode;
28import org.onosproject.cluster.Member;
29import org.onosproject.cluster.MembershipService;
30import org.onosproject.cluster.NodeId;
Madan Jampani7e55c662016-02-15 21:13:53 -080031import org.onosproject.persistence.PersistenceService;
Thomas Vachuskab6d31672018-07-27 17:03:46 -070032import org.onosproject.store.atomix.impl.AtomixManager;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070033import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Madan Jampani630c8822016-02-24 10:38:21 -080034import org.onosproject.store.primitives.PartitionAdminService;
Madan Jampani7e55c662016-02-15 21:13:53 -080035import org.onosproject.store.primitives.TransactionId;
36import org.onosproject.store.serializers.KryoNamespaces;
Aaron Kruglikov61582a02016-09-06 13:18:58 -070037import org.onosproject.store.service.AsyncConsistentMultimap;
38import org.onosproject.store.service.AsyncConsistentTreeMap;
Jordan Halterman980a8c12017-09-22 18:01:19 -070039import org.onosproject.store.service.AsyncDocumentTree;
Madan Jampani7e55c662016-02-15 21:13:53 -080040import org.onosproject.store.service.AtomicCounterBuilder;
Jordan Haltermanc955df72017-02-04 20:43:28 -080041import org.onosproject.store.service.AtomicCounterMapBuilder;
Jordan Halterman5a1053e2017-05-19 18:03:47 -070042import org.onosproject.store.service.AtomicIdGeneratorBuilder;
Madan Jampani7e55c662016-02-15 21:13:53 -080043import org.onosproject.store.service.AtomicValueBuilder;
Madan Jampani7e55c662016-02-15 21:13:53 -080044import org.onosproject.store.service.ConsistentMapBuilder;
Aaron Kruglikov61582a02016-09-06 13:18:58 -070045import org.onosproject.store.service.ConsistentMultimapBuilder;
Aaron Kruglikoved88ff62016-08-01 16:02:09 -070046import org.onosproject.store.service.ConsistentTreeMapBuilder;
Jordan Halterman47432582018-01-25 16:56:45 -080047import org.onosproject.store.service.DistributedLockBuilder;
Madan Jampani7e55c662016-02-15 21:13:53 -080048import org.onosproject.store.service.DistributedSetBuilder;
Sithara Punnassery112ed822016-10-24 14:55:19 -070049import org.onosproject.store.service.DocumentTreeBuilder;
Madan Jampani7e55c662016-02-15 21:13:53 -080050import org.onosproject.store.service.EventuallyConsistentMapBuilder;
51import org.onosproject.store.service.LeaderElectorBuilder;
52import org.onosproject.store.service.MapInfo;
Madan Jampani7e55c662016-02-15 21:13:53 -080053import org.onosproject.store.service.Serializer;
54import org.onosproject.store.service.StorageAdminService;
55import org.onosproject.store.service.StorageService;
Madan Jampani13f65152016-08-17 13:14:53 -070056import org.onosproject.store.service.Topic;
Jordan Halterman400bbe52018-04-05 23:07:47 -070057import org.onosproject.store.service.TopicBuilder;
Madan Jampani7e55c662016-02-15 21:13:53 -080058import org.onosproject.store.service.TransactionContextBuilder;
Madan Jampani819d61d2016-07-25 20:29:43 -070059import org.onosproject.store.service.WorkQueue;
Jordan Halterman00e92da2018-05-22 23:05:52 -070060import org.onosproject.store.service.WorkQueueBuilder;
Madan Jampani35708a92016-07-06 10:48:19 -070061import org.onosproject.store.service.WorkQueueStats;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070062import org.osgi.service.component.annotations.Activate;
63import org.osgi.service.component.annotations.Component;
64import org.osgi.service.component.annotations.Deactivate;
65import org.osgi.service.component.annotations.Reference;
66import org.osgi.service.component.annotations.ReferenceCardinality;
Madan Jampani7e55c662016-02-15 21:13:53 -080067import org.slf4j.Logger;
68
Ray Milkeyd84f89b2018-08-17 14:54:17 -070069import java.util.Collection;
70import java.util.List;
71import java.util.Map;
72import java.util.function.Supplier;
73import java.util.stream.Collectors;
74
Jordan Halterman980a8c12017-09-22 18:01:19 -070075import static org.onosproject.security.AppGuard.checkPermission;
76import static org.onosproject.security.AppPermission.Type.STORAGE_WRITE;
77import static org.slf4j.LoggerFactory.getLogger;
Madan Jampani7e55c662016-02-15 21:13:53 -080078
79/**
80 * Implementation for {@code StorageService} and {@code StorageAdminService}.
81 */
Ray Milkeyd84f89b2018-08-17 14:54:17 -070082@Component(immediate = true, service = { StorageService.class, StorageAdminService.class })
Madan Jampani7e55c662016-02-15 21:13:53 -080083public class StorageManager implements StorageService, StorageAdminService {
84
85 private final Logger log = getLogger(getClass());
86
Ray Milkeyd84f89b2018-08-17 14:54:17 -070087 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jordan Halterman28183ee2017-10-17 17:29:10 -070088 protected ClusterService clusterService;
Madan Jampani7e55c662016-02-15 21:13:53 -080089
Ray Milkeyd84f89b2018-08-17 14:54:17 -070090 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jordan Halterman28183ee2017-10-17 17:29:10 -070091 protected ClusterCommunicationService clusterCommunicator;
Madan Jampani7e55c662016-02-15 21:13:53 -080092
Ray Milkeyd84f89b2018-08-17 14:54:17 -070093 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Madan Jampani7e55c662016-02-15 21:13:53 -080094 protected PersistenceService persistenceService;
95
Ray Milkeyd84f89b2018-08-17 14:54:17 -070096 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Madan Jampani630c8822016-02-24 10:38:21 -080097 protected PartitionAdminService partitionAdminService;
98
Ray Milkeyd84f89b2018-08-17 14:54:17 -070099 @Reference(cardinality = ReferenceCardinality.MANDATORY)
slowr878625f2017-10-24 14:53:49 -0700100 protected MembershipService membershipService;
101
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700102 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jordan Halterman00e92da2018-05-22 23:05:52 -0700103 protected AtomixManager atomixManager;
104
105 private Atomix atomix;
106 private PartitionGroup group;
Madan Jampani7e55c662016-02-15 21:13:53 -0800107
108 @Activate
Madan Jampani86cb2432016-02-17 11:07:56 -0800109 public void activate() {
Jordan Halterman00e92da2018-05-22 23:05:52 -0700110 atomix = atomixManager.getAtomix();
111 group = atomix.getPartitionService().getPartitionGroup(MultiRaftProtocol.TYPE);
Madan Jampani7e55c662016-02-15 21:13:53 -0800112 log.info("Started");
113 }
114
115 @Deactivate
116 public void deactivate() {
117 log.info("Stopped");
118 }
119
120 @Override
121 public <K, V> EventuallyConsistentMapBuilder<K, V> eventuallyConsistentMapBuilder() {
Heedo Kang4a47a302016-02-29 17:40:23 +0900122 checkPermission(STORAGE_WRITE);
Jordan Haltermana3f16112018-01-11 09:34:52 -0800123
124 // Note: NPE in the usage of ClusterService/MembershipService prevents rebooting the Karaf container.
125 // We need to reference these services outside the following peer suppliers.
126 final MembershipService membershipService = this.membershipService;
127 final ClusterService clusterService = this.clusterService;
128
slowr878625f2017-10-24 14:53:49 -0700129 final NodeId localNodeId = clusterService.getLocalNode().id();
130
Jordan Haltermana3f16112018-01-11 09:34:52 -0800131 // Use the MembershipService to provide peers for the map that are isolated within the current version.
slowr878625f2017-10-24 14:53:49 -0700132 Supplier<List<NodeId>> peersSupplier = () -> membershipService.getMembers().stream()
Jordan Halterman00e92da2018-05-22 23:05:52 -0700133 .map(Member::nodeId)
134 .filter(nodeId -> !nodeId.equals(localNodeId))
135 .filter(id -> clusterService.getState(id).isActive())
136 .collect(Collectors.toList());
slowr878625f2017-10-24 14:53:49 -0700137
Jordan Haltermana3f16112018-01-11 09:34:52 -0800138 // If this is the first node in its version, bootstrap from the previous version. Otherwise, bootstrap the
139 // map from members isolated within the current version.
slowr878625f2017-10-24 14:53:49 -0700140 Supplier<List<NodeId>> bootstrapPeersSupplier = () -> {
141 if (membershipService.getMembers().size() == 1) {
142 return clusterService.getNodes()
Jordan Halterman00e92da2018-05-22 23:05:52 -0700143 .stream()
144 .map(ControllerNode::id)
145 .filter(id -> !localNodeId.equals(id))
146 .filter(id -> clusterService.getState(id).isActive())
147 .collect(Collectors.toList());
slowr878625f2017-10-24 14:53:49 -0700148 } else {
149 return membershipService.getMembers()
Jordan Halterman00e92da2018-05-22 23:05:52 -0700150 .stream()
151 .map(Member::nodeId)
152 .filter(id -> !localNodeId.equals(id))
153 .filter(id -> clusterService.getState(id).isActive())
154 .collect(Collectors.toList());
slowr878625f2017-10-24 14:53:49 -0700155 }
156 };
157
slowr878625f2017-10-24 14:53:49 -0700158 return new EventuallyConsistentMapBuilderImpl<>(
Jordan Halterman00e92da2018-05-22 23:05:52 -0700159 localNodeId,
160 clusterCommunicator,
161 persistenceService,
162 peersSupplier,
163 bootstrapPeersSupplier
slowr878625f2017-10-24 14:53:49 -0700164 );
Madan Jampani7e55c662016-02-15 21:13:53 -0800165 }
166
167 @Override
168 public <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder() {
Heedo Kang4a47a302016-02-29 17:40:23 +0900169 checkPermission(STORAGE_WRITE);
Jordan Halterman00e92da2018-05-22 23:05:52 -0700170 return new AtomixConsistentMapBuilder<>(atomix, group.name());
Madan Jampani7e55c662016-02-15 21:13:53 -0800171 }
172
173 @Override
Sithara Punnassery112ed822016-10-24 14:55:19 -0700174 public <V> DocumentTreeBuilder<V> documentTreeBuilder() {
175 checkPermission(STORAGE_WRITE);
Jordan Halterman00e92da2018-05-22 23:05:52 -0700176 return new AtomixDocumentTreeBuilder<V>(atomix, group.name());
Sithara Punnassery112ed822016-10-24 14:55:19 -0700177 }
178
179 @Override
Aaron Kruglikoved88ff62016-08-01 16:02:09 -0700180 public <V> ConsistentTreeMapBuilder<V> consistentTreeMapBuilder() {
Jordan Halterman00e92da2018-05-22 23:05:52 -0700181 return new AtomixConsistentTreeMapBuilder<>(atomix, group.name());
Aaron Kruglikoved88ff62016-08-01 16:02:09 -0700182 }
183
184 @Override
Aaron Kruglikov61582a02016-09-06 13:18:58 -0700185 public <K, V> ConsistentMultimapBuilder<K, V> consistentMultimapBuilder() {
186 checkPermission(STORAGE_WRITE);
Jordan Halterman00e92da2018-05-22 23:05:52 -0700187 return new AtomixConsistentMultimapBuilder<>(atomix, group.name());
Aaron Kruglikov61582a02016-09-06 13:18:58 -0700188 }
189
190 @Override
Jordan Haltermanc955df72017-02-04 20:43:28 -0800191 public <K> AtomicCounterMapBuilder<K> atomicCounterMapBuilder() {
192 checkPermission(STORAGE_WRITE);
Jordan Halterman00e92da2018-05-22 23:05:52 -0700193 return new AtomixAtomicCounterMapBuilder<>(atomix, group.name());
Jordan Haltermanc955df72017-02-04 20:43:28 -0800194 }
195
196 @Override
Madan Jampani7e55c662016-02-15 21:13:53 -0800197 public <E> DistributedSetBuilder<E> setBuilder() {
Heedo Kang4a47a302016-02-29 17:40:23 +0900198 checkPermission(STORAGE_WRITE);
Jordan Halterman00e92da2018-05-22 23:05:52 -0700199 return new AtomixDistributedSetBuilder<>(atomix, group.name());
Madan Jampani7e55c662016-02-15 21:13:53 -0800200 }
201
202 @Override
Madan Jampani7e55c662016-02-15 21:13:53 -0800203 public AtomicCounterBuilder atomicCounterBuilder() {
Heedo Kang4a47a302016-02-29 17:40:23 +0900204 checkPermission(STORAGE_WRITE);
Jordan Halterman00e92da2018-05-22 23:05:52 -0700205 return new AtomixAtomicCounterBuilder(atomix, group.name());
Madan Jampani7e55c662016-02-15 21:13:53 -0800206 }
207
208 @Override
Jordan Halterman5a1053e2017-05-19 18:03:47 -0700209 public AtomicIdGeneratorBuilder atomicIdGeneratorBuilder() {
210 checkPermission(STORAGE_WRITE);
Jordan Halterman00e92da2018-05-22 23:05:52 -0700211 return new AtomixAtomicIdGeneratorBuilder(atomix, group.name());
Jordan Halterman5a1053e2017-05-19 18:03:47 -0700212 }
213
214 @Override
Madan Jampani7e55c662016-02-15 21:13:53 -0800215 public <V> AtomicValueBuilder<V> atomicValueBuilder() {
Heedo Kang4a47a302016-02-29 17:40:23 +0900216 checkPermission(STORAGE_WRITE);
Jordan Halterman00e92da2018-05-22 23:05:52 -0700217 return new AtomixAtomicValueBuilder<>(atomix, group.name());
Madan Jampani7e55c662016-02-15 21:13:53 -0800218 }
219
220 @Override
221 public TransactionContextBuilder transactionContextBuilder() {
Heedo Kang4a47a302016-02-29 17:40:23 +0900222 checkPermission(STORAGE_WRITE);
Jordan Halterman00e92da2018-05-22 23:05:52 -0700223 return new AtomixTransactionContextBuilder(atomix, group.name());
Madan Jampani7e55c662016-02-15 21:13:53 -0800224 }
225
226 @Override
Jordan Halterman47432582018-01-25 16:56:45 -0800227 public DistributedLockBuilder lockBuilder() {
228 checkPermission(STORAGE_WRITE);
Jordan Halterman00e92da2018-05-22 23:05:52 -0700229 return new AtomixDistributedLockBuilder(atomix, group.name());
Jordan Halterman47432582018-01-25 16:56:45 -0800230 }
231
232 @Override
Madan Jampani7e55c662016-02-15 21:13:53 -0800233 public LeaderElectorBuilder leaderElectorBuilder() {
Heedo Kang4a47a302016-02-29 17:40:23 +0900234 checkPermission(STORAGE_WRITE);
Jordan Halterman00e92da2018-05-22 23:05:52 -0700235 return new AtomixLeaderElectorBuilder(atomix, group.name(), clusterService.getLocalNode().id());
Madan Jampani7e55c662016-02-15 21:13:53 -0800236 }
237
238 @Override
Jordan Halterman400bbe52018-04-05 23:07:47 -0700239 public <T> TopicBuilder<T> topicBuilder() {
240 checkPermission(STORAGE_WRITE);
Jordan Halterman00e92da2018-05-22 23:05:52 -0700241 return new AtomixDistributedTopicBuilder<>(atomix, group.name());
242 }
243
244 @Override
245 public <E> WorkQueueBuilder<E> workQueueBuilder() {
246 checkPermission(STORAGE_WRITE);
247 return new AtomixWorkQueueBuilder<>(atomix, group.name());
Jordan Halterman400bbe52018-04-05 23:07:47 -0700248 }
249
250 @Override
Madan Jampani35708a92016-07-06 10:48:19 -0700251 public <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer) {
252 checkPermission(STORAGE_WRITE);
Jordan Halterman00e92da2018-05-22 23:05:52 -0700253 return this.<E>workQueueBuilder()
254 .withName(name)
255 .withSerializer(serializer)
256 .build();
Madan Jampani35708a92016-07-06 10:48:19 -0700257 }
258
259 @Override
Madan Jampani2914e4e2016-09-13 17:48:56 -0700260 public <V> AsyncDocumentTree<V> getDocumentTree(String name, Serializer serializer) {
261 checkPermission(STORAGE_WRITE);
Jordan Halterman00e92da2018-05-22 23:05:52 -0700262 return this.<V>documentTreeBuilder()
263 .withName(name)
264 .withSerializer(serializer)
265 .build();
Madan Jampani2914e4e2016-09-13 17:48:56 -0700266 }
267
268 @Override
Jordan Halterman00e92da2018-05-22 23:05:52 -0700269 public <K, V> AsyncConsistentMultimap<K, V> getAsyncSetMultimap(String name, Serializer serializer) {
Aaron Kruglikov61582a02016-09-06 13:18:58 -0700270 checkPermission(STORAGE_WRITE);
Jordan Halterman00e92da2018-05-22 23:05:52 -0700271 return new AtomixConsistentMultimapBuilder<K, V>(atomix, group.name())
272 .withName(name)
273 .withSerializer(serializer)
274 .buildMultimap();
Aaron Kruglikov61582a02016-09-06 13:18:58 -0700275 }
276
277 @Override
Jordan Halterman00e92da2018-05-22 23:05:52 -0700278 public <V> AsyncConsistentTreeMap<V> getAsyncTreeMap(String name, Serializer serializer) {
Aaron Kruglikov61582a02016-09-06 13:18:58 -0700279 checkPermission(STORAGE_WRITE);
Jordan Halterman00e92da2018-05-22 23:05:52 -0700280 return this.<V>consistentTreeMapBuilder()
281 .withName(name)
282 .withSerializer(serializer)
283 .buildTreeMap();
284 }
285
286 @Override
287 public <T> Topic<T> getTopic(String name, Serializer serializer) {
288 checkPermission(STORAGE_WRITE);
289 return this.<T>topicBuilder()
290 .withName(name)
291 .withSerializer(serializer)
292 .build();
Aaron Kruglikov61582a02016-09-06 13:18:58 -0700293 }
294
295 @Override
Madan Jampani7e55c662016-02-15 21:13:53 -0800296 public List<MapInfo> getMapInfo() {
Jordan Halterman00e92da2018-05-22 23:05:52 -0700297 Serializer serializer = Serializer.using(KryoNamespaces.BASIC);
298 return atomix.getPrimitives(AtomicMapType.instance())
299 .stream()
300 .map(info -> {
301 io.atomix.core.map.AtomicMap<String, byte[]> map =
302 atomix.<String, byte[]>atomicMapBuilder(info.name())
303 .withSerializer(new AtomixSerializerAdapter(serializer))
304 .build();
305 int size = map.size();
306 map.close();
307 return new MapInfo(info.name(), size);
308 }).collect(Collectors.toList());
Madan Jampani7e55c662016-02-15 21:13:53 -0800309 }
310
311 @Override
312 public Map<String, Long> getCounters() {
Jordan Halterman00e92da2018-05-22 23:05:52 -0700313 return atomix.getPrimitives(AtomicCounterType.instance())
314 .stream()
315 .map(info -> {
316 AtomicCounter counter = atomix.atomicCounterBuilder(info.name()).build();
317 long value = counter.get();
318 counter.close();
319 return Maps.immutableEntry(info.name(), value);
320 }).collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
Madan Jampani7e55c662016-02-15 21:13:53 -0800321 }
322
323 @Override
Madan Jampani35708a92016-07-06 10:48:19 -0700324 public Map<String, WorkQueueStats> getQueueStats() {
Jordan Halterman00e92da2018-05-22 23:05:52 -0700325 Serializer serializer = Serializer.using(KryoNamespaces.BASIC);
326 return atomix.getPrimitives(WorkQueueType.instance())
327 .stream()
328 .map(info -> {
329 io.atomix.core.workqueue.WorkQueue queue = atomix.workQueueBuilder(info.name())
330 .withSerializer(new AtomixSerializerAdapter(serializer))
331 .build();
332 io.atomix.core.workqueue.WorkQueueStats stats = queue.stats();
333 return Maps.immutableEntry(info.name(), WorkQueueStats.builder()
334 .withTotalCompleted(stats.totalCompleted())
335 .withTotalInProgress(stats.totalInProgress())
336 .withTotalPending(stats.totalPending())
337 .build());
338 }).collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
Madan Jampani35708a92016-07-06 10:48:19 -0700339 }
340
341 @Override
Madan Jampani7e55c662016-02-15 21:13:53 -0800342 public Collection<TransactionId> getPendingTransactions() {
Jordan Halterman00e92da2018-05-22 23:05:52 -0700343 return atomix.getTransactionService().getActiveTransactions()
344 .stream()
345 .map(transactionId -> TransactionId.from(transactionId.id()))
346 .collect(Collectors.toList());
Madan Jampani13f65152016-08-17 13:14:53 -0700347 }
Madan Jampani630c8822016-02-24 10:38:21 -0800348}