blob: 30d7fd814adad124cbdd9af3703c7a007685961b [file] [log] [blame]
Madan Jampani15b8ef52016-02-02 17:35:05 -08001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.primitives.impl;
18
19import static org.slf4j.LoggerFactory.getLogger;
20
21import java.io.File;
22import java.util.Map;
23import java.util.Set;
24import java.util.concurrent.CompletableFuture;
25
26import org.apache.felix.scr.annotations.Activate;
27import org.apache.felix.scr.annotations.Component;
28import org.apache.felix.scr.annotations.Reference;
29import org.apache.felix.scr.annotations.ReferenceCardinality;
30import org.apache.felix.scr.annotations.Service;
31import org.onlab.util.Tools;
32import org.onosproject.cluster.ClusterMetadataService;
33import org.onosproject.cluster.ClusterService;
34import org.onosproject.cluster.NodeId;
35import org.onosproject.cluster.PartitionId;
36import org.onosproject.event.AbstractListenerManager;
37import org.onosproject.store.cluster.messaging.MessagingService;
38import org.onosproject.store.primitives.DistributedPrimitiveCreator;
39import org.onosproject.store.primitives.PartitionAdminService;
40import org.onosproject.store.primitives.PartitionEvent;
41import org.onosproject.store.primitives.PartitionEventListener;
42import org.onosproject.store.primitives.PartitionService;
43import org.slf4j.Logger;
44
45import com.google.common.collect.ImmutableSet;
46import com.google.common.collect.Maps;
47
48/**
49 * Implementation of {@code PartitionService} and {@code PartitionAdminService}.
50 */
51@Component
52@Service
53public class PartitionManager extends AbstractListenerManager<PartitionEvent, PartitionEventListener>
54 implements PartitionService, PartitionAdminService {
55
56 private final Logger log = getLogger(getClass());
57
58 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
59 protected MessagingService messagingService;
60
61 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
62 protected ClusterMetadataService metadataService;
63
64 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
65 protected ClusterService clusterService;
66
67 Map<PartitionId, StoragePartition> partitions = Maps.newConcurrentMap();
68
69 @Activate
70 public void activate() {
71 eventDispatcher.addSink(PartitionEvent.class, listenerRegistry);
72
73 metadataService.getClusterMetadata()
74 .getPartitions()
75 .stream()
76 .forEach(partition -> partitions.put(partition.getId(), new StoragePartition(partition,
77 messagingService,
78 clusterService,
79 CatalystSerializers.getSerializer(),
80 new File(System.getProperty("karaf.data") + "/data/" + partition.getId()))));
81
82 CompletableFuture<Void> openFuture = CompletableFuture.allOf(partitions.values()
83 .stream()
84 .map(StoragePartition::open)
85 .toArray(CompletableFuture[]::new));
86 openFuture.join();
87 log.info("Started");
88 }
89
90 public void deactivate() {
91 eventDispatcher.removeSink(PartitionEvent.class);
92
93 CompletableFuture<Void> closeFuture = CompletableFuture.allOf(partitions.values()
94 .stream()
95 .map(StoragePartition::close)
96 .toArray(CompletableFuture[]::new));
97 closeFuture.join();
98 log.info("Stopped");
99 }
100
101 @Override
102 public CompletableFuture<Void> leave(PartitionId partitionId) {
103 // TODO: Implement
104 return Tools.exceptionalFuture(new UnsupportedOperationException());
105 }
106
107 @Override
108 public CompletableFuture<Void> join(PartitionId partitionId) {
109 // TODO: Implement
110 return Tools.exceptionalFuture(new UnsupportedOperationException());
111 }
112
113 @Override
114 public int getNumberOfPartitions() {
115 return partitions.size();
116 }
117
118 @Override
119 public Set<PartitionId> getAllPartitionIds() {
120 return partitions.keySet();
121 }
122
123 @Override
124 public DistributedPrimitiveCreator getDistributedPrimitiveCreator(PartitionId partitionId) {
125 return partitions.get(partitionId).client();
126 }
127
128 @Override
129 public Set<NodeId> getConfiguredMembers(PartitionId partitionId) {
130 StoragePartition partition = partitions.get(partitionId);
131 return ImmutableSet.copyOf(partition.getMembers());
132 }
133
134 @Override
135 public Set<NodeId> getActiveMembersMembers(PartitionId partitionId) {
136 // TODO: This needs to query metadata to determine currently active
137 // members of partition
138 return getConfiguredMembers(partitionId);
139 }
140}