Skip to content

Intoduce tokio/async-based BFD implementation#795

Open
jgallagher wants to merge 32 commits into
mainfrom
john/bfd-async
Open

Intoduce tokio/async-based BFD implementation#795
jgallagher wants to merge 32 commits into
mainfrom
john/bfd-async

Conversation

@jgallagher

Copy link
Copy Markdown
Contributor

This PR adds a tokio/async-based implementation of BFD. It's heavily based on the existing implementation both in API shape and names, but has some intentional architectural differences (primarily to support pretty extensive unit testing). The non-test code is handwritten, but I leaned pretty heavily on Claude to help with writing tests (although I went over all of them by hand - it has a tendency to write flaky-in-a-not-obvious-way tests when sockets are involved, so hopefully I caught all of those) and used it to compare this implementation against the existing one. The primary motivator for moving to async is fixing #787, which it does: running the reproducer against this implementation results in the DELETEs returning more or less immediately (under 1ms).

The biggest architectural changes are:

  1. The state machine is written in a pure "sans I/O" style, and must be driven by an external source that provides both input/output and a clock source. This allows straightforward unit testing without having to deal with timeouts, threads, or sockets.
  2. Each BFD session starts 3 tokio tasks (one to drive the state machine, one to send outgoing packets, and one to sync with the RIB), plus a tokio task for the local listening address that's shared with all BFD sessions using that same listening address. This is pretty close to swapping out threads for tasks; the sync implementation started 3 OS threads per session (two to drive the state machine, one to send outgoing packets) plus a local listening thread shared between sessions. It didn't need a separate thread for RIB syncing, but we do that here to bridge between the sync and async worlds.
  3. Communication between the UDP send/recv tasks and the state machine driver use bounded channels and drop packets if the channels are full. The old implementation used unbounded channels. I don't expect this to matter in practice, but it's a spot where something going off the rails (e.g., a send_to() getting stuck or something) used to mean we'd have unbounded memory growth now means we have a lot of dropped packets instead.

Some less-architectural-but-still-intentional changes:

  1. I made the detection threshold a NonZeroU8, both because I was concerned about multiplying by 0 and because the RFC says such packets MUST be rejected. We now do so, and also return a 400 if we're asked to configure a session with a threshold of 0. This might be the wrong call and I'm happy to discuss - there was a "panic on startup if we can't re-add all peers" path, so I made that path bump any persisted 0s to 1s, but maybe I should do the same in the API handler instead of returning a 400, in case there are Nexus configs persisted that have a threshold of 0?
  2. Some more careful math in the event of unreasonable large numbers (e.g., replacing an as u32 that wrapped around on large values with .saturating_*() methods).
  3. We now increment SessionCounters::control_packets_received on incoming packets - I think this was just an oversight before?
  4. If we fail to bind an egress socket, the old code would retry every 5 seconds; this code retries every time it has a packet it wants to send. I'm not sure if this is better or worse but it was simpler to implement - happy to discuss.

Some things that I left unchanged but I'm not sure are correct:

  1. Calculations of the "next send" and "recv deadline" timers in the state machine - I left TODO-correctness comments on these with my notes but didn't want to change that behavior as part of a big rework.
  2. Outgoing packets don't populate the local detection_multiplier value (also tagged with a TODO-correctness in the state machine).
  3. We close the egress socket and rebind on any error from UdpSocket::send_to(). Could that leave us in a constant loop of closing/opening sockets if sends are failing for some reason not related to the local socket?
  4. When removing the peer, there's no final sync with the RIB (nor is there in the sync implementation) - I think this means if we remove a peer in the Up state, we'll leave behind a nexthop_shudown of false for that address? I don't know whether this is okay or gets cleaned up by something else.
  5. Should required_min_rx also be a NonZero type? If that gets set to 0 we'll also have a "multiply by 0" in the calculation of the recv deadline.

This PR is only the async implementation with no modifications to the main mgd binary except the change of detection threshold to NonZeroU8 mentioned above. I'll open a second PR that swaps the implementations over.

@nicolaskagami nicolaskagami left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(doing a first pass for now focusing on the state machine)

Overall this looks great! This is exactly the place where the sans-io approach with proptesting is really strong. I see you made many of the same decisions I made when working on a similar project.

I agreed with all your TODO-correctness comments. It seems there are a few features missing from our current BFD implementation, so I wonder what our overall plan is for it.

I added a few suggestions which might be a little tiresome but I'd be more than happy to tackle them.

Comment thread mgd/src/main.rs Outdated
Comment on lines +474 to +482
for mut config in configs {
// Backwards-compatibility protection: We now reject a detection
// threshold of 0 (as required by RFC 5880), but it's possible we
// previously persisted a config with such a threshold before adding the
// guards that reject them. Bump it up to 1 to ensure we don't panic in
// the `unwrap_or_else()` below.
if config.detection_threshold == 0 {
config.detection_threshold = 1;
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we would enforce the NonZero detection_threshold in this input type itself so it's unrepresentable (as per "parse, don't validate").

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100% agreed, but this bubbles out to the persisted state, which I have no context for so didn't want to change as part of this PR. I'll file an issue that we should push NonZeroU8 out to both rdb and the API?

@jgallagher jgallagher Jun 23, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oof, ok, so I looked very briefly into this and I think we may have a slightly bigger landmine here than I thought. BfdPeerConfig is part of the versioned HTTP API, so it's defined in the mg-api-types-versions, and we'll follow the standard RFD 619 process for revving that type. If we wanted to change detection_threshold to a NonZeroU8, we'd:

  • add a new API version
  • add a new BfdPeerConfig with the changed field type
  • add either From or TryFrom impls to convert if we get requests from an older client (depending on whether we want to reject requests that have a multiplier of 0, or silently convert them to 1)

That would all work fine at the API level. However, we also persist JSON-ified BfdPeerConfigs on disk, but with no versioning information AFAICT. When we then tried to load them here:

maghemite/rdb/src/db.rs

Lines 471 to 484 in dc53171

let value: BfdPeerConfig = match serde_json::from_str(&value) {
Ok(item) => item,
Err(ref e) => {
rdb_log!(
self,
error,
"error parsing bfd entry value {value:?}: {e}";
"unit" => UNIT_PERSISTENT,
"error" => format!("{e}")
);
return None;
}
};
Some(value)

we'd be trying to deserialize a JSON blob from whatever version was originally persisted as the latest BfdPeerConfig version. Any of these are possible:

  • it just works (e.g., if the change is or happens to be wire-compatible; in this case going from u8 to NonZeroU8 is wire compatible as long as there are no 0 values)
  • it fails (e.g., if we rename fields or add new required fields) - we'd log an error but then skip that config
  • it appears to work but silently does the wrong thing (e.g., if we change the semantics of some field in a way that's handled by the From or TryFrom conversions at the API level but which don't get a chance to run with this "deserialize old blob as new type" path)

We've been burned by this kind of on-disk lack of versioning thing in omicron a few places and have had to go back and add extra infrastructure around on-disk persisted values. Maybe we need to do that here before revving any API types that are also persisted on disk?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh actually maybe all of this is irrelevant - we do persist on disk, but the switch zone has ephemeral storage, so there's not actually a way for us to see an old version disk. I'm not sure why we persist to disk at all, actually - I guess it's for crash recovery (but only if mgd itself crashes - if the sled reboots we don't have any persistence so have to wait for nexus / sled-agent to tell us what to do).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The on-disk db (sled::Db) is only used for daemon restart/crash recovery, like you said. mgd effectively trusts that there is no need to handle backwards compatibility issues with sled::Db, since all upgrades happen via a wholesale zone replacement that mops up the old filesystem anyway.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, makes sense. I think that means I can take out this backcompat shim entirely, right? (We should still push NonZeroU8 out into BfdPeerConfig, but that can happen as a followup.)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think so. And if we're going to start enforcing the value be non zero (which I think is a good thing!) then we may consider just bumping the API rev as part of this. I'm happy to take that on as a follow-up so you don't have to deal with a bunch of boilerplate and crate org stuff.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mind bumping the API version; I've done that a bunch in omicron. But this is only one bit of the BFD peer config that's a little suspect - I filed oxidecomputer/omicron#10657 on the omicron side, but it's almost a 1-to-1 port of mgd's type. Should we also consider:

  1. Should required_rx either be a u32 (to match BFD control packets) or a Duration (since that's what it actually is, then we put the onus of converting to microseconds on mgd internally)?
  2. Should required_rx enforce any upper or lower bounds (via a newtype or runtime checks)?
  3. Is allowing the caller the ability to specify any local listening IpAddr right?
  4. Should get_bfd_peers() return an IdOrdMap<BfdPeerConfig> (keyed by remote address) instead of a Vec<BfdPeerConfig>?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are all great questions... which I don't have the answer to! I haven't had to dig into the BFD protocol or implementation before, so I think I'll have more opinions as I get further along the review

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

de5ab22 bumps the API version to make this param a NonZeroU8. Happy to add more changes pending answers to the other questions, or that can be deferred for later.

Comment thread bfd-async/src/sm.rs
Comment on lines +287 to +295
// TODO-correctness What should we do on "overflows an Instant"? That should
// be _very_ impossible given any reasonable values for `last_recv` (which
// are always from `Instant::now()` and `recv_timeout`, but someone passing
// a truly absurdly large `required_min_rx` could maybe cause problems? For
// now, we'll fall back to a hardcoded, large recv deadline if this addition
// overflows, but the actual value we pick is a total WAG.
last_recv
.checked_add(recv_timeout)
.unwrap_or_else(|| last_recv + Duration::from_secs(60))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm for defaulting to a max duration for recv_timeout. I'd also like to note that this could still panic for unreasonable values of last_recv (near the representational limit).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd also like to note that this could still panic for unreasonable values of last_recv (near the representational limit).

This is true, but in practice last_recv is fully under our control, and is always the result of calling Instant::now() (unlike any of the timeout parameters, which ultimately come from outside, either directly from the BFD peer or indirectly by way of Nexus -> mgd in terms of parameters). I think it's okay to leave the + in here because of that?

I could also drop this in a comment if you think that'd be an improvement. I'm on the fence.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it'd be slightly better to deal with it separately:

  • Make sure recv_timeout is sensible when it is set ("by construction").
  • If the checked add fails you return the maximum representable or something like that. In that case it should only fail if the value was tempered with or we're at the heat-death of the universe so we can return whatever. Not doing this last unchecked add at least doesn't trick us into checking (with our brains) every time we look at it.

Comment thread bfd-async/src/sm.rs Outdated
Comment thread bfd-async/src/sm.rs Outdated
Comment thread bfd-async/src/sm.rs Outdated
Comment on lines +180 to +200
pub fn packet_received(
&mut self,
packet: packet::Control,
now: Instant,
) -> PacketReceivedResult {
self.update_remote_peer_info(&packet);
self.recv_deadline = next_recv_deadline(&self.local, now);

if packet.poll() {
let mut pkt = self.make_packet_to_send();
pkt.set_final();
self.poll_responses.push_back(pkt);
}

match packet.state() {
packet::State::Peer(peer_state) => {
self.update_remote_peer_state(peer_state)
}
packet::State::Unknown(_) => PacketReceivedResult::UnknownPeerState,
}
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that we don't persist the remote state (bfd.RemoteSessionState).

I understand that on normal circumstances it may not matter, but there are a few RFC statements that explicitly check for it (6.8.6, 6.8.7).

Comment thread bfd-async/src/sm.rs
Comment on lines +281 to +285
// I don't think we support demand mode? But that means this calculation is
// wrong for async (wrong multiplier, isn't considering remote min tx)?
let recv_timeout = local
.required_min_rx
.saturating_mul(u32::from(local.detection_multiplier.get()));

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. According to the RFC, we should use the remote detection mult for async mode, though I admit it feels weird to use the remote one.

   In Asynchronous mode, the Detection Time calculated in the local
   system is equal to the value of Detect Mult received from the remote
   system, multiplied by the agreed transmit interval of the remote
   system (the greater of bfd.RequiredMinRxInterval and the last
   received Desired Min TX Interval)

Comment thread bfd-async/src/sm.rs Outdated
Comment thread bfd-async/src/sm.rs Outdated
Comment on lines +180 to +200
pub fn packet_received(
&mut self,
packet: packet::Control,
now: Instant,
) -> PacketReceivedResult {
self.update_remote_peer_info(&packet);
self.recv_deadline = next_recv_deadline(&self.local, now);

if packet.poll() {
let mut pkt = self.make_packet_to_send();
pkt.set_final();
self.poll_responses.push_back(pkt);
}

match packet.state() {
packet::State::Peer(peer_state) => {
self.update_remote_peer_state(peer_state)
}
packet::State::Unknown(_) => PacketReceivedResult::UnknownPeerState,
}
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An argument could be made here to ignore packets signaling an unknown state. Ideally, that would happen along with a more strongly typed packet input. Your addition of a NonZeroU8 field is a great step in this direction (as per "parse, don't validate") but I don't think we should tackle it in this PR.

Comment thread bfd/src/packet.rs Outdated
Comment thread bfd-async/src/daemon.rs
Comment thread bfd-async/src/egress_src_port_iter.rs Outdated
Comment thread bfd-async/src/rib.rs
Comment on lines +76 to +79
let result = tokio::task::spawn_blocking(move || {
sink.set_nexthop_shutdown(nexthop, shutdown);
})
.await;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a bit of a bummer that we spawn a new blocking task every time we want to write something, rather than makingthe worker that does the writing be a dedicated std::thread::spawned thread that just does the blocking write itself, and then blocks until it has more work to do. but i see that we're using watch::Receiver::changed() to wait for a change, and, of course, there is no blocking version of that, so meh whatever.

Comment thread bfd-async/src/session.rs Outdated

@nicolaskagami nicolaskagami left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's looking pretty good.

I'd love other people's opinion on the semantic concerns, particularly on the correctness of the detection multiplier.
Additionally:

  • Not syncing the state when removing a peer sounds concerning to me, but I guess it depends on how we're using it.
  • Are implementing all features we want from BFD?

I also have a few observations below that are entirely syntactical.

use std::sync::atomic::AtomicU16;
use std::sync::atomic::Ordering;

const OFFSET_RANGE: u16 = (u16::MAX - EgressSrcPortIter::SOURCE_PORT_BEGIN) + 1;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

microNIT: I'd keep this and SOURCE_PORT_BEGIN beside each other, wherever that is.

Comment thread bfd-async/src/egress.rs
Comment on lines +194 to +224
let mut first_port_tried = None;
let mut last_port_tried = None;
for _ in 0..max_ports_to_try.get() {
let src_port = src_port_iter.next();

// Keep track of what ports we tried exclusively for the error we return
// if we fail to find a free port.
if first_port_tried.is_none() {
first_port_tried = Some(src_port);
}
last_port_tried = Some(src_port);

let local_addr = SocketAddr::new(local_ip, src_port);
match bind_egress_socket(local_addr).await {
Ok(socket) => return Ok(socket),
Err(BindEgressSocketError::Bind(err))
if err.kind() == io::ErrorKind::AddrInUse =>
{
continue;
}
Err(err) => return Err(err),
}
}

Err(BindEgressSocketError::SrcPortsInUse {
// We know these must be `Some(_)` becaues `max_ports_to_try` is a
// `NonZeroUsize`, so we must have iterated at least once in the for
// loop above, and the first iteration sets both these values.
first_port_tried: first_port_tried.unwrap(),
last_port_tried: last_port_tried.unwrap(),
})

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let mut first_port_tried = None;
let mut last_port_tried = None;
for _ in 0..max_ports_to_try.get() {
let src_port = src_port_iter.next();
// Keep track of what ports we tried exclusively for the error we return
// if we fail to find a free port.
if first_port_tried.is_none() {
first_port_tried = Some(src_port);
}
last_port_tried = Some(src_port);
let local_addr = SocketAddr::new(local_ip, src_port);
match bind_egress_socket(local_addr).await {
Ok(socket) => return Ok(socket),
Err(BindEgressSocketError::Bind(err))
if err.kind() == io::ErrorKind::AddrInUse =>
{
continue;
}
Err(err) => return Err(err),
}
}
Err(BindEgressSocketError::SrcPortsInUse {
// We know these must be `Some(_)` becaues `max_ports_to_try` is a
// `NonZeroUsize`, so we must have iterated at least once in the for
// loop above, and the first iteration sets both these values.
first_port_tried: first_port_tried.unwrap(),
last_port_tried: last_port_tried.unwrap(),
})
let first_port_tried = src_port_iter.next();
let mut last_port_tried = first_port_tried;
for _ in 0..max_ports_to_try.get() {
match bind_egress_socket(SocketAddr::new(local_ip, last_port_tried))
.await
{
Ok(socket) => return Ok(socket),
Err(BindEgressSocketError::Bind(err))
if err.kind() == io::ErrorKind::AddrInUse => {}
Err(err) => return Err(err),
}
last_port_tried = src_port_iter.next();
}
Err(BindEgressSocketError::SrcPortsInUse {
first_port_tried,
last_port_tried,
})

We can remove the unwraps.

Comment on lines +294 to +303
// This is pretty spicy: we're using std's `UdpSocket` so we can
// _synchronously_ bind, even though this function is ultimately called
// from an async context. The arguments for this are:
//
// 1. Binding a UDP socket doesn't involve any network traffic, so
// shouldn't block for long
// 2. Reworking this to allow async binding is pretty involved; we'd
// need to either use an async Mutex (lots of footguns there) or move
// to an actor task pattern to manage the listeners.
let socket = std::net::UdpSocket::bind(listen_addr).map_err(|err| {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is ok. The only reason it could block for long is if you passed it a listen_addr that required name resolution, which the method accepts (ToSocketAddrs).

Comment on lines +268 to +281
trait ListenerBackend: Send + Sync + 'static {
/// Bind a socket at `listen_addr` and spawn a task that reads packets and
/// dispatches them to the per-peer channels in `sessions`.
///
/// Returns the spawned task's handle, or `None` for test fakes that don't
/// bind a real socket. A `Listener` holding `None` is shut down/dropped as
/// a no-op.
fn spawn(
&self,
listen_addr: SocketAddr,
sessions: SharedSessions,
log: Logger,
) -> Result<Option<JoinHandle<()>>, AddPeerError>;
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather not change the signature just for tests. You could remove the Option from here and from Listener and just return Ok(tokio::spawn(async {})) for the FakeBackend.

(tip: for Listener::shutdown you can let _: Result<_, _> = (&mut self.listen_task).await;)

Comment thread bfd-async/src/sm.rs
Comment on lines +287 to +295
// TODO-correctness What should we do on "overflows an Instant"? That should
// be _very_ impossible given any reasonable values for `last_recv` (which
// are always from `Instant::now()` and `recv_timeout`, but someone passing
// a truly absurdly large `required_min_rx` could maybe cause problems? For
// now, we'll fall back to a hardcoded, large recv deadline if this addition
// overflows, but the actual value we pick is a total WAG.
last_recv
.checked_add(recv_timeout)
.unwrap_or_else(|| last_recv + Duration::from_secs(60))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it'd be slightly better to deal with it separately:

  • Make sure recv_timeout is sensible when it is set ("by construction").
  • If the checked add fails you return the maximum representable or something like that. In that case it should only fail if the value was tempered with or we're at the heat-death of the universe so we can return whatever. Not doing this last unchecked add at least doesn't trick us into checking (with our brains) every time we look at it.

Comment thread bfd-async/src/egress.rs
Comment on lines +152 to +162
warn!(
self.log, "udp egress send_to failed; will rebind";
"remote" => %self.remote_addr,
InlineErrorChain::new(&err),
);
// Drop the socket so the next packet triggers a fresh bind.
//
// TODO-correctness Should we _always_ drop and rebind on a send
// failure, or should we be matching on particular kinds of
// errors? Always rebinding is consistent with the prior (sync)
// implementation.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at how send_to can fail it seems it's mostly when trying to convert the target address and very little to do with our socket. So maybe you don't even need to drop it, but I don't think it matters too much.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants