blob: f92e3f5da294d5b87039cc7cfe156d526ff99848 [file] [log] [blame]
Jordan Halterman980a8c12017-09-22 18:01:19 -07001/*
2 * Copyright 2017-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
18import java.io.File;
19import java.util.function.Supplier;
20import java.util.stream.Collectors;
21
22import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Deactivate;
25import org.apache.felix.scr.annotations.Reference;
26import org.apache.felix.scr.annotations.ReferenceCardinality;
27import org.apache.felix.scr.annotations.Service;
Jordan Halterman28183ee2017-10-17 17:29:10 -070028import org.onosproject.cluster.ClusterService;
Jordan Halterman980a8c12017-09-22 18:01:19 -070029import org.onosproject.cluster.ControllerNode;
30import org.onosproject.cluster.DefaultPartition;
31import org.onosproject.cluster.PartitionId;
32import org.onosproject.persistence.PersistenceService;
Jordan Halterman28183ee2017-10-17 17:29:10 -070033import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Jordan Halterman980a8c12017-09-22 18:01:19 -070034import org.onosproject.store.primitives.DistributedPrimitiveCreator;
35import org.onosproject.store.serializers.KryoNamespaces;
36import org.onosproject.store.service.AsyncAtomicValue;
37import org.onosproject.store.service.AsyncConsistentMultimap;
38import org.onosproject.store.service.AsyncConsistentTreeMap;
39import org.onosproject.store.service.AsyncDocumentTree;
40import org.onosproject.store.service.AtomicCounterBuilder;
41import org.onosproject.store.service.AtomicCounterMapBuilder;
42import org.onosproject.store.service.AtomicIdGeneratorBuilder;
43import org.onosproject.store.service.AtomicValueBuilder;
44import org.onosproject.store.service.ConsistentMapBuilder;
45import org.onosproject.store.service.ConsistentMultimapBuilder;
46import org.onosproject.store.service.ConsistentTreeMapBuilder;
47import org.onosproject.store.service.CoordinationService;
48import org.onosproject.store.service.DistributedSetBuilder;
49import org.onosproject.store.service.DocumentTreeBuilder;
50import org.onosproject.store.service.EventuallyConsistentMapBuilder;
51import org.onosproject.store.service.LeaderElectorBuilder;
52import org.onosproject.store.service.Serializer;
53import org.onosproject.store.service.Topic;
54import org.onosproject.store.service.TransactionContextBuilder;
55import org.onosproject.store.service.WorkQueue;
56import org.slf4j.Logger;
57
58import static org.onosproject.security.AppGuard.checkPermission;
59import static org.onosproject.security.AppPermission.Type.STORAGE_WRITE;
Thomas Vachuska58bf4912017-10-31 12:00:32 -070060import static org.onosproject.store.primitives.impl.PartitionManager.PARTITIONS_DIR;
Jordan Halterman980a8c12017-09-22 18:01:19 -070061import static org.slf4j.LoggerFactory.getLogger;
62
63/**
64 * Implementation of {@code CoordinationService} that uses a {@link StoragePartition} that spans all the nodes
65 * in the cluster regardless of version.
66 */
67@Service
68@Component(immediate = true)
69public class CoordinationManager implements CoordinationService {
70
71 private final Logger log = getLogger(getClass());
72
73 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jordan Halterman28183ee2017-10-17 17:29:10 -070074 protected ClusterService clusterService;
Jordan Halterman980a8c12017-09-22 18:01:19 -070075
76 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jordan Halterman28183ee2017-10-17 17:29:10 -070077 protected ClusterCommunicationService clusterCommunicator;
Jordan Halterman980a8c12017-09-22 18:01:19 -070078
79 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
80 protected PersistenceService persistenceService;
81
82 private StoragePartition partition;
83 private DistributedPrimitiveCreator primitiveCreator;
84
85 @Activate
86 public void activate() {
87 partition = new StoragePartition(
88 new DefaultPartition(
89 PartitionId.SHARED,
90 clusterService.getNodes()
91 .stream()
92 .map(ControllerNode::id)
93 .collect(Collectors.toSet())),
94 null,
95 null,
96 clusterCommunicator,
97 clusterService,
Thomas Vachuska58bf4912017-10-31 12:00:32 -070098 new File(PARTITIONS_DIR + "/coordination"));
Jordan Halterman980a8c12017-09-22 18:01:19 -070099 partition.open().join();
100 primitiveCreator = partition.client();
101 log.info("Started");
102 }
103
104 @Deactivate
105 public void deactivate() {
106 log.info("Stopped");
107 }
108
109 @Override
110 public <K, V> EventuallyConsistentMapBuilder<K, V> eventuallyConsistentMapBuilder() {
111 checkPermission(STORAGE_WRITE);
112 return new EventuallyConsistentMapBuilderImpl<>(clusterService,
113 clusterCommunicator,
114 persistenceService);
115 }
116
117 @Override
118 public <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder() {
119 checkPermission(STORAGE_WRITE);
120 return new DefaultConsistentMapBuilder<>(primitiveCreator);
121 }
122
123 @Override
124 public <V> DocumentTreeBuilder<V> documentTreeBuilder() {
125 checkPermission(STORAGE_WRITE);
126 return new DefaultDocumentTreeBuilder<>(primitiveCreator);
127 }
128
129 @Override
130 public <V> ConsistentTreeMapBuilder<V> consistentTreeMapBuilder() {
131 return new DefaultConsistentTreeMapBuilder<>(primitiveCreator);
132 }
133
134 @Override
135 public <K, V> ConsistentMultimapBuilder<K, V> consistentMultimapBuilder() {
136 checkPermission(STORAGE_WRITE);
137 return new DefaultConsistentMultimapBuilder<>(primitiveCreator);
138 }
139
140 @Override
141 public <K> AtomicCounterMapBuilder<K> atomicCounterMapBuilder() {
142 checkPermission(STORAGE_WRITE);
143 return new DefaultAtomicCounterMapBuilder<>(primitiveCreator);
144 }
145
146 @Override
147 public <E> DistributedSetBuilder<E> setBuilder() {
148 checkPermission(STORAGE_WRITE);
149 return new DefaultDistributedSetBuilder<>(() -> this.<E, Boolean>consistentMapBuilder());
150 }
151
152 @Override
153 public AtomicCounterBuilder atomicCounterBuilder() {
154 checkPermission(STORAGE_WRITE);
155 return new DefaultAtomicCounterBuilder(primitiveCreator);
156 }
157
158 @Override
159 public AtomicIdGeneratorBuilder atomicIdGeneratorBuilder() {
160 checkPermission(STORAGE_WRITE);
161 return new DefaultAtomicIdGeneratorBuilder(primitiveCreator);
162 }
163
164 @Override
165 public <V> AtomicValueBuilder<V> atomicValueBuilder() {
166 checkPermission(STORAGE_WRITE);
167 Supplier<ConsistentMapBuilder<String, byte[]>> mapBuilderSupplier =
168 () -> this.<String, byte[]>consistentMapBuilder()
169 .withName("onos-atomic-values")
170 .withSerializer(Serializer.using(KryoNamespaces.BASIC));
171 return new DefaultAtomicValueBuilder<>(mapBuilderSupplier);
172 }
173
174 @Override
175 public TransactionContextBuilder transactionContextBuilder() {
176 throw new UnsupportedOperationException();
177 }
178
179 @Override
180 public LeaderElectorBuilder leaderElectorBuilder() {
181 checkPermission(STORAGE_WRITE);
182 return new DefaultLeaderElectorBuilder(primitiveCreator);
183 }
184
185 @Override
186 public <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer) {
187 checkPermission(STORAGE_WRITE);
188 return primitiveCreator.newWorkQueue(name, serializer);
189 }
190
191 @Override
192 public <V> AsyncDocumentTree<V> getDocumentTree(String name, Serializer serializer) {
193 checkPermission(STORAGE_WRITE);
194 return primitiveCreator.newAsyncDocumentTree(name, serializer);
195 }
196
197 @Override
198 public <K, V> AsyncConsistentMultimap<K, V> getAsyncSetMultimap(
199 String name, Serializer serializer) {
200 checkPermission(STORAGE_WRITE);
201 return primitiveCreator.newAsyncConsistentSetMultimap(name,
202 serializer);
203 }
204
205 @Override
206 public <V> AsyncConsistentTreeMap<V> getAsyncTreeMap(
207 String name, Serializer serializer) {
208 checkPermission(STORAGE_WRITE);
209 return primitiveCreator.newAsyncConsistentTreeMap(name, serializer);
210 }
211
212 @Override
213 public <T> Topic<T> getTopic(String name, Serializer serializer) {
214 AsyncAtomicValue<T> atomicValue = this.<T>atomicValueBuilder()
215 .withName("topic-" + name)
216 .withSerializer(serializer)
217 .build();
218 return new DefaultDistributedTopic<>(atomicValue);
219 }
220}