blob: dff233022dba12c7004d154402e739b626145695 [file] [log] [blame]
Carmelo Cascone3977ea42019-02-28 13:43:42 -08001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.p4runtime.ctl.controller;
18
19import com.google.common.collect.Maps;
20import org.onlab.util.KryoNamespace;
21import org.onosproject.net.DeviceId;
22import org.onosproject.store.serializers.KryoNamespaces;
23import org.onosproject.store.service.EventuallyConsistentMap;
24import org.onosproject.store.service.EventuallyConsistentMapEvent;
25import org.onosproject.store.service.EventuallyConsistentMapListener;
26import org.onosproject.store.service.StorageService;
27import org.onosproject.store.service.WallClockTimestamp;
28import org.osgi.service.component.annotations.Activate;
29import org.osgi.service.component.annotations.Component;
30import org.osgi.service.component.annotations.Deactivate;
31import org.osgi.service.component.annotations.Reference;
32import org.osgi.service.component.annotations.ReferenceCardinality;
33import org.slf4j.Logger;
34
35import java.math.BigInteger;
36import java.util.concurrent.ConcurrentMap;
37
38import static com.google.common.base.Preconditions.checkNotNull;
39import static org.slf4j.LoggerFactory.getLogger;
40
41/**
42 * Distributed implementation of MasterElectionIdStore.
43 */
44@Component(immediate = true, service = MasterElectionIdStore.class)
45public class DistributedMasterElectionIdStore implements MasterElectionIdStore {
46
47 @Reference(cardinality = ReferenceCardinality.MANDATORY)
48 protected StorageService storageService;
49
50 private static final KryoNamespace SERIALIZER = KryoNamespace.newBuilder()
51 .register(KryoNamespaces.API)
52 .register(BigInteger.class)
53 .build();
54
55 private final Logger log = getLogger(getClass());
56 private final EventuallyConsistentMapListener<DeviceId, BigInteger> mapListener =
57 new InternalMapListener();
58
59 private EventuallyConsistentMap<DeviceId, BigInteger> masterElectionIds;
60 private ConcurrentMap<DeviceId, MasterElectionIdListener> listeners =
61 Maps.newConcurrentMap();
62
63 @Activate
64 public void activate() {
65 this.listeners = Maps.newConcurrentMap();
66 this.masterElectionIds = storageService.<DeviceId, BigInteger>eventuallyConsistentMapBuilder()
67 .withName("p4runtime-master-election-ids")
68 .withSerializer(SERIALIZER)
69 .withTimestampProvider((k, v) -> new WallClockTimestamp())
70 .build();
71 this.masterElectionIds.addListener(mapListener);
72 log.info("Started");
73 }
74
75 @Deactivate
76 public void deactivate() {
77 this.masterElectionIds.removeListener(mapListener);
78 this.masterElectionIds.destroy();
79 this.masterElectionIds = null;
80 this.listeners.clear();
81 this.listeners = null;
82 log.info("Stopped");
83 }
84
85
86 @Override
87 public void set(DeviceId deviceId, BigInteger electionId) {
88 checkNotNull(deviceId);
89 checkNotNull(electionId);
90 this.masterElectionIds.put(deviceId, electionId);
91 }
92
93 @Override
94 public BigInteger get(DeviceId deviceId) {
95 checkNotNull(deviceId);
96 return this.masterElectionIds.get(deviceId);
97 }
98
99 @Override
100 public void remove(DeviceId deviceId) {
101 checkNotNull(deviceId);
102 this.masterElectionIds.remove(deviceId);
103 }
104
105 @Override
106 public void setListener(DeviceId deviceId, MasterElectionIdListener newListener) {
107 checkNotNull(deviceId);
108 checkNotNull(newListener);
109 listeners.compute(deviceId, (did, existingListener) -> {
110 if (existingListener == null || existingListener == newListener) {
111 return newListener;
112 } else {
113 log.error("Cannot add listener as one already exist for {}", deviceId);
114 return existingListener;
115 }
116 });
117 }
118
119 @Override
120 public void unsetListener(DeviceId deviceId) {
121 listeners.remove(deviceId);
122 }
123
124 private class InternalMapListener implements EventuallyConsistentMapListener<DeviceId, BigInteger> {
125 @Override
126 public void event(EventuallyConsistentMapEvent<DeviceId, BigInteger> event) {
127 final MasterElectionIdListener listener = listeners.get(event.key());
128 if (listener == null) {
129 return;
130 }
131 switch (event.type()) {
132 case PUT:
133 listener.updated(event.value());
134 break;
135 case REMOVE:
136 listener.updated(null);
137 break;
138 default:
139 log.error("Unrecognized map event type {}", event.type());
140 }
141 }
142 }
143}