blob: e2ecdf331baa0c1a407ee25316d9b210a0fc0fdf [file] [log] [blame]
yoonseon86bebed2017-02-03 15:23:57 -08001/*
2 * Copyright 2017-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.incubator.store.virtual.impl;
18
19import com.google.common.collect.Maps;
20import org.apache.felix.scr.annotations.Activate;
21import org.apache.felix.scr.annotations.Component;
22import org.apache.felix.scr.annotations.Reference;
23import org.apache.felix.scr.annotations.ReferenceCardinality;
24import org.apache.felix.scr.annotations.Service;
25import org.onosproject.incubator.net.virtual.NetworkId;
26import org.onosproject.incubator.net.virtual.VirtualNetworkFlowObjectiveStore;
27import org.onosproject.net.behaviour.DefaultNextGroup;
28import org.onosproject.net.behaviour.NextGroup;
29import org.onosproject.net.flowobjective.FlowObjectiveStoreDelegate;
30import org.onosproject.net.flowobjective.ObjectiveEvent;
31import org.onosproject.store.service.AtomicCounter;
32import org.onosproject.store.service.StorageService;
33import org.slf4j.Logger;
34
35import java.util.HashMap;
36import java.util.Map;
37import java.util.concurrent.BlockingQueue;
38import java.util.concurrent.ConcurrentMap;
39import java.util.concurrent.ExecutorService;
40import java.util.concurrent.Executors;
41import java.util.concurrent.LinkedBlockingQueue;
42
43import static org.onlab.util.Tools.groupedThreads;
44import static org.slf4j.LoggerFactory.getLogger;
45
46/**
47 * Single instance implementation of store to manage
48 * the inventory of created next groups for virtual network.
49 */
50@Component(immediate = true)
51@Service
52public class SimpleVirtualFlowObjectiveStore
53 extends AbstractVirtualStore<ObjectiveEvent, FlowObjectiveStoreDelegate>
54 implements VirtualNetworkFlowObjectiveStore {
55
56 private final Logger log = getLogger(getClass());
57
58 private ConcurrentMap<NetworkId, ConcurrentMap<Integer, byte[]>> nextGroupsMap;
59
60 private AtomicCounter nextIds;
61
62 // event queue to separate map-listener threads from event-handler threads (tpool)
63 private BlockingQueue<VirtualObjectiveEvent> eventQ;
64 private ExecutorService tpool;
65
66 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
67 protected StorageService storageService;
68
69 @Activate
70 public void activate() {
71 tpool = Executors.newFixedThreadPool(4, groupedThreads("onos/virtual/flobj-notifier", "%d", log));
72 eventQ = new LinkedBlockingQueue<>();
73 tpool.execute(new FlowObjectiveNotifier());
74
Claudine Chiucdc4b8c2017-03-30 00:34:39 -040075 initNextGroupsMap();
yoonseon86bebed2017-02-03 15:23:57 -080076
77 nextIds = storageService.getAtomicCounter("next-objective-counter");
78 log.info("Started");
79 }
80
Claudine Chiucdc4b8c2017-03-30 00:34:39 -040081 public void deactivate() {
82 log.info("Stopped");
83 }
84
85 protected void initNextGroupsMap() {
86 nextGroupsMap = Maps.newConcurrentMap();
87 }
88
89 protected void updateNextGroupsMap(NetworkId networkId,
90 ConcurrentMap<Integer, byte[]> nextGroups) {
91 }
92
93 protected ConcurrentMap<Integer, byte[]> getNextGroups(NetworkId networkId) {
yoonseon86bebed2017-02-03 15:23:57 -080094 nextGroupsMap.computeIfAbsent(networkId, n -> Maps.newConcurrentMap());
95 return nextGroupsMap.get(networkId);
96 }
97
98 @Override
99 public void putNextGroup(NetworkId networkId, Integer nextId, NextGroup group) {
100 ConcurrentMap<Integer, byte[]> nextGroups = getNextGroups(networkId);
101 nextGroups.put(nextId, group.data());
Claudine Chiucdc4b8c2017-03-30 00:34:39 -0400102 updateNextGroupsMap(networkId, nextGroups);
yoonseon86bebed2017-02-03 15:23:57 -0800103
104 eventQ.add(new VirtualObjectiveEvent(networkId, ObjectiveEvent.Type.ADD, nextId));
105 }
106
107 @Override
108 public NextGroup getNextGroup(NetworkId networkId, Integer nextId) {
109 ConcurrentMap<Integer, byte[]> nextGroups = getNextGroups(networkId);
Claudine Chiucdc4b8c2017-03-30 00:34:39 -0400110 byte[] groupData = nextGroups.get(nextId);
111 if (groupData != null) {
112 return new DefaultNextGroup(groupData);
113 }
114 return null;
yoonseon86bebed2017-02-03 15:23:57 -0800115 }
116
117 @Override
118 public NextGroup removeNextGroup(NetworkId networkId, Integer nextId) {
119 ConcurrentMap<Integer, byte[]> nextGroups = getNextGroups(networkId);
Claudine Chiucdc4b8c2017-03-30 00:34:39 -0400120 byte[] nextGroup = nextGroups.remove(nextId);
121 updateNextGroupsMap(networkId, nextGroups);
122
yoonseon86bebed2017-02-03 15:23:57 -0800123 eventQ.add(new VirtualObjectiveEvent(networkId, ObjectiveEvent.Type.REMOVE, nextId));
Claudine Chiucdc4b8c2017-03-30 00:34:39 -0400124
125 return new DefaultNextGroup(nextGroup);
yoonseon86bebed2017-02-03 15:23:57 -0800126 }
127
128 @Override
129 public Map<Integer, NextGroup> getAllGroups(NetworkId networkId) {
130 ConcurrentMap<Integer, byte[]> nextGroups = getNextGroups(networkId);
131
132 Map<Integer, NextGroup> nextGroupMappings = new HashMap<>();
133 for (int key : nextGroups.keySet()) {
134 NextGroup nextGroup = getNextGroup(networkId, key);
135 if (nextGroup != null) {
136 nextGroupMappings.put(key, nextGroup);
137 }
138 }
139 return nextGroupMappings;
140 }
141
142 @Override
143 public int allocateNextId(NetworkId networkId) {
144 return (int) nextIds.incrementAndGet();
145 }
146
147 private class FlowObjectiveNotifier implements Runnable {
148 @Override
149 public void run() {
150 try {
151 while (!Thread.currentThread().isInterrupted()) {
152 VirtualObjectiveEvent vEvent = eventQ.take();
153 notifyDelegate(vEvent.networkId(), vEvent);
154 }
155 } catch (InterruptedException ex) {
156 Thread.currentThread().interrupt();
157 }
158 }
159 }
160
161 private class VirtualObjectiveEvent extends ObjectiveEvent {
162 NetworkId networkId;
163
164 public VirtualObjectiveEvent(NetworkId networkId, Type type,
165 Integer objective) {
166 super(type, objective);
167 this.networkId = networkId;
168 }
169
170 NetworkId networkId() {
171 return networkId;
172 }
173 }
174}