1# SPDX-License-Identifier: GPL-2.0 2# Copyright (c) 2020 SUSE LLC. 3 4import collections 5import functools 6import json 7import os 8import socket 9import subprocess 10import unittest 11 12 13# Add the source tree of bpftool and /usr/local/sbin to PATH 14cur_dir = os.path.dirname(os.path.realpath(__file__)) 15bpftool_dir = os.path.abspath(os.path.join(cur_dir, "..", "..", "..", "..", 16 "tools", "bpf", "bpftool")) 17os.environ["PATH"] = bpftool_dir + ":/usr/local/sbin:" + os.environ["PATH"] 18 19 20class IfaceNotFoundError(Exception): 21 pass 22 23 24class UnprivilegedUserError(Exception): 25 pass 26 27 28def _bpftool(args, json=True): 29 _args = ["bpftool"] 30 if json: 31 _args.append("-j") 32 _args.extend(args) 33 34 return subprocess.check_output(_args) 35 36 37def bpftool(args): 38 return _bpftool(args, json=False).decode("utf-8") 39 40 41def bpftool_json(args): 42 res = _bpftool(args) 43 return json.loads(res) 44 45 46def get_default_iface(): 47 for iface in socket.if_nameindex(): 48 if iface[1] != "lo": 49 return iface[1] 50 raise IfaceNotFoundError("Could not find any network interface to probe") 51 52 53def default_iface(f): 54 @functools.wraps(f) 55 def wrapper(*args, **kwargs): 56 iface = get_default_iface() 57 return f(*args, iface, **kwargs) 58 return wrapper 59 60 61class TestBpftool(unittest.TestCase): 62 @classmethod 63 def setUpClass(cls): 64 if os.getuid() != 0: 65 raise UnprivilegedUserError( 66 "This test suite needs root privileges") 67 68 @default_iface 69 def test_feature_dev_json(self, iface): 70 unexpected_helpers = [ 71 "bpf_probe_write_user", 72 "bpf_trace_printk", 73 ] 74 expected_keys = [ 75 "syscall_config", 76 "program_types", 77 "map_types", 78 "helpers", 79 "misc", 80 ] 81 82 res = bpftool_json(["feature", "probe", "dev", iface]) 83 # Check if the result has all expected keys. 84 self.assertCountEqual(res.keys(), expected_keys) 85 # Check if unexpected helpers are not included in helpers probes 86 # result. 87 for helpers in res["helpers"].values(): 88 for unexpected_helper in unexpected_helpers: 89 self.assertNotIn(unexpected_helper, helpers) 90 91 def test_feature_kernel(self): 92 test_cases = [ 93 bpftool_json(["feature", "probe", "kernel"]), 94 bpftool_json(["feature", "probe"]), 95 bpftool_json(["feature"]), 96 ] 97 unexpected_helpers = [ 98 "bpf_probe_write_user", 99 "bpf_trace_printk", 100 ] 101 expected_keys = [ 102 "syscall_config", 103 "system_config", 104 "program_types", 105 "map_types", 106 "helpers", 107 "misc", 108 ] 109 110 for tc in test_cases: 111 # Check if the result has all expected keys. 112 self.assertCountEqual(tc.keys(), expected_keys) 113 # Check if unexpected helpers are not included in helpers probes 114 # result. 115 for helpers in tc["helpers"].values(): 116 for unexpected_helper in unexpected_helpers: 117 self.assertNotIn(unexpected_helper, helpers) 118 119 def test_feature_kernel_full(self): 120 test_cases = [ 121 bpftool_json(["feature", "probe", "kernel", "full"]), 122 bpftool_json(["feature", "probe", "full"]), 123 ] 124 expected_helpers = [ 125 "bpf_probe_write_user", 126 "bpf_trace_printk", 127 ] 128 129 for tc in test_cases: 130 # Check if expected helpers are included at least once in any 131 # helpers list for any program type. Unfortunately we cannot assume 132 # that they will be included in all program types or a specific 133 # subset of programs. It depends on the kernel version and 134 # configuration. 135 found_helpers = False 136 137 for helpers in tc["helpers"].values(): 138 if all(expected_helper in helpers 139 for expected_helper in expected_helpers): 140 found_helpers = True 141 break 142 143 self.assertTrue(found_helpers) 144 145 def test_feature_kernel_full_vs_not_full(self): 146 full_res = bpftool_json(["feature", "probe", "full"]) 147 not_full_res = bpftool_json(["feature", "probe"]) 148 not_full_set = set() 149 full_set = set() 150 151 for helpers in full_res["helpers"].values(): 152 for helper in helpers: 153 full_set.add(helper) 154 155 for helpers in not_full_res["helpers"].values(): 156 for helper in helpers: 157 not_full_set.add(helper) 158 159 self.assertCountEqual(full_set - not_full_set, 160 {"bpf_probe_write_user", "bpf_trace_printk"}) 161 self.assertCountEqual(not_full_set - full_set, set()) 162 163 def test_feature_macros(self): 164 expected_patterns = [ 165 r"/\*\*\* System call availability \*\*\*/", 166 r"#define HAVE_BPF_SYSCALL", 167 r"/\*\*\* eBPF program types \*\*\*/", 168 r"#define HAVE.*PROG_TYPE", 169 r"/\*\*\* eBPF map types \*\*\*/", 170 r"#define HAVE.*MAP_TYPE", 171 r"/\*\*\* eBPF helper functions \*\*\*/", 172 r"#define HAVE.*HELPER", 173 r"/\*\*\* eBPF misc features \*\*\*/", 174 ] 175 176 res = bpftool(["feature", "probe", "macros"]) 177 for pattern in expected_patterns: 178 self.assertRegex(res, pattern) 179