@@ -79,9 +79,23 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super
7979 insert_statement. bind_text ( 1 , bucket, sqlite:: Destructor :: STATIC ) ?;
8080
8181 // language=SQLite
82- let bucket_statement = db. prepare_v2 ( "INSERT OR IGNORE INTO ps_buckets(name) VALUES(?)" ) ?;
82+ let bucket_statement = db. prepare_v2 (
83+ "INSERT INTO ps_buckets(name)
84+ VALUES(?)
85+ ON CONFLICT DO UPDATE
86+ SET last_applied_op = last_applied_op
87+ RETURNING last_applied_op" ,
88+ ) ?;
8389 bucket_statement. bind_text ( 1 , bucket, sqlite:: Destructor :: STATIC ) ?;
84- bucket_statement. exec ( ) ?;
90+ bucket_statement. step ( ) ?;
91+
92+ // This is an optimization for initial sync - we can avoid persisting individual REMOVE
93+ // operations when last_applied_op = 0.
94+ // We do still need to do the "supersede_statement" step for this case, since a REMOVE
95+ // operation can supersede another PUT operation we're syncing at the same time.
96+ let mut is_empty = bucket_statement. column_int64 ( 0 ) ? == 0 ;
97+
98+ bucket_statement. reset ( ) ?;
8599
86100 let mut last_op: Option < i64 > = None ;
87101 let mut add_checksum: i32 = 0 ;
@@ -96,24 +110,28 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super
96110
97111 last_op = Some ( op_id) ;
98112
113+ let mut key: String = "" . to_string ( ) ;
114+
99115 if op == "PUT" || op == "REMOVE" {
100- let key: String ;
101116 if let ( Ok ( object_type) , Ok ( object_id) ) = ( object_type. as_ref ( ) , object_id. as_ref ( ) ) {
102117 let subkey = iterate_statement. column_text ( 6 ) . unwrap_or ( "null" ) ;
103- key = format ! ( "{}/{}/{}" , & object_type, & object_id, subkey) ;
104- supersede_statement. bind_text ( 2 , & key, sqlite:: Destructor :: STATIC ) ?;
118+ let populated_key = format ! ( "{}/{}/{}" , & object_type, & object_id, subkey) ;
119+
120+ supersede_statement. bind_text ( 2 , & populated_key, sqlite:: Destructor :: STATIC ) ?;
105121 supersede_statement. exec ( ) ?;
106- } else {
107- key = String :: from ( "" ) ;
122+
123+ key = populated_key ;
108124 }
125+ }
109126
127+ if op == "PUT" || ( op == "REMOVE" && !is_empty) {
110128 let opi = if op == "PUT" { 3 } else { 4 } ;
111129 insert_statement. bind_int64 ( 2 , op_id) ?;
112130 insert_statement. bind_int ( 3 , opi) ?;
113- if key == "" {
114- insert_statement. bind_null ( 4 ) ?;
115- } else {
131+ if key != "" {
116132 insert_statement. bind_text ( 4 , & key, sqlite:: Destructor :: STATIC ) ?;
133+ } else {
134+ insert_statement. bind_null ( 4 ) ?;
117135 }
118136
119137 if let ( Ok ( object_type) , Ok ( object_id) ) = ( object_type, object_id) {
@@ -131,7 +149,7 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super
131149
132150 insert_statement. bind_int ( 8 , checksum) ?;
133151 insert_statement. exec ( ) ?;
134- } else if op == "MOVE" {
152+ } else if op == "MOVE" || ( is_empty && op == "REMOVE" ) {
135153 add_checksum = add_checksum. wrapping_add ( checksum) ;
136154 } else if op == "CLEAR" {
137155 // Any remaining PUT operations should get an implicit REMOVE
@@ -151,6 +169,7 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super
151169 clear_statement2. exec ( ) ?;
152170
153171 add_checksum = 0 ;
172+ is_empty = true ;
154173 }
155174 }
156175
0 commit comments