blob: edbb979ce5e1735a1e693074692f3a303a856556 [file] [log] [blame]
alshabib2a441c62015-04-13 18:39:38 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
alshabib2a441c62015-04-13 18:39:38 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.flowobjective.impl;
17
alshabib2a441c62015-04-13 18:39:38 -070018import org.onlab.util.KryoNamespace;
19import org.onosproject.net.behaviour.DefaultNextGroup;
20import org.onosproject.net.behaviour.NextGroup;
21import org.onosproject.net.flowobjective.FlowObjectiveStore;
22import org.onosproject.net.flowobjective.FlowObjectiveStoreDelegate;
23import org.onosproject.net.flowobjective.ObjectiveEvent;
24import org.onosproject.store.AbstractStore;
Jordan Halterman5597ac82017-05-23 14:30:00 -070025import org.onosproject.store.service.AtomicIdGenerator;
alshabib2a441c62015-04-13 18:39:38 -070026import org.onosproject.store.service.ConsistentMap;
Saurav Das25190812016-05-27 13:54:07 -070027import org.onosproject.store.service.MapEvent;
28import org.onosproject.store.service.MapEventListener;
alshabib2a441c62015-04-13 18:39:38 -070029import org.onosproject.store.service.Serializer;
30import org.onosproject.store.service.StorageService;
31import org.onosproject.store.service.Versioned;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070032import org.osgi.service.component.annotations.Activate;
33import org.osgi.service.component.annotations.Component;
34import org.osgi.service.component.annotations.Deactivate;
35import org.osgi.service.component.annotations.Reference;
36import org.osgi.service.component.annotations.ReferenceCardinality;
alshabib2a441c62015-04-13 18:39:38 -070037import org.slf4j.Logger;
38
Saurav Das24431192016-03-07 19:13:00 -080039import java.util.HashMap;
40import java.util.Map;
Saurav Das25190812016-05-27 13:54:07 -070041import java.util.concurrent.BlockingQueue;
42import java.util.concurrent.ExecutorService;
43import java.util.concurrent.Executors;
44import java.util.concurrent.LinkedBlockingQueue;
Saurav Das24431192016-03-07 19:13:00 -080045
Ray Milkeyd84f89b2018-08-17 14:54:17 -070046import static org.onlab.util.Tools.groupedThreads;
47import static org.slf4j.LoggerFactory.getLogger;
48
alshabib2a441c62015-04-13 18:39:38 -070049/**
50 * Manages the inventory of created next groups.
51 */
Ray Milkeyd84f89b2018-08-17 14:54:17 -070052@Component(immediate = true, service = FlowObjectiveStore.class)
alshabib2a441c62015-04-13 18:39:38 -070053public class DistributedFlowObjectiveStore
54 extends AbstractStore<ObjectiveEvent, FlowObjectiveStoreDelegate>
55 implements FlowObjectiveStore {
56
57 private final Logger log = getLogger(getClass());
58
59 private ConsistentMap<Integer, byte[]> nextGroups;
60
Ray Milkeyd84f89b2018-08-17 14:54:17 -070061 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabib2a441c62015-04-13 18:39:38 -070062 protected StorageService storageService;
63
Jordan Halterman5597ac82017-05-23 14:30:00 -070064 private AtomicIdGenerator nextIds;
Saurav Das25190812016-05-27 13:54:07 -070065 private MapEventListener<Integer, byte[]> mapListener = new NextGroupListener();
66 // event queue to separate map-listener threads from event-handler threads (tpool)
67 private BlockingQueue<ObjectiveEvent> eventQ;
68 private ExecutorService tpool;
alshabibf6ea9e62015-04-21 17:08:26 -070069
alshabib2a441c62015-04-13 18:39:38 -070070 @Activate
71 public void activate() {
Saurav Das25190812016-05-27 13:54:07 -070072 tpool = Executors.newFixedThreadPool(4, groupedThreads("onos/flobj-notifier", "%d", log));
73 eventQ = new LinkedBlockingQueue<ObjectiveEvent>();
74 tpool.execute(new FlowObjectiveNotifier());
alshabib2a441c62015-04-13 18:39:38 -070075 nextGroups = storageService.<Integer, byte[]>consistentMapBuilder()
Jon Halldbe4c532016-10-04 11:45:52 -070076 .withName("onos-flowobjective-groups")
alshabib2a441c62015-04-13 18:39:38 -070077 .withSerializer(Serializer.using(
78 new KryoNamespace.Builder()
79 .register(byte[].class)
Srikanth Vavilapallie48b3cf2015-07-06 11:43:07 -070080 .register(Versioned.class)
Charles Chaneefdedf2016-05-23 16:45:45 -070081 .build("DistributedFlowObjectiveStore")))
alshabib2a441c62015-04-13 18:39:38 -070082 .build();
Saurav Das25190812016-05-27 13:54:07 -070083 nextGroups.addListener(mapListener);
Jordan Halterman5597ac82017-05-23 14:30:00 -070084 nextIds = storageService.getAtomicIdGenerator("next-objective-id-generator");
alshabib2a441c62015-04-13 18:39:38 -070085 log.info("Started");
86 }
87
88
89 @Deactivate
90 public void deactivate() {
Frank Wange0eb5ce2016-07-01 18:21:25 +080091 nextGroups.removeListener(mapListener);
Saurav Das25190812016-05-27 13:54:07 -070092 tpool.shutdown();
alshabib2a441c62015-04-13 18:39:38 -070093 log.info("Stopped");
94 }
95
alshabib2a441c62015-04-13 18:39:38 -070096 @Override
97 public void putNextGroup(Integer nextId, NextGroup group) {
Saurav Das423fe2b2015-12-04 10:52:59 -080098 nextGroups.put(nextId, group.data());
alshabib2a441c62015-04-13 18:39:38 -070099 }
100
101 @Override
102 public NextGroup getNextGroup(Integer nextId) {
103 Versioned<byte[]> versionGroup = nextGroups.get(nextId);
104 if (versionGroup != null) {
105 return new DefaultNextGroup(versionGroup.value());
106 }
107 return null;
108 }
alshabibf6ea9e62015-04-21 17:08:26 -0700109
110 @Override
Saurav Das423fe2b2015-12-04 10:52:59 -0800111 public NextGroup removeNextGroup(Integer nextId) {
112 Versioned<byte[]> versionGroup = nextGroups.remove(nextId);
113 if (versionGroup != null) {
Saurav Das423fe2b2015-12-04 10:52:59 -0800114 return new DefaultNextGroup(versionGroup.value());
115 }
116 return null;
117 }
118
119 @Override
Saurav Das24431192016-03-07 19:13:00 -0800120 public Map<Integer, NextGroup> getAllGroups() {
121 Map<Integer, NextGroup> nextGroupMappings = new HashMap<>();
122 for (int key : nextGroups.keySet()) {
123 NextGroup nextGroup = getNextGroup(key);
124 if (nextGroup != null) {
125 nextGroupMappings.put(key, nextGroup);
126 }
127 }
128 return nextGroupMappings;
129 }
130
131 @Override
alshabibf6ea9e62015-04-21 17:08:26 -0700132 public int allocateNextId() {
Jordan Halterman5597ac82017-05-23 14:30:00 -0700133 return (int) nextIds.nextId();
alshabibf6ea9e62015-04-21 17:08:26 -0700134 }
Saurav Das25190812016-05-27 13:54:07 -0700135
136 private class FlowObjectiveNotifier implements Runnable {
137 @Override
138 public void run() {
139 try {
140 while (!Thread.currentThread().isInterrupted()) {
141 notifyDelegate(eventQ.take());
142 }
143 } catch (InterruptedException ex) {
144 Thread.currentThread().interrupt();
145 }
146 }
147 }
148
149 private class NextGroupListener implements MapEventListener<Integer, byte[]> {
150 @Override
151 public void event(MapEvent<Integer, byte[]> event) {
152 switch (event.type()) {
153 case INSERT:
154 eventQ.add(new ObjectiveEvent(ObjectiveEvent.Type.ADD, event.key()));
155 break;
156 case REMOVE:
157 eventQ.add(new ObjectiveEvent(ObjectiveEvent.Type.REMOVE, event.key()));
158 break;
159 case UPDATE:
160 // TODO Introduce UPDATE ObjectiveEvent when the map is being updated
161 break;
162 default:
163 break;
164 }
165 }
166 }
167
alshabib2a441c62015-04-13 18:39:38 -0700168}