1# SPDX-License-Identifier: GPL-2.0
2# Copyright (c) 2020 SUSE LLC.
3
4import collections
5import functools
6import json
7import os
8import socket
9import subprocess
10import unittest
11
12
13# Add the source tree of bpftool and /usr/local/sbin to PATH
14cur_dir = os.path.dirname(os.path.realpath(__file__))
15bpftool_dir = os.path.abspath(os.path.join(cur_dir, "..", "..", "..", "..",
16                                           "tools", "bpf", "bpftool"))
17os.environ["PATH"] = bpftool_dir + ":/usr/local/sbin:" + os.environ["PATH"]
18
19
20class IfaceNotFoundError(Exception):
21    pass
22
23
24class UnprivilegedUserError(Exception):
25    pass
26
27
28def _bpftool(args, json=True):
29    _args = ["bpftool"]
30    if json:
31        _args.append("-j")
32    _args.extend(args)
33
34    return subprocess.check_output(_args)
35
36
37def bpftool(args):
38    return _bpftool(args, json=False).decode("utf-8")
39
40
41def bpftool_json(args):
42    res = _bpftool(args)
43    return json.loads(res)
44
45
46def get_default_iface():
47    for iface in socket.if_nameindex():
48        if iface[1] != "lo":
49            return iface[1]
50    raise IfaceNotFoundError("Could not find any network interface to probe")
51
52
53def default_iface(f):
54    @functools.wraps(f)
55    def wrapper(*args, **kwargs):
56        iface = get_default_iface()
57        return f(*args, iface, **kwargs)
58    return wrapper
59
60
61class TestBpftool(unittest.TestCase):
62    @classmethod
63    def setUpClass(cls):
64        if os.getuid() != 0:
65            raise UnprivilegedUserError(
66                "This test suite needs root privileges")
67
68    @default_iface
69    def test_feature_dev_json(self, iface):
70        unexpected_helpers = [
71            "bpf_probe_write_user",
72            "bpf_trace_printk",
73        ]
74        expected_keys = [
75            "syscall_config",
76            "program_types",
77            "map_types",
78            "helpers",
79            "misc",
80        ]
81
82        res = bpftool_json(["feature", "probe", "dev", iface])
83        # Check if the result has all expected keys.
84        self.assertCountEqual(res.keys(), expected_keys)
85        # Check if unexpected helpers are not included in helpers probes
86        # result.
87        for helpers in res["helpers"].values():
88            for unexpected_helper in unexpected_helpers:
89                self.assertNotIn(unexpected_helper, helpers)
90
91    def test_feature_kernel(self):
92        test_cases = [
93            bpftool_json(["feature", "probe", "kernel"]),
94            bpftool_json(["feature", "probe"]),
95            bpftool_json(["feature"]),
96        ]
97        unexpected_helpers = [
98            "bpf_probe_write_user",
99            "bpf_trace_printk",
100        ]
101        expected_keys = [
102            "syscall_config",
103            "system_config",
104            "program_types",
105            "map_types",
106            "helpers",
107            "misc",
108        ]
109
110        for tc in test_cases:
111            # Check if the result has all expected keys.
112            self.assertCountEqual(tc.keys(), expected_keys)
113            # Check if unexpected helpers are not included in helpers probes
114            # result.
115            for helpers in tc["helpers"].values():
116                for unexpected_helper in unexpected_helpers:
117                    self.assertNotIn(unexpected_helper, helpers)
118
119    def test_feature_kernel_full(self):
120        test_cases = [
121            bpftool_json(["feature", "probe", "kernel", "full"]),
122            bpftool_json(["feature", "probe", "full"]),
123        ]
124        expected_helpers = [
125            "bpf_probe_write_user",
126            "bpf_trace_printk",
127        ]
128
129        for tc in test_cases:
130            # Check if expected helpers are included at least once in any
131            # helpers list for any program type. Unfortunately we cannot assume
132            # that they will be included in all program types or a specific
133            # subset of programs. It depends on the kernel version and
134            # configuration.
135            found_helpers = False
136
137            for helpers in tc["helpers"].values():
138                if all(expected_helper in helpers
139                       for expected_helper in expected_helpers):
140                    found_helpers = True
141                    break
142
143            self.assertTrue(found_helpers)
144
145    def test_feature_kernel_full_vs_not_full(self):
146        full_res = bpftool_json(["feature", "probe", "full"])
147        not_full_res = bpftool_json(["feature", "probe"])
148        not_full_set = set()
149        full_set = set()
150
151        for helpers in full_res["helpers"].values():
152            for helper in helpers:
153                full_set.add(helper)
154
155        for helpers in not_full_res["helpers"].values():
156            for helper in helpers:
157                not_full_set.add(helper)
158
159        self.assertCountEqual(full_set - not_full_set,
160                                {"bpf_probe_write_user", "bpf_trace_printk"})
161        self.assertCountEqual(not_full_set - full_set, set())
162
163    def test_feature_macros(self):
164        expected_patterns = [
165            r"/\*\*\* System call availability \*\*\*/",
166            r"#define HAVE_BPF_SYSCALL",
167            r"/\*\*\* eBPF program types \*\*\*/",
168            r"#define HAVE.*PROG_TYPE",
169            r"/\*\*\* eBPF map types \*\*\*/",
170            r"#define HAVE.*MAP_TYPE",
171            r"/\*\*\* eBPF helper functions \*\*\*/",
172            r"#define HAVE.*HELPER",
173            r"/\*\*\* eBPF misc features \*\*\*/",
174        ]
175
176        res = bpftool(["feature", "probe", "macros"])
177        for pattern in expected_patterns:
178            self.assertRegex(res, pattern)
179