diff --git a/python/vyos/qos/base.py b/python/vyos/qos/base.py index 87927ba9d..98e486e42 100644 --- a/python/vyos/qos/base.py +++ b/python/vyos/qos/base.py @@ -1,435 +1,440 @@ # Copyright 2022-2024 VyOS maintainers and contributors <maintainers@vyos.io> # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see <http://www.gnu.org/licenses/>. import os import jmespath from vyos.base import Warning from vyos.utils.process import cmd from vyos.utils.dict import dict_search from vyos.utils.file import read_file from vyos.utils.network import get_protocol_by_name class QoSBase: _debug = False _direction = ['egress'] _parent = 0xffff _dsfields = { "default": 0x0, "lowdelay": 0x10, "throughput": 0x08, "reliability": 0x04, "mincost": 0x02, "priority": 0x20, "immediate": 0x40, "flash": 0x60, "flash-override": 0x80, "critical": 0x0A, "internet": 0xC0, "network": 0xE0, "AF11": 0x28, "AF12": 0x30, "AF13": 0x38, "AF21": 0x48, "AF22": 0x50, "AF23": 0x58, "AF31": 0x68, "AF32": 0x70, "AF33": 0x78, "AF41": 0x88, "AF42": 0x90, "AF43": 0x98, "CS1": 0x20, "CS2": 0x40, "CS3": 0x60, "CS4": 0x80, "CS5": 0xA0, "CS6": 0xC0, "CS7": 0xE0, "EF": 0xB8 } qostype = None def __init__(self, interface): if os.path.exists('/tmp/vyos.qos.debug'): self._debug = True self._interface = interface def _cmd(self, command): if self._debug: print(f'DEBUG/QoS: {command}') return cmd(command) def get_direction(self) -> list: return self._direction def _get_class_max_id(self, config) -> int: if 'class' in config: tmp = list(config['class'].keys()) tmp.sort(key=lambda ii: int(ii)) return tmp[-1] return None def _get_dsfield(self, value): if value in self._dsfields: return self._dsfields[value] else: return value def _calc_random_detect_queue_params(self, avg_pkt, max_thr, limit=None, min_thr=None, mark_probability=None, precedence=0): params = dict() avg_pkt = int(avg_pkt) max_thr = int(max_thr) mark_probability = int(mark_probability) limit = int(limit) if limit else 4 * max_thr min_thr = int(min_thr) if min_thr else ((9 + precedence) * max_thr) // 18 params['avg_pkt'] = avg_pkt params['limit'] = limit * avg_pkt params['min_val'] = min_thr * avg_pkt params['max_val'] = max_thr * avg_pkt params['burst'] = (2 * min_thr + max_thr) // 3 params['probability'] = 1 / mark_probability return params def _build_base_qdisc(self, config : dict, cls_id : int): """ Add/replace qdisc for every class (also default is a class). This is a genetic method which need an implementation "per" queue-type. This matches the old mapping as defined in Perl here: https://github.com/vyos/vyatta-cfg-qos/blob/equuleus/lib/Vyatta/Qos/ShaperClass.pm#L223-L229 """ queue_type = dict_search('queue_type', config) default_tc = f'tc qdisc replace dev {self._interface} parent {self._parent}:{cls_id:x}' if queue_type == 'priority': handle = 0x4000 + cls_id default_tc += f' handle {handle:x}: prio' self._cmd(default_tc) queue_limit = dict_search('queue_limit', config) for ii in range(1, 4): tmp = f'tc qdisc replace dev {self._interface} parent {handle:x}:{ii:x} pfifo' if queue_limit: tmp += f' limit {queue_limit}' self._cmd(tmp) elif queue_type == 'fair-queue': default_tc += f' sfq' tmp = dict_search('queue_limit', config) if tmp: default_tc += f' limit {tmp}' self._cmd(default_tc) elif queue_type == 'fq-codel': default_tc += f' fq_codel' tmp = dict_search('codel_quantum', config) if tmp: default_tc += f' quantum {tmp}' tmp = dict_search('flows', config) if tmp: default_tc += f' flows {tmp}' tmp = dict_search('interval', config) if tmp: default_tc += f' interval {tmp}ms' tmp = dict_search('queue_limit', config) if tmp: default_tc += f' limit {tmp}' tmp = dict_search('target', config) if tmp: default_tc += f' target {tmp}ms' default_tc += f' noecn' self._cmd(default_tc) elif queue_type == 'random-detect': default_tc += f' red' qparams = self._calc_random_detect_queue_params( avg_pkt=dict_search('average_packet', config), max_thr=dict_search('maximum_threshold', config), limit=dict_search('queue_limit', config), min_thr=dict_search('minimum_threshold', config), mark_probability=dict_search('mark_probability', config) ) default_tc += f' limit {qparams["limit"]} avpkt {qparams["avg_pkt"]}' default_tc += f' max {qparams["max_val"]} min {qparams["min_val"]}' default_tc += f' burst {qparams["burst"]} probability {qparams["probability"]}' self._cmd(default_tc) elif queue_type == 'drop-tail': default_tc += f' pfifo' tmp = dict_search('queue_limit', config) if tmp: default_tc += f' limit {tmp}' self._cmd(default_tc) def _rate_convert(self, rate) -> int: rates = { 'bit' : 1, 'kbit' : 1000, 'mbit' : 1000000, 'gbit' : 1000000000, 'tbit' : 1000000000000, } if rate == 'auto' or rate.endswith('%'): speed = 1000 default_speed = speed # Not all interfaces have valid entries in the speed file. PPPoE # interfaces have the appropriate speed file, but you can not read it: # cat: /sys/class/net/pppoe7/speed: Invalid argument try: speed = read_file(f'/sys/class/net/{self._interface}/speed') if not speed.isnumeric(): Warning('Interface speed cannot be determined (assuming 1000 Mbit/s)') if int(speed) < 1: speed = default_speed if rate.endswith('%'): percent = rate.rstrip('%') speed = int(speed) * int(percent) // 100 except: pass return int(speed) *1000000 # convert to MBit/s rate_numeric = int(''.join([n for n in rate if n.isdigit()])) rate_scale = ''.join([n for n in rate if not n.isdigit()]) if int(rate_numeric) <= 0: raise ValueError(f'{rate_numeric} is not a valid bandwidth <= 0') if rate_scale: return int(rate_numeric * rates[rate_scale]) else: # No suffix implies Kbps just as Cisco IOS return int(rate_numeric * 1000) def update(self, config, direction, priority=None): """ method must be called from derived class after it has completed qdisc setup """ if self._debug: import pprint pprint.pprint(config) if 'class' in config: for cls, cls_config in config['class'].items(): self._build_base_qdisc(cls_config, int(cls)) # every match criteria has it's tc instance filter_cmd_base = f'tc filter add dev {self._interface} parent {self._parent:x}:' if priority: filter_cmd_base += f' prio {cls}' elif 'priority' in cls_config: prio = cls_config['priority'] filter_cmd_base += f' prio {prio}' filter_cmd_base += ' protocol all' if 'match' in cls_config: - is_filtered = False + has_filter = False for index, (match, match_config) in enumerate(cls_config['match'].items(), start=1): filter_cmd = filter_cmd_base + if not has_filter: + for key in ['mark', 'vif', 'ip', 'ipv6']: + if key in match_config: + has_filter = True + break + if self.qostype == 'shaper' and 'prio ' not in filter_cmd: filter_cmd += f' prio {index}' if 'mark' in match_config: mark = match_config['mark'] filter_cmd += f' handle {mark} fw' if 'vif' in match_config: vif = match_config['vif'] filter_cmd += f' basic match "meta(vlan mask 0xfff eq {vif})"' for af in ['ip', 'ipv6']: tc_af = af if af == 'ipv6': tc_af = 'ip6' if af in match_config: filter_cmd += ' u32' tmp = dict_search(f'{af}.source.address', match_config) if tmp: filter_cmd += f' match {tc_af} src {tmp}' tmp = dict_search(f'{af}.source.port', match_config) if tmp: filter_cmd += f' match {tc_af} sport {tmp} 0xffff' tmp = dict_search(f'{af}.destination.address', match_config) if tmp: filter_cmd += f' match {tc_af} dst {tmp}' tmp = dict_search(f'{af}.destination.port', match_config) if tmp: filter_cmd += f' match {tc_af} dport {tmp} 0xffff' tmp = dict_search(f'{af}.protocol', match_config) if tmp: tmp = get_protocol_by_name(tmp) filter_cmd += f' match {tc_af} protocol {tmp} 0xff' tmp = dict_search(f'{af}.dscp', match_config) if tmp: tmp = self._get_dsfield(tmp) if af == 'ip': filter_cmd += f' match {tc_af} dsfield {tmp} 0xff' elif af == 'ipv6': filter_cmd += f' match u16 {tmp} 0x0ff0 at 0' # Will match against total length of an IPv4 packet and # payload length of an IPv6 packet. # # IPv4 : match u16 0x0000 ~MAXLEN at 2 # IPv6 : match u16 0x0000 ~MAXLEN at 4 tmp = dict_search(f'{af}.max_length', match_config) if tmp: # We need the 16 bit two's complement of the maximum # packet length tmp = hex(0xffff & ~int(tmp)) if af == 'ip': filter_cmd += f' match u16 0x0000 {tmp} at 2' elif af == 'ipv6': filter_cmd += f' match u16 0x0000 {tmp} at 4' # We match against specific TCP flags - we assume the IPv4 # header length is 20 bytes and assume the IPv6 packet is # not using extension headers (hence a ip header length of 40 bytes) # TCP Flags are set on byte 13 of the TCP header. # IPv4 : match u8 X X at 33 # IPv6 : match u8 X X at 53 # with X = 0x02 for SYN and X = 0x10 for ACK tmp = dict_search(f'{af}.tcp', match_config) if tmp: mask = 0 if 'ack' in tmp: mask |= 0x10 if 'syn' in tmp: mask |= 0x02 mask = hex(mask) if af == 'ip': filter_cmd += f' match u8 {mask} {mask} at 33' elif af == 'ipv6': filter_cmd += f' match u8 {mask} {mask} at 53' cls = int(cls) filter_cmd += f' flowid {self._parent:x}:{cls:x}' self._cmd(filter_cmd) - is_filtered = True vlan_expression = "match.*.vif" match_vlan = jmespath.search(vlan_expression, cls_config) if any(tmp in ['exceed', 'bandwidth', 'burst'] for tmp in cls_config) \ - and is_filtered: + and has_filter: # For "vif" "basic match" is used instead of "action police" T5961 if not match_vlan: filter_cmd += f' action police' if 'exceed' in cls_config: action = cls_config['exceed'] filter_cmd += f' conform-exceed {action}' if 'not_exceed' in cls_config: action = cls_config['not_exceed'] filter_cmd += f'/{action}' if 'bandwidth' in cls_config: rate = self._rate_convert(cls_config['bandwidth']) filter_cmd += f' rate {rate}' if 'burst' in cls_config: burst = cls_config['burst'] filter_cmd += f' burst {burst}' if 'mtu' in cls_config: mtu = cls_config['mtu'] filter_cmd += f' mtu {mtu}' cls = int(cls) filter_cmd += f' flowid {self._parent:x}:{cls:x}' self._cmd(filter_cmd) # The police block allows limiting of the byte or packet rate of # traffic matched by the filter it is attached to. # https://man7.org/linux/man-pages/man8/tc-police.8.html # T5295: We do not handle rate via tc filter directly, # but rather set the tc filter to direct traffic to the correct tc class flow. # # if any(tmp in ['exceed', 'bandwidth', 'burst'] for tmp in cls_config): # filter_cmd += f' action police' # # if 'exceed' in cls_config: # action = cls_config['exceed'] # filter_cmd += f' conform-exceed {action}' # if 'not_exceed' in cls_config: # action = cls_config['not_exceed'] # filter_cmd += f'/{action}' # # if 'bandwidth' in cls_config: # rate = self._rate_convert(cls_config['bandwidth']) # filter_cmd += f' rate {rate}' # # if 'burst' in cls_config: # burst = cls_config['burst'] # filter_cmd += f' burst {burst}' if 'default' in config: default_cls_id = 1 if 'class' in config: class_id_max = self._get_class_max_id(config) default_cls_id = int(class_id_max) +1 self._build_base_qdisc(config['default'], default_cls_id) if self.qostype == 'limiter': if 'default' in config: filter_cmd = f'tc filter replace dev {self._interface} parent {self._parent:x}: ' filter_cmd += 'prio 255 protocol all basic' # The police block allows limiting of the byte or packet rate of # traffic matched by the filter it is attached to. # https://man7.org/linux/man-pages/man8/tc-police.8.html if any(tmp in ['exceed', 'bandwidth', 'burst'] for tmp in config['default']): filter_cmd += f' action police' if 'exceed' in config['default']: action = config['default']['exceed'] filter_cmd += f' conform-exceed {action}' if 'not_exceed' in config['default']: action = config['default']['not_exceed'] filter_cmd += f'/{action}' if 'bandwidth' in config['default']: rate = self._rate_convert(config['default']['bandwidth']) filter_cmd += f' rate {rate}' if 'burst' in config['default']: burst = config['default']['burst'] filter_cmd += f' burst {burst}' if 'mtu' in config['default']: mtu = config['default']['mtu'] filter_cmd += f' mtu {mtu}' if 'class' in config: filter_cmd += f' flowid {self._parent:x}:{default_cls_id:x}' self._cmd(filter_cmd) diff --git a/smoketest/scripts/cli/test_qos.py b/smoketest/scripts/cli/test_qos.py index bcf5139c7..5977b2f41 100755 --- a/smoketest/scripts/cli/test_qos.py +++ b/smoketest/scripts/cli/test_qos.py @@ -1,743 +1,764 @@ #!/usr/bin/env python3 # # Copyright (C) 2022-2023 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import os import unittest from json import loads from base_vyostest_shim import VyOSUnitTestSHIM from vyos.configsession import ConfigSessionError from vyos.ifconfig import Section from vyos.utils.process import cmd base_path = ['qos'] def get_tc_qdisc_json(interface) -> dict: tmp = cmd(f'tc -detail -json qdisc show dev {interface}') tmp = loads(tmp) return next(iter(tmp)) def get_tc_filter_json(interface, direction) -> list: if direction not in ['ingress', 'egress']: raise ValueError() tmp = cmd(f'tc -detail -json filter show dev {interface} {direction}') tmp = loads(tmp) return tmp def get_tc_filter_details(interface, direction) -> list: # json doesn't contain all params, such as mtu if direction not in ['ingress', 'egress']: raise ValueError() tmp = cmd(f'tc -details filter show dev {interface} {direction}') return tmp class TestQoS(VyOSUnitTestSHIM.TestCase): @classmethod def setUpClass(cls): super(TestQoS, cls).setUpClass() # ensure we can also run this test on a live system - so lets clean # out the current configuration :) cls.cli_delete(cls, base_path) # We only test on physical interfaces and not VLAN (sub-)interfaces cls._interfaces = [] if 'TEST_ETH' in os.environ: tmp = os.environ['TEST_ETH'].split() cls._interfaces = tmp else: for tmp in Section.interfaces('ethernet', vlan=False): cls._interfaces.append(tmp) def tearDown(self): # delete testing SSH config self.cli_delete(base_path) self.cli_commit() def test_01_cake(self): bandwidth = 1000000 rtt = 200 for interface in self._interfaces: policy_name = f'qos-policy-{interface}' self.cli_set(base_path + ['interface', interface, 'egress', policy_name]) self.cli_set(base_path + ['policy', 'cake', policy_name, 'bandwidth', str(bandwidth)]) self.cli_set(base_path + ['policy', 'cake', policy_name, 'rtt', str(rtt)]) self.cli_set(base_path + ['policy', 'cake', policy_name, 'flow-isolation', 'dual-src-host']) bandwidth += 1000000 rtt += 20 # commit changes self.cli_commit() bandwidth = 1000000 rtt = 200 for interface in self._interfaces: tmp = get_tc_qdisc_json(interface) self.assertEqual('cake', tmp['kind']) # TC store rates as a 32-bit unsigned integer in bps (Bytes per second) self.assertEqual(int(bandwidth *125), tmp['options']['bandwidth']) # RTT internally is in us self.assertEqual(int(rtt *1000), tmp['options']['rtt']) self.assertEqual('dual-srchost', tmp['options']['flowmode']) self.assertFalse(tmp['options']['ingress']) self.assertFalse(tmp['options']['nat']) self.assertTrue(tmp['options']['raw']) bandwidth += 1000000 rtt += 20 def test_02_drop_tail(self): queue_limit = 50 first = True for interface in self._interfaces: policy_name = f'qos-policy-{interface}' if first: self.cli_set(base_path + ['interface', interface, 'ingress', policy_name]) # verify() - selected QoS policy on interface only supports egress with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(base_path + ['interface', interface, 'ingress', policy_name]) first = False self.cli_set(base_path + ['interface', interface, 'egress', policy_name]) self.cli_set(base_path + ['policy', 'drop-tail', policy_name, 'queue-limit', str(queue_limit)]) queue_limit += 10 # commit changes self.cli_commit() queue_limit = 50 for interface in self._interfaces: tmp = get_tc_qdisc_json(interface) self.assertEqual('pfifo', tmp['kind']) self.assertEqual(queue_limit, tmp['options']['limit']) queue_limit += 10 def test_03_fair_queue(self): hash_interval = 10 queue_limit = 5 policy_type = 'fair-queue' first = True for interface in self._interfaces: policy_name = f'qos-policy-{interface}' if first: self.cli_set(base_path + ['interface', interface, 'ingress', policy_name]) # verify() - selected QoS policy on interface only supports egress with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(base_path + ['interface', interface, 'ingress', policy_name]) first = False self.cli_set(base_path + ['interface', interface, 'egress', policy_name]) self.cli_set(base_path + ['policy', policy_type, policy_name, 'hash-interval', str(hash_interval)]) self.cli_set(base_path + ['policy', policy_type, policy_name, 'queue-limit', str(queue_limit)]) hash_interval += 1 queue_limit += 1 # commit changes self.cli_commit() hash_interval = 10 queue_limit = 5 for interface in self._interfaces: tmp = get_tc_qdisc_json(interface) self.assertEqual('sfq', tmp['kind']) self.assertEqual(hash_interval, tmp['options']['perturb']) self.assertEqual(queue_limit, tmp['options']['limit']) hash_interval += 1 queue_limit += 1 def test_04_fq_codel(self): policy_type = 'fq-codel' codel_quantum = 1500 flows = 512 interval = 100 queue_limit = 2048 target = 5 first = True for interface in self._interfaces: policy_name = f'qos-policy-{interface}' if first: self.cli_set(base_path + ['interface', interface, 'ingress', policy_name]) # verify() - selected QoS policy on interface only supports egress with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(base_path + ['interface', interface, 'ingress', policy_name]) first = False self.cli_set(base_path + ['interface', interface, 'egress', policy_name]) self.cli_set(base_path + ['policy', policy_type, policy_name, 'codel-quantum', str(codel_quantum)]) self.cli_set(base_path + ['policy', policy_type, policy_name, 'flows', str(flows)]) self.cli_set(base_path + ['policy', policy_type, policy_name, 'interval', str(interval)]) self.cli_set(base_path + ['policy', policy_type, policy_name, 'queue-limit', str(queue_limit)]) self.cli_set(base_path + ['policy', policy_type, policy_name, 'target', str(target)]) codel_quantum += 10 flows += 2 interval += 10 queue_limit += 512 target += 1 # commit changes self.cli_commit() codel_quantum = 1500 flows = 512 interval = 100 queue_limit = 2048 target = 5 for interface in self._interfaces: tmp = get_tc_qdisc_json(interface) self.assertEqual('fq_codel', tmp['kind']) self.assertEqual(codel_quantum, tmp['options']['quantum']) self.assertEqual(flows, tmp['options']['flows']) self.assertEqual(queue_limit, tmp['options']['limit']) # due to internal rounding we need to substract 1 from interval and target after converting to milliseconds # configuration of: # tc qdisc add dev eth0 root fq_codel quantum 1500 flows 512 interval 100ms limit 2048 target 5ms noecn # results in: tc -j qdisc show dev eth0 # [{"kind":"fq_codel","handle":"8046:","root":true,"refcnt":3,"options":{"limit":2048,"flows":512, # "quantum":1500,"target":4999,"interval":99999,"memory_limit":33554432,"drop_batch":64}}] self.assertAlmostEqual(tmp['options']['interval'], interval *1000, delta=1) self.assertAlmostEqual(tmp['options']['target'], target *1000 -1, delta=1) codel_quantum += 10 flows += 2 interval += 10 queue_limit += 512 target += 1 def test_05_limiter(self): qos_config = { '1' : { 'bandwidth' : '3000000', 'exceed' : 'pipe', 'burst' : '100Kb', 'mtu' : '1600', 'not-exceed' : 'continue', 'priority': '15', 'match4' : { 'ssh' : { 'dport' : '22', }, }, }, '2' : { 'bandwidth' : '1000000', 'match6' : { 'ssh' : { 'dport' : '22', }, }, }, } first = True for interface in self._interfaces: policy_name = f'qos-policy-{interface}' if first: self.cli_set(base_path + ['interface', interface, 'egress', policy_name]) # verify() - selected QoS policy on interface only supports egress with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(base_path + ['interface', interface, 'egress', policy_name]) first = False self.cli_set(base_path + ['interface', interface, 'ingress', policy_name]) # set default bandwidth parameter for all remaining connections self.cli_set(base_path + ['policy', 'limiter', policy_name, 'default', 'bandwidth', '500000']) self.cli_set(base_path + ['policy', 'limiter', policy_name, 'default', 'burst', '200kb']) self.cli_set(base_path + ['policy', 'limiter', policy_name, 'default', 'exceed', 'drop']) self.cli_set(base_path + ['policy', 'limiter', policy_name, 'default', 'mtu', '3000']) self.cli_set(base_path + ['policy', 'limiter', policy_name, 'default', 'not-exceed', 'ok']) for qos_class, qos_class_config in qos_config.items(): qos_class_base = base_path + ['policy', 'limiter', policy_name, 'class', qos_class] if 'match4' in qos_class_config: for match, match_config in qos_class_config['match4'].items(): if 'dport' in match_config: self.cli_set(qos_class_base + ['match', match, 'ip', 'destination', 'port', match_config['dport']]) if 'match6' in qos_class_config: for match, match_config in qos_class_config['match6'].items(): if 'dport' in match_config: self.cli_set(qos_class_base + ['match', match, 'ipv6', 'destination', 'port', match_config['dport']]) if 'bandwidth' in qos_class_config: self.cli_set(qos_class_base + ['bandwidth', qos_class_config['bandwidth']]) if 'exceed' in qos_class_config: self.cli_set(qos_class_base + ['exceed', qos_class_config['exceed']]) if 'not-exceed' in qos_class_config: self.cli_set(qos_class_base + ['not-exceed', qos_class_config['not-exceed']]) if 'burst' in qos_class_config: self.cli_set(qos_class_base + ['burst', qos_class_config['burst']]) if 'mtu' in qos_class_config: self.cli_set(qos_class_base + ['mtu', qos_class_config['mtu']]) if 'priority' in qos_class_config: self.cli_set(qos_class_base + ['priority', qos_class_config['priority']]) # commit changes self.cli_commit() for interface in self._interfaces: for filter in get_tc_filter_json(interface, 'ingress'): # bail out early if filter has no attached action if 'options' not in filter or 'actions' not in filter['options']: continue for qos_class, qos_class_config in qos_config.items(): # Every flowid starts with ffff and we encopde the class number after the colon if 'flowid' not in filter['options'] or filter['options']['flowid'] != f'ffff:{qos_class}': continue ip_hdr_offset = 20 if 'match6' in qos_class_config: ip_hdr_offset = 40 self.assertEqual(ip_hdr_offset, filter['options']['match']['off']) if 'dport' in match_config: dport = int(match_config['dport']) self.assertEqual(f'{dport:x}', filter['options']['match']['value']) tc_details = get_tc_filter_details(interface, 'ingress') self.assertTrue('filter parent ffff: protocol all pref 20 u32 chain 0' in tc_details) self.assertTrue('rate 1Gbit burst 15125b mtu 2Kb action drop overhead 0b linklayer ethernet' in tc_details) self.assertTrue('filter parent ffff: protocol all pref 15 u32 chain 0' in tc_details) self.assertTrue('rate 3Gbit burst 102000b mtu 1600b action pipe/continue overhead 0b linklayer ethernet' in tc_details) self.assertTrue('rate 500Mbit burst 204687b mtu 3000b action drop overhead 0b linklayer ethernet' in tc_details) self.assertTrue('filter parent ffff: protocol all pref 255 basic chain 0' in tc_details) def test_06_network_emulator(self): policy_type = 'network-emulator' bandwidth = 1000000 corruption = 1 delay = 2 duplicate = 3 loss = 4 queue_limit = 5 reordering = 6 first = True for interface in self._interfaces: policy_name = f'qos-policy-{interface}' if first: self.cli_set(base_path + ['interface', interface, 'ingress', policy_name]) # verify() - selected QoS policy on interface only supports egress with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(base_path + ['interface', interface, 'ingress', policy_name]) first = False self.cli_set(base_path + ['interface', interface, 'egress', policy_name]) self.cli_set(base_path + ['policy', policy_type, policy_name, 'bandwidth', str(bandwidth)]) self.cli_set(base_path + ['policy', policy_type, policy_name, 'corruption', str(corruption)]) self.cli_set(base_path + ['policy', policy_type, policy_name, 'delay', str(delay)]) self.cli_set(base_path + ['policy', policy_type, policy_name, 'duplicate', str(duplicate)]) self.cli_set(base_path + ['policy', policy_type, policy_name, 'loss', str(loss)]) self.cli_set(base_path + ['policy', policy_type, policy_name, 'queue-limit', str(queue_limit)]) self.cli_set(base_path + ['policy', policy_type, policy_name, 'reordering', str(reordering)]) bandwidth += 1000000 corruption += 1 delay += 1 duplicate +=1 loss += 1 queue_limit += 1 reordering += 1 # commit changes self.cli_commit() bandwidth = 1000000 corruption = 1 delay = 2 duplicate = 3 loss = 4 queue_limit = 5 reordering = 6 for interface in self._interfaces: tmp = get_tc_qdisc_json(interface) self.assertEqual('netem', tmp['kind']) self.assertEqual(int(bandwidth *125), tmp['options']['rate']['rate']) # values are in % self.assertEqual(corruption/100, tmp['options']['corrupt']['corrupt']) self.assertEqual(duplicate/100, tmp['options']['duplicate']['duplicate']) self.assertEqual(loss/100, tmp['options']['loss-random']['loss']) self.assertEqual(reordering/100, tmp['options']['reorder']['reorder']) self.assertEqual(delay/1000, tmp['options']['delay']['delay']) self.assertEqual(queue_limit, tmp['options']['limit']) bandwidth += 1000000 corruption += 1 delay += 1 duplicate += 1 loss += 1 queue_limit += 1 reordering += 1 def test_07_priority_queue(self): priorities = ['1', '2', '3', '4', '5'] first = True for interface in self._interfaces: policy_name = f'qos-policy-{interface}' if first: self.cli_set(base_path + ['interface', interface, 'ingress', policy_name]) # verify() - selected QoS policy on interface only supports egress with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(base_path + ['interface', interface, 'ingress', policy_name]) first = False self.cli_set(base_path + ['interface', interface, 'egress', policy_name]) self.cli_set(base_path + ['policy', 'priority-queue', policy_name, 'default', 'queue-limit', '10']) for priority in priorities: prio_base = base_path + ['policy', 'priority-queue', policy_name, 'class', priority] self.cli_set(prio_base + ['match', f'prio-{priority}', 'ip', 'destination', 'port', str(1000 + int(priority))]) # commit changes self.cli_commit() def test_08_random_detect(self): bandwidth = 5000 first = True for interface in self._interfaces: policy_name = f'qos-policy-{interface}' if first: self.cli_set(base_path + ['interface', interface, 'ingress', policy_name]) # verify() - selected QoS policy on interface only supports egress with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(base_path + ['interface', interface, 'ingress', policy_name]) first = False self.cli_set(base_path + ['interface', interface, 'egress', policy_name]) self.cli_set(base_path + ['policy', 'random-detect', policy_name, 'bandwidth', str(bandwidth)]) bandwidth += 1000 # commit changes self.cli_commit() bandwidth = 5000 for interface in self._interfaces: tmp = get_tc_qdisc_json(interface) self.assertTrue('gred' in tmp.get('kind')) self.assertEqual(8, len(tmp.get('options', {}).get('vqs'))) self.assertEqual(8, tmp.get('options', {}).get('dp_cnt')) self.assertEqual(0, tmp.get('options', {}).get('dp_default')) self.assertTrue(tmp.get('options', {}).get('grio')) def test_09_rate_control(self): bandwidth = 5000 burst = 20 latency = 5 first = True for interface in self._interfaces: policy_name = f'qos-policy-{interface}' if first: self.cli_set(base_path + ['interface', interface, 'ingress', policy_name]) # verify() - selected QoS policy on interface only supports egress with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(base_path + ['interface', interface, 'ingress', policy_name]) first = False self.cli_set(base_path + ['interface', interface, 'egress', policy_name]) self.cli_set(base_path + ['policy', 'rate-control', policy_name, 'bandwidth', str(bandwidth)]) self.cli_set(base_path + ['policy', 'rate-control', policy_name, 'burst', str(burst)]) self.cli_set(base_path + ['policy', 'rate-control', policy_name, 'latency', str(latency)]) bandwidth += 1000 burst += 5 latency += 1 # commit changes self.cli_commit() bandwidth = 5000 burst = 20 latency = 5 for interface in self._interfaces: tmp = get_tc_qdisc_json(interface) self.assertEqual('tbf', tmp['kind']) self.assertEqual(0, tmp['options']['mpu']) # TC store rates as a 32-bit unsigned integer in bps (Bytes per second) self.assertEqual(int(bandwidth * 125), tmp['options']['rate']) bandwidth += 1000 burst += 5 latency += 1 def test_10_round_robin(self): qos_config = { '1' : { 'match4' : { 'ssh' : { 'dport' : '22', }, }, }, '2' : { 'match6' : { 'ssh' : { 'dport' : '22', }, }, }, } first = True for interface in self._interfaces: policy_name = f'qos-policy-{interface}' if first: self.cli_set(base_path + ['interface', interface, 'ingress', policy_name]) # verify() - selected QoS policy on interface only supports egress with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(base_path + ['interface', interface, 'ingress', policy_name]) first = False self.cli_set(base_path + ['interface', interface, 'egress', policy_name]) for qos_class, qos_class_config in qos_config.items(): qos_class_base = base_path + ['policy', 'round-robin', policy_name, 'class', qos_class] if 'match4' in qos_class_config: for match, match_config in qos_class_config['match4'].items(): if 'dport' in match_config: self.cli_set(qos_class_base + ['match', match, 'ip', 'destination', 'port', match_config['dport']]) if 'match6' in qos_class_config: for match, match_config in qos_class_config['match6'].items(): if 'dport' in match_config: self.cli_set(qos_class_base + ['match', match, 'ipv6', 'destination', 'port', match_config['dport']]) # commit changes self.cli_commit() for interface in self._interfaces: tmp = get_tc_qdisc_json(interface) self.assertEqual('drr', tmp['kind']) for filter in get_tc_filter_json(interface, 'ingress'): # bail out early if filter has no attached action if 'options' not in filter or 'actions' not in filter['options']: continue for qos_class, qos_class_config in qos_config.items(): # Every flowid starts with ffff and we encopde the class number after the colon if 'flowid' not in filter['options'] or filter['options']['flowid'] != f'ffff:{qos_class}': continue ip_hdr_offset = 20 if 'match6' in qos_class_config: ip_hdr_offset = 40 self.assertEqual(ip_hdr_offset, filter['options']['match']['off']) if 'dport' in match_config: dport = int(match_config['dport']) self.assertEqual(f'{dport:x}', filter['options']['match']['value']) def test_11_shaper(self): bandwidth = 250 default_bandwidth = 20 default_ceil = 30 class_bandwidth = 50 class_ceil = 80 dst_address = '192.0.2.8/32' for interface in self._interfaces: shaper_name = f'qos-shaper-{interface}' self.cli_set(base_path + ['interface', interface, 'egress', shaper_name]) self.cli_set(base_path + ['policy', 'shaper', shaper_name, 'bandwidth', f'{bandwidth}mbit']) self.cli_set(base_path + ['policy', 'shaper', shaper_name, 'default', 'bandwidth', f'{default_bandwidth}mbit']) self.cli_set(base_path + ['policy', 'shaper', shaper_name, 'default', 'ceiling', f'{default_ceil}mbit']) self.cli_set(base_path + ['policy', 'shaper', shaper_name, 'default', 'queue-type', 'fair-queue']) self.cli_set(base_path + ['policy', 'shaper', shaper_name, 'class', '23', 'bandwidth', f'{class_bandwidth}mbit']) self.cli_set(base_path + ['policy', 'shaper', shaper_name, 'class', '23', 'ceiling', f'{class_ceil}mbit']) self.cli_set(base_path + ['policy', 'shaper', shaper_name, 'class', '23', 'match', '10', 'ip', 'destination', 'address', dst_address]) bandwidth += 1 default_bandwidth += 1 default_ceil += 1 class_bandwidth += 1 class_ceil += 1 # commit changes self.cli_commit() bandwidth = 250 default_bandwidth = 20 default_ceil = 30 class_bandwidth = 50 class_ceil = 80 for interface in self._interfaces: config_entries = ( f'root rate {bandwidth}Mbit ceil {bandwidth}Mbit', f'prio 0 rate {class_bandwidth}Mbit ceil {class_ceil}Mbit', f'prio 7 rate {default_bandwidth}Mbit ceil {default_ceil}Mbit' ) output = cmd(f'tc class show dev {interface}') for config_entry in config_entries: self.assertIn(config_entry, output) bandwidth += 1 default_bandwidth += 1 default_ceil += 1 class_bandwidth += 1 class_ceil += 1 def test_12_shaper_with_red_queue(self): bandwidth = 100 default_bandwidth = 100 default_burst = 100 interface = self._interfaces[0] class_bandwidth = 50 dst_address = '192.0.2.8/32' shaper_name = f'qos-shaper-{interface}' self.cli_set(base_path + ['interface', interface, 'egress', shaper_name]) self.cli_set(base_path + ['policy', 'shaper', shaper_name, 'bandwidth', f'{bandwidth}mbit']) self.cli_set(base_path + ['policy', 'shaper', shaper_name, 'default', 'bandwidth', f'{default_bandwidth}%']) self.cli_set(base_path + ['policy', 'shaper', shaper_name, 'default', 'burst', f'{default_burst}']) self.cli_set(base_path + ['policy', 'shaper', shaper_name, 'default', 'queue-type', 'random-detect']) self.cli_set(base_path + ['policy', 'shaper', shaper_name, 'class', '2', 'bandwidth', f'{class_bandwidth}mbit']) self.cli_set(base_path + ['policy', 'shaper', shaper_name, 'class', '2', 'match', '10', 'ip', 'destination', 'address', dst_address]) self.cli_set(base_path + ['policy', 'shaper', shaper_name, 'class', '2', 'queue-type', 'random-detect']) # commit changes self.cli_commit() # check root htb config output = cmd(f'tc class show dev {interface}') config_entries = ( f'prio 0 rate {class_bandwidth}Mbit ceil 50Mbit burst 15Kb', # specified class f'prio 7 rate {default_bandwidth}Mbit ceil 100Mbit burst {default_burst}b', # default class ) for config_entry in config_entries: self.assertIn(config_entry, output) output = cmd(f'tc -d qdisc show dev {interface}') config_entries = ( 'qdisc red', # use random detect 'limit 72Kb min 9Kb max 18Kb ewma 3 probability 0.1', # default config for random detect ) for config_entry in config_entries: self.assertIn(config_entry, output) # test random detect queue params self.cli_set(base_path + ['policy', 'shaper', shaper_name, 'default', 'queue-limit', '1024']) self.cli_set(base_path + ['policy', 'shaper', shaper_name, 'default', 'average-packet', '1024']) self.cli_set(base_path + ['policy', 'shaper', shaper_name, 'default', 'maximum-threshold', '32']) self.cli_set(base_path + ['policy', 'shaper', shaper_name, 'default', 'minimum-threshold', '16']) self.cli_set(base_path + ['policy', 'shaper', shaper_name, 'class', '2', 'queue-limit', '1024']) self.cli_set(base_path + ['policy', 'shaper', shaper_name, 'class', '2', 'average-packet', '512']) self.cli_set(base_path + ['policy', 'shaper', shaper_name, 'class', '2', 'maximum-threshold', '32']) self.cli_set(base_path + ['policy', 'shaper', shaper_name, 'class', '2', 'minimum-threshold', '16']) self.cli_set(base_path + ['policy', 'shaper', shaper_name, 'class', '2', 'mark-probability', '20']) self.cli_commit() output = cmd(f'tc -d qdisc show dev {interface}') config_entries = ( 'qdisc red', # use random detect 'limit 1Mb min 16Kb max 32Kb ewma 3 probability 0.1', # default config for random detect 'limit 512Kb min 8Kb max 16Kb ewma 3 probability 0.05', # class config for random detect ) for config_entry in config_entries: self.assertIn(config_entry, output) def test_13_shaper_delete_only_rule(self): default_bandwidth = 100 default_burst = 100 interface = self._interfaces[0] class_bandwidth = 50 class_ceiling = 5 src_address = '10.1.1.0/24' shaper_name = f'qos-shaper-{interface}' self.cli_set(base_path + ['interface', interface, 'egress', shaper_name]) self.cli_set(base_path + ['policy', 'shaper', shaper_name, 'bandwidth', f'10mbit']) self.cli_set(base_path + ['policy', 'shaper', shaper_name, 'default', 'bandwidth', f'{default_bandwidth}mbit']) self.cli_set(base_path + ['policy', 'shaper', shaper_name, 'default', 'burst', f'{default_burst}']) self.cli_set(base_path + ['policy', 'shaper', shaper_name, 'class', '30', 'bandwidth', f'{class_bandwidth}mbit']) self.cli_set(base_path + ['policy', 'shaper', shaper_name, 'class', '30', 'ceiling', f'{class_ceiling}mbit']) self.cli_set(base_path + ['policy', 'shaper', shaper_name, 'class', '30', 'match', 'ADDRESS30', 'ip', 'source', 'address', src_address]) self.cli_set(base_path + ['policy', 'shaper', shaper_name, 'class', '30', 'match', 'ADDRESS30', 'description', 'smoketest']) self.cli_set(base_path + ['policy', 'shaper', shaper_name, 'class', '30', 'priority', '5']) self.cli_set(base_path + ['policy', 'shaper', shaper_name, 'class', '30', 'queue-type', 'fair-queue']) # commit changes self.cli_commit() # check root htb config output = cmd(f'tc class show dev {interface}') config_entries = ( f'prio 5 rate {class_bandwidth}Mbit ceil {class_ceiling}Mbit burst 15Kb', # specified class f'prio 7 rate {default_bandwidth}Mbit ceil 100Mbit burst {default_burst}b', # default class ) for config_entry in config_entries: self.assertIn(config_entry, output) self.assertTrue('' != cmd(f'tc filter show dev {interface}')) # self.cli_delete(base_path + ['policy', 'shaper', shaper_name, 'class', '30', 'match', 'ADDRESS30']) self.cli_delete(base_path + ['policy', 'shaper', shaper_name, 'class', '30', 'match', 'ADDRESS30', 'ip', 'source', 'address', src_address]) self.cli_commit() self.assertEqual('', cmd(f'tc filter show dev {interface}')) + def test_14_policy_limiter_marked_traffic(self): + policy_name = 'smoke_test' + base_policy_path = ['qos', 'policy', 'limiter', policy_name] + + self.cli_set(['qos', 'interface', self._interfaces[0], 'ingress', policy_name]) + self.cli_set(base_policy_path + ['class', '100', 'bandwidth', '20gbit']) + self.cli_set(base_policy_path + ['class', '100', 'burst', '3760k']) + self.cli_set(base_policy_path + ['class', '100', 'match', 'INTERNAL', 'mark', '100']) + self.cli_set(base_policy_path + ['class', '100', 'priority', '20']) + self.cli_set(base_policy_path + ['default', 'bandwidth', '1gbit']) + self.cli_set(base_policy_path + ['default', 'burst', '125000000b']) + self.cli_commit() + + tc_filters = cmd(f'tc filter show dev {self._interfaces[0]} ingress') + # class 100 + self.assertIn('filter parent ffff: protocol all pref 20 fw chain 0', tc_filters) + self.assertIn('action order 1: police 0x1 rate 20Gbit burst 3847500b mtu 2Kb action drop overhead 0b', tc_filters) + # default + self.assertIn('filter parent ffff: protocol all pref 255 basic chain 0', tc_filters) + self.assertIn('action order 1: police 0x2 rate 1Gbit burst 125000000b mtu 2Kb action drop overhead 0b', tc_filters) + if __name__ == '__main__': unittest.main(verbosity=2)