Commit dbeb8310 authored by Alberts S's avatar Alberts S
Browse files

Rework Improvement handling to support route flapping, added route add/del (wip)

parent c130ece5
import asyncio
import collections
import ipaddress
import logging
from datetime import datetime, timedelta
from typing import Type
import networkx as nx
from CapybaraNetty import CapybaraNetty
from Router import Router
from Router import Route, Router
class Improvement:
def __init__(self, target, path, optimized_ms, is_current_viable_path, is_current_best_path):
class Improvement(CapybaraNetty):
def __init__(self, target, path, is_current_viable_path, is_current_best_path, optimized_ms=None):
super().__init__()
optimization_history_count = self.config["controller_static_improvement_optimization_story_count"]
self.target = target
self.path = path
self.optimized_ms = optimized_ms
self.discovery_time = datetime.now()
self.__optimized_ms = optimized_ms
self.__optimized_ms_history = collections.deque(maxlen=optimization_history_count)
self.__is_current_best_path = None
self.__last_best_time = None
self.__total_best_seconds = 0
self.__is_current_viable_path = None
self.__routes = set()
self.is_current_best_path = is_current_best_path
self.is_current_viable_path = is_current_viable_path
@property
def optimized_ms(self):
return self.__optimized_ms
@optimized_ms.setter
def optimized_ms(self, val: int):
self.__optimized_ms = val
self.__optimized_ms_history.append(val)
@property
def is_current_best_path(self):
return self.__is_current_best_path
......@@ -51,12 +68,32 @@ class Improvement:
def is_current_viable_path(self, val: bool):
self.__is_current_viable_path = val
@property
def routes(self):
return self.__routes
@routes.setter
def routes(self, routes: set):
self.__routes = routes
@property
def total_best_seconds(self):
return self.__total_best_seconds
@property
def is_install_candidate(self):
return self.is_current_viable_path and self.is_current_best_path
# https://codereview.stackexchange.com/a/215329
def is_subsequence_of(self, other: "Improvement") -> bool:
if self.target != other.target:
raise NotImplemented
needle = self.path
haystack = other.path
return any(haystack[i : i + len(needle)] == needle for i in range(len(haystack) - len(needle) + 1))
def __str__(self):
return f"{self.target:<16s} viable={self.is_current_viable_path:<1}, best={self.is_current_best_path:<1} for {self.total_best_seconds:<3} seconds, {self.optimized_ms:<3}ms [{'->'.join(self.path)}] "
return f"{self.target:<16s} candidate={self.is_install_candidate:<1} viable={self.is_current_viable_path:<1}, best={self.is_current_best_path:<1} for {self.total_best_seconds:<3} seconds, {self.optimized_ms:<3}ms [{'->'.join(self.path)}] "
class ControllerStatic(CapybaraNetty):
......@@ -74,6 +111,8 @@ class ControllerStatic(CapybaraNetty):
if len(self.pinger_data) > 0:
self.G = self.get_latency_graph(self.G)
self.find_viable_paths(self.G)
self.set_viable_improvement_routes(self.G)
await self.install_viable_routes(self.G)
print("a")
def get_latency_graph(self, G: nx.Graph):
......@@ -89,37 +128,56 @@ class ControllerStatic(CapybaraNetty):
return G
# Create new or update existing graph node path
def upsert_node_paths(
self, G: nx.graph, node_name, target, path, optimized_ms, is_current_viable_path, is_current_best_path
):
G.nodes[node_name]["improved_path"] = True
G.nodes[node_name].setdefault("improved_path_info", [])
node_improved_path_info = G.nodes[node_name]["improved_path_info"]
def upsert_node_paths(self, G: nx.graph, node_name, target, path, optimized_ms, is_current_viable_path):
node = G.nodes[node_name]
node["improved_path"] = True
node.setdefault("improved_path_info", [])
node_improved_path_info = node["improved_path_info"]
path_is_already_known = False
is_current_best_path = True
improvement_candidate = Improvement(
target=target,
path=path,
optimized_ms=optimized_ms,
is_current_viable_path=is_current_viable_path,
is_current_best_path=is_current_best_path,
)
improved_path: Improvement
for improved_path in node_improved_path_info:
if improved_path.target == target:
if improved_path.path == path:
if improved_path.target == improvement_candidate.target:
if improved_path.path == improvement_candidate.path:
path_is_already_known = True
improved_path.optimized_ms = optimized_ms
improved_path.is_current_viable_path = is_current_viable_path
improved_path.is_current_best_path = is_current_best_path
elif improved_path.is_subsequence_of(improvement_candidate) or improvement_candidate.is_subsequence_of(
improved_path
):
# self.__logger.debug(
# f"{node_name:<24} subset/superset found for improvement {str(improved_path)}")
# A special case where same node has
# e.g node = euc1
# (1) improved path = euc1->apne1->x.x.x.x, 1 ms viable=0, best=0
# (2) improved path = fra1->euc1->apne1->x.x.x.x, 8 ms viable=1, best=0
# However (2) should have been best, but was reset from 1 to 0 since it was caught by else
# We need to accomplish
# (3) improved path = euc1->apne1->x.x.x.x, 1 ms viable=0, best=1
# (4) improved path = fra1->euc1->apne1->x.x.x.x, 8 ms viable=1, best=1
# This was only occurring if nodes were checked in a particular order
pass
else:
# self.__logger.debug(f"{node_name:<24} SET best=0 for improvement {str(improved_path)}")
improved_path.is_current_best_path = False
if not path_is_already_known:
node_improved_path_info.append(
Improvement(
target=target,
path=path,
optimized_ms=optimized_ms,
is_current_viable_path=is_current_viable_path,
is_current_best_path=is_current_best_path,
)
)
node_improved_path_info.append(improvement_candidate)
def is_improvement_viable_route(self, improvement: Improvement):
# TODO check whether improvement is viable (e.g is stable for x... seconds)
pass
def find_viable_paths(self, G):
......@@ -131,13 +189,18 @@ class ControllerStatic(CapybaraNetty):
# We need G routers + single target, filter out all other targets
# otherwise the graph finds shortest path with including external targets
G_subgraph = nx.subgraph_view(G, filter_node=filter_node)
# TODO add cutoff
lengths, paths = nx.single_source_dijkstra(G_subgraph, source=target, weight="weight")
for key, path in paths.items():
# TODO remove reversed since it was used for testing
for key, path in reversed(paths.items()):
"""
if len(path) > 2 and not (
path[0] in self.external_targets_for_optimizations
and path[-1] in self.external_targets_for_optimizations
):
"""
# Avoid a case where self is referenced i.e 'x.x.x.x' - > path to x.x.x.x is [x.x.x.x]
if len(path) > 1:
optimized_ms = G[key][path[0]]["weight"] - lengths[key]
is_current_viable_path = optimized_ms > self.optimization_threshold_ms
if is_current_viable_path:
......@@ -154,7 +217,6 @@ class ControllerStatic(CapybaraNetty):
path=path[::-1],
optimized_ms=optimized_ms,
is_current_viable_path=is_current_viable_path,
is_current_best_path=True,
)
# Internal targets
......@@ -170,8 +232,8 @@ class ControllerStatic(CapybaraNetty):
path=path,
optimized_ms=optimized_ms,
is_current_viable_path=is_current_viable_path,
is_current_best_path=True,
)
"""
else:
# if a node becomes inactive - we still need to upsert it and make sure its not marked as best path anymore
for node in path:
......@@ -186,19 +248,18 @@ class ControllerStatic(CapybaraNetty):
)
# How do we handle internal routes?
"""
return G
async def get_viable_external_routes(self, G):
current_best_routes = {}
def set_viable_improvement_routes(self, G: nx.Graph) -> None:
for node in G.nodes():
if G.nodes[node]["type"] == "router" and "improved_path_info" in G.nodes[node]:
current_router = G.nodes[node]["obj"]
current_router_improved_routes = []
path_obj: Improvement
for path_obj in G.nodes[node]["improved_path_info"]:
idx = path_obj["path"].index(node)
idx = path_obj.path.index(node)
try:
next_hop_name = path_obj["path"][idx + 1]
next_hop_name = path_obj.path[idx + 1]
if G.nodes[next_hop_name]["type"] == "router":
next_router = G.nodes[next_hop_name]["obj"]
inbound_interface_name = next_router.get_interface_name(current_router.name_simplified)
......@@ -207,40 +268,49 @@ class ControllerStatic(CapybaraNetty):
exit_gateway_ip = ipaddress.ip_interface(
next_router_interfaces[inbound_interface_name]["ip"]
).ip
current_router_improved_routes.append(
{
"prefix": f"{path_obj['target']}/32",
"gateway": exit_gateway_ip,
"interface": exit_interface_name,
}
path_obj.routes.add(
Route(
prefix=f"{path_obj.target}/32",
gateway=exit_gateway_ip,
interface=exit_interface_name,
)
)
except IndexError:
self.__logger.debug(f"Skipping route for {path_obj['target']} due to IndexError")
# Store the new route objects for best paths
current_best_routes.update({current_router: current_router_improved_routes})
self.__logger.debug(f"Skipping route for {path_obj.target} due to IndexError")
return current_best_routes
async def install_viable_external_routes(self, current_best_routes):
async def install_viable_routes(self, G: nx.Graph):
coroutines = []
# Compare existing routes and the ones which are CURRENT best routes
# If there are any existing routes which are not used in CURRENT - delete them
for router, new_route_candidates in current_best_routes.items():
# Convert dict to list of routes
current_managed_routes = list(router.managed_routes.values())
routes_to_be_deleted = [
old_route for old_route in current_managed_routes if old_route not in new_route_candidates
]
routes_to_be_added = [
new_route for new_route in new_route_candidates if new_route not in current_managed_routes
]
self.__logger.debug(f"{router.log_name()} has {len(routes_to_be_deleted)} DEL ROUTE actions")
self.__logger.debug(f"{router.log_name()} has {len(routes_to_be_added)} ADD ROUTE actions")
coroutines.append(
self.chain_coroutines(router.del_routes(routes_to_be_deleted), router.add_routes(routes_to_be_added))
)
for node_name in G.nodes():
node = G.nodes[node_name]
if node["type"] == "router":
router: Router = node["obj"]
# Convert dict to list of routes
current_managed_routes = list(router.managed_routes.values())
new_route_candidates = []
improvement: Improvement
for improvement in node["improved_path_info"]:
if improvement.is_install_candidate:
new_route_candidates.extend(improvement.routes)
routes_to_be_deleted = [
old_route for old_route in current_managed_routes if old_route not in new_route_candidates
]
routes_to_be_added = [
new_route for new_route in new_route_candidates if new_route not in current_managed_routes
]
self.__logger.debug(f"{router.log_name()} has {len(routes_to_be_deleted)} DEL ROUTE actions")
self.__logger.debug(f"{router.log_name()} has {len(routes_to_be_added)} ADD ROUTE actions")
coroutines.append(
self.chain_coroutines(
router.del_routes(routes_to_be_deleted), router.add_routes(routes_to_be_added)
)
)
return asyncio.gather(*coroutines)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment