@@ -3,7 +3,6 @@ use std::sync::Arc;
33
44use futures_core:: future:: BoxFuture ;
55use parking_lot:: Mutex as PMutex ;
6- use rusqlite:: types:: ValueRef ;
76use sqld_libsql_bindings:: wal_hook:: { TransparentMethods , TRANSPARENT_METHODS } ;
87use tokio:: sync:: { mpsc, watch, Mutex } ;
98use tokio_stream:: StreamExt ;
@@ -17,15 +16,10 @@ use crate::connection::program::{DescribeCol, DescribeParam};
1716use crate :: error:: Error ;
1817use crate :: namespace:: NamespaceName ;
1918use crate :: query_analysis:: TxnStatus ;
20- use crate :: query_result_builder:: { Column , QueryBuilderConfig , QueryResultBuilder } ;
19+ use crate :: query_result_builder:: { QueryBuilderConfig , QueryResultBuilder } ;
2120use crate :: replication:: FrameNo ;
2221use crate :: rpc:: proxy:: rpc:: proxy_client:: ProxyClient ;
23- use crate :: rpc:: proxy:: rpc:: resp_step:: Step ;
24- use crate :: rpc:: proxy:: rpc:: row_value:: Value ;
25- use crate :: rpc:: proxy:: rpc:: {
26- self , exec_req, exec_resp, AddRowValue , ColsDescription , DisconnectMessage , ExecReq , ExecResp ,
27- Finish , FinishStep , RowValue , StepError ,
28- } ;
22+ use crate :: rpc:: proxy:: rpc:: { self , exec_req, exec_resp, DisconnectMessage , ExecReq , ExecResp } ;
2923use crate :: rpc:: NAMESPACE_METADATA_KEY ;
3024use crate :: stats:: Stats ;
3125use crate :: { Result , DEFAULT_AUTO_CHECKPOINT } ;
@@ -297,7 +291,7 @@ impl RemoteConnection {
297291 Ok ( resp) => {
298292 // there was an interuption, and we moved to the next query
299293 if resp. request_id > request_id {
300- return Err ( Error :: PrimaryStreamInterupted )
294+ return Err ( Error :: PrimaryStreamInterupted ) ;
301295 }
302296
303297 // we can ignore response for previously interupted requests
@@ -327,58 +321,20 @@ impl RemoteConnection {
327321 let mut txn_status = TxnStatus :: Invalid ;
328322 let mut new_frame_no = None ;
329323 let builder_config = self . builder_config ;
330- let cb = |response : exec_resp:: Response | {
331- match response {
332- exec_resp:: Response :: ProgramResp ( resp) => {
333- for step in resp. steps {
334- let Some ( step) = step. step else { return Err ( Error :: PrimaryStreamMisuse ) } ;
335- match step {
336- Step :: Init ( _) => builder. init ( & builder_config) ?,
337- Step :: BeginStep ( _) => builder. begin_step ( ) ?,
338- Step :: FinishStep ( FinishStep {
339- affected_row_count,
340- last_insert_rowid,
341- } ) => builder. finish_step ( affected_row_count, last_insert_rowid) ?,
342- Step :: StepError ( StepError { error : Some ( err) } ) => builder
343- . step_error ( crate :: error:: Error :: RpcQueryError ( err) ) ?,
344- Step :: ColsDescription ( ColsDescription { columns } ) => {
345- let cols = columns. iter ( ) . map ( |c| Column {
346- name : & c. name ,
347- decl_ty : c. decltype . as_deref ( ) ,
348- } ) ;
349- builder. cols_description ( cols) ?
350- }
351- Step :: BeginRows ( _) => builder. begin_rows ( ) ?,
352- Step :: BeginRow ( _) => builder. begin_row ( ) ?,
353- Step :: AddRowValue ( AddRowValue {
354- val : Some ( RowValue { value : Some ( val) } ) ,
355- } ) => {
356- let val = match & val {
357- Value :: Text ( s) => ValueRef :: Text ( s. as_bytes ( ) ) ,
358- Value :: Integer ( i) => ValueRef :: Integer ( * i) ,
359- Value :: Real ( x) => ValueRef :: Real ( * x) ,
360- Value :: Blob ( b) => ValueRef :: Blob ( b. as_slice ( ) ) ,
361- Value :: Null ( _) => ValueRef :: Null ,
362- } ;
363- builder. add_row_value ( val) ?;
364- }
365- Step :: FinishRow ( _) => builder. finish_row ( ) ?,
366- Step :: FinishRows ( _) => builder. finish_rows ( ) ?,
367- Step :: Finish ( f @ Finish { last_frame_no, .. } ) => {
368- txn_status = TxnStatus :: from ( f. state ( ) ) ;
369- new_frame_no = last_frame_no;
370- builder. finish ( last_frame_no, txn_status) ?;
371- return Ok ( false ) ;
372- }
373- _ => return Err ( Error :: PrimaryStreamMisuse ) ,
374- }
375- }
376- }
377- exec_resp:: Response :: DescribeResp ( _) => return Err ( Error :: PrimaryStreamMisuse ) ,
378- exec_resp:: Response :: Error ( e) => return Err ( Error :: RpcQueryError ( e) ) ,
324+ let cb = |response : exec_resp:: Response | match response {
325+ exec_resp:: Response :: ProgramResp ( resp) => {
326+ crate :: rpc:: streaming_exec:: apply_program_resp_to_builder (
327+ & builder_config,
328+ & mut builder,
329+ resp,
330+ |last_frame_no, status| {
331+ txn_status = status;
332+ new_frame_no = last_frame_no;
333+ } ,
334+ )
379335 }
380-
381- Ok ( true )
336+ exec_resp :: Response :: DescribeResp ( _ ) => Err ( Error :: PrimaryStreamMisuse ) ,
337+ exec_resp :: Response :: Error ( e ) => Err ( Error :: RpcQueryError ( e ) ) ,
382338 } ;
383339
384340 self . make_request (
@@ -395,32 +351,30 @@ impl RemoteConnection {
395351 #[ allow( dead_code) ] // reference implementation
396352 async fn describe ( & mut self , stmt : String ) -> crate :: Result < DescribeResponse > {
397353 let mut out = None ;
398- let cb = |response : exec_resp:: Response | {
399- match response {
400- exec_resp:: Response :: DescribeResp ( resp) => {
401- out = Some ( DescribeResponse {
402- params : resp
403- . params
404- . into_iter ( )
405- . map ( |p| DescribeParam { name : p. name } )
406- . collect ( ) ,
407- cols : resp
408- . cols
409- . into_iter ( )
410- . map ( |c| DescribeCol {
411- name : c. name ,
412- decltype : c. decltype ,
413- } )
414- . collect ( ) ,
415- is_explain : resp. is_explain ,
416- is_readonly : resp. is_readonly ,
417- } ) ;
418-
419- Ok ( false )
420- }
421- exec_resp:: Response :: Error ( e) => Err ( Error :: RpcQueryError ( e) ) ,
422- exec_resp:: Response :: ProgramResp ( _) => Err ( Error :: PrimaryStreamMisuse ) ,
354+ let cb = |response : exec_resp:: Response | match response {
355+ exec_resp:: Response :: DescribeResp ( resp) => {
356+ out = Some ( DescribeResponse {
357+ params : resp
358+ . params
359+ . into_iter ( )
360+ . map ( |p| DescribeParam { name : p. name } )
361+ . collect ( ) ,
362+ cols : resp
363+ . cols
364+ . into_iter ( )
365+ . map ( |c| DescribeCol {
366+ name : c. name ,
367+ decltype : c. decltype ,
368+ } )
369+ . collect ( ) ,
370+ is_explain : resp. is_explain ,
371+ is_readonly : resp. is_readonly ,
372+ } ) ;
373+
374+ Ok ( false )
423375 }
376+ exec_resp:: Response :: Error ( e) => Err ( Error :: RpcQueryError ( e) ) ,
377+ exec_resp:: Response :: ProgramResp ( _) => Err ( Error :: PrimaryStreamMisuse ) ,
424378 } ;
425379
426380 self . make_request (
@@ -520,10 +474,11 @@ pub mod test {
520474 use arbitrary:: { Arbitrary , Unstructured } ;
521475 use bytes:: Bytes ;
522476 use rand:: Fill ;
477+ use rusqlite:: types:: ValueRef ;
523478
524479 use super :: * ;
525480 use crate :: {
526- query_result_builder:: { test:: test_driver, QueryResultBuilderError } ,
481+ query_result_builder:: { test:: test_driver, QueryResultBuilderError , Column } ,
527482 rpc:: proxy:: rpc:: { query_result:: RowResult , ExecuteResults } ,
528483 } ;
529484
0 commit comments