blob: 6c3c1789cba2b727ad1f205cda29ad21d5c1262e [file] [log] [blame]
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -08001package net.floodlightcontroller.mastership;
2
Jonathan Hartbd181b62013-02-17 16:05:38 -08003import java.io.IOException;
4import java.net.UnknownHostException;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -08005import java.util.ArrayList;
6import java.util.Collection;
7import java.util.HashMap;
8import java.util.Map;
9
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -080010import net.floodlightcontroller.core.module.FloodlightModuleContext;
11import net.floodlightcontroller.core.module.FloodlightModuleException;
12import net.floodlightcontroller.core.module.IFloodlightModule;
13import net.floodlightcontroller.core.module.IFloodlightService;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -080014
Jonathan Hartbd181b62013-02-17 16:05:38 -080015import org.apache.zookeeper.WatchedEvent;
16import org.openflow.util.HexString;
17import org.slf4j.Logger;
18import org.slf4j.LoggerFactory;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -080019
Jonathan Hartbd181b62013-02-17 16:05:38 -080020import com.netflix.curator.RetryPolicy;
21import com.netflix.curator.framework.CuratorFramework;
22import com.netflix.curator.framework.CuratorFrameworkFactory;
23import com.netflix.curator.framework.api.CuratorWatcher;
24import com.netflix.curator.framework.recipes.leader.LeaderLatch;
25import com.netflix.curator.framework.recipes.leader.Participant;
26import com.netflix.curator.framework.state.ConnectionState;
27import com.netflix.curator.framework.state.ConnectionStateListener;
28import com.netflix.curator.retry.ExponentialBackoffRetry;
29
30public class MastershipManager implements IFloodlightModule, IMastershipService,
31 ConnectionStateListener {
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -080032 protected static Logger log = LoggerFactory.getLogger(MastershipManager.class);
33 protected String mastershipId;
34
Jonathan Hartbd181b62013-02-17 16:05:38 -080035 //TODO read this from configuration
36 protected String connectionString = "localhost:2181";
37 private final String namespace = "onos";
38 private final String switchLatchesPath = "/switchmastership";
39
40 protected CuratorFramework client;
41
42 protected Map<String, LeaderLatch> switchLatches;
43 protected Map<String, MastershipCallback> switchCallbacks;
44
45 //protected final String singleLatchPath = switchLatchesPath + "/00:00:00:00:00:00:00:01";
46 //protected LeaderLatch singleLatch;
47
48
49 protected class ParamaterizedCuratorWatcher implements CuratorWatcher {
50 private String dpid;
51 private String latchPath;
52
53 public ParamaterizedCuratorWatcher(String dpid, String latchPath){
54 this.dpid = dpid;
55 this.latchPath = latchPath;
56 }
57
58 @Override
59 public void process(WatchedEvent event) throws Exception {
60 log.debug("Watch Event: {}", event);
61
62 LeaderLatch latch = switchLatches.get(dpid);
63
64 Participant leader = latch.getLeader();
65
66 log.debug("Leader for {} changed. Now {}", dpid, leader.getId());
67
68 if (leader.getId().equals(mastershipId)){
69 switchCallbacks.get(dpid).changeCallback(HexString.toLong(dpid), true);
70 }
71
72 client.getChildren().usingWatcher(this).inBackground().forPath(latchPath);
73 //client.getChildren().usingWatcher(this).forPath(latchPath);
74 }
75 };
76
77 @Override
78 public void acquireMastership(long dpid, MastershipCallback cb) {
79 //TODO check if there's already a latch in the list
80 //TODO throw exception if unique ID not set
81 //TODO use unique ID
82 if (cb == null) {
83 //TODO throw exception?
84 }
85
86 String dpidStr = HexString.toHexString(dpid);
87 String latchPath = switchLatchesPath + "/" + dpidStr;
88
89 LeaderLatch latch = new LeaderLatch(client, latchPath, mastershipId);
90 switchLatches.put(dpidStr, latch);
91 //if (switchCallbacks.put(dpidStr, cb) != cb){
92 //NOTE instance equality intended
93 // log.debug("Throwing out old callback for switch {}: {}", dpidStr, cb);
94 //}
95 switchCallbacks.put(dpidStr, cb);
96
97 try {
98 //client.getChildren().usingWatcher(watcher).inBackground().forPath(singleLatchPath);
99 client.getChildren().usingWatcher(
100 new ParamaterizedCuratorWatcher(dpidStr, latchPath))
101 .inBackground().forPath(latchPath);
102 latch.start();
103 } catch (Exception e) {
104 log.debug("WATCHER ERROROROROR");
105 e.printStackTrace();
106 }
107
108 }
109
110 @Override
111 public void releaseMastership(long dpid) {
112 // TODO Auto-generated method stub
113 LeaderLatch latch = switchLatches.get(HexString.toHexString(dpid));
114 if (latch == null) {
115 return;
116 }
117
118 try {
119 latch.close();
120 } catch (IOException e) {
121
122 }
123 }
124
125 @Override
126 public boolean amMaster(long dpid) {
127 LeaderLatch latch = switchLatches.get(HexString.toHexString(dpid));
128
129 if (latch == null) {
130 log.warn("No leader latch for dpid {}", HexString.toHexString(dpid));
131 return false;
132 }
133
134 //return latch.hasLeadership();
135 try {
136 return latch.getLeader().getId().equals(mastershipId);
137 } catch (Exception e) {
138 //TODO swallow exception?
139 return false;
140 }
141 }
142
143 @Override
144 public void setMastershipId(String id) {
145 // TODO Synchronisation?
146 mastershipId = id;
147 }
148
149 @Override
150 public String getMastershipId() {
151 // TODO Synchronisation?
152 return mastershipId;
153 }
154
155 /*
156 * ConnectionStateListener
157 */
158
159 @Override
160 public void stateChanged(CuratorFramework client, ConnectionState state) {
161 // TODO Auto-generated method stub
162 log.debug("Connection state change: {}", state);
163
164 switch (state){
165 case SUSPENDED:
166 case LOST:
167 //If we're suspended or lost, we can no longer assume mastership
168 for (Map.Entry<String, MastershipCallback> entry : switchCallbacks.entrySet()){
169 entry.getValue().changeCallback(HexString.toLong(entry.getKey()), false);
170 }
171 break;
172 default:
173 //CONNECTED or RECONNECTED
174 //LeaderLatches should automatically try and re-gain the leadership
175 break;
176 }
177 }
178
179 /*
180 * IFloodlightModule
181 */
182
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800183 @Override
184 public Collection<Class<? extends IFloodlightService>> getModuleServices() {
185 Collection<Class<? extends IFloodlightService>> l = new ArrayList<Class<? extends IFloodlightService>>();
186 l.add(IMastershipService.class);
187 return l;
188 }
189
190 @Override
191 public Map<Class<? extends IFloodlightService>, IFloodlightService> getServiceImpls() {
192 Map<Class<? extends IFloodlightService>, IFloodlightService> m =
193 new HashMap<Class<? extends IFloodlightService>, IFloodlightService>();
194 m.put(IMastershipService.class, this);
195 return m;
196 }
197
198 @Override
199 public Collection<Class<? extends IFloodlightService>> getModuleDependencies() {
200 // no module dependencies
201 return null;
202 }
203
Jonathan Hartbd181b62013-02-17 16:05:38 -0800204 //be ready to serve mastership requests by startup?
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800205 @Override
206 public void init (FloodlightModuleContext context) throws FloodlightModuleException {
207 //TODO
Jonathan Hartbd181b62013-02-17 16:05:38 -0800208 try {
209 String localHostname = java.net.InetAddress.getLocalHost().getHostName();
210 mastershipId = localHostname;
211 log.debug("Setting mastership id to {}", mastershipId);
212 } catch (UnknownHostException e) {
213 // TODO Handle this exception
214 e.printStackTrace();
215 }
216
217 switchLatches = new HashMap<String, LeaderLatch>();
218 switchCallbacks = new HashMap<String, MastershipCallback>();
219
220 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
221 client = CuratorFrameworkFactory.newClient(connectionString, retryPolicy);
222
223 client.start();
224
225 client = client.usingNamespace(namespace);
226
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800227 return;
228 }
229
230 @Override
231 public void startUp (FloodlightModuleContext context) {
232 //TODO
233 return;
234 }
Jonathan Hartbd181b62013-02-17 16:05:38 -0800235
236 public static void main(String args[]){
237 FloodlightModuleContext fmc = new FloodlightModuleContext();
238 MastershipManager mm = new MastershipManager();
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800239
Jonathan Hartbd181b62013-02-17 16:05:38 -0800240 if (args.length < 1){
241 log.error("Need to supply ID");
242 System.exit(1);
243 }
244 String id = args[0];
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800245
Jonathan Hartbd181b62013-02-17 16:05:38 -0800246 try {
247 mm.init(fmc);
248 mm.startUp(fmc);
249
250 mm.setMastershipId(id);
251
252 mm.acquireMastership(1L,
253 new MastershipCallback(){
254 @Override
255 public void changeCallback(long dpid, boolean isMaster) {
256 if (isMaster){
257 log.debug("I'm master for {}", HexString.toHexString(dpid));
258 }
259 else {
260 log.debug("NOT master for {}", HexString.toHexString(dpid));
261 }
262 }
263 });
264
265 //"Server" loop
266 while (true) {
267 Thread.sleep(60000);
268 }
269
270 } catch (FloodlightModuleException e) {
271 // TODO Auto-generated catch block
272 e.printStackTrace();
273 } catch (InterruptedException e) {
274 // TODO Auto-generated catch block
275 e.printStackTrace();
276 }
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800277
Jonathan Hartbd181b62013-02-17 16:05:38 -0800278 log.debug("is master: {}", mm.amMaster(1L));
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800279 }
280}