blob: f2965d9316e628afeed73f4dd5f6c68637dc84ad [file] [log] [blame]
Jordan Halterman980a8c12017-09-22 18:01:19 -07001/*
2 * Copyright 2017-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
Jordan Halterman07f052b2017-10-08 14:22:41 -070018import java.util.function.Supplier;
19import java.util.stream.Collectors;
20
Jordan Halterman980a8c12017-09-22 18:01:19 -070021import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
24import org.apache.felix.scr.annotations.Reference;
25import org.apache.felix.scr.annotations.ReferenceCardinality;
26import org.apache.felix.scr.annotations.Service;
Jordan Halterman28183ee2017-10-17 17:29:10 -070027import org.onosproject.cluster.ClusterService;
Jordan Halterman980a8c12017-09-22 18:01:19 -070028import org.onosproject.cluster.ControllerNode;
29import org.onosproject.cluster.DefaultPartition;
slowr878625f2017-10-24 14:53:49 -070030import org.onosproject.cluster.NodeId;
Jordan Halterman980a8c12017-09-22 18:01:19 -070031import org.onosproject.cluster.PartitionId;
Jordan Halterman45008172018-03-19 16:40:31 -070032import org.onosproject.core.VersionService;
Jordan Halterman980a8c12017-09-22 18:01:19 -070033import org.onosproject.persistence.PersistenceService;
Jordan Halterman28183ee2017-10-17 17:29:10 -070034import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Jordan Halterman980a8c12017-09-22 18:01:19 -070035import org.onosproject.store.primitives.DistributedPrimitiveCreator;
36import org.onosproject.store.serializers.KryoNamespaces;
37import org.onosproject.store.service.AsyncAtomicValue;
38import org.onosproject.store.service.AsyncConsistentMultimap;
39import org.onosproject.store.service.AsyncConsistentTreeMap;
40import org.onosproject.store.service.AsyncDocumentTree;
41import org.onosproject.store.service.AtomicCounterBuilder;
42import org.onosproject.store.service.AtomicCounterMapBuilder;
43import org.onosproject.store.service.AtomicIdGeneratorBuilder;
44import org.onosproject.store.service.AtomicValueBuilder;
45import org.onosproject.store.service.ConsistentMapBuilder;
46import org.onosproject.store.service.ConsistentMultimapBuilder;
47import org.onosproject.store.service.ConsistentTreeMapBuilder;
48import org.onosproject.store.service.CoordinationService;
49import org.onosproject.store.service.DistributedSetBuilder;
50import org.onosproject.store.service.DocumentTreeBuilder;
51import org.onosproject.store.service.EventuallyConsistentMapBuilder;
52import org.onosproject.store.service.LeaderElectorBuilder;
53import org.onosproject.store.service.Serializer;
54import org.onosproject.store.service.Topic;
55import org.onosproject.store.service.TransactionContextBuilder;
56import org.onosproject.store.service.WorkQueue;
57import org.slf4j.Logger;
58
slowr878625f2017-10-24 14:53:49 -070059import java.util.List;
slowr878625f2017-10-24 14:53:49 -070060
Jordan Halterman980a8c12017-09-22 18:01:19 -070061import static org.onosproject.security.AppGuard.checkPermission;
62import static org.onosproject.security.AppPermission.Type.STORAGE_WRITE;
63import static org.slf4j.LoggerFactory.getLogger;
64
65/**
66 * Implementation of {@code CoordinationService} that uses a {@link StoragePartition} that spans all the nodes
67 * in the cluster regardless of version.
68 */
69@Service
70@Component(immediate = true)
71public class CoordinationManager implements CoordinationService {
72
73 private final Logger log = getLogger(getClass());
74
75 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jordan Halterman28183ee2017-10-17 17:29:10 -070076 protected ClusterService clusterService;
Jordan Halterman980a8c12017-09-22 18:01:19 -070077
78 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jordan Halterman28183ee2017-10-17 17:29:10 -070079 protected ClusterCommunicationService clusterCommunicator;
Jordan Halterman980a8c12017-09-22 18:01:19 -070080
81 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
82 protected PersistenceService persistenceService;
83
Jordan Halterman45008172018-03-19 16:40:31 -070084 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
85 protected VersionService versionService;
86
Jordan Halterman980a8c12017-09-22 18:01:19 -070087 private StoragePartition partition;
88 private DistributedPrimitiveCreator primitiveCreator;
89
90 @Activate
91 public void activate() {
Jordan Haltermana57a4722018-03-19 15:44:24 -070092 partition = new StoragePartition(
Jordan Halterman980a8c12017-09-22 18:01:19 -070093 new DefaultPartition(
Jordan Haltermana57a4722018-03-19 15:44:24 -070094 PartitionId.from(0),
Jordan Halterman07f052b2017-10-08 14:22:41 -070095 null,
Jordan Halterman980a8c12017-09-22 18:01:19 -070096 clusterService.getNodes()
97 .stream()
98 .map(ControllerNode::id)
99 .collect(Collectors.toSet())),
Jordan Halterman980a8c12017-09-22 18:01:19 -0700100 clusterCommunicator,
Jordan Halterman07f052b2017-10-08 14:22:41 -0700101 clusterService);
Jordan Halterman980a8c12017-09-22 18:01:19 -0700102 partition.open().join();
103 primitiveCreator = partition.client();
104 log.info("Started");
105 }
106
107 @Deactivate
108 public void deactivate() {
109 log.info("Stopped");
110 }
111
112 @Override
113 public <K, V> EventuallyConsistentMapBuilder<K, V> eventuallyConsistentMapBuilder() {
114 checkPermission(STORAGE_WRITE);
slowr878625f2017-10-24 14:53:49 -0700115 final NodeId localNodeId = clusterService.getLocalNode().id();
116
117 Supplier<List<NodeId>> peersSupplier = () -> clusterService.getNodes().stream()
118 .map(ControllerNode::id)
119 .filter(nodeId -> !nodeId.equals(localNodeId))
120 .filter(id -> clusterService.getState(id).isActive())
121 .collect(Collectors.toList());
122
123 Supplier<List<NodeId>> bootstrapPeersSupplier = () -> clusterService.getNodes()
124 .stream()
125 .map(ControllerNode::id)
126 .filter(id -> !localNodeId.equals(id))
127 .filter(id -> clusterService.getState(id).isActive())
128 .collect(Collectors.toList());
129
130 return new EventuallyConsistentMapBuilderImpl<>(
131 localNodeId,
Jordan Halterman980a8c12017-09-22 18:01:19 -0700132 clusterCommunicator,
slowr878625f2017-10-24 14:53:49 -0700133 persistenceService,
134 peersSupplier,
135 bootstrapPeersSupplier
136 );
Jordan Halterman980a8c12017-09-22 18:01:19 -0700137 }
138
139 @Override
140 public <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder() {
141 checkPermission(STORAGE_WRITE);
Jordan Halterman45008172018-03-19 16:40:31 -0700142 return new DefaultConsistentMapBuilder<>(primitiveCreator, versionService.version());
Jordan Halterman980a8c12017-09-22 18:01:19 -0700143 }
144
145 @Override
146 public <V> DocumentTreeBuilder<V> documentTreeBuilder() {
147 checkPermission(STORAGE_WRITE);
148 return new DefaultDocumentTreeBuilder<>(primitiveCreator);
149 }
150
151 @Override
152 public <V> ConsistentTreeMapBuilder<V> consistentTreeMapBuilder() {
153 return new DefaultConsistentTreeMapBuilder<>(primitiveCreator);
154 }
155
156 @Override
157 public <K, V> ConsistentMultimapBuilder<K, V> consistentMultimapBuilder() {
158 checkPermission(STORAGE_WRITE);
159 return new DefaultConsistentMultimapBuilder<>(primitiveCreator);
160 }
161
162 @Override
163 public <K> AtomicCounterMapBuilder<K> atomicCounterMapBuilder() {
164 checkPermission(STORAGE_WRITE);
165 return new DefaultAtomicCounterMapBuilder<>(primitiveCreator);
166 }
167
168 @Override
169 public <E> DistributedSetBuilder<E> setBuilder() {
170 checkPermission(STORAGE_WRITE);
slowr878625f2017-10-24 14:53:49 -0700171 return new DefaultDistributedSetBuilder<>(this::<E, Boolean>consistentMapBuilder);
Jordan Halterman980a8c12017-09-22 18:01:19 -0700172 }
173
174 @Override
175 public AtomicCounterBuilder atomicCounterBuilder() {
176 checkPermission(STORAGE_WRITE);
177 return new DefaultAtomicCounterBuilder(primitiveCreator);
178 }
179
180 @Override
181 public AtomicIdGeneratorBuilder atomicIdGeneratorBuilder() {
182 checkPermission(STORAGE_WRITE);
183 return new DefaultAtomicIdGeneratorBuilder(primitiveCreator);
184 }
185
186 @Override
187 public <V> AtomicValueBuilder<V> atomicValueBuilder() {
188 checkPermission(STORAGE_WRITE);
189 Supplier<ConsistentMapBuilder<String, byte[]>> mapBuilderSupplier =
190 () -> this.<String, byte[]>consistentMapBuilder()
191 .withName("onos-atomic-values")
192 .withSerializer(Serializer.using(KryoNamespaces.BASIC));
193 return new DefaultAtomicValueBuilder<>(mapBuilderSupplier);
194 }
195
196 @Override
197 public TransactionContextBuilder transactionContextBuilder() {
198 throw new UnsupportedOperationException();
199 }
200
201 @Override
202 public LeaderElectorBuilder leaderElectorBuilder() {
203 checkPermission(STORAGE_WRITE);
204 return new DefaultLeaderElectorBuilder(primitiveCreator);
205 }
206
207 @Override
208 public <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer) {
209 checkPermission(STORAGE_WRITE);
210 return primitiveCreator.newWorkQueue(name, serializer);
211 }
212
213 @Override
214 public <V> AsyncDocumentTree<V> getDocumentTree(String name, Serializer serializer) {
215 checkPermission(STORAGE_WRITE);
216 return primitiveCreator.newAsyncDocumentTree(name, serializer);
217 }
218
219 @Override
220 public <K, V> AsyncConsistentMultimap<K, V> getAsyncSetMultimap(
221 String name, Serializer serializer) {
222 checkPermission(STORAGE_WRITE);
223 return primitiveCreator.newAsyncConsistentSetMultimap(name,
224 serializer);
225 }
226
227 @Override
228 public <V> AsyncConsistentTreeMap<V> getAsyncTreeMap(
229 String name, Serializer serializer) {
230 checkPermission(STORAGE_WRITE);
231 return primitiveCreator.newAsyncConsistentTreeMap(name, serializer);
232 }
233
234 @Override
235 public <T> Topic<T> getTopic(String name, Serializer serializer) {
236 AsyncAtomicValue<T> atomicValue = this.<T>atomicValueBuilder()
237 .withName("topic-" + name)
238 .withSerializer(serializer)
239 .build();
240 return new DefaultDistributedTopic<>(atomicValue);
241 }
242}