blob: 47596e3323834faf59e0d6b37d43f285e9a94cff [file] [log] [blame]
alshabib2a441c62015-04-13 18:39:38 -07001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2015-present Open Networking Laboratory
alshabib2a441c62015-04-13 18:39:38 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.flowobjective.impl;
17
18import org.apache.felix.scr.annotations.Activate;
19import org.apache.felix.scr.annotations.Component;
20import org.apache.felix.scr.annotations.Deactivate;
21import org.apache.felix.scr.annotations.Reference;
22import org.apache.felix.scr.annotations.ReferenceCardinality;
23import org.apache.felix.scr.annotations.Service;
24import org.onlab.util.KryoNamespace;
25import org.onosproject.net.behaviour.DefaultNextGroup;
26import org.onosproject.net.behaviour.NextGroup;
27import org.onosproject.net.flowobjective.FlowObjectiveStore;
28import org.onosproject.net.flowobjective.FlowObjectiveStoreDelegate;
29import org.onosproject.net.flowobjective.ObjectiveEvent;
30import org.onosproject.store.AbstractStore;
alshabibf6ea9e62015-04-21 17:08:26 -070031import org.onosproject.store.service.AtomicCounter;
alshabib2a441c62015-04-13 18:39:38 -070032import org.onosproject.store.service.ConsistentMap;
Saurav Das25190812016-05-27 13:54:07 -070033import org.onosproject.store.service.MapEvent;
34import org.onosproject.store.service.MapEventListener;
alshabib2a441c62015-04-13 18:39:38 -070035import org.onosproject.store.service.Serializer;
36import org.onosproject.store.service.StorageService;
37import org.onosproject.store.service.Versioned;
38import org.slf4j.Logger;
39
Saurav Das25190812016-05-27 13:54:07 -070040import static org.onlab.util.Tools.groupedThreads;
alshabib2a441c62015-04-13 18:39:38 -070041import static org.slf4j.LoggerFactory.getLogger;
42
Saurav Das24431192016-03-07 19:13:00 -080043import java.util.HashMap;
44import java.util.Map;
Saurav Das25190812016-05-27 13:54:07 -070045import java.util.concurrent.BlockingQueue;
46import java.util.concurrent.ExecutorService;
47import java.util.concurrent.Executors;
48import java.util.concurrent.LinkedBlockingQueue;
Saurav Das24431192016-03-07 19:13:00 -080049
alshabib2a441c62015-04-13 18:39:38 -070050/**
51 * Manages the inventory of created next groups.
52 */
Sho SHIMIZU5c396e32016-08-12 15:19:12 -070053@Component(immediate = true)
alshabib2a441c62015-04-13 18:39:38 -070054@Service
55public class DistributedFlowObjectiveStore
56 extends AbstractStore<ObjectiveEvent, FlowObjectiveStoreDelegate>
57 implements FlowObjectiveStore {
58
59 private final Logger log = getLogger(getClass());
60
61 private ConsistentMap<Integer, byte[]> nextGroups;
62
63 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
64 protected StorageService storageService;
65
alshabibf6ea9e62015-04-21 17:08:26 -070066 private AtomicCounter nextIds;
Saurav Das25190812016-05-27 13:54:07 -070067 private MapEventListener<Integer, byte[]> mapListener = new NextGroupListener();
68 // event queue to separate map-listener threads from event-handler threads (tpool)
69 private BlockingQueue<ObjectiveEvent> eventQ;
70 private ExecutorService tpool;
alshabibf6ea9e62015-04-21 17:08:26 -070071
alshabib2a441c62015-04-13 18:39:38 -070072 @Activate
73 public void activate() {
Saurav Das25190812016-05-27 13:54:07 -070074 tpool = Executors.newFixedThreadPool(4, groupedThreads("onos/flobj-notifier", "%d", log));
75 eventQ = new LinkedBlockingQueue<ObjectiveEvent>();
76 tpool.execute(new FlowObjectiveNotifier());
alshabib2a441c62015-04-13 18:39:38 -070077 nextGroups = storageService.<Integer, byte[]>consistentMapBuilder()
78 .withName("flowobjective-groups")
79 .withSerializer(Serializer.using(
80 new KryoNamespace.Builder()
81 .register(byte[].class)
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -070082 .register(Versioned.class)
Charles Chaneefdedf2016-05-23 16:45:45 -070083 .build("DistributedFlowObjectiveStore")))
alshabib2a441c62015-04-13 18:39:38 -070084 .build();
Saurav Das25190812016-05-27 13:54:07 -070085 nextGroups.addListener(mapListener);
Madan Jampanid5714e02016-04-19 14:15:20 -070086 nextIds = storageService.getAtomicCounter("next-objective-counter");
alshabib2a441c62015-04-13 18:39:38 -070087 log.info("Started");
88 }
89
90
91 @Deactivate
92 public void deactivate() {
Frank Wange0eb5ce2016-07-01 18:21:25 +080093 nextGroups.removeListener(mapListener);
Saurav Das25190812016-05-27 13:54:07 -070094 tpool.shutdown();
alshabib2a441c62015-04-13 18:39:38 -070095 log.info("Stopped");
96 }
97
alshabib2a441c62015-04-13 18:39:38 -070098 @Override
99 public void putNextGroup(Integer nextId, NextGroup group) {
Saurav Das423fe2b2015-12-04 10:52:59 -0800100 nextGroups.put(nextId, group.data());
alshabib2a441c62015-04-13 18:39:38 -0700101 }
102
103 @Override
104 public NextGroup getNextGroup(Integer nextId) {
105 Versioned<byte[]> versionGroup = nextGroups.get(nextId);
106 if (versionGroup != null) {
107 return new DefaultNextGroup(versionGroup.value());
108 }
109 return null;
110 }
alshabibf6ea9e62015-04-21 17:08:26 -0700111
112 @Override
Saurav Das423fe2b2015-12-04 10:52:59 -0800113 public NextGroup removeNextGroup(Integer nextId) {
114 Versioned<byte[]> versionGroup = nextGroups.remove(nextId);
115 if (versionGroup != null) {
Saurav Das423fe2b2015-12-04 10:52:59 -0800116 return new DefaultNextGroup(versionGroup.value());
117 }
118 return null;
119 }
120
121 @Override
Saurav Das24431192016-03-07 19:13:00 -0800122 public Map<Integer, NextGroup> getAllGroups() {
123 Map<Integer, NextGroup> nextGroupMappings = new HashMap<>();
124 for (int key : nextGroups.keySet()) {
125 NextGroup nextGroup = getNextGroup(key);
126 if (nextGroup != null) {
127 nextGroupMappings.put(key, nextGroup);
128 }
129 }
130 return nextGroupMappings;
131 }
132
133 @Override
alshabibf6ea9e62015-04-21 17:08:26 -0700134 public int allocateNextId() {
135 return (int) nextIds.incrementAndGet();
136 }
Saurav Das25190812016-05-27 13:54:07 -0700137
138 private class FlowObjectiveNotifier implements Runnable {
139 @Override
140 public void run() {
141 try {
142 while (!Thread.currentThread().isInterrupted()) {
143 notifyDelegate(eventQ.take());
144 }
145 } catch (InterruptedException ex) {
146 Thread.currentThread().interrupt();
147 }
148 }
149 }
150
151 private class NextGroupListener implements MapEventListener<Integer, byte[]> {
152 @Override
153 public void event(MapEvent<Integer, byte[]> event) {
154 switch (event.type()) {
155 case INSERT:
156 eventQ.add(new ObjectiveEvent(ObjectiveEvent.Type.ADD, event.key()));
157 break;
158 case REMOVE:
159 eventQ.add(new ObjectiveEvent(ObjectiveEvent.Type.REMOVE, event.key()));
160 break;
161 case UPDATE:
162 // TODO Introduce UPDATE ObjectiveEvent when the map is being updated
163 break;
164 default:
165 break;
166 }
167 }
168 }
169
alshabib2a441c62015-04-13 18:39:38 -0700170}