blob: a083a8b2ac5c0de3eb3be15ba7b30632e94ab8be [file] [log] [blame]
Madan Jampani15b8ef52016-02-02 17:35:05 -08001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.primitives.impl;
18
19import static org.slf4j.LoggerFactory.getLogger;
20
21import java.io.File;
Madan Jampanie14a09c2016-02-11 10:43:21 -080022import java.util.List;
Madan Jampani15b8ef52016-02-02 17:35:05 -080023import java.util.Map;
24import java.util.Set;
25import java.util.concurrent.CompletableFuture;
Madan Jampani33547452016-02-29 16:45:04 -080026import java.util.concurrent.atomic.AtomicReference;
Madan Jampanie14a09c2016-02-11 10:43:21 -080027import java.util.stream.Collectors;
Madan Jampani15b8ef52016-02-02 17:35:05 -080028
29import org.apache.felix.scr.annotations.Activate;
30import org.apache.felix.scr.annotations.Component;
Madan Jampani86cb2432016-02-17 11:07:56 -080031import org.apache.felix.scr.annotations.Deactivate;
Madan Jampani15b8ef52016-02-02 17:35:05 -080032import org.apache.felix.scr.annotations.Reference;
33import org.apache.felix.scr.annotations.ReferenceCardinality;
34import org.apache.felix.scr.annotations.Service;
35import org.onlab.util.Tools;
Madan Jampani33547452016-02-29 16:45:04 -080036import org.onosproject.cluster.ClusterMetadata;
37import org.onosproject.cluster.ClusterMetadataDiff;
38import org.onosproject.cluster.ClusterMetadataEvent;
39import org.onosproject.cluster.ClusterMetadataEventListener;
Madan Jampani15b8ef52016-02-02 17:35:05 -080040import org.onosproject.cluster.ClusterMetadataService;
41import org.onosproject.cluster.ClusterService;
42import org.onosproject.cluster.NodeId;
Madan Jampani33547452016-02-29 16:45:04 -080043import org.onosproject.cluster.PartitionDiff;
Madan Jampani15b8ef52016-02-02 17:35:05 -080044import org.onosproject.cluster.PartitionId;
45import org.onosproject.event.AbstractListenerManager;
46import org.onosproject.store.cluster.messaging.MessagingService;
47import org.onosproject.store.primitives.DistributedPrimitiveCreator;
48import org.onosproject.store.primitives.PartitionAdminService;
49import org.onosproject.store.primitives.PartitionEvent;
50import org.onosproject.store.primitives.PartitionEventListener;
51import org.onosproject.store.primitives.PartitionService;
Madan Jampanie14a09c2016-02-11 10:43:21 -080052import org.onosproject.store.service.PartitionInfo;
Madan Jampani15b8ef52016-02-02 17:35:05 -080053import org.slf4j.Logger;
54
55import com.google.common.collect.ImmutableSet;
56import com.google.common.collect.Maps;
57
58/**
59 * Implementation of {@code PartitionService} and {@code PartitionAdminService}.
60 */
61@Component
62@Service
63public class PartitionManager extends AbstractListenerManager<PartitionEvent, PartitionEventListener>
64 implements PartitionService, PartitionAdminService {
65
66 private final Logger log = getLogger(getClass());
67
68 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
69 protected MessagingService messagingService;
70
71 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
72 protected ClusterMetadataService metadataService;
73
74 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
75 protected ClusterService clusterService;
76
Madan Jampani33547452016-02-29 16:45:04 -080077 private final Map<PartitionId, StoragePartition> partitions = Maps.newConcurrentMap();
78 private final AtomicReference<ClusterMetadata> currentClusterMetadata = new AtomicReference<>();
79 private final InternalClusterMetadataListener metadataListener = new InternalClusterMetadataListener();
Madan Jampani15b8ef52016-02-02 17:35:05 -080080
81 @Activate
82 public void activate() {
83 eventDispatcher.addSink(PartitionEvent.class, listenerRegistry);
Madan Jampani33547452016-02-29 16:45:04 -080084 currentClusterMetadata.set(metadataService.getClusterMetadata());
85 metadataService.addListener(metadataListener);
86 currentClusterMetadata.get()
Madan Jampani15b8ef52016-02-02 17:35:05 -080087 .getPartitions()
88 .stream()
Madan Jampani33547452016-02-29 16:45:04 -080089 .filter(partition -> !partition.getId().equals(PartitionId.from(0))) // exclude p0
Madan Jampani15b8ef52016-02-02 17:35:05 -080090 .forEach(partition -> partitions.put(partition.getId(), new StoragePartition(partition,
91 messagingService,
92 clusterService,
93 CatalystSerializers.getSerializer(),
94 new File(System.getProperty("karaf.data") + "/data/" + partition.getId()))));
95
96 CompletableFuture<Void> openFuture = CompletableFuture.allOf(partitions.values()
97 .stream()
98 .map(StoragePartition::open)
99 .toArray(CompletableFuture[]::new));
100 openFuture.join();
101 log.info("Started");
102 }
103
Madan Jampani86cb2432016-02-17 11:07:56 -0800104 @Deactivate
Madan Jampani15b8ef52016-02-02 17:35:05 -0800105 public void deactivate() {
Madan Jampani33547452016-02-29 16:45:04 -0800106 metadataService.removeListener(metadataListener);
Madan Jampani15b8ef52016-02-02 17:35:05 -0800107 eventDispatcher.removeSink(PartitionEvent.class);
108
109 CompletableFuture<Void> closeFuture = CompletableFuture.allOf(partitions.values()
110 .stream()
111 .map(StoragePartition::close)
112 .toArray(CompletableFuture[]::new));
113 closeFuture.join();
114 log.info("Stopped");
115 }
116
117 @Override
Madan Jampani15b8ef52016-02-02 17:35:05 -0800118 public int getNumberOfPartitions() {
119 return partitions.size();
120 }
121
122 @Override
123 public Set<PartitionId> getAllPartitionIds() {
124 return partitions.keySet();
125 }
126
127 @Override
128 public DistributedPrimitiveCreator getDistributedPrimitiveCreator(PartitionId partitionId) {
129 return partitions.get(partitionId).client();
130 }
131
132 @Override
133 public Set<NodeId> getConfiguredMembers(PartitionId partitionId) {
134 StoragePartition partition = partitions.get(partitionId);
135 return ImmutableSet.copyOf(partition.getMembers());
136 }
137
138 @Override
139 public Set<NodeId> getActiveMembersMembers(PartitionId partitionId) {
140 // TODO: This needs to query metadata to determine currently active
141 // members of partition
142 return getConfiguredMembers(partitionId);
143 }
Madan Jampanie14a09c2016-02-11 10:43:21 -0800144
145 @Override
146 public List<PartitionInfo> partitionInfo() {
147 return partitions.values()
148 .stream()
Sho SHIMIZU5fab6e52016-02-15 11:54:15 -0800149 .flatMap(x -> Tools.stream(x.info()))
Madan Jampanie14a09c2016-02-11 10:43:21 -0800150 .collect(Collectors.toList());
151 }
Madan Jampani33547452016-02-29 16:45:04 -0800152
153 private void processMetadataUpdate(ClusterMetadata clusterMetadata) {
154 ClusterMetadataDiff diffExaminer =
155 new ClusterMetadataDiff(currentClusterMetadata.get(), clusterMetadata);
156 diffExaminer.partitionDiffs()
157 .values()
158 .stream()
159 // TODO: Remove after partition 0 is removed from cluster metadata.
160 .filter(diff -> !diff.partitionId().equals(PartitionId.from(0)))
161 .filter(PartitionDiff::hasChanged)
162 .forEach(diff -> partitions.get(diff.partitionId()).onUpdate(diff.newValue()));
163 }
164
165 private class InternalClusterMetadataListener implements ClusterMetadataEventListener {
166 @Override
167 public void event(ClusterMetadataEvent event) {
168 processMetadataUpdate(event.subject());
169 }
170 }
Madan Jampani2f9cc712016-02-15 19:36:21 -0800171}