Page Menu
Home
VyOS Platform
Search
Configure Global Search
Log In
Files
F35450763
vyos-load-balancer.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Flag For Later
Award Token
Size
13 KB
Referenced Files
None
Subscribers
None
vyos-load-balancer.py
View Options
#!/usr/bin/python3
# Copyright VyOS maintainers and contributors <maintainers@vyos.io>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
import
json
import
os
import
signal
import
sys
import
time
from
vyos.config
import
Config
from
vyos.template
import
render
from
vyos.utils.commit
import
commit_in_progress
from
vyos.utils.dict
import
dict_search_args
from
vyos.utils.network
import
get_interface_address
from
vyos.utils.process
import
rc_cmd
from
vyos.utils.process
import
run
from
vyos.xml_ref
import
get_defaults
from
vyos.wanloadbalance
import
health_ping_host
from
vyos.wanloadbalance
import
health_ping_host_ttl
from
vyos.wanloadbalance
import
parse_dhcp_nexthop
from
vyos.wanloadbalance
import
parse_ppp_nexthop
nftables_wlb_conf
=
'/run/nftables_wlb.conf'
wlb_status_file
=
'/run/wlb_status.json'
wlb_pid_file
=
'/run/wlb_daemon.pid'
sleep_interval
=
5
# Main loop sleep interval
def
health_check
(
ifname
,
conf
,
state
,
test_defaults
):
# Run health tests for interface
if
get_ipv4_address
(
ifname
)
is
None
:
return
False
if
'test'
not
in
conf
:
resp_time
=
test_defaults
[
'resp-time'
]
target
=
conf
[
'nexthop'
]
if
target
==
'dhcp'
:
target
=
state
[
'dhcp_nexthop'
]
if
not
target
:
return
False
return
health_ping_host
(
target
,
ifname
,
wait_time
=
resp_time
)
for
test_id
,
test_conf
in
conf
[
'test'
]
.
items
():
check_type
=
test_conf
[
'type'
]
if
check_type
==
'ping'
:
resp_time
=
test_conf
[
'resp_time'
]
target
=
test_conf
[
'target'
]
if
not
health_ping_host
(
target
,
ifname
,
wait_time
=
resp_time
):
return
False
elif
check_type
==
'ttl'
:
target
=
test_conf
[
'target'
]
ttl_limit
=
test_conf
[
'ttl_limit'
]
if
not
health_ping_host_ttl
(
target
,
ifname
,
ttl_limit
=
ttl_limit
):
return
False
elif
check_type
==
'user-defined'
:
script
=
test_conf
[
'test_script'
]
rc
=
run
(
script
)
if
rc
!=
0
:
return
False
return
True
def
on_state_change
(
lb
,
ifname
,
state
):
# Run hook on state change
if
'hook'
in
lb
:
script_path
=
os
.
path
.
join
(
'/config/scripts/'
,
lb
[
'hook'
])
env
=
{
'WLB_INTERFACE_NAME'
:
ifname
,
'WLB_INTERFACE_STATE'
:
'ACTIVE'
if
state
else
'FAILED'
}
code
=
run
(
script_path
,
env
=
env
)
if
code
!=
0
:
print
(
'WLB hook returned non-zero error code'
)
print
(
f
'INFO: State change: {ifname} -> {state}'
)
def
get_ipv4_address
(
ifname
):
# Get primary ipv4 address on interface (for source nat)
addr_json
=
get_interface_address
(
ifname
)
if
addr_json
and
'addr_info'
in
addr_json
and
len
(
addr_json
[
'addr_info'
])
>
0
:
for
addr_info
in
addr_json
[
'addr_info'
]:
if
addr_info
[
'family'
]
==
'inet'
:
if
'local'
in
addr_info
:
return
addr_json
[
'addr_info'
][
0
][
'local'
]
return
None
def
get_dynamic_nexthop
(
ifname
:
str
)
->
str
|
None
|
bool
:
'''
Resolve the dynamic next-hop for a WAN interface.
Determines the current default gateway learned dynamically on the interface:
- PPPoE interfaces (`pppoe*`): uses `parse_ppp_nexthop`.
- Other interfaces (e.g. DHCP): uses `parse_dhcp_nexthop`.
Return values:
- str: IPv4 next-hop address
- None: when DHCP lease has no router value
- False: when PPPoE nexthop state file is missing
Args:
ifname: Interface name (e.g. 'pppoe0', 'eth0').
Returns:
See above for possible values.
'''
if
ifname
.
startswith
(
'pppoe'
):
return
parse_ppp_nexthop
(
ifname
)
else
:
return
parse_dhcp_nexthop
(
ifname
)
def
dynamic_nexthop_update
(
lb
,
ifname
):
# Update on DHCP/PPP address/nexthop changes
# Return True if nftables needs to be updated - IP change
if
'dhcp_nexthop'
in
lb
[
'health_state'
][
ifname
]:
dhcp_nexthop_addr
=
get_dynamic_nexthop
(
ifname
)
table_num
=
lb
[
'health_state'
][
ifname
][
'table_number'
]
if
dhcp_nexthop_addr
and
lb
[
'health_state'
][
ifname
][
'dhcp_nexthop'
]
!=
dhcp_nexthop_addr
:
lb
[
'health_state'
][
ifname
][
'dhcp_nexthop'
]
=
dhcp_nexthop_addr
run
(
f
'ip route replace table {table_num} default dev {ifname} via {dhcp_nexthop_addr}'
)
if_addr
=
get_ipv4_address
(
ifname
)
if
if_addr
and
if_addr
!=
lb
[
'health_state'
][
ifname
][
'if_addr'
]:
lb
[
'health_state'
][
ifname
][
'if_addr'
]
=
if_addr
return
True
return
False
def
restore_default_route
(
lb
:
dict
,
ifname
:
str
)
->
None
:
"""
Restores a missing default route for a WAN interface in its policy routing table.
When a link flap or DHCP/PPP renegotiation removes the per-interface default route,
this function checks the interface’s assigned table for an existing default entry.
If none is found, it determines the proper next-hop (from DHCP, PPP, or static config)
and reinstalls the route using:
ip route replace table <table_num> default dev <ifname> via <nexthop>
@param lb Load-balancer state/config dictionary.
@param ifname Interface name whose default route should be verified and restored.
@returns None — exits quietly if the table number or next-hop cannot be found.
"""
table_num
=
dict_search_args
(
lb
,
'health_state'
,
ifname
,
'table_number'
)
if
not
table_num
:
return
rc
,
out
=
rc_cmd
(
f
'ip -j route show default table {table_num}'
)
if
rc
==
0
:
rt_table
=
json
.
loads
(
out
)
if
len
(
rt_table
)
>
0
:
return
else
:
if
'dhcp_nexthop'
in
lb
[
'health_state'
][
ifname
]:
nexthop_addr
=
get_dynamic_nexthop
(
ifname
)
else
:
nexthop_addr
=
dict_search_args
(
lb
,
'interface_health'
,
ifname
,
'nexthop'
)
if
nexthop_addr
:
run
(
f
'ip route replace table {table_num} default dev {ifname} via {nexthop_addr}'
)
else
:
return
def
nftables_update
(
lb
):
# Atomically reload nftables table from template
if
not
os
.
path
.
exists
(
nftables_wlb_conf
):
lb
[
'first_install'
]
=
True
elif
'first_install'
in
lb
:
del
lb
[
'first_install'
]
render
(
nftables_wlb_conf
,
'load-balancing/nftables-wlb.j2'
,
lb
)
rc
,
out
=
rc_cmd
(
f
'nft -f {nftables_wlb_conf}'
)
if
rc
!=
0
:
print
(
'ERROR: Failed to apply WLB nftables config'
)
print
(
'Output:'
,
out
)
return
False
return
True
def
cleanup
(
lb
):
if
'interface_health'
in
lb
:
index
=
1
for
ifname
,
health_conf
in
lb
[
'interface_health'
]
.
items
():
table_num
=
lb
[
'mark_offset'
]
+
index
run
(
f
'ip route del table {table_num} default'
)
run
(
f
'ip rule del fwmark {hex(table_num)} table {table_num}'
)
index
+=
1
run
(
f
'nft delete table ip vyos_wanloadbalance'
)
def
get_config
():
conf
=
Config
()
base
=
[
'load-balancing'
,
'wan'
]
lb
=
conf
.
get_config_dict
(
base
,
key_mangling
=
(
'-'
,
'_'
),
get_first_key
=
True
,
with_recursive_defaults
=
True
)
lb
[
'firewall_group'
]
=
conf
.
get_config_dict
([
'firewall'
,
'group'
],
key_mangling
=
(
'-'
,
'_'
),
get_first_key
=
True
,
no_tag_node_value_mangle
=
True
)
# prune limit key if not set by user
for
rule
in
lb
.
get
(
'rule'
,
[]):
if
lb
.
from_defaults
([
'rule'
,
rule
,
'limit'
]):
del
lb
[
'rule'
][
rule
][
'limit'
]
lb
[
'test_defaults'
]
=
get_defaults
(
base
+
[
'interface-health'
,
'A'
,
'test'
,
'B'
],
get_first_key
=
True
)
return
lb
if
__name__
==
'__main__'
:
while
commit_in_progress
():
print
(
"Notice: Waiting for commit to complete..."
)
time
.
sleep
(
1
)
lb
=
get_config
()
lb
[
'health_state'
]
=
{}
lb
[
'mark_offset'
]
=
0xc8
# Create state dicts, interface address and nexthop, install routes and ip rules
if
'interface_health'
in
lb
:
index
=
1
for
ifname
,
health_conf
in
lb
[
'interface_health'
]
.
items
():
table_num
=
lb
[
'mark_offset'
]
+
index
addr
=
get_ipv4_address
(
ifname
)
lb
[
'health_state'
][
ifname
]
=
{
'if_addr'
:
addr
,
'failure_count'
:
0
,
'success_count'
:
0
,
'last_success'
:
0
,
'last_failure'
:
0
,
'state'
:
addr
is
not
None
,
'state_changed'
:
False
,
'table_number'
:
table_num
,
'mark'
:
hex
(
table_num
)
}
if
health_conf
[
'nexthop'
]
==
'dhcp'
:
lb
[
'health_state'
][
ifname
][
'dhcp_nexthop'
]
=
None
dynamic_nexthop_update
(
lb
,
ifname
)
else
:
run
(
f
'ip route replace table {table_num} default dev {ifname} via {health_conf["nexthop"]}'
)
run
(
f
'ip rule add fwmark {hex(table_num)} table {table_num}'
)
index
+=
1
nftables_update
(
lb
)
run
(
'ip route flush cache'
)
if
'flush_connections'
in
lb
:
run
(
'conntrack --delete'
)
run
(
'conntrack -F expect'
)
with
open
(
wlb_status_file
,
'w'
)
as
f
:
f
.
write
(
json
.
dumps
(
lb
[
'health_state'
]))
# Signal handler SIGUSR2 -> dhcpcd update
def
handle_sigusr2
(
signum
,
frame
):
for
ifname
,
health_conf
in
lb
[
'interface_health'
]
.
items
():
if
'nexthop'
in
health_conf
and
health_conf
[
'nexthop'
]
==
'dhcp'
:
retval
=
dynamic_nexthop_update
(
lb
,
ifname
)
if
retval
:
nftables_update
(
lb
)
# Signal handler SIGTERM -> exit
def
handle_sigterm
(
signum
,
frame
):
if
os
.
path
.
exists
(
wlb_status_file
):
os
.
unlink
(
wlb_status_file
)
if
os
.
path
.
exists
(
wlb_pid_file
):
os
.
unlink
(
wlb_pid_file
)
if
os
.
path
.
exists
(
nftables_wlb_conf
):
os
.
unlink
(
nftables_wlb_conf
)
cleanup
(
lb
)
sys
.
exit
(
0
)
signal
.
signal
(
signal
.
SIGUSR2
,
handle_sigusr2
)
signal
.
signal
(
signal
.
SIGINT
,
handle_sigterm
)
signal
.
signal
(
signal
.
SIGTERM
,
handle_sigterm
)
with
open
(
wlb_pid_file
,
'w'
)
as
f
:
f
.
write
(
str
(
os
.
getpid
()))
# Main loop
init
=
True
;
try
:
while
True
:
ip_change
=
False
if
'interface_health'
in
lb
:
for
ifname
,
health_conf
in
lb
[
'interface_health'
]
.
items
():
state
=
lb
[
'health_state'
][
ifname
]
result
=
health_check
(
ifname
,
health_conf
,
state
=
state
,
test_defaults
=
lb
[
'test_defaults'
])
state_changed
=
result
!=
state
[
'state'
]
state
[
'state_changed'
]
=
False
if
result
:
state
[
'failure_count'
]
=
0
state
[
'success_count'
]
+=
1
state
[
'last_success'
]
=
time
.
time
()
if
state_changed
and
state
[
'success_count'
]
>=
int
(
health_conf
[
'success_count'
]):
state
[
'state'
]
=
True
state
[
'state_changed'
]
=
True
elif
not
result
:
state
[
'failure_count'
]
+=
1
state
[
'success_count'
]
=
0
state
[
'last_failure'
]
=
time
.
time
()
if
state_changed
and
state
[
'failure_count'
]
>=
int
(
health_conf
[
'failure_count'
]):
state
[
'state'
]
=
False
state
[
'state_changed'
]
=
True
#Force state changed to trigger the first write
if
init
==
True
:
state
[
'state_changed'
]
=
True
init
=
False
if
state
[
'state_changed'
]:
state
[
'if_addr'
]
=
get_ipv4_address
(
ifname
)
on_state_change
(
lb
,
ifname
,
state
[
'state'
])
if
dynamic_nexthop_update
(
lb
,
ifname
):
ip_change
=
True
restore_default_route
(
lb
,
ifname
)
if
any
(
state
[
'state_changed'
]
for
ifname
,
state
in
lb
[
'health_state'
]
.
items
()):
if
not
nftables_update
(
lb
):
break
run
(
'ip route flush cache'
)
if
'flush_connections'
in
lb
:
run
(
'conntrack --delete'
)
run
(
'conntrack -F expect'
)
with
open
(
wlb_status_file
,
'w'
)
as
f
:
f
.
write
(
json
.
dumps
(
lb
[
'health_state'
]))
elif
ip_change
:
nftables_update
(
lb
)
time
.
sleep
(
sleep_interval
)
except
Exception
as
e
:
print
(
'WLB ERROR:'
,
e
)
if
os
.
path
.
exists
(
wlb_status_file
):
os
.
unlink
(
wlb_status_file
)
if
os
.
path
.
exists
(
wlb_pid_file
):
os
.
unlink
(
wlb_pid_file
)
if
os
.
path
.
exists
(
nftables_wlb_conf
):
os
.
unlink
(
nftables_wlb_conf
)
cleanup
(
lb
)
File Metadata
Details
Attached
Mime Type
text/x-script.python
Expires
Tue, Dec 9, 10:51 PM (1 d, 7 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3096754
Default Alt Text
vyos-load-balancer.py (13 KB)
Attached To
Mode
rVYOSONEX vyos-1x
Attached
Detach File
Event Timeline
Log In to Comment