blob: 29045a276c3c90ebbb160fe1ae269f03262e7f51 [file] [log] [blame]
Jordan Halterman980a8c12017-09-22 18:01:19 -07001/*
2 * Copyright 2017-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
18import java.io.File;
19import java.util.function.Supplier;
20import java.util.stream.Collectors;
21
22import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Deactivate;
25import org.apache.felix.scr.annotations.Reference;
26import org.apache.felix.scr.annotations.ReferenceCardinality;
27import org.apache.felix.scr.annotations.Service;
28import org.onosproject.cluster.UnifiedClusterService;
29import org.onosproject.cluster.ControllerNode;
30import org.onosproject.cluster.DefaultPartition;
31import org.onosproject.cluster.PartitionId;
32import org.onosproject.persistence.PersistenceService;
33import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService;
34import org.onosproject.store.primitives.DistributedPrimitiveCreator;
35import org.onosproject.store.serializers.KryoNamespaces;
36import org.onosproject.store.service.AsyncAtomicValue;
37import org.onosproject.store.service.AsyncConsistentMultimap;
38import org.onosproject.store.service.AsyncConsistentTreeMap;
39import org.onosproject.store.service.AsyncDocumentTree;
40import org.onosproject.store.service.AtomicCounterBuilder;
41import org.onosproject.store.service.AtomicCounterMapBuilder;
42import org.onosproject.store.service.AtomicIdGeneratorBuilder;
43import org.onosproject.store.service.AtomicValueBuilder;
44import org.onosproject.store.service.ConsistentMapBuilder;
45import org.onosproject.store.service.ConsistentMultimapBuilder;
46import org.onosproject.store.service.ConsistentTreeMapBuilder;
47import org.onosproject.store.service.CoordinationService;
48import org.onosproject.store.service.DistributedSetBuilder;
49import org.onosproject.store.service.DocumentTreeBuilder;
50import org.onosproject.store.service.EventuallyConsistentMapBuilder;
51import org.onosproject.store.service.LeaderElectorBuilder;
52import org.onosproject.store.service.Serializer;
53import org.onosproject.store.service.Topic;
54import org.onosproject.store.service.TransactionContextBuilder;
55import org.onosproject.store.service.WorkQueue;
56import org.slf4j.Logger;
57
58import static org.onosproject.security.AppGuard.checkPermission;
59import static org.onosproject.security.AppPermission.Type.STORAGE_WRITE;
60import static org.slf4j.LoggerFactory.getLogger;
61
62/**
63 * Implementation of {@code CoordinationService} that uses a {@link StoragePartition} that spans all the nodes
64 * in the cluster regardless of version.
65 */
66@Service
67@Component(immediate = true)
68public class CoordinationManager implements CoordinationService {
69
70 private final Logger log = getLogger(getClass());
71
72 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
73 protected UnifiedClusterService clusterService;
74
75 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
76 protected UnifiedClusterCommunicationService clusterCommunicator;
77
78 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
79 protected PersistenceService persistenceService;
80
81 private StoragePartition partition;
82 private DistributedPrimitiveCreator primitiveCreator;
83
84 @Activate
85 public void activate() {
86 partition = new StoragePartition(
87 new DefaultPartition(
88 PartitionId.SHARED,
89 clusterService.getNodes()
90 .stream()
91 .map(ControllerNode::id)
92 .collect(Collectors.toSet())),
93 null,
94 null,
95 clusterCommunicator,
96 clusterService,
97 new File(System.getProperty("karaf.data") + "/partitions/coordination"));
98 partition.open().join();
99 primitiveCreator = partition.client();
100 log.info("Started");
101 }
102
103 @Deactivate
104 public void deactivate() {
105 log.info("Stopped");
106 }
107
108 @Override
109 public <K, V> EventuallyConsistentMapBuilder<K, V> eventuallyConsistentMapBuilder() {
110 checkPermission(STORAGE_WRITE);
111 return new EventuallyConsistentMapBuilderImpl<>(clusterService,
112 clusterCommunicator,
113 persistenceService);
114 }
115
116 @Override
117 public <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder() {
118 checkPermission(STORAGE_WRITE);
119 return new DefaultConsistentMapBuilder<>(primitiveCreator);
120 }
121
122 @Override
123 public <V> DocumentTreeBuilder<V> documentTreeBuilder() {
124 checkPermission(STORAGE_WRITE);
125 return new DefaultDocumentTreeBuilder<>(primitiveCreator);
126 }
127
128 @Override
129 public <V> ConsistentTreeMapBuilder<V> consistentTreeMapBuilder() {
130 return new DefaultConsistentTreeMapBuilder<>(primitiveCreator);
131 }
132
133 @Override
134 public <K, V> ConsistentMultimapBuilder<K, V> consistentMultimapBuilder() {
135 checkPermission(STORAGE_WRITE);
136 return new DefaultConsistentMultimapBuilder<>(primitiveCreator);
137 }
138
139 @Override
140 public <K> AtomicCounterMapBuilder<K> atomicCounterMapBuilder() {
141 checkPermission(STORAGE_WRITE);
142 return new DefaultAtomicCounterMapBuilder<>(primitiveCreator);
143 }
144
145 @Override
146 public <E> DistributedSetBuilder<E> setBuilder() {
147 checkPermission(STORAGE_WRITE);
148 return new DefaultDistributedSetBuilder<>(() -> this.<E, Boolean>consistentMapBuilder());
149 }
150
151 @Override
152 public AtomicCounterBuilder atomicCounterBuilder() {
153 checkPermission(STORAGE_WRITE);
154 return new DefaultAtomicCounterBuilder(primitiveCreator);
155 }
156
157 @Override
158 public AtomicIdGeneratorBuilder atomicIdGeneratorBuilder() {
159 checkPermission(STORAGE_WRITE);
160 return new DefaultAtomicIdGeneratorBuilder(primitiveCreator);
161 }
162
163 @Override
164 public <V> AtomicValueBuilder<V> atomicValueBuilder() {
165 checkPermission(STORAGE_WRITE);
166 Supplier<ConsistentMapBuilder<String, byte[]>> mapBuilderSupplier =
167 () -> this.<String, byte[]>consistentMapBuilder()
168 .withName("onos-atomic-values")
169 .withSerializer(Serializer.using(KryoNamespaces.BASIC));
170 return new DefaultAtomicValueBuilder<>(mapBuilderSupplier);
171 }
172
173 @Override
174 public TransactionContextBuilder transactionContextBuilder() {
175 throw new UnsupportedOperationException();
176 }
177
178 @Override
179 public LeaderElectorBuilder leaderElectorBuilder() {
180 checkPermission(STORAGE_WRITE);
181 return new DefaultLeaderElectorBuilder(primitiveCreator);
182 }
183
184 @Override
185 public <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer) {
186 checkPermission(STORAGE_WRITE);
187 return primitiveCreator.newWorkQueue(name, serializer);
188 }
189
190 @Override
191 public <V> AsyncDocumentTree<V> getDocumentTree(String name, Serializer serializer) {
192 checkPermission(STORAGE_WRITE);
193 return primitiveCreator.newAsyncDocumentTree(name, serializer);
194 }
195
196 @Override
197 public <K, V> AsyncConsistentMultimap<K, V> getAsyncSetMultimap(
198 String name, Serializer serializer) {
199 checkPermission(STORAGE_WRITE);
200 return primitiveCreator.newAsyncConsistentSetMultimap(name,
201 serializer);
202 }
203
204 @Override
205 public <V> AsyncConsistentTreeMap<V> getAsyncTreeMap(
206 String name, Serializer serializer) {
207 checkPermission(STORAGE_WRITE);
208 return primitiveCreator.newAsyncConsistentTreeMap(name, serializer);
209 }
210
211 @Override
212 public <T> Topic<T> getTopic(String name, Serializer serializer) {
213 AsyncAtomicValue<T> atomicValue = this.<T>atomicValueBuilder()
214 .withName("topic-" + name)
215 .withSerializer(serializer)
216 .build();
217 return new DefaultDistributedTopic<>(atomicValue);
218 }
219}