blob: 1ac5396db8305238c89d2268186c4d586b5393ad [file] [log] [blame]
yoonseon86bebed2017-02-03 15:23:57 -08001/*
2 * Copyright 2017-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.incubator.store.virtual.impl;
18
19import com.google.common.collect.Maps;
20import org.apache.felix.scr.annotations.Activate;
21import org.apache.felix.scr.annotations.Component;
22import org.apache.felix.scr.annotations.Reference;
23import org.apache.felix.scr.annotations.ReferenceCardinality;
24import org.apache.felix.scr.annotations.Service;
25import org.onosproject.incubator.net.virtual.NetworkId;
26import org.onosproject.incubator.net.virtual.VirtualNetworkFlowObjectiveStore;
27import org.onosproject.net.behaviour.DefaultNextGroup;
28import org.onosproject.net.behaviour.NextGroup;
29import org.onosproject.net.flowobjective.FlowObjectiveStoreDelegate;
30import org.onosproject.net.flowobjective.ObjectiveEvent;
31import org.onosproject.store.service.AtomicCounter;
32import org.onosproject.store.service.StorageService;
33import org.slf4j.Logger;
34
35import java.util.HashMap;
36import java.util.Map;
37import java.util.concurrent.BlockingQueue;
38import java.util.concurrent.ConcurrentMap;
39import java.util.concurrent.ExecutorService;
40import java.util.concurrent.Executors;
41import java.util.concurrent.LinkedBlockingQueue;
42
43import static org.onlab.util.Tools.groupedThreads;
44import static org.slf4j.LoggerFactory.getLogger;
45
46/**
47 * Single instance implementation of store to manage
48 * the inventory of created next groups for virtual network.
49 */
50@Component(immediate = true)
51@Service
52public class SimpleVirtualFlowObjectiveStore
53 extends AbstractVirtualStore<ObjectiveEvent, FlowObjectiveStoreDelegate>
54 implements VirtualNetworkFlowObjectiveStore {
55
56 private final Logger log = getLogger(getClass());
57
58 private ConcurrentMap<NetworkId, ConcurrentMap<Integer, byte[]>> nextGroupsMap;
59
60 private AtomicCounter nextIds;
61
62 // event queue to separate map-listener threads from event-handler threads (tpool)
63 private BlockingQueue<VirtualObjectiveEvent> eventQ;
64 private ExecutorService tpool;
65
66 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
67 protected StorageService storageService;
68
69 @Activate
70 public void activate() {
71 tpool = Executors.newFixedThreadPool(4, groupedThreads("onos/virtual/flobj-notifier", "%d", log));
72 eventQ = new LinkedBlockingQueue<>();
73 tpool.execute(new FlowObjectiveNotifier());
74
75 nextGroupsMap = Maps.newConcurrentMap();
76
77 nextIds = storageService.getAtomicCounter("next-objective-counter");
78 log.info("Started");
79 }
80
81 private ConcurrentMap<Integer, byte[]> getNextGroups(NetworkId networkId) {
82 nextGroupsMap.computeIfAbsent(networkId, n -> Maps.newConcurrentMap());
83 return nextGroupsMap.get(networkId);
84 }
85
86 @Override
87 public void putNextGroup(NetworkId networkId, Integer nextId, NextGroup group) {
88 ConcurrentMap<Integer, byte[]> nextGroups = getNextGroups(networkId);
89 nextGroups.put(nextId, group.data());
90
91 eventQ.add(new VirtualObjectiveEvent(networkId, ObjectiveEvent.Type.ADD, nextId));
92 }
93
94 @Override
95 public NextGroup getNextGroup(NetworkId networkId, Integer nextId) {
96 ConcurrentMap<Integer, byte[]> nextGroups = getNextGroups(networkId);
97 return new DefaultNextGroup(nextGroups.get(nextId));
98 }
99
100 @Override
101 public NextGroup removeNextGroup(NetworkId networkId, Integer nextId) {
102 ConcurrentMap<Integer, byte[]> nextGroups = getNextGroups(networkId);
103 eventQ.add(new VirtualObjectiveEvent(networkId, ObjectiveEvent.Type.REMOVE, nextId));
104 return new DefaultNextGroup(nextGroups.remove(nextId));
105 }
106
107 @Override
108 public Map<Integer, NextGroup> getAllGroups(NetworkId networkId) {
109 ConcurrentMap<Integer, byte[]> nextGroups = getNextGroups(networkId);
110
111 Map<Integer, NextGroup> nextGroupMappings = new HashMap<>();
112 for (int key : nextGroups.keySet()) {
113 NextGroup nextGroup = getNextGroup(networkId, key);
114 if (nextGroup != null) {
115 nextGroupMappings.put(key, nextGroup);
116 }
117 }
118 return nextGroupMappings;
119 }
120
121 @Override
122 public int allocateNextId(NetworkId networkId) {
123 return (int) nextIds.incrementAndGet();
124 }
125
126 private class FlowObjectiveNotifier implements Runnable {
127 @Override
128 public void run() {
129 try {
130 while (!Thread.currentThread().isInterrupted()) {
131 VirtualObjectiveEvent vEvent = eventQ.take();
132 notifyDelegate(vEvent.networkId(), vEvent);
133 }
134 } catch (InterruptedException ex) {
135 Thread.currentThread().interrupt();
136 }
137 }
138 }
139
140 private class VirtualObjectiveEvent extends ObjectiveEvent {
141 NetworkId networkId;
142
143 public VirtualObjectiveEvent(NetworkId networkId, Type type,
144 Integer objective) {
145 super(type, objective);
146 this.networkId = networkId;
147 }
148
149 NetworkId networkId() {
150 return networkId;
151 }
152 }
153}