Added cleanup thread to RCNetworkGraphPublisher

Change-Id: I5c663b146d68f8bb8634ecd4e74953026da79c13
diff --git a/src/main/java/net/onrc/onos/ofcontroller/networkgraph/NetworkGraphDatastore.java b/src/main/java/net/onrc/onos/ofcontroller/networkgraph/NetworkGraphDatastore.java
index 52a6ab7..a3069c3 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/networkgraph/NetworkGraphDatastore.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/networkgraph/NetworkGraphDatastore.java
@@ -5,6 +5,7 @@
 
 import net.onrc.onos.datastore.RCObject;
 import net.onrc.onos.datastore.RCObject.WriteOp;
+import net.onrc.onos.datastore.topology.RCLink;
 import net.onrc.onos.datastore.topology.RCSwitch;
 
 import org.slf4j.Logger;
@@ -61,21 +62,17 @@
 //			groupOp.add(WriteOp.ForceCreate(rcPort));
 //		}
 
-		boolean failed = RCObject.multiWrite( groupOp );
+		boolean failed = RCObject.multiWrite(groupOp);
 
-		if ( failed ) {
+		if (failed) {
 		    log.error("Adding Switch {} and its ports failed.", sw.getDpid());
-		    for ( WriteOp op : groupOp ) {
+		    for (WriteOp op : groupOp) {
 			log.debug("Operation:{} for {} - Result:{}", op.getOp(), op.getObject(), op.getStatus() );
 
 			// If we changed the operation from ForceCreate to
 			// Conditional operation (Create/Update) then we should retry here.
 		    }
 		}
-		else {
-			// Publish event to the in-memory cache
-//			graph.addSwitch(sw);
-		}
 
 	}
 
@@ -163,80 +160,81 @@
 //		}
 	}
 
-	public void addLink(LinkEvent link) {
-		log.debug("Adding link {}", link);
-//		RCLink rcLink = new RCLink(link.getSourceSwitchDpid(), (long)link.getSourcePortNumber(),
-//				link.getDestinationSwitchDpid(), (long)link.getDestinationPortNumber());
-//
-//		RCPort rcSrcPort = new RCPort(link.getSourceSwitchDpid(), (long)link.getSourcePortNumber());
-//		RCPort rcDstPort = new RCPort(link.getDestinationSwitchDpid(), (long)link.getDestinationPortNumber());
-//
-//		for (int i = 0; i < NUM_RETRIES; i++) {
-//			try {
-//				rcSrcPort.read();
-//				rcDstPort.read();
-//				rcLink.create();
-//			} catch (ObjectDoesntExistException e) {
-//				// port doesn't exist
-//				log.error("Add link failed {}", link, e);
-//				return;
-//			} catch (ObjectExistsException e) {
-//				log.debug("Link already exists {}", link);
-//				return;
-//			}
-//
-//			rcSrcPort.addLinkId(rcLink.getId());
-//			rcDstPort.addLinkId(rcLink.getId());
-//
-//			rcLink.setStatus(RCLink.STATUS.ACTIVE);
-//
-//			try {
-//				rcLink.update();
-//				rcSrcPort.update();
-//				rcDstPort.update();
-//				break;
-//			} catch (ObjectDoesntExistException | WrongVersionException e) {
-//				log.debug(" ", e);
-//				// retry
-//			}
-//		}
-//
-//		// Publish event to in-memory cache
-//		graph.putLink(link);
+	public void addLink(LinkEvent linkEvent) {
+		log.debug("Adding link {}", linkEvent);
+		
+		RCLink rcLink = new RCLink(linkEvent.getSrc().getDpid(), linkEvent.getSrc().getNumber(),
+				linkEvent.getDst().getDpid(), linkEvent.getDst().getNumber());
+
+		//RCPort rcSrcPort = new RCPort(link.getSrc().getDpid(), link.getSrc().getNumber());
+		//RCPort rcDstPort = new RCPort(link.getDst().getDpid(), link.getDst().getNumber());
+
+		for (int i = 0; i < NUM_RETRIES; i++) {
+			try {
+				//rcSrcPort.read();
+				//rcDstPort.read();
+				rcLink.create();
+			//} catch (ObjectDoesntExistException e) {
+				// port doesn't exist
+				//log.error("Add link failed {}", link, e);
+				//return;
+			} catch (ObjectExistsException e) {
+				log.debug("Link already exists {}", linkEvent);
+				return;
+			}
+
+			//rcSrcPort.addLinkId(rcLink.getId());
+			//rcDstPort.addLinkId(rcLink.getId());
+
+			rcLink.setStatus(RCLink.STATUS.ACTIVE);
+
+			try {
+				rcLink.update();
+				//rcSrcPort.update();
+				//rcDstPort.update();
+				break;
+			} catch (ObjectDoesntExistException | WrongVersionException e) {
+				log.debug(" ", e);
+				// retry
+			}
+		}
+
+		// Publish event to in-memory cache
+		graph.putLink(linkEvent);
 	}
 
-	public void removeLink(LinkEvent link) {
-		log.debug("Removing link {}", link);
-//		RCLink rcLink = new RCLink(link.getSourceSwitchDpid(), (long)link.getSourcePortNumber(),
-//				link.getDestinationSwitchDpid(), (long)link.getDestinationPortNumber());
-//
-//		RCPort rcSrcPort = new RCPort(link.getSourceSwitchDpid(), (long)link.getSourcePortNumber());
-//		RCPort rcDstPort = new RCPort(link.getDestinationSwitchDpid(), (long)link.getDestinationPortNumber());
-//
-//		for (int i = 0; i < NUM_RETRIES; i++) {
-//			try {
-//				rcSrcPort.read();
-//				rcDstPort.read();
-//				rcLink.read();
-//			} catch (ObjectDoesntExistException e) {
-//				log.error("Remove link failed {}", link, e);
-//				return;
-//			}
-//
-//			rcSrcPort.removeLinkId(rcLink.getId());
-//			rcDstPort.removeLinkId(rcLink.getId());
-//
-//			try {
-//				rcSrcPort.update();
-//				rcDstPort.update();
-//				rcLink.delete();
-//			} catch (ObjectDoesntExistException e) {
-//				log.error("Remove link failed {}", link, e);
-//				return;
-//			} catch (WrongVersionException e) {
-//				// retry
-//			}
-//		}
+	public void removeLink(LinkEvent linkEvent) {
+		log.debug("Removing link {}", linkEvent);
+		RCLink rcLink = new RCLink(linkEvent.getSrc().getDpid(), linkEvent.getSrc().getNumber(),
+				linkEvent.getDst().getDpid(), linkEvent.getDst().getNumber());
+
+		//RCPort rcSrcPort = new RCPort(link.getSourceSwitchDpid(), (long)link.getSourcePortNumber());
+		//RCPort rcDstPort = new RCPort(link.getDestinationSwitchDpid(), (long)link.getDestinationPortNumber());
+
+		for (int i = 0; i < NUM_RETRIES; i++) {
+			try {
+				//rcSrcPort.read();
+				//rcDstPort.read();
+				rcLink.read();
+			} catch (ObjectDoesntExistException e) {
+				log.error("Remove link failed {}", linkEvent, e);
+				return;
+			}
+
+			//rcSrcPort.removeLinkId(rcLink.getId());
+			//rcDstPort.removeLinkId(rcLink.getId());
+
+			try {
+				//rcSrcPort.update();
+				//rcDstPort.update();
+				rcLink.delete();
+			} catch (ObjectDoesntExistException e) {
+				log.error("Remove link failed {}", linkEvent, e);
+				return;
+			} catch (WrongVersionException e) {
+				// retry
+			}
+		}
 	}
 
 	public void updateDevice(DeviceEvent device) {