blob: 263e93a728eb56e1e4f510cf9895c699994f98f7 [file] [log] [blame]
alshabib2a441c62015-04-13 18:39:38 -07001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2015-present Open Networking Laboratory
alshabib2a441c62015-04-13 18:39:38 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.flowobjective.impl;
17
18import org.apache.felix.scr.annotations.Activate;
19import org.apache.felix.scr.annotations.Component;
20import org.apache.felix.scr.annotations.Deactivate;
21import org.apache.felix.scr.annotations.Reference;
22import org.apache.felix.scr.annotations.ReferenceCardinality;
23import org.apache.felix.scr.annotations.Service;
24import org.onlab.util.KryoNamespace;
25import org.onosproject.net.behaviour.DefaultNextGroup;
26import org.onosproject.net.behaviour.NextGroup;
27import org.onosproject.net.flowobjective.FlowObjectiveStore;
28import org.onosproject.net.flowobjective.FlowObjectiveStoreDelegate;
29import org.onosproject.net.flowobjective.ObjectiveEvent;
30import org.onosproject.store.AbstractStore;
alshabibf6ea9e62015-04-21 17:08:26 -070031import org.onosproject.store.service.AtomicCounter;
alshabib2a441c62015-04-13 18:39:38 -070032import org.onosproject.store.service.ConsistentMap;
Saurav Das25190812016-05-27 13:54:07 -070033import org.onosproject.store.service.MapEvent;
34import org.onosproject.store.service.MapEventListener;
alshabib2a441c62015-04-13 18:39:38 -070035import org.onosproject.store.service.Serializer;
36import org.onosproject.store.service.StorageService;
37import org.onosproject.store.service.Versioned;
38import org.slf4j.Logger;
39
Saurav Das25190812016-05-27 13:54:07 -070040import static org.onlab.util.Tools.groupedThreads;
alshabib2a441c62015-04-13 18:39:38 -070041import static org.slf4j.LoggerFactory.getLogger;
42
Saurav Das24431192016-03-07 19:13:00 -080043import java.util.HashMap;
44import java.util.Map;
Saurav Das25190812016-05-27 13:54:07 -070045import java.util.concurrent.BlockingQueue;
46import java.util.concurrent.ExecutorService;
47import java.util.concurrent.Executors;
48import java.util.concurrent.LinkedBlockingQueue;
Saurav Das24431192016-03-07 19:13:00 -080049
alshabib2a441c62015-04-13 18:39:38 -070050/**
51 * Manages the inventory of created next groups.
52 */
53@Component(immediate = true, enabled = true)
54@Service
55public class DistributedFlowObjectiveStore
56 extends AbstractStore<ObjectiveEvent, FlowObjectiveStoreDelegate>
57 implements FlowObjectiveStore {
58
59 private final Logger log = getLogger(getClass());
60
61 private ConsistentMap<Integer, byte[]> nextGroups;
62
63 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
64 protected StorageService storageService;
65
alshabibf6ea9e62015-04-21 17:08:26 -070066 private AtomicCounter nextIds;
Saurav Das25190812016-05-27 13:54:07 -070067 private MapEventListener<Integer, byte[]> mapListener = new NextGroupListener();
68 // event queue to separate map-listener threads from event-handler threads (tpool)
69 private BlockingQueue<ObjectiveEvent> eventQ;
70 private ExecutorService tpool;
alshabibf6ea9e62015-04-21 17:08:26 -070071
alshabib2a441c62015-04-13 18:39:38 -070072 @Activate
73 public void activate() {
Saurav Das25190812016-05-27 13:54:07 -070074 tpool = Executors.newFixedThreadPool(4, groupedThreads("onos/flobj-notifier", "%d", log));
75 eventQ = new LinkedBlockingQueue<ObjectiveEvent>();
76 tpool.execute(new FlowObjectiveNotifier());
alshabib2a441c62015-04-13 18:39:38 -070077 nextGroups = storageService.<Integer, byte[]>consistentMapBuilder()
78 .withName("flowobjective-groups")
79 .withSerializer(Serializer.using(
80 new KryoNamespace.Builder()
81 .register(byte[].class)
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -070082 .register(Versioned.class)
Charles Chaneefdedf2016-05-23 16:45:45 -070083 .build("DistributedFlowObjectiveStore")))
alshabib2a441c62015-04-13 18:39:38 -070084 .build();
Saurav Das25190812016-05-27 13:54:07 -070085 nextGroups.addListener(mapListener);
Madan Jampanid5714e02016-04-19 14:15:20 -070086 nextIds = storageService.getAtomicCounter("next-objective-counter");
alshabib2a441c62015-04-13 18:39:38 -070087 log.info("Started");
88 }
89
90
91 @Deactivate
92 public void deactivate() {
Saurav Das25190812016-05-27 13:54:07 -070093 tpool.shutdown();
alshabib2a441c62015-04-13 18:39:38 -070094 log.info("Stopped");
95 }
96
alshabib2a441c62015-04-13 18:39:38 -070097 @Override
98 public void putNextGroup(Integer nextId, NextGroup group) {
Saurav Das423fe2b2015-12-04 10:52:59 -080099 nextGroups.put(nextId, group.data());
alshabib2a441c62015-04-13 18:39:38 -0700100 }
101
102 @Override
103 public NextGroup getNextGroup(Integer nextId) {
104 Versioned<byte[]> versionGroup = nextGroups.get(nextId);
105 if (versionGroup != null) {
106 return new DefaultNextGroup(versionGroup.value());
107 }
108 return null;
109 }
alshabibf6ea9e62015-04-21 17:08:26 -0700110
111 @Override
Saurav Das423fe2b2015-12-04 10:52:59 -0800112 public NextGroup removeNextGroup(Integer nextId) {
113 Versioned<byte[]> versionGroup = nextGroups.remove(nextId);
114 if (versionGroup != null) {
Saurav Das423fe2b2015-12-04 10:52:59 -0800115 return new DefaultNextGroup(versionGroup.value());
116 }
117 return null;
118 }
119
120 @Override
Saurav Das24431192016-03-07 19:13:00 -0800121 public Map<Integer, NextGroup> getAllGroups() {
122 Map<Integer, NextGroup> nextGroupMappings = new HashMap<>();
123 for (int key : nextGroups.keySet()) {
124 NextGroup nextGroup = getNextGroup(key);
125 if (nextGroup != null) {
126 nextGroupMappings.put(key, nextGroup);
127 }
128 }
129 return nextGroupMappings;
130 }
131
132 @Override
alshabibf6ea9e62015-04-21 17:08:26 -0700133 public int allocateNextId() {
134 return (int) nextIds.incrementAndGet();
135 }
Saurav Das25190812016-05-27 13:54:07 -0700136
137 private class FlowObjectiveNotifier implements Runnable {
138 @Override
139 public void run() {
140 try {
141 while (!Thread.currentThread().isInterrupted()) {
142 notifyDelegate(eventQ.take());
143 }
144 } catch (InterruptedException ex) {
145 Thread.currentThread().interrupt();
146 }
147 }
148 }
149
150 private class NextGroupListener implements MapEventListener<Integer, byte[]> {
151 @Override
152 public void event(MapEvent<Integer, byte[]> event) {
153 switch (event.type()) {
154 case INSERT:
155 eventQ.add(new ObjectiveEvent(ObjectiveEvent.Type.ADD, event.key()));
156 break;
157 case REMOVE:
158 eventQ.add(new ObjectiveEvent(ObjectiveEvent.Type.REMOVE, event.key()));
159 break;
160 case UPDATE:
161 // TODO Introduce UPDATE ObjectiveEvent when the map is being updated
162 break;
163 default:
164 break;
165 }
166 }
167 }
168
alshabib2a441c62015-04-13 18:39:38 -0700169}