Commit 3dcdd96a authored by Alberts S's avatar Alberts S
Browse files

Add optimized route addition

parent aa14f21f
......@@ -31,11 +31,14 @@ class CapybaraNetty(metaclass=MetaCapybaraNetty):
self.config = self.config | yaml.safe_load(f)
return self.config
async def ssh_exec(self, ip, username, command):
self.__logger.debug(f"{ip:<16s} CMD: {command}")
async def ssh_exec(self, ip, username, command, check=True):
self.__logger.debug(f"{self.log_format_ip(ip)} CMD: {command}")
async with asyncssh.connect(host=ip, username=username, known_hosts=None, connect_timeout=10) as conn:
result = await conn.run(command, check=True)
result = await conn.run(command, check=check)
if self.config["per_host_ssh_log"]:
with open(f"{self.config['per_host_ssh_log_output_directory']}/ssh_cmds_{ip}.txt", "a") as f:
f.write(f"{command}\n\n")
return result.stdout.strip("\n").split("\n"), result.stderr.strip("\n").split("\n")
async def ssh_is_alive(self, ip, username):
......@@ -43,5 +46,14 @@ class CapybaraNetty(metaclass=MetaCapybaraNetty):
async with asyncssh.connect(host=ip, username=username, known_hosts=None, connect_timeout=3) as conn:
await conn.run("true", check=True)
return True
except (asyncssh.misc.PermissionDenied, asyncio.exceptions.TimeoutError) as e:
except (
asyncssh.misc.PermissionDenied,
asyncio.exceptions.TimeoutError,
ConnectionResetError,
ConnectionRefusedError,
) as e:
return False
@staticmethod
def log_format_ip(ip):
return f"{ip:<16s}"
......@@ -2,9 +2,8 @@ import asyncio
import ipaddress
import itertools
import yaml
from CapybaraNetty import CapybaraNetty
from ControllerStatic import ControllerStatic
from Inventory import Inventory
from Pinger import Pinger
from Router import Router
......@@ -14,12 +13,13 @@ from Visualizer import Visualizer
class Controller(CapybaraNetty):
def __init__(self):
super().__init__()
self.hosts = Inventory().get_hosts()
self.hosts = []
self.routers = list()
self.pingers = list()
self.visualizer = Visualizer()
self.tunnel_network_base = ipaddress.ip_network(self.config["tunnel_network_base"]).subnets(new_prefix=30)
self.dummy_network_base = ipaddress.ip_network(self.config["dummy_network_base"]).subnets(new_prefix=32)
self.controller = ControllerStatic(routers=self.routers)
@classmethod
async def run(cls):
......@@ -39,6 +39,8 @@ class Controller(CapybaraNetty):
return interfaces[0]
async def main(self):
self.hosts = await Inventory().get_hosts()
router_coroutines = []
for host in self.hosts:
router = Router(
......@@ -59,17 +61,6 @@ class Controller(CapybaraNetty):
router_coroutines.append(router.add_default_dummy_interface_ip(ip=self.get_dummy_ip()))
await asyncio.gather(*router_coroutines)
await asyncio.gather(self.set_router_interface_tunnels())
"""
await router.add_routes(
[
{
"prefix" :"8.8.8.8/32",
"gateway":router.get_default_gateway(),
"interface": router.get_default_interface()
}
]
)
"""
self.load_pingers()
concurrent_tasks = [
......@@ -77,6 +68,7 @@ class Controller(CapybaraNetty):
self.visualizer.run_daemon(
pinger_data_fn=self.get_host_latency_data,
),
self.controller.run_daemon(pinger_data_fn=self.get_host_latency_data, routers_fn=self.get_routers),
]
await asyncio.gather(*concurrent_tasks)
......@@ -132,6 +124,9 @@ class Controller(CapybaraNetty):
return router
return None
def get_routers(self):
return self.routers
# Helper functions for visualizing
def get_host_latency_data(self):
pinger_data = []
......@@ -148,8 +143,10 @@ class Controller(CapybaraNetty):
"from": pinger.router_ip,
"from_type": "router",
"from_name": self.find_router_by_ip(pinger.router_ip).name_simplified,
"from_obj": self.find_router_by_ip(pinger.router_ip),
# Not all dsts have names (e.g optimization targets)
"to_name": getattr(self.find_router_by_ip(result["host"]), "name_simplified", result["host"]),
"to_obj": self.find_router_by_ip(result["host"]),
"to_type": to_type,
"to": result["host"],
"time": result["average"],
......
import asyncio
import ipaddress
import itertools
import networkx as nx
from CapybaraNetty import CapybaraNetty
from Router import Router
from Visualizer import Visualizer
class ControllerStatic(CapybaraNetty):
def __init__(self, routers):
super().__init__()
self.routers = routers
self.pinger_data = []
self.external_targets_for_optimizations = []
self.optimization_threshold_ms = 0
self.daemon_interval = self.config["controller_interval"]
async def run(self):
if len(self.pinger_data) > 0:
G = self.get_latency_graph()
self.find_viable_paths(G)
await self.add_viable_external_routes(G)
# print(G)
# G_simple = await self.get_host_latency_overview()
def get_latency_graph(self):
G = nx.Graph()
for result in self.pinger_data:
host = result["from_name"]
dest = result["to_name"]
time = int(result["time"])
G.add_edge(host, dest, weight=time, color="black")
G.nodes[host]["type"] = result["from_type"]
G.nodes[host]["obj"] = result["from_obj"]
G.nodes[dest]["type"] = result["to_type"]
G.nodes[dest]["obj"] = result["to_obj"]
return G
def find_viable_paths(self, G):
def filter_node(n1):
return G.nodes[n1].get("type") == "router" or (G.nodes[n1].get("type") == "target" and n1 == target)
viable_paths = []
for target in self.external_targets_for_optimizations:
# G has all routers and targets
# We need G routers + single target, filter out all other targets
# otherwise the graph finds shortest path with including external targets
G_subgraph = nx.subgraph_view(G, filter_node=filter_node)
lengths, paths = nx.single_source_dijkstra(G_subgraph, source=target, weight="weight")
for key, path in paths.items():
if len(path) > 2 and not (
path[0] in self.external_targets_for_optimizations
and path[-1] in self.external_targets_for_optimizations
):
optimized_ms = G[key][path[0]]["weight"] - lengths[key]
if optimized_ms < self.optimization_threshold_ms:
self.__logger.debug(f"IGNORED OPTIMIZATION: {optimized_ms:<3d}ms {'->'.join(path[::-1])}")
else:
for node in path:
G.nodes[node]["improved_path"] = True
G.nodes[node].setdefault("improved_path_info", []).append(
self.get_improvement_obj(
target=target, source="10.0.0.1", path=path[::-1], optimization_ms=optimized_ms
)
)
self.__logger.info(f"VIABLE OPTIMIZATION: {optimized_ms:<3d}ms {'->'.join(path[::-1])}")
return G
@staticmethod
async def add_viable_external_routes(G):
for node in G.nodes():
if G.nodes[node]["type"] == "router" and "improved_path_info" in G.nodes[node]:
for path_obj in G.nodes[node]["improved_path_info"]:
idx = path_obj["path"].index(node)
path_len = len(path_obj["path"])
current_router = G.nodes[node]["obj"]
next_hop_name = path_obj["path"][idx + 1]
if G.nodes[next_hop_name]["type"] == "router":
next_router = G.nodes[next_hop_name]["obj"]
exit_interface_name = current_router.get_interface_name(next_router.name_simplified)
current_router_interfaces = await current_router.get_interfaces()
exit_interface_ip = ipaddress.ip_interface(
current_router_interfaces[exit_interface_name]["ipAddresses"][0]["address"]
).ip
await current_router.add_routes(
[
{
"prefix": f"{path_obj['target']}/32",
"gateway": exit_interface_ip,
"interface": exit_interface_name,
}
]
)
@staticmethod
def get_improvement_obj(target, source, path, optimization_ms):
return {"target": target, "source": source, "path": path, "optimization_ms": optimization_ms}
async def run_daemon(self, pinger_data_fn, routers_fn):
while True:
self.optimization_threshold_ms = self.config["optimization_threshold_ms"]
self.pinger_data = pinger_data_fn()
self.routers = routers_fn()
self.external_targets_for_optimizations = self.config["external_targets_for_optimizations"]
await self.run()
await asyncio.sleep(self.config["controller_interval"])
......@@ -13,24 +13,28 @@ class Inventory(CapybaraNetty):
self.chef_api.ssl_verify = False
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def get_hosts(self):
async def get_hosts(self):
hosts = []
for node_name in self.chef_api["/nodes"]:
chefNode = chef.Node(node_name)
try:
nodeTime = datetime.utcfromtimestamp(chefNode.attributes["ohai_time"])
ipv4 = chefNode.attributes["base"]["public_ipv4"].strip()
hostname = chefNode.attributes["base"]["hostname"]
# If Node time is less than 1 hour old - its active
if (nodeTime + timedelta(hours=1)) > datetime.utcnow():
hosts.append(
{
"public_ipv4": chefNode.attributes["base"]["public_ipv4"].strip(),
"hostname": chefNode.attributes["base"]["hostname"],
}
)
if await self.ssh_is_alive(ipv4, self.config["default_router_ssh_user"]):
hosts.append(
{
"public_ipv4": ipv4,
"hostname": hostname,
}
)
else:
self.__logger.warning(f"Skipping {hostname} due SSH unavailable")
else:
self.__logger.warning(
f"Skipping {chefNode.attributes['base']['hostname']} due to last checkin time"
)
self.__logger.warning(f"Skipping {hostname} due to last checkin time")
except KeyError as e:
self.__logger.warning(f"Skipped {chefNode} due to {type(e).__name__}")
......
......@@ -16,8 +16,8 @@ class Pinger(CapybaraNetty):
self.ping_addresses = ping_addresses
self.latest_results = list()
async def ssh_exec(self, command, **kwargs):
return await super(Pinger, self).ssh_exec(self.router_ip, self.router_ssh_user, command=command)
async def ssh_exec(self, command, check, **kwargs):
return await super(Pinger, self).ssh_exec(self.router_ip, self.router_ssh_user, command=command, check=check)
def set_ping_addresses(self, ping_addresses):
self.ping_addresses = ping_addresses
......@@ -55,29 +55,29 @@ class Pinger(CapybaraNetty):
async def get_ping(self):
if len(self.ping_addresses) > 0:
self.__logger.info(f"{self.router_ip:<16s} Running")
self.__logger.info(f"{self.log_format_ip(self.router_ip)} Running")
fping_cmd_base = "fping -c 10 -N -p 500 "
fping_cmd_dests = " ".join(self.ping_addresses)
fping_cmd = fping_cmd_base + " " + fping_cmd_dests
stdout, stderr = await self.ssh_exec(fping_cmd)
stdout, stderr = await self.ssh_exec(fping_cmd, check=False)
ping_results = []
for line in stderr: # First line is empty
parsed_line = self.parse_fping_line(line)
if parsed_line["loss"] < 100:
ping_results.append(parsed_line)
self.__logger.debug(
f"PING: {self.router_ip:<16s} => {parsed_line['host']:<16s} {int(parsed_line['average']):<5d}"
f"PING: {self.log_format_ip(self.router_ip)} => {parsed_line['host']:<16s} {int(parsed_line['average']):<5d}"
)
else:
self.__logger.debug(
f"PING: DOWN {self.router_ip:<16s} => {parsed_line['host']:<16s} {int(parsed_line['loss']):<5d}% packet loss"
f"PING: DOWN {self.log_format_ip(self.router_ip)} => {parsed_line['host']:<16s} {int(parsed_line['loss']):<5d}% packet loss"
)
self.latest_results = ping_results
return ping_results
else:
self.__logger.warning(f"{self.router_ip:<16s} Pinger doing nothing")
self.__logger.warning(f"{self.log_format_ip(self.router_ip)} Pinger doing nothing")
self.latest_results = []
return []
......@@ -86,5 +86,5 @@ class Pinger(CapybaraNetty):
try:
await self.get_ping()
except (asyncssh.misc.PermissionDenied, asyncio.exceptions.TimeoutError) as e:
self.__logger.warning(f"Pinger {self.router_ip:<16s} is unable to connect via SSH")
self.__logger.warning(f"Pinger {self.log_format_ip(self.router_ip)} is unable to connect via SSH")
await asyncio.sleep(interval_seconds)
......@@ -61,9 +61,14 @@ class Router(CapybaraNetty):
await self.vty.del_config_routes(self.config_routes)
def get_default_interface_name(self):
default_route = next(route for route in self.routes if route["prefix"] == "0.0.0.0/0")
self.default_interface_name = default_route["nexthops"][0]["interfaceName"]
return self.default_interface_name
self.__logger.debug(f"{self.log_format_ip(self.ip)} has {len(self.routes)} routes")
try:
default_route = next(route for route in self.routes if route["prefix"] == "0.0.0.0/0")
self.default_interface_name = default_route["nexthops"][0]["interfaceName"]
return self.default_interface_name
except StopIteration:
self.__logger.error(f"{self.log_format_ip(self.ip)} has no default route")
raise
def get_default_interface_ip(self):
self.default_interface_ip = ipaddress.ip_interface(
......@@ -115,6 +120,7 @@ class Router(CapybaraNetty):
async def get_interfaces(self):
self.interfaces = await self.vty.get_interfaces()
return self.interfaces
async def ssh_exec(self, command, **kwargs):
return await super(Router, self).ssh_exec(self.ip, self.ssh_user, command=command)
......@@ -6,7 +6,6 @@ import time
import matplotlib
import matplotlib.pyplot as plt
import networkx as nx
import scipy
import seaborn as sns
from pyvis.network import Network
......
......@@ -18,7 +18,7 @@ class Vty(CapybaraNetty):
base = "sudo vtysh -c 'configure term'"
exec_commands = []
for command in commands:
self.__logger.info(f"{self.host_ip:<16s} CMD: {command}")
self.__logger.info(f"{self.log_format_ip(self.host_ip)} CMD: {command}")
exec_commands.append(f" -c '{command}' ")
return base + "".join(exec_commands)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment