Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 39 additions & 36 deletions iroh/src/endpoint/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,33 +206,19 @@ pub enum AuthenticationError {
RemoteId { source: RemoteEndpointIdError },
#[error("no ALPN provided")]
NoAlpn {},
#[error(transparent)]
ConnectionError {
#[error(std_err)]
source: ConnectionError,
},
#[error("internal consistency error: EndpointStateActor stopped")]
InternalConsistencyError,
}

impl From<EndpointStateActorStoppedError> for AuthenticationError {
#[track_caller]
fn from(_value: EndpointStateActorStoppedError) -> Self {
e!(Self::InternalConsistencyError)
}
}

impl From<EndpointStateActorStoppedError> for ConnectingError {
#[track_caller]
fn from(_value: EndpointStateActorStoppedError) -> Self {
e!(AuthenticationError::InternalConsistencyError).into()
e!(Self::InternalConsistencyError)
}
}

/// Converts a `quinn::Connection` to a `Connection`.
///
/// Returns a [`AuthenticationError`] if the handshake data has not completed,
/// or if no alpn was set by the remote node.
/// Returns an error if there was a connection error, the handshake data has not completed
/// or if the remote did not set an ALPN.
///
/// Otherwise returns a future that completes once the connection has been registered with the
/// magicsock. This future can return an [`EndpointStateActorStoppedError`], which will only be
Expand All @@ -244,13 +230,21 @@ fn conn_from_quinn_conn(
ep: &Endpoint,
) -> Result<
impl Future<Output = Result<Connection, EndpointStateActorStoppedError>> + Send + 'static,
AuthenticationError,
ConnectingError,
> {
if let Some(reason) = conn.close_reason() {
return Err(e!(AuthenticationError::ConnectionError { source: reason }));
}
let remote_id = remote_id_from_quinn_conn(&conn)?;
let alpn = alpn_from_quinn_conn(&conn).ok_or_else(|| e!(AuthenticationError::NoAlpn))?;
let (remote_id, alpn) = match static_info_from_conn(&conn) {
Ok(val) => val,
Err(auth_err) => {
// If the authentication error raced with a connection error, the connection
// error wins.
if let Some(conn_err) = conn.close_reason() {
return Err(e!(ConnectingError::ConnectionError { source: conn_err }));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so this actually fixes the racy thing I had. It took me a while to figure out why. But basically what happens is that because the handshake does complete on the client before the connection is closed, we never get the auth_err here, because all the info is available. So it never checks on whether the connection is already closes. Which in turn apparently means you just get a closed Connection. Which is pretty neat.

I don't think this means this branch can be removed though, because if the client closes before it completes the handshake you can get here again.

} else {
return Err(e!(ConnectingError::HandshakeFailure { source: auth_err }));
}
}
};

// Register this connection with the magicsock.
let fut = ep.msock.register_connection(remote_id, conn.weak_handle());
Ok(async move {
Expand All @@ -264,6 +258,14 @@ fn conn_from_quinn_conn(
})
}

fn static_info_from_conn(
conn: &quinn::Connection,
) -> Result<(EndpointId, Vec<u8>), AuthenticationError> {
let remote_id = remote_id_from_quinn_conn(conn)?;
let alpn = alpn_from_quinn_conn(conn).ok_or_else(|| e!(AuthenticationError::NoAlpn))?;
Ok((remote_id, alpn))
}

/// Returns the [`EndpointId`] from the peer's TLS certificate.
///
/// The [`PublicKey`] of an endpoint is also known as an [`EndpointId`]. This [`PublicKey`] is
Expand Down Expand Up @@ -359,6 +361,7 @@ pub enum AlpnError {
#[stack_error(add_meta, derive, from_sources)]
#[allow(missing_docs)]
#[non_exhaustive]
#[derive(Clone)]
pub enum ConnectingError {
#[error(transparent)]
ConnectionError {
Expand All @@ -367,6 +370,8 @@ pub enum ConnectingError {
},
#[error("Failure finalizing the handshake")]
HandshakeFailure { source: AuthenticationError },
#[error("internal consistency error: EndpointStateActor stopped")]
InternalConsistencyError,
}

impl Connecting {
Expand Down Expand Up @@ -470,10 +475,8 @@ impl Future for Connecting {
return fut.poll_unpin(cx).map_err(Into::into);
} else {
let quinn_conn = std::task::ready!(self.inner.poll_unpin(cx)?);
match conn_from_quinn_conn(quinn_conn, &self.ep) {
Err(err) => return Poll::Ready(Err(err.into())),
Ok(fut) => self.register_with_magicsock = Some(Box::pin(fut.err_into())),
};
let fut = conn_from_quinn_conn(quinn_conn, &self.ep)?;
self.register_with_magicsock = Some(Box::pin(fut.err_into()));
}
}
}
Expand Down Expand Up @@ -555,7 +558,7 @@ impl Future for Accepting {
} else {
let quinn_conn = std::task::ready!(self.inner.poll_unpin(cx)?);
match conn_from_quinn_conn(quinn_conn, &self.ep) {
Err(err) => return Poll::Ready(Err(err.into())),
Err(err) => return Poll::Ready(Err(err)),
Ok(fut) => self.register_with_magicsock = Some(Box::pin(fut.err_into())),
};
}
Expand All @@ -577,7 +580,7 @@ impl Future for Accepting {
#[derive(Debug, Clone)]
pub struct OutgoingZeroRttConnection {
inner: quinn::Connection,
handshake_completed_fut: Shared<BoxFuture<Result<ZeroRttStatus, AuthenticationError>>>,
handshake_completed_fut: Shared<BoxFuture<Result<ZeroRttStatus, ConnectingError>>>,
}

/// Returned from [`OutgoingZeroRttConnection::handshake_completed`].
Expand All @@ -602,17 +605,17 @@ impl OutgoingZeroRttConnection {
/// the handshake will error and any data sent should be re-sent on a
/// new stream.
///
/// This may fail with [`AuthenticationError::ConnectionError`], if there was
/// This may fail with [`ConnectingError::ConnectionError`], if there was
/// some general failure with the connection, such as a network timeout since
/// we initiated the connection.
///
/// This may fail with other [`AuthenticationError`]s, if the other side
/// This may fail with [`ConnectingError::HandshakeFailure`], if the other side
/// doesn't use the right TLS authentication, which usually every iroh endpoint
/// uses and requires.
///
/// Thus, those errors should only occur if someone connects to you with a
/// modified iroh endpoint or with a plain QUIC client.
pub async fn handshake_completed(self) -> Result<ZeroRttStatus, AuthenticationError> {
pub async fn handshake_completed(self) -> Result<ZeroRttStatus, ConnectingError> {
self.handshake_completed_fut.await
}

Expand Down Expand Up @@ -906,23 +909,23 @@ impl OutgoingZeroRttConnection {
#[derive(Debug)]
pub struct IncomingZeroRttConnection {
inner: quinn::Connection,
handshake_completed_fut: Shared<BoxFuture<Result<Connection, AuthenticationError>>>,
handshake_completed_fut: Shared<BoxFuture<Result<Connection, ConnectingError>>>,
}

impl IncomingZeroRttConnection {
/// Waits until the full handshake occurs and then returns a [`Connection`].
///
/// This may fail with [`AuthenticationError::ConnectionError`], if there was
/// This may fail with [`ConnectingError::ConnectionError`], if there was
/// some general failure with the connection, such as a network timeout since
/// we accepted the connection.
///
/// This may fail with other [`AuthenticationError`]s, if the other side
/// This may fail with [`ConnectingError::HandshakeFailure`], if the other side
/// doesn't use the right TLS authentication, which usually every iroh endpoint
/// uses and requires.
///
/// Thus, those errors should only occur if someone connects to you with a
/// modified iroh endpoint or with a plain QUIC client.
pub async fn handshake_completed(self) -> Result<Connection, AuthenticationError> {
pub async fn handshake_completed(self) -> Result<Connection, ConnectingError> {
self.handshake_completed_fut.await
}

Expand Down
Loading