Skip to content

Commit e8d50c9

Browse files
committed
feat(client): add Layer types for client::conn
1 parent b9dc3d2 commit e8d50c9

File tree

2 files changed

+287
-0
lines changed

2 files changed

+287
-0
lines changed

src/client/conn.rs

Lines changed: 284 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,284 @@
1+
//! todo
2+
3+
use std::future::Future;
4+
use std::marker::PhantomData;
5+
use std::pin::Pin;
6+
use std::task::{Context, Poll};
7+
8+
use http::{Request, Response};
9+
use tower_service::Service;
10+
11+
type BoxError = Box<dyn std::error::Error + Send + Sync>;
12+
13+
/// todo
14+
#[cfg(feature = "http1")]
15+
pub struct Http1Layer<B> {
16+
builder: hyper::client::conn::http1::Builder,
17+
_body: PhantomData<fn(B)>,
18+
}
19+
20+
/// todo
21+
#[cfg(feature = "http1")]
22+
pub fn http1<B>() -> Http1Layer<B> {
23+
Http1Layer {
24+
builder: hyper::client::conn::http1::Builder::new(),
25+
_body: PhantomData,
26+
}
27+
}
28+
29+
#[cfg(feature = "http1")]
30+
impl<M, B> tower_layer::Layer<M> for Http1Layer<B> {
31+
type Service = Http1Connect<M, B>;
32+
fn layer(&self, inner: M) -> Self::Service {
33+
Http1Connect {
34+
inner,
35+
builder: self.builder.clone(),
36+
_body: self._body,
37+
}
38+
}
39+
}
40+
41+
#[cfg(feature = "http1")]
42+
impl<B> Clone for Http1Layer<B> {
43+
fn clone(&self) -> Self {
44+
Self {
45+
builder: self.builder.clone(),
46+
_body: self._body.clone(),
47+
}
48+
}
49+
}
50+
51+
#[cfg(feature = "http1")]
52+
impl<B> From<hyper::client::conn::http1::Builder> for Http1Layer<B> {
53+
fn from(builder: hyper::client::conn::http1::Builder) -> Self {
54+
Self {
55+
builder,
56+
_body: PhantomData,
57+
}
58+
}
59+
}
60+
61+
/// todo
62+
#[cfg(feature = "http1")]
63+
pub struct Http1Connect<M, B> {
64+
inner: M,
65+
builder: hyper::client::conn::http1::Builder,
66+
_body: PhantomData<fn(B)>,
67+
}
68+
69+
#[cfg(feature = "http1")]
70+
impl<M, Dst, B> Service<Dst> for Http1Connect<M, B>
71+
where
72+
M: Service<Dst>,
73+
M::Future: Send + 'static,
74+
M::Response: hyper::rt::Read + hyper::rt::Write + Unpin + Send + 'static,
75+
M::Error: Into<BoxError>,
76+
B: hyper::body::Body + Send + 'static,
77+
B::Data: Send + 'static,
78+
B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
79+
{
80+
type Response = Http1ClientService<B>;
81+
type Error = BoxError;
82+
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
83+
84+
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
85+
// Minimal, explicit contract: delegate readiness to the response future.
86+
// If you want strict backpressure, use the "permit" pattern (see notes).
87+
Poll::Ready(Ok(()))
88+
}
89+
90+
fn call(&mut self, dst: Dst) -> Self::Future {
91+
let fut = self.inner.call(dst);
92+
let builder = self.builder.clone();
93+
Box::pin(async move {
94+
let io = fut.await.map_err(Into::into)?;
95+
let (tx, conn) = builder.handshake(io).await?;
96+
tokio::spawn(async move {
97+
if let Err(e) = conn.await {
98+
eprintln!("connection error: {:?}", e);
99+
}
100+
});
101+
Ok(Http1ClientService::new(tx))
102+
})
103+
}
104+
}
105+
106+
#[cfg(feature = "http1")]
107+
impl<M: Clone, B> Clone for Http1Connect<M, B> {
108+
fn clone(&self) -> Self {
109+
Self {
110+
inner: self.inner.clone(),
111+
builder: self.builder.clone(),
112+
_body: self._body.clone(),
113+
}
114+
}
115+
}
116+
117+
/// todo
118+
#[cfg(feature = "http2")]
119+
pub struct Http2Layer<B> {
120+
_body: PhantomData<fn(B)>,
121+
}
122+
123+
/// todo
124+
#[cfg(feature = "http2")]
125+
pub fn http2<B>() -> Http2Layer<B> {
126+
Http2Layer { _body: PhantomData }
127+
}
128+
129+
#[cfg(feature = "http2")]
130+
impl<M, B> tower_layer::Layer<M> for Http2Layer<B> {
131+
type Service = Http2Connect<M, B>;
132+
fn layer(&self, inner: M) -> Self::Service {
133+
Http2Connect {
134+
inner,
135+
builder: hyper::client::conn::http2::Builder::new(crate::rt::TokioExecutor::new()),
136+
_body: self._body,
137+
}
138+
}
139+
}
140+
141+
#[cfg(feature = "http2")]
142+
impl<B> Clone for Http2Layer<B> {
143+
fn clone(&self) -> Self {
144+
Self {
145+
_body: self._body.clone(),
146+
}
147+
}
148+
}
149+
150+
/// todo
151+
#[cfg(feature = "http2")]
152+
#[derive(Debug)]
153+
pub struct Http2Connect<M, B> {
154+
inner: M,
155+
builder: hyper::client::conn::http2::Builder<crate::rt::TokioExecutor>,
156+
_body: PhantomData<fn(B)>,
157+
}
158+
159+
#[cfg(feature = "http2")]
160+
impl<M, Dst, B> Service<Dst> for Http2Connect<M, B>
161+
where
162+
M: Service<Dst>,
163+
M::Future: Send + 'static,
164+
M::Response: hyper::rt::Read + hyper::rt::Write + Unpin + Send + 'static,
165+
M::Error: Into<BoxError>,
166+
B: hyper::body::Body + Unpin + Send + 'static,
167+
B::Data: Send + 'static,
168+
B::Error: Into<BoxError>,
169+
{
170+
type Response = Http2ClientService<B>;
171+
type Error = BoxError;
172+
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
173+
174+
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
175+
// Minimal, explicit contract: delegate readiness to the response future.
176+
// If you want strict backpressure, use the "permit" pattern (see notes).
177+
Poll::Ready(Ok(()))
178+
}
179+
180+
fn call(&mut self, dst: Dst) -> Self::Future {
181+
let fut = self.inner.call(dst);
182+
let builder = self.builder.clone();
183+
Box::pin(async move {
184+
let io = fut.await.map_err(Into::into)?;
185+
let (tx, conn) = builder.handshake(io).await?;
186+
tokio::spawn(async move {
187+
if let Err(e) = conn.await {
188+
eprintln!("connection error: {:?}", e);
189+
}
190+
});
191+
Ok(Http2ClientService::new(tx))
192+
})
193+
}
194+
}
195+
196+
#[cfg(feature = "http2")]
197+
impl<M: Clone, B> Clone for Http2Connect<M, B> {
198+
fn clone(&self) -> Self {
199+
Self {
200+
inner: self.inner.clone(),
201+
builder: self.builder.clone(),
202+
_body: self._body.clone(),
203+
}
204+
}
205+
}
206+
207+
/// A thin adapter over hyper HTTP/1 client SendRequest.
208+
#[cfg(feature = "http1")]
209+
#[derive(Debug)]
210+
pub struct Http1ClientService<B> {
211+
tx: hyper::client::conn::http1::SendRequest<B>,
212+
}
213+
214+
#[cfg(feature = "http1")]
215+
impl<B> Http1ClientService<B> {
216+
/// todo
217+
pub fn new(tx: hyper::client::conn::http1::SendRequest<B>) -> Self {
218+
Self { tx }
219+
}
220+
}
221+
222+
#[cfg(feature = "http1")]
223+
impl<B> Service<Request<B>> for Http1ClientService<B>
224+
where
225+
B: hyper::body::Body + Send + 'static,
226+
{
227+
type Response = Response<hyper::body::Incoming>;
228+
type Error = hyper::Error;
229+
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
230+
231+
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
232+
// Minimal, explicit contract: delegate readiness to the response future.
233+
// If you want strict backpressure, use the "permit" pattern (see notes).
234+
Poll::Ready(Ok(()))
235+
}
236+
237+
fn call(&mut self, req: Request<B>) -> Self::Future {
238+
let fut = self.tx.send_request(req);
239+
Box::pin(async move { Ok(fut.await?) })
240+
}
241+
}
242+
243+
/// todo
244+
#[cfg(feature = "http2")]
245+
#[derive(Debug)]
246+
pub struct Http2ClientService<B> {
247+
tx: hyper::client::conn::http2::SendRequest<B>,
248+
}
249+
250+
#[cfg(feature = "http2")]
251+
impl<B> Http2ClientService<B> {
252+
/// todo
253+
pub fn new(tx: hyper::client::conn::http2::SendRequest<B>) -> Self {
254+
Self { tx }
255+
}
256+
}
257+
258+
#[cfg(feature = "http2")]
259+
impl<B> Service<Request<B>> for Http2ClientService<B>
260+
where
261+
B: hyper::body::Body + Send + 'static,
262+
{
263+
type Response = Response<hyper::body::Incoming>;
264+
type Error = hyper::Error;
265+
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
266+
267+
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
268+
Poll::Ready(Ok(()))
269+
}
270+
271+
fn call(&mut self, req: Request<B>) -> Self::Future {
272+
let fut = self.tx.send_request(req);
273+
Box::pin(async move { Ok(fut.await?) })
274+
}
275+
}
276+
277+
#[cfg(feature = "http2")]
278+
impl<B> Clone for Http2ClientService<B> {
279+
fn clone(&self) -> Self {
280+
Self {
281+
tx: self.tx.clone(),
282+
}
283+
}
284+
}

src/client/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
//! HTTP client utilities
22
3+
#[cfg(any(feature = "http1", feature = "http2"))]
4+
pub mod conn;
5+
36
/// Legacy implementations of `connect` module and `Client`
47
#[cfg(feature = "client-legacy")]
58
pub mod legacy;

0 commit comments

Comments
 (0)