blob: 76709d6d377c3f1213c4d4d75cce96cab6e18ca6 [file] [log] [blame]
Madan Jampani15b8ef52016-02-02 17:35:05 -08001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.primitives.impl;
18
19import static org.slf4j.LoggerFactory.getLogger;
20
21import java.io.File;
Madan Jampanie14a09c2016-02-11 10:43:21 -080022import java.util.List;
Madan Jampani15b8ef52016-02-02 17:35:05 -080023import java.util.Map;
Madan Jampanie14a09c2016-02-11 10:43:21 -080024import java.util.Optional;
Madan Jampani15b8ef52016-02-02 17:35:05 -080025import java.util.Set;
26import java.util.concurrent.CompletableFuture;
Madan Jampanie14a09c2016-02-11 10:43:21 -080027import java.util.stream.Collectors;
Madan Jampani15b8ef52016-02-02 17:35:05 -080028
29import org.apache.felix.scr.annotations.Activate;
30import org.apache.felix.scr.annotations.Component;
31import org.apache.felix.scr.annotations.Reference;
32import org.apache.felix.scr.annotations.ReferenceCardinality;
33import org.apache.felix.scr.annotations.Service;
34import org.onlab.util.Tools;
35import org.onosproject.cluster.ClusterMetadataService;
36import org.onosproject.cluster.ClusterService;
37import org.onosproject.cluster.NodeId;
38import org.onosproject.cluster.PartitionId;
39import org.onosproject.event.AbstractListenerManager;
40import org.onosproject.store.cluster.messaging.MessagingService;
41import org.onosproject.store.primitives.DistributedPrimitiveCreator;
42import org.onosproject.store.primitives.PartitionAdminService;
43import org.onosproject.store.primitives.PartitionEvent;
44import org.onosproject.store.primitives.PartitionEventListener;
45import org.onosproject.store.primitives.PartitionService;
Madan Jampanie14a09c2016-02-11 10:43:21 -080046import org.onosproject.store.service.PartitionInfo;
Madan Jampani15b8ef52016-02-02 17:35:05 -080047import org.slf4j.Logger;
48
49import com.google.common.collect.ImmutableSet;
50import com.google.common.collect.Maps;
51
52/**
53 * Implementation of {@code PartitionService} and {@code PartitionAdminService}.
54 */
55@Component
56@Service
57public class PartitionManager extends AbstractListenerManager<PartitionEvent, PartitionEventListener>
58 implements PartitionService, PartitionAdminService {
59
Madan Jampani2f9cc712016-02-15 19:36:21 -080060 public static final String HELLO_MESSAGE_SUBJECT = "partition-manager-hello";
Madan Jampani15b8ef52016-02-02 17:35:05 -080061 private final Logger log = getLogger(getClass());
62
63 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
64 protected MessagingService messagingService;
65
66 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
67 protected ClusterMetadataService metadataService;
68
69 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
70 protected ClusterService clusterService;
71
72 Map<PartitionId, StoragePartition> partitions = Maps.newConcurrentMap();
73
74 @Activate
75 public void activate() {
Madan Jampani2f9cc712016-02-15 19:36:21 -080076 messagingService.registerHandler(HELLO_MESSAGE_SUBJECT,
77 (ep, input) -> CompletableFuture.completedFuture(input));
Madan Jampani15b8ef52016-02-02 17:35:05 -080078 eventDispatcher.addSink(PartitionEvent.class, listenerRegistry);
79
80 metadataService.getClusterMetadata()
81 .getPartitions()
82 .stream()
83 .forEach(partition -> partitions.put(partition.getId(), new StoragePartition(partition,
84 messagingService,
85 clusterService,
86 CatalystSerializers.getSerializer(),
87 new File(System.getProperty("karaf.data") + "/data/" + partition.getId()))));
88
89 CompletableFuture<Void> openFuture = CompletableFuture.allOf(partitions.values()
90 .stream()
91 .map(StoragePartition::open)
92 .toArray(CompletableFuture[]::new));
93 openFuture.join();
94 log.info("Started");
95 }
96
97 public void deactivate() {
Madan Jampani2f9cc712016-02-15 19:36:21 -080098 messagingService.unregisterHandler(HELLO_MESSAGE_SUBJECT);
Madan Jampani15b8ef52016-02-02 17:35:05 -080099 eventDispatcher.removeSink(PartitionEvent.class);
100
101 CompletableFuture<Void> closeFuture = CompletableFuture.allOf(partitions.values()
102 .stream()
103 .map(StoragePartition::close)
104 .toArray(CompletableFuture[]::new));
105 closeFuture.join();
106 log.info("Stopped");
107 }
108
109 @Override
110 public CompletableFuture<Void> leave(PartitionId partitionId) {
111 // TODO: Implement
112 return Tools.exceptionalFuture(new UnsupportedOperationException());
113 }
114
115 @Override
116 public CompletableFuture<Void> join(PartitionId partitionId) {
117 // TODO: Implement
118 return Tools.exceptionalFuture(new UnsupportedOperationException());
119 }
120
121 @Override
122 public int getNumberOfPartitions() {
123 return partitions.size();
124 }
125
126 @Override
127 public Set<PartitionId> getAllPartitionIds() {
128 return partitions.keySet();
129 }
130
131 @Override
132 public DistributedPrimitiveCreator getDistributedPrimitiveCreator(PartitionId partitionId) {
133 return partitions.get(partitionId).client();
134 }
135
136 @Override
137 public Set<NodeId> getConfiguredMembers(PartitionId partitionId) {
138 StoragePartition partition = partitions.get(partitionId);
139 return ImmutableSet.copyOf(partition.getMembers());
140 }
141
142 @Override
143 public Set<NodeId> getActiveMembersMembers(PartitionId partitionId) {
144 // TODO: This needs to query metadata to determine currently active
145 // members of partition
146 return getConfiguredMembers(partitionId);
147 }
Madan Jampanie14a09c2016-02-11 10:43:21 -0800148
149 @Override
150 public List<PartitionInfo> partitionInfo() {
151 return partitions.values()
152 .stream()
153 .map(StoragePartition::info)
154 .filter(Optional::isPresent)
155 .map(Optional::get)
156 .collect(Collectors.toList());
157 }
Madan Jampani2f9cc712016-02-15 19:36:21 -0800158}