blob: 8d5440d9cf1c27afc600f3867dbb85eac64af072 [file] [log] [blame]
Madan Jampani15b8ef52016-02-02 17:35:05 -08001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Madan Jampani15b8ef52016-02-02 17:35:05 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.primitives.impl;
18
Madan Jampani15b8ef52016-02-02 17:35:05 -080019import java.io.File;
Madan Jampanie14a09c2016-02-11 10:43:21 -080020import java.util.List;
Madan Jampani15b8ef52016-02-02 17:35:05 -080021import java.util.Map;
22import java.util.Set;
23import java.util.concurrent.CompletableFuture;
Madan Jampani33547452016-02-29 16:45:04 -080024import java.util.concurrent.atomic.AtomicReference;
Madan Jampanie14a09c2016-02-11 10:43:21 -080025import java.util.stream.Collectors;
Madan Jampani15b8ef52016-02-02 17:35:05 -080026
Jordan Halterman2bf177c2017-06-29 01:49:08 -070027import com.google.common.collect.ImmutableSet;
28import com.google.common.collect.Maps;
Madan Jampani15b8ef52016-02-02 17:35:05 -080029import org.apache.felix.scr.annotations.Activate;
30import org.apache.felix.scr.annotations.Component;
Madan Jampani86cb2432016-02-17 11:07:56 -080031import org.apache.felix.scr.annotations.Deactivate;
Madan Jampani15b8ef52016-02-02 17:35:05 -080032import org.apache.felix.scr.annotations.Reference;
33import org.apache.felix.scr.annotations.ReferenceCardinality;
34import org.apache.felix.scr.annotations.Service;
35import org.onlab.util.Tools;
Madan Jampani33547452016-02-29 16:45:04 -080036import org.onosproject.cluster.ClusterMetadata;
37import org.onosproject.cluster.ClusterMetadataDiff;
38import org.onosproject.cluster.ClusterMetadataEvent;
39import org.onosproject.cluster.ClusterMetadataEventListener;
Madan Jampani15b8ef52016-02-02 17:35:05 -080040import org.onosproject.cluster.ClusterMetadataService;
41import org.onosproject.cluster.ClusterService;
42import org.onosproject.cluster.NodeId;
Madan Jampani33547452016-02-29 16:45:04 -080043import org.onosproject.cluster.PartitionDiff;
Madan Jampani15b8ef52016-02-02 17:35:05 -080044import org.onosproject.cluster.PartitionId;
Jordan Halterman980a8c12017-09-22 18:01:19 -070045import org.onosproject.core.Version;
46import org.onosproject.core.VersionService;
Madan Jampani15b8ef52016-02-02 17:35:05 -080047import org.onosproject.event.AbstractListenerManager;
Jordan Halterman28183ee2017-10-17 17:29:10 -070048import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Madan Jampani15b8ef52016-02-02 17:35:05 -080049import org.onosproject.store.primitives.DistributedPrimitiveCreator;
50import org.onosproject.store.primitives.PartitionAdminService;
51import org.onosproject.store.primitives.PartitionEvent;
52import org.onosproject.store.primitives.PartitionEventListener;
53import org.onosproject.store.primitives.PartitionService;
Madan Jampaniccdf9da2016-05-05 14:37:27 -070054import org.onosproject.store.service.PartitionClientInfo;
Madan Jampanie14a09c2016-02-11 10:43:21 -080055import org.onosproject.store.service.PartitionInfo;
Jordan Halterman980a8c12017-09-22 18:01:19 -070056import org.onosproject.upgrade.UpgradeService;
Madan Jampani15b8ef52016-02-02 17:35:05 -080057import org.slf4j.Logger;
58
Heedo Kang4a47a302016-02-29 17:40:23 +090059import static org.onosproject.security.AppGuard.checkPermission;
60import static org.onosproject.security.AppPermission.Type.PARTITION_READ;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070061import static org.slf4j.LoggerFactory.getLogger;
Heedo Kang4a47a302016-02-29 17:40:23 +090062
Madan Jampani15b8ef52016-02-02 17:35:05 -080063/**
64 * Implementation of {@code PartitionService} and {@code PartitionAdminService}.
65 */
66@Component
67@Service
68public class PartitionManager extends AbstractListenerManager<PartitionEvent, PartitionEventListener>
69 implements PartitionService, PartitionAdminService {
70
Thomas Vachuska58bf4912017-10-31 12:00:32 -070071 static final String PARTITIONS_DIR =
72 System.getProperty("karaf.data") + "/db/partitions/";
73
Madan Jampani15b8ef52016-02-02 17:35:05 -080074 private final Logger log = getLogger(getClass());
75
76 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jordan Halterman28183ee2017-10-17 17:29:10 -070077 protected ClusterCommunicationService clusterCommunicator;
Madan Jampani15b8ef52016-02-02 17:35:05 -080078
79 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
80 protected ClusterMetadataService metadataService;
81
82 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
83 protected ClusterService clusterService;
84
Jordan Halterman980a8c12017-09-22 18:01:19 -070085 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
86 protected UpgradeService upgradeService;
87
88 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
89 protected VersionService versionService;
90
91 private final Map<PartitionId, StoragePartition> inactivePartitions = Maps.newConcurrentMap();
92 private final Map<PartitionId, StoragePartition> activePartitions = Maps.newConcurrentMap();
Madan Jampani33547452016-02-29 16:45:04 -080093 private final AtomicReference<ClusterMetadata> currentClusterMetadata = new AtomicReference<>();
94 private final InternalClusterMetadataListener metadataListener = new InternalClusterMetadataListener();
Madan Jampani15b8ef52016-02-02 17:35:05 -080095
96 @Activate
97 public void activate() {
98 eventDispatcher.addSink(PartitionEvent.class, listenerRegistry);
Madan Jampani33547452016-02-29 16:45:04 -080099 currentClusterMetadata.set(metadataService.getClusterMetadata());
100 metadataService.addListener(metadataListener);
Madan Jampani15b8ef52016-02-02 17:35:05 -0800101
Jordan Halterman980a8c12017-09-22 18:01:19 -0700102 // If an upgrade is currently in progress and this node is an upgraded node, initialize upgrade partitions.
103 CompletableFuture<Void> openFuture;
104 if (upgradeService.isUpgrading() && upgradeService.isLocalUpgraded()) {
105 Version sourceVersion = upgradeService.getState().source();
106 Version targetVersion = upgradeService.getState().target();
107 currentClusterMetadata.get()
108 .getPartitions()
Jordan Halterman28183ee2017-10-17 17:29:10 -0700109 .forEach(partition -> {
110 inactivePartitions.put(partition.getId(), new StoragePartition(
111 partition,
112 sourceVersion,
113 null,
114 clusterCommunicator,
115 clusterService,
Thomas Vachuska58bf4912017-10-31 12:00:32 -0700116 new File(PARTITIONS_DIR + sourceVersion + "/" + partition.getId())));
Jordan Halterman28183ee2017-10-17 17:29:10 -0700117 activePartitions.put(partition.getId(), new StoragePartition(
118 partition,
119 targetVersion,
120 sourceVersion,
121 clusterCommunicator,
122 clusterService,
Thomas Vachuska58bf4912017-10-31 12:00:32 -0700123 new File(PARTITIONS_DIR + targetVersion + "/" + partition.getId())));
Jordan Halterman28183ee2017-10-17 17:29:10 -0700124 });
Jordan Halterman980a8c12017-09-22 18:01:19 -0700125
126 // We have to fork existing partitions before we can start inactive partition servers to
127 // avoid duplicate message handlers when both servers are running.
128 openFuture = CompletableFuture.allOf(activePartitions.values().stream()
129 .map(StoragePartition::open)
130 .toArray(CompletableFuture[]::new))
131 .thenCompose(v -> CompletableFuture.allOf(inactivePartitions.values().stream()
132 .map(StoragePartition::open)
133 .toArray(CompletableFuture[]::new)));
134 } else {
135 Version version = versionService.version();
136 currentClusterMetadata.get()
137 .getPartitions()
138 .forEach(partition -> activePartitions.put(partition.getId(), new StoragePartition(
139 partition,
140 version,
141 null,
142 clusterCommunicator,
143 clusterService,
Thomas Vachuska58bf4912017-10-31 12:00:32 -0700144 new File(PARTITIONS_DIR + version + "/" + partition.getId()))));
Jordan Halterman980a8c12017-09-22 18:01:19 -0700145 openFuture = CompletableFuture.allOf(activePartitions.values().stream()
146 .map(StoragePartition::open)
147 .toArray(CompletableFuture[]::new));
148 }
149
Madan Jampani15b8ef52016-02-02 17:35:05 -0800150 openFuture.join();
151 log.info("Started");
152 }
153
Madan Jampani86cb2432016-02-17 11:07:56 -0800154 @Deactivate
Madan Jampani15b8ef52016-02-02 17:35:05 -0800155 public void deactivate() {
Madan Jampani33547452016-02-29 16:45:04 -0800156 metadataService.removeListener(metadataListener);
Madan Jampani15b8ef52016-02-02 17:35:05 -0800157 eventDispatcher.removeSink(PartitionEvent.class);
158
Jordan Halterman980a8c12017-09-22 18:01:19 -0700159 CompletableFuture<Void> closeFuture = CompletableFuture.allOf(
160 CompletableFuture.allOf(inactivePartitions.values().stream()
161 .map(StoragePartition::close)
162 .toArray(CompletableFuture[]::new)),
163 CompletableFuture.allOf(activePartitions.values().stream()
164 .map(StoragePartition::close)
165 .toArray(CompletableFuture[]::new)));
Madan Jampani15b8ef52016-02-02 17:35:05 -0800166 closeFuture.join();
167 log.info("Stopped");
168 }
169
170 @Override
Madan Jampani15b8ef52016-02-02 17:35:05 -0800171 public int getNumberOfPartitions() {
Heedo Kang4a47a302016-02-29 17:40:23 +0900172 checkPermission(PARTITION_READ);
Jordan Halterman980a8c12017-09-22 18:01:19 -0700173 return activePartitions.size();
Madan Jampani15b8ef52016-02-02 17:35:05 -0800174 }
175
176 @Override
177 public Set<PartitionId> getAllPartitionIds() {
Heedo Kang4a47a302016-02-29 17:40:23 +0900178 checkPermission(PARTITION_READ);
Jordan Halterman980a8c12017-09-22 18:01:19 -0700179 return activePartitions.keySet();
Madan Jampani15b8ef52016-02-02 17:35:05 -0800180 }
181
182 @Override
183 public DistributedPrimitiveCreator getDistributedPrimitiveCreator(PartitionId partitionId) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900184 checkPermission(PARTITION_READ);
Jordan Halterman980a8c12017-09-22 18:01:19 -0700185 return activePartitions.get(partitionId).client();
Madan Jampani15b8ef52016-02-02 17:35:05 -0800186 }
187
188 @Override
189 public Set<NodeId> getConfiguredMembers(PartitionId partitionId) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900190 checkPermission(PARTITION_READ);
Jordan Halterman980a8c12017-09-22 18:01:19 -0700191 StoragePartition partition = activePartitions.get(partitionId);
Madan Jampani15b8ef52016-02-02 17:35:05 -0800192 return ImmutableSet.copyOf(partition.getMembers());
193 }
194
195 @Override
196 public Set<NodeId> getActiveMembersMembers(PartitionId partitionId) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900197 checkPermission(PARTITION_READ);
Madan Jampani15b8ef52016-02-02 17:35:05 -0800198 // TODO: This needs to query metadata to determine currently active
199 // members of partition
200 return getConfiguredMembers(partitionId);
201 }
Madan Jampanie14a09c2016-02-11 10:43:21 -0800202
203 @Override
204 public List<PartitionInfo> partitionInfo() {
Jordan Halterman980a8c12017-09-22 18:01:19 -0700205 return activePartitions.values()
Madan Jampanie14a09c2016-02-11 10:43:21 -0800206 .stream()
Sho SHIMIZU5fab6e52016-02-15 11:54:15 -0800207 .flatMap(x -> Tools.stream(x.info()))
Madan Jampanie14a09c2016-02-11 10:43:21 -0800208 .collect(Collectors.toList());
209 }
Madan Jampani33547452016-02-29 16:45:04 -0800210
211 private void processMetadataUpdate(ClusterMetadata clusterMetadata) {
212 ClusterMetadataDiff diffExaminer =
213 new ClusterMetadataDiff(currentClusterMetadata.get(), clusterMetadata);
214 diffExaminer.partitionDiffs()
215 .values()
216 .stream()
Madan Jampani33547452016-02-29 16:45:04 -0800217 .filter(PartitionDiff::hasChanged)
Jordan Halterman980a8c12017-09-22 18:01:19 -0700218 .forEach(diff -> activePartitions.get(diff.partitionId()).onUpdate(diff.newValue()));
Madan Jampani33547452016-02-29 16:45:04 -0800219 }
220
221 private class InternalClusterMetadataListener implements ClusterMetadataEventListener {
222 @Override
223 public void event(ClusterMetadataEvent event) {
224 processMetadataUpdate(event.subject());
225 }
226 }
Madan Jampaniccdf9da2016-05-05 14:37:27 -0700227
228 @Override
229 public List<PartitionClientInfo> partitionClientInfo() {
Jordan Halterman980a8c12017-09-22 18:01:19 -0700230 return activePartitions.values()
Madan Jampaniccdf9da2016-05-05 14:37:27 -0700231 .stream()
232 .map(StoragePartition::client)
233 .map(StoragePartitionClient::clientInfo)
234 .collect(Collectors.toList());
235 }
Madan Jampani2f9cc712016-02-15 19:36:21 -0800236}