Vty.py 4.79 KB
Newer Older
Alberts S's avatar
Alberts S committed
1
2
import json

Alberts S's avatar
Alberts S committed
3
import asyncssh
Alberts S's avatar
Alberts S committed
4

5
from CapybaraNetty import CapybaraNetty
Alberts S's avatar
Alberts S committed
6

7
8

class Vty(CapybaraNetty):
Alberts S's avatar
Alberts S committed
9
    def __init__(self, host_ip, host_user):
10
        super().__init__()
Alberts S's avatar
Alberts S committed
11
12
13
        self.host_ip = host_ip
        self.host_user = host_user

14
15
    def ssh_is_alive(self, **kwargs):
        return super(Vty, self).ssh_is_alive(self.host_ip, self.host_user)
Alberts S's avatar
Alberts S committed
16

17
18
19
20
21
    def build_vty_exec(self, commands: list, echo_commands=False):
        additional_vtysh_flags = ""
        if echo_commands:
            additional_vtysh_flags += " -E "
        base = f"sudo vtysh {additional_vtysh_flags} -c 'configure term'"
Alberts S's avatar
Alberts S committed
22
23
        exec_commands = []
        for command in commands:
Alberts S's avatar
Alberts S committed
24
            self.__logger.info(f"{self.log_format_ip(self.host_ip)} CMD: {command}")
Alberts S's avatar
Alberts S committed
25
26
27
            exec_commands.append(f" -c '{command}' ")
        return base + "".join(exec_commands)

28
    async def get_routes(self):
29
30
31
        route_json, stderr = await self.ssh_exec(
            self.build_vty_exec(["do show ip route json"]),
        )
Alberts S's avatar
Alberts S committed
32
33
34
35
36
37
38
39
40
41
42
43
        # It returns list of strings which combined makes up JSON - need to join it up
        routes = json.loads("".join(route_json))
        # Routes now has dict of lists, where d[prefix][0] is one of the possible routes for prefix, change this so
        # it is easier to process
        # Now it will be list of dicts, do note that this means that there cannot be two routes for same prefix,
        # but for PoC this is fine
        routes_converted = [dict(zip(routes, col)) for col in zip(*routes.values())][0]
        routes_converted = list(routes_converted.values())
        return routes_converted

    # Add a route - make note of generate_note_cmd signature - some attributes are location sensitive
    # Make sure commands are batched up otherwise new routes will be slow to apply since 1 command = 1 ssh connection
44
    async def add_routes(self, routes: list):
Alberts S's avatar
Alberts S committed
45
        commands = []
46
47
48
49
50
51
        if len(routes) > 0:
            for route in routes:
                commands.append(self.generate_route_config_cmd(**route))
            await self.ssh_exec(
                self.build_vty_exec(commands),
            )
Alberts S's avatar
Alberts S committed
52

53
54
55
56
57
58
59
60
61
62
    async def del_routes(self, routes: list):
        commands = []
        if len(routes) > 0:
            for route in routes:
                commands.append(f"no {self.generate_route_config_cmd(**route)} tag 2")
            return await self.ssh_exec(
                self.build_vty_exec(commands, echo_commands=True),
            )
        return None, None

Alberts S's avatar
Alberts S committed
63
64
65
66
    async def show_route(self, prefix):
        stdout, stderr = await self.ssh_exec(self.build_vty_exec([f"do show ip route {prefix}"]))
        return stdout

Alberts S's avatar
Alberts S committed
67
68
69
70
71
72
73
74
    def generate_route_config_cmd(self, prefix, gateway, interface, distance=1, **kwargs):
        additional_cmd_args = ""
        for key, value in kwargs.items():
            additional_cmd_args += f"{key} {value} "
        return f"ip route {prefix} {gateway} {interface} {distance}  {additional_cmd_args}"

    # This includes information from the configuration itself - useful for removing routes
    # Provide arguments to get route CMD which match the provided arguments
75
    async def get_config_routes(self, prefix=None, gateway=None, interface=None, **kwargs):
Alberts S's avatar
Alberts S committed
76
77
        def filter(route: str):
            route = route[9:]  # remove `ip route ` part from cmd
78
79
            if len(route) == 0:
                return False
Alberts S's avatar
Alberts S committed
80
81
82
83
84
85
86
87
88
89
90
            if prefix and route.split()[0] != prefix:
                return False
            if gateway and route.split()[1] != gateway:
                return False
            if interface and interface not in route.split()[2:]:
                return False
            for key, value in kwargs.items():
                if f"{key} {value}" not in route:
                    return False
            return True

91
92
93
        routes_config, stderr = await self.ssh_exec(
            self.build_vty_exec(["do show running-config | include ip route"]),
        )
Alberts S's avatar
Alberts S committed
94
95
96
97
98
99
100
101
        routes = []
        for route in routes_config:
            # Remove newlines at end
            route = route.rstrip()
            if filter(route):
                routes.append(route)
        return routes

102
    async def del_config_routes(self, route_cmds):
Alberts S's avatar
Alberts S committed
103
104
105
        no_route_cmds = []
        for route in route_cmds:
            no_route_cmds.append(f"no {route}")
106
        await self.ssh_exec(
107
            self.build_vty_exec(no_route_cmds, echo_commands=True),
108
109
        )

110
111
112
113
114
115
116
117
    async def get_interfaces(self):
        interface_json, stderr = await self.ssh_exec(
            self.build_vty_exec(["do show interface json"]),
        )
        # It returns list of strings which combined makes up JSON - need to join it up
        interfaces = json.loads("".join(interface_json))
        return interfaces

118
119
    async def ssh_exec(self, command, **kwargs):
        return await super(Vty, self).ssh_exec(self.host_ip, self.host_user, command=command)