blob: bacda00d78d68777ec1a878f2ea7b1cd2d0c8bda [file] [log] [blame]
Madan Jampani15b8ef52016-02-02 17:35:05 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Madan Jampani15b8ef52016-02-02 17:35:05 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.primitives.impl;
18
Jordan Halterman9bdc24f2017-04-19 23:45:12 -070019import static org.onlab.util.Tools.groupedThreads;
Madan Jampani15b8ef52016-02-02 17:35:05 -080020import static org.slf4j.LoggerFactory.getLogger;
21
22import java.io.File;
Madan Jampanie14a09c2016-02-11 10:43:21 -080023import java.util.List;
Madan Jampani15b8ef52016-02-02 17:35:05 -080024import java.util.Map;
25import java.util.Set;
26import java.util.concurrent.CompletableFuture;
Jordan Halterman9bdc24f2017-04-19 23:45:12 -070027import java.util.concurrent.ExecutorService;
28import java.util.concurrent.Executors;
Madan Jampani33547452016-02-29 16:45:04 -080029import java.util.concurrent.atomic.AtomicReference;
Madan Jampanie14a09c2016-02-11 10:43:21 -080030import java.util.stream.Collectors;
Madan Jampani15b8ef52016-02-02 17:35:05 -080031
32import org.apache.felix.scr.annotations.Activate;
33import org.apache.felix.scr.annotations.Component;
Madan Jampani86cb2432016-02-17 11:07:56 -080034import org.apache.felix.scr.annotations.Deactivate;
Madan Jampani15b8ef52016-02-02 17:35:05 -080035import org.apache.felix.scr.annotations.Reference;
36import org.apache.felix.scr.annotations.ReferenceCardinality;
37import org.apache.felix.scr.annotations.Service;
38import org.onlab.util.Tools;
Madan Jampani33547452016-02-29 16:45:04 -080039import org.onosproject.cluster.ClusterMetadata;
40import org.onosproject.cluster.ClusterMetadataDiff;
41import org.onosproject.cluster.ClusterMetadataEvent;
42import org.onosproject.cluster.ClusterMetadataEventListener;
Madan Jampani15b8ef52016-02-02 17:35:05 -080043import org.onosproject.cluster.ClusterMetadataService;
44import org.onosproject.cluster.ClusterService;
45import org.onosproject.cluster.NodeId;
Madan Jampani33547452016-02-29 16:45:04 -080046import org.onosproject.cluster.PartitionDiff;
Madan Jampani15b8ef52016-02-02 17:35:05 -080047import org.onosproject.cluster.PartitionId;
48import org.onosproject.event.AbstractListenerManager;
49import org.onosproject.store.cluster.messaging.MessagingService;
50import org.onosproject.store.primitives.DistributedPrimitiveCreator;
51import org.onosproject.store.primitives.PartitionAdminService;
52import org.onosproject.store.primitives.PartitionEvent;
53import org.onosproject.store.primitives.PartitionEventListener;
54import org.onosproject.store.primitives.PartitionService;
Madan Jampaniccdf9da2016-05-05 14:37:27 -070055import org.onosproject.store.service.PartitionClientInfo;
Madan Jampanie14a09c2016-02-11 10:43:21 -080056import org.onosproject.store.service.PartitionInfo;
Madan Jampani15b8ef52016-02-02 17:35:05 -080057import org.slf4j.Logger;
58
59import com.google.common.collect.ImmutableSet;
60import com.google.common.collect.Maps;
61
Heedo Kang4a47a302016-02-29 17:40:23 +090062import static org.onosproject.security.AppGuard.checkPermission;
63import static org.onosproject.security.AppPermission.Type.PARTITION_READ;
64
Madan Jampani15b8ef52016-02-02 17:35:05 -080065/**
66 * Implementation of {@code PartitionService} and {@code PartitionAdminService}.
67 */
68@Component
69@Service
70public class PartitionManager extends AbstractListenerManager<PartitionEvent, PartitionEventListener>
71 implements PartitionService, PartitionAdminService {
72
73 private final Logger log = getLogger(getClass());
74
75 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
76 protected MessagingService messagingService;
77
78 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
79 protected ClusterMetadataService metadataService;
80
81 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
82 protected ClusterService clusterService;
83
Madan Jampani33547452016-02-29 16:45:04 -080084 private final Map<PartitionId, StoragePartition> partitions = Maps.newConcurrentMap();
85 private final AtomicReference<ClusterMetadata> currentClusterMetadata = new AtomicReference<>();
86 private final InternalClusterMetadataListener metadataListener = new InternalClusterMetadataListener();
Jordan Halterman9bdc24f2017-04-19 23:45:12 -070087 private final ExecutorService sharedPrimitiveExecutor = Executors.newFixedThreadPool(
88 Runtime.getRuntime().availableProcessors(),
89 groupedThreads("onos/primitives", "primitive-events", log));
Madan Jampani15b8ef52016-02-02 17:35:05 -080090
91 @Activate
92 public void activate() {
93 eventDispatcher.addSink(PartitionEvent.class, listenerRegistry);
Madan Jampani33547452016-02-29 16:45:04 -080094 currentClusterMetadata.set(metadataService.getClusterMetadata());
95 metadataService.addListener(metadataListener);
96 currentClusterMetadata.get()
Madan Jampani15b8ef52016-02-02 17:35:05 -080097 .getPartitions()
Madan Jampani15b8ef52016-02-02 17:35:05 -080098 .forEach(partition -> partitions.put(partition.getId(), new StoragePartition(partition,
99 messagingService,
100 clusterService,
101 CatalystSerializers.getSerializer(),
Jordan Halterman9bdc24f2017-04-19 23:45:12 -0700102 sharedPrimitiveExecutor,
Madan Jampani832686d2016-04-04 21:57:26 -0700103 new File(System.getProperty("karaf.data") + "/partitions/" + partition.getId()))));
Madan Jampani15b8ef52016-02-02 17:35:05 -0800104
105 CompletableFuture<Void> openFuture = CompletableFuture.allOf(partitions.values()
106 .stream()
107 .map(StoragePartition::open)
108 .toArray(CompletableFuture[]::new));
109 openFuture.join();
110 log.info("Started");
111 }
112
Madan Jampani86cb2432016-02-17 11:07:56 -0800113 @Deactivate
Madan Jampani15b8ef52016-02-02 17:35:05 -0800114 public void deactivate() {
Madan Jampani33547452016-02-29 16:45:04 -0800115 metadataService.removeListener(metadataListener);
Madan Jampani15b8ef52016-02-02 17:35:05 -0800116 eventDispatcher.removeSink(PartitionEvent.class);
117
118 CompletableFuture<Void> closeFuture = CompletableFuture.allOf(partitions.values()
119 .stream()
120 .map(StoragePartition::close)
121 .toArray(CompletableFuture[]::new));
122 closeFuture.join();
123 log.info("Stopped");
124 }
125
126 @Override
Madan Jampani15b8ef52016-02-02 17:35:05 -0800127 public int getNumberOfPartitions() {
Heedo Kang4a47a302016-02-29 17:40:23 +0900128 checkPermission(PARTITION_READ);
Madan Jampani15b8ef52016-02-02 17:35:05 -0800129 return partitions.size();
130 }
131
132 @Override
133 public Set<PartitionId> getAllPartitionIds() {
Heedo Kang4a47a302016-02-29 17:40:23 +0900134 checkPermission(PARTITION_READ);
Madan Jampani15b8ef52016-02-02 17:35:05 -0800135 return partitions.keySet();
136 }
137
138 @Override
139 public DistributedPrimitiveCreator getDistributedPrimitiveCreator(PartitionId partitionId) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900140 checkPermission(PARTITION_READ);
Madan Jampani15b8ef52016-02-02 17:35:05 -0800141 return partitions.get(partitionId).client();
142 }
143
144 @Override
145 public Set<NodeId> getConfiguredMembers(PartitionId partitionId) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900146 checkPermission(PARTITION_READ);
Madan Jampani15b8ef52016-02-02 17:35:05 -0800147 StoragePartition partition = partitions.get(partitionId);
148 return ImmutableSet.copyOf(partition.getMembers());
149 }
150
151 @Override
152 public Set<NodeId> getActiveMembersMembers(PartitionId partitionId) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900153 checkPermission(PARTITION_READ);
Madan Jampani15b8ef52016-02-02 17:35:05 -0800154 // TODO: This needs to query metadata to determine currently active
155 // members of partition
156 return getConfiguredMembers(partitionId);
157 }
Madan Jampanie14a09c2016-02-11 10:43:21 -0800158
159 @Override
160 public List<PartitionInfo> partitionInfo() {
161 return partitions.values()
162 .stream()
Sho SHIMIZU5fab6e52016-02-15 11:54:15 -0800163 .flatMap(x -> Tools.stream(x.info()))
Madan Jampanie14a09c2016-02-11 10:43:21 -0800164 .collect(Collectors.toList());
165 }
Madan Jampani33547452016-02-29 16:45:04 -0800166
167 private void processMetadataUpdate(ClusterMetadata clusterMetadata) {
168 ClusterMetadataDiff diffExaminer =
169 new ClusterMetadataDiff(currentClusterMetadata.get(), clusterMetadata);
170 diffExaminer.partitionDiffs()
171 .values()
172 .stream()
Madan Jampani33547452016-02-29 16:45:04 -0800173 .filter(PartitionDiff::hasChanged)
174 .forEach(diff -> partitions.get(diff.partitionId()).onUpdate(diff.newValue()));
175 }
176
177 private class InternalClusterMetadataListener implements ClusterMetadataEventListener {
178 @Override
179 public void event(ClusterMetadataEvent event) {
180 processMetadataUpdate(event.subject());
181 }
182 }
Madan Jampaniccdf9da2016-05-05 14:37:27 -0700183
184 @Override
185 public List<PartitionClientInfo> partitionClientInfo() {
186 return partitions.values()
187 .stream()
188 .map(StoragePartition::client)
189 .map(StoragePartitionClient::clientInfo)
190 .collect(Collectors.toList());
191 }
Madan Jampani2f9cc712016-02-15 19:36:21 -0800192}