blob: e7ed20a5e6f36c597325a380f5bf62a7727ff42f [file] [log] [blame]
Jordan Halterman980a8c12017-09-22 18:01:19 -07001/*
2 * Copyright 2017-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
Jordan Halterman07f052b2017-10-08 14:22:41 -070018import java.util.function.Supplier;
19import java.util.stream.Collectors;
20
Jordan Halterman980a8c12017-09-22 18:01:19 -070021import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
24import org.apache.felix.scr.annotations.Reference;
25import org.apache.felix.scr.annotations.ReferenceCardinality;
26import org.apache.felix.scr.annotations.Service;
Jordan Halterman28183ee2017-10-17 17:29:10 -070027import org.onosproject.cluster.ClusterService;
Jordan Halterman980a8c12017-09-22 18:01:19 -070028import org.onosproject.cluster.ControllerNode;
29import org.onosproject.cluster.DefaultPartition;
slowr878625f2017-10-24 14:53:49 -070030import org.onosproject.cluster.NodeId;
Jordan Halterman980a8c12017-09-22 18:01:19 -070031import org.onosproject.cluster.PartitionId;
32import org.onosproject.persistence.PersistenceService;
Jordan Halterman28183ee2017-10-17 17:29:10 -070033import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Jordan Halterman980a8c12017-09-22 18:01:19 -070034import org.onosproject.store.primitives.DistributedPrimitiveCreator;
35import org.onosproject.store.serializers.KryoNamespaces;
36import org.onosproject.store.service.AsyncAtomicValue;
37import org.onosproject.store.service.AsyncConsistentMultimap;
38import org.onosproject.store.service.AsyncConsistentTreeMap;
39import org.onosproject.store.service.AsyncDocumentTree;
40import org.onosproject.store.service.AtomicCounterBuilder;
41import org.onosproject.store.service.AtomicCounterMapBuilder;
42import org.onosproject.store.service.AtomicIdGeneratorBuilder;
43import org.onosproject.store.service.AtomicValueBuilder;
44import org.onosproject.store.service.ConsistentMapBuilder;
45import org.onosproject.store.service.ConsistentMultimapBuilder;
46import org.onosproject.store.service.ConsistentTreeMapBuilder;
47import org.onosproject.store.service.CoordinationService;
48import org.onosproject.store.service.DistributedSetBuilder;
49import org.onosproject.store.service.DocumentTreeBuilder;
50import org.onosproject.store.service.EventuallyConsistentMapBuilder;
51import org.onosproject.store.service.LeaderElectorBuilder;
52import org.onosproject.store.service.Serializer;
53import org.onosproject.store.service.Topic;
54import org.onosproject.store.service.TransactionContextBuilder;
55import org.onosproject.store.service.WorkQueue;
56import org.slf4j.Logger;
57
slowr878625f2017-10-24 14:53:49 -070058import java.util.List;
slowr878625f2017-10-24 14:53:49 -070059
Jordan Halterman980a8c12017-09-22 18:01:19 -070060import static org.onosproject.security.AppGuard.checkPermission;
61import static org.onosproject.security.AppPermission.Type.STORAGE_WRITE;
62import static org.slf4j.LoggerFactory.getLogger;
63
64/**
65 * Implementation of {@code CoordinationService} that uses a {@link StoragePartition} that spans all the nodes
66 * in the cluster regardless of version.
67 */
68@Service
69@Component(immediate = true)
70public class CoordinationManager implements CoordinationService {
71
72 private final Logger log = getLogger(getClass());
73
74 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jordan Halterman28183ee2017-10-17 17:29:10 -070075 protected ClusterService clusterService;
Jordan Halterman980a8c12017-09-22 18:01:19 -070076
77 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jordan Halterman28183ee2017-10-17 17:29:10 -070078 protected ClusterCommunicationService clusterCommunicator;
Jordan Halterman980a8c12017-09-22 18:01:19 -070079
80 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
81 protected PersistenceService persistenceService;
82
83 private StoragePartition partition;
84 private DistributedPrimitiveCreator primitiveCreator;
85
86 @Activate
87 public void activate() {
Jordan Halterman07f052b2017-10-08 14:22:41 -070088 partition = new ActiveStoragePartition(
Jordan Halterman980a8c12017-09-22 18:01:19 -070089 new DefaultPartition(
90 PartitionId.SHARED,
Jordan Halterman07f052b2017-10-08 14:22:41 -070091 null,
Jordan Halterman980a8c12017-09-22 18:01:19 -070092 clusterService.getNodes()
93 .stream()
94 .map(ControllerNode::id)
95 .collect(Collectors.toSet())),
Jordan Halterman980a8c12017-09-22 18:01:19 -070096 clusterCommunicator,
Jordan Halterman07f052b2017-10-08 14:22:41 -070097 clusterService);
Jordan Halterman980a8c12017-09-22 18:01:19 -070098 partition.open().join();
99 primitiveCreator = partition.client();
100 log.info("Started");
101 }
102
103 @Deactivate
104 public void deactivate() {
105 log.info("Stopped");
106 }
107
108 @Override
109 public <K, V> EventuallyConsistentMapBuilder<K, V> eventuallyConsistentMapBuilder() {
110 checkPermission(STORAGE_WRITE);
slowr878625f2017-10-24 14:53:49 -0700111 final NodeId localNodeId = clusterService.getLocalNode().id();
112
113 Supplier<List<NodeId>> peersSupplier = () -> clusterService.getNodes().stream()
114 .map(ControllerNode::id)
115 .filter(nodeId -> !nodeId.equals(localNodeId))
116 .filter(id -> clusterService.getState(id).isActive())
117 .collect(Collectors.toList());
118
119 Supplier<List<NodeId>> bootstrapPeersSupplier = () -> clusterService.getNodes()
120 .stream()
121 .map(ControllerNode::id)
122 .filter(id -> !localNodeId.equals(id))
123 .filter(id -> clusterService.getState(id).isActive())
124 .collect(Collectors.toList());
125
126 return new EventuallyConsistentMapBuilderImpl<>(
127 localNodeId,
Jordan Halterman980a8c12017-09-22 18:01:19 -0700128 clusterCommunicator,
slowr878625f2017-10-24 14:53:49 -0700129 persistenceService,
130 peersSupplier,
131 bootstrapPeersSupplier
132 );
Jordan Halterman980a8c12017-09-22 18:01:19 -0700133 }
134
135 @Override
136 public <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder() {
137 checkPermission(STORAGE_WRITE);
138 return new DefaultConsistentMapBuilder<>(primitiveCreator);
139 }
140
141 @Override
142 public <V> DocumentTreeBuilder<V> documentTreeBuilder() {
143 checkPermission(STORAGE_WRITE);
144 return new DefaultDocumentTreeBuilder<>(primitiveCreator);
145 }
146
147 @Override
148 public <V> ConsistentTreeMapBuilder<V> consistentTreeMapBuilder() {
149 return new DefaultConsistentTreeMapBuilder<>(primitiveCreator);
150 }
151
152 @Override
153 public <K, V> ConsistentMultimapBuilder<K, V> consistentMultimapBuilder() {
154 checkPermission(STORAGE_WRITE);
155 return new DefaultConsistentMultimapBuilder<>(primitiveCreator);
156 }
157
158 @Override
159 public <K> AtomicCounterMapBuilder<K> atomicCounterMapBuilder() {
160 checkPermission(STORAGE_WRITE);
161 return new DefaultAtomicCounterMapBuilder<>(primitiveCreator);
162 }
163
164 @Override
165 public <E> DistributedSetBuilder<E> setBuilder() {
166 checkPermission(STORAGE_WRITE);
slowr878625f2017-10-24 14:53:49 -0700167 return new DefaultDistributedSetBuilder<>(this::<E, Boolean>consistentMapBuilder);
Jordan Halterman980a8c12017-09-22 18:01:19 -0700168 }
169
170 @Override
171 public AtomicCounterBuilder atomicCounterBuilder() {
172 checkPermission(STORAGE_WRITE);
173 return new DefaultAtomicCounterBuilder(primitiveCreator);
174 }
175
176 @Override
177 public AtomicIdGeneratorBuilder atomicIdGeneratorBuilder() {
178 checkPermission(STORAGE_WRITE);
179 return new DefaultAtomicIdGeneratorBuilder(primitiveCreator);
180 }
181
182 @Override
183 public <V> AtomicValueBuilder<V> atomicValueBuilder() {
184 checkPermission(STORAGE_WRITE);
185 Supplier<ConsistentMapBuilder<String, byte[]>> mapBuilderSupplier =
186 () -> this.<String, byte[]>consistentMapBuilder()
187 .withName("onos-atomic-values")
188 .withSerializer(Serializer.using(KryoNamespaces.BASIC));
189 return new DefaultAtomicValueBuilder<>(mapBuilderSupplier);
190 }
191
192 @Override
193 public TransactionContextBuilder transactionContextBuilder() {
194 throw new UnsupportedOperationException();
195 }
196
197 @Override
198 public LeaderElectorBuilder leaderElectorBuilder() {
199 checkPermission(STORAGE_WRITE);
200 return new DefaultLeaderElectorBuilder(primitiveCreator);
201 }
202
203 @Override
204 public <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer) {
205 checkPermission(STORAGE_WRITE);
206 return primitiveCreator.newWorkQueue(name, serializer);
207 }
208
209 @Override
210 public <V> AsyncDocumentTree<V> getDocumentTree(String name, Serializer serializer) {
211 checkPermission(STORAGE_WRITE);
212 return primitiveCreator.newAsyncDocumentTree(name, serializer);
213 }
214
215 @Override
216 public <K, V> AsyncConsistentMultimap<K, V> getAsyncSetMultimap(
217 String name, Serializer serializer) {
218 checkPermission(STORAGE_WRITE);
219 return primitiveCreator.newAsyncConsistentSetMultimap(name,
220 serializer);
221 }
222
223 @Override
224 public <V> AsyncConsistentTreeMap<V> getAsyncTreeMap(
225 String name, Serializer serializer) {
226 checkPermission(STORAGE_WRITE);
227 return primitiveCreator.newAsyncConsistentTreeMap(name, serializer);
228 }
229
230 @Override
231 public <T> Topic<T> getTopic(String name, Serializer serializer) {
232 AsyncAtomicValue<T> atomicValue = this.<T>atomicValueBuilder()
233 .withName("topic-" + name)
234 .withSerializer(serializer)
235 .build();
236 return new DefaultDistributedTopic<>(atomicValue);
237 }
238}