Refactoring to eliminate a split package issue in onos-core-primitives.
Change-Id: I48ff6fe62ae006906674aae1f69e015395f042fc
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/StorageManager.java b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/StorageManager.java
new file mode 100644
index 0000000..a59f4f9
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/StorageManager.java
@@ -0,0 +1,356 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.atomix.primitives.impl;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Maps;
+import io.atomix.core.Atomix;
+import io.atomix.core.counter.AtomicCounter;
+import io.atomix.core.counter.AtomicCounterType;
+import io.atomix.core.map.AtomicMapType;
+import io.atomix.core.workqueue.WorkQueueType;
+import io.atomix.primitive.partition.PartitionGroup;
+import io.atomix.protocols.raft.MultiRaftProtocol;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.Member;
+import org.onosproject.cluster.MembershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.persistence.PersistenceService;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.atomix.impl.AtomixManager;
+import org.onosproject.store.primitives.PartitionAdminService;
+import org.onosproject.store.primitives.TransactionId;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AsyncConsistentMultimap;
+import org.onosproject.store.service.AsyncConsistentTreeMap;
+import org.onosproject.store.service.AsyncDocumentTree;
+import org.onosproject.store.service.AtomicCounterBuilder;
+import org.onosproject.store.service.AtomicCounterMapBuilder;
+import org.onosproject.store.service.AtomicIdGeneratorBuilder;
+import org.onosproject.store.service.AtomicValueBuilder;
+import org.onosproject.store.service.ConsistentMapBuilder;
+import org.onosproject.store.service.ConsistentMultimapBuilder;
+import org.onosproject.store.service.ConsistentTreeMapBuilder;
+import org.onosproject.store.service.DistributedLockBuilder;
+import org.onosproject.store.service.DistributedSetBuilder;
+import org.onosproject.store.service.DocumentTreeBuilder;
+import org.onosproject.store.service.EventuallyConsistentMapBuilder;
+import org.onosproject.store.service.LeaderElectorBuilder;
+import org.onosproject.store.service.MapInfo;
+import org.onosproject.store.service.PartitionInfo;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageAdminService;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Topic;
+import org.onosproject.store.service.TopicBuilder;
+import org.onosproject.store.service.TransactionContextBuilder;
+import org.onosproject.store.service.WorkQueue;
+import org.onosproject.store.service.WorkQueueBuilder;
+import org.onosproject.store.service.WorkQueueStats;
+import org.slf4j.Logger;
+
+import static org.onosproject.security.AppGuard.checkPermission;
+import static org.onosproject.security.AppPermission.Type.STORAGE_WRITE;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Implementation for {@code StorageService} and {@code StorageAdminService}.
+ */
+@Service
+@Component(immediate = true)
+public class StorageManager implements StorageService, StorageAdminService {
+
+ private final Logger log = getLogger(getClass());
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterCommunicationService clusterCommunicator;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected PersistenceService persistenceService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected PartitionAdminService partitionAdminService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected MembershipService membershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected AtomixManager atomixManager;
+
+ private Atomix atomix;
+ private PartitionGroup group;
+
+ @Activate
+ public void activate() {
+ atomix = atomixManager.getAtomix();
+ group = atomix.getPartitionService().getPartitionGroup(MultiRaftProtocol.TYPE);
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ log.info("Stopped");
+ }
+
+ @Override
+ public <K, V> EventuallyConsistentMapBuilder<K, V> eventuallyConsistentMapBuilder() {
+ checkPermission(STORAGE_WRITE);
+
+ // Note: NPE in the usage of ClusterService/MembershipService prevents rebooting the Karaf container.
+ // We need to reference these services outside the following peer suppliers.
+ final MembershipService membershipService = this.membershipService;
+ final ClusterService clusterService = this.clusterService;
+
+ final NodeId localNodeId = clusterService.getLocalNode().id();
+
+ // Use the MembershipService to provide peers for the map that are isolated within the current version.
+ Supplier<List<NodeId>> peersSupplier = () -> membershipService.getMembers().stream()
+ .map(Member::nodeId)
+ .filter(nodeId -> !nodeId.equals(localNodeId))
+ .filter(id -> clusterService.getState(id).isActive())
+ .collect(Collectors.toList());
+
+ // If this is the first node in its version, bootstrap from the previous version. Otherwise, bootstrap the
+ // map from members isolated within the current version.
+ Supplier<List<NodeId>> bootstrapPeersSupplier = () -> {
+ if (membershipService.getMembers().size() == 1) {
+ return clusterService.getNodes()
+ .stream()
+ .map(ControllerNode::id)
+ .filter(id -> !localNodeId.equals(id))
+ .filter(id -> clusterService.getState(id).isActive())
+ .collect(Collectors.toList());
+ } else {
+ return membershipService.getMembers()
+ .stream()
+ .map(Member::nodeId)
+ .filter(id -> !localNodeId.equals(id))
+ .filter(id -> clusterService.getState(id).isActive())
+ .collect(Collectors.toList());
+ }
+ };
+
+ return new EventuallyConsistentMapBuilderImpl<>(
+ localNodeId,
+ clusterCommunicator,
+ persistenceService,
+ peersSupplier,
+ bootstrapPeersSupplier
+ );
+ }
+
+ @Override
+ public <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder() {
+ checkPermission(STORAGE_WRITE);
+ return new AtomixConsistentMapBuilder<>(atomix, group.name());
+ }
+
+ @Override
+ public <V> DocumentTreeBuilder<V> documentTreeBuilder() {
+ checkPermission(STORAGE_WRITE);
+ return new AtomixDocumentTreeBuilder<V>(atomix, group.name());
+ }
+
+ @Override
+ public <V> ConsistentTreeMapBuilder<V> consistentTreeMapBuilder() {
+ return new AtomixConsistentTreeMapBuilder<>(atomix, group.name());
+ }
+
+ @Override
+ public <K, V> ConsistentMultimapBuilder<K, V> consistentMultimapBuilder() {
+ checkPermission(STORAGE_WRITE);
+ return new AtomixConsistentMultimapBuilder<>(atomix, group.name());
+ }
+
+ @Override
+ public <K> AtomicCounterMapBuilder<K> atomicCounterMapBuilder() {
+ checkPermission(STORAGE_WRITE);
+ return new AtomixAtomicCounterMapBuilder<>(atomix, group.name());
+ }
+
+ @Override
+ public <E> DistributedSetBuilder<E> setBuilder() {
+ checkPermission(STORAGE_WRITE);
+ return new AtomixDistributedSetBuilder<>(atomix, group.name());
+ }
+
+ @Override
+ public AtomicCounterBuilder atomicCounterBuilder() {
+ checkPermission(STORAGE_WRITE);
+ return new AtomixAtomicCounterBuilder(atomix, group.name());
+ }
+
+ @Override
+ public AtomicIdGeneratorBuilder atomicIdGeneratorBuilder() {
+ checkPermission(STORAGE_WRITE);
+ return new AtomixAtomicIdGeneratorBuilder(atomix, group.name());
+ }
+
+ @Override
+ public <V> AtomicValueBuilder<V> atomicValueBuilder() {
+ checkPermission(STORAGE_WRITE);
+ return new AtomixAtomicValueBuilder<>(atomix, group.name());
+ }
+
+ @Override
+ public TransactionContextBuilder transactionContextBuilder() {
+ checkPermission(STORAGE_WRITE);
+ return new AtomixTransactionContextBuilder(atomix, group.name());
+ }
+
+ @Override
+ public DistributedLockBuilder lockBuilder() {
+ checkPermission(STORAGE_WRITE);
+ return new AtomixDistributedLockBuilder(atomix, group.name());
+ }
+
+ @Override
+ public LeaderElectorBuilder leaderElectorBuilder() {
+ checkPermission(STORAGE_WRITE);
+ return new AtomixLeaderElectorBuilder(atomix, group.name(), clusterService.getLocalNode().id());
+ }
+
+ @Override
+ public <T> TopicBuilder<T> topicBuilder() {
+ checkPermission(STORAGE_WRITE);
+ return new AtomixDistributedTopicBuilder<>(atomix, group.name());
+ }
+
+ @Override
+ public <E> WorkQueueBuilder<E> workQueueBuilder() {
+ checkPermission(STORAGE_WRITE);
+ return new AtomixWorkQueueBuilder<>(atomix, group.name());
+ }
+
+ @Override
+ public <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer) {
+ checkPermission(STORAGE_WRITE);
+ return this.<E>workQueueBuilder()
+ .withName(name)
+ .withSerializer(serializer)
+ .build();
+ }
+
+ @Override
+ public <V> AsyncDocumentTree<V> getDocumentTree(String name, Serializer serializer) {
+ checkPermission(STORAGE_WRITE);
+ return this.<V>documentTreeBuilder()
+ .withName(name)
+ .withSerializer(serializer)
+ .build();
+ }
+
+ @Override
+ public <K, V> AsyncConsistentMultimap<K, V> getAsyncSetMultimap(String name, Serializer serializer) {
+ checkPermission(STORAGE_WRITE);
+ return new AtomixConsistentMultimapBuilder<K, V>(atomix, group.name())
+ .withName(name)
+ .withSerializer(serializer)
+ .buildMultimap();
+ }
+
+ @Override
+ public <V> AsyncConsistentTreeMap<V> getAsyncTreeMap(String name, Serializer serializer) {
+ checkPermission(STORAGE_WRITE);
+ return this.<V>consistentTreeMapBuilder()
+ .withName(name)
+ .withSerializer(serializer)
+ .buildTreeMap();
+ }
+
+ @Override
+ public <T> Topic<T> getTopic(String name, Serializer serializer) {
+ checkPermission(STORAGE_WRITE);
+ return this.<T>topicBuilder()
+ .withName(name)
+ .withSerializer(serializer)
+ .build();
+ }
+
+ @Override
+ public List<MapInfo> getMapInfo() {
+ Serializer serializer = Serializer.using(KryoNamespaces.BASIC);
+ return atomix.getPrimitives(AtomicMapType.instance())
+ .stream()
+ .map(info -> {
+ io.atomix.core.map.AtomicMap<String, byte[]> map =
+ atomix.<String, byte[]>atomicMapBuilder(info.name())
+ .withSerializer(new AtomixSerializerAdapter(serializer))
+ .build();
+ int size = map.size();
+ map.close();
+ return new MapInfo(info.name(), size);
+ }).collect(Collectors.toList());
+ }
+
+ @Override
+ public Map<String, Long> getCounters() {
+ return atomix.getPrimitives(AtomicCounterType.instance())
+ .stream()
+ .map(info -> {
+ AtomicCounter counter = atomix.atomicCounterBuilder(info.name()).build();
+ long value = counter.get();
+ counter.close();
+ return Maps.immutableEntry(info.name(), value);
+ }).collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
+ }
+
+ @Override
+ public Map<String, WorkQueueStats> getQueueStats() {
+ Serializer serializer = Serializer.using(KryoNamespaces.BASIC);
+ return atomix.getPrimitives(WorkQueueType.instance())
+ .stream()
+ .map(info -> {
+ io.atomix.core.workqueue.WorkQueue queue = atomix.workQueueBuilder(info.name())
+ .withSerializer(new AtomixSerializerAdapter(serializer))
+ .build();
+ io.atomix.core.workqueue.WorkQueueStats stats = queue.stats();
+ return Maps.immutableEntry(info.name(), WorkQueueStats.builder()
+ .withTotalCompleted(stats.totalCompleted())
+ .withTotalInProgress(stats.totalInProgress())
+ .withTotalPending(stats.totalPending())
+ .build());
+ }).collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
+ }
+
+ @Override
+ public List<PartitionInfo> getPartitionInfo() {
+ return partitionAdminService.partitionInfo();
+ }
+
+ @Override
+ public Collection<TransactionId> getPendingTransactions() {
+ return atomix.getTransactionService().getActiveTransactions()
+ .stream()
+ .map(transactionId -> TransactionId.from(transactionId.id()))
+ .collect(Collectors.toList());
+ }
+}