1- use crate :: postgres_store :: {
1+ use crate :: models :: {
22 VssDbRecord , LIST_KEY_VERSIONS_MAX_PAGE_SIZE , MAX_PUT_REQUEST_ITEM_COUNT ,
33} ;
44use api:: error:: VssError ;
@@ -10,41 +10,39 @@ use api::types::{
1010use async_trait:: async_trait;
1111use bytes:: Bytes ;
1212use chrono:: prelude:: Utc ;
13- use std:: collections:: HashMap ;
13+ use std:: collections:: BTreeMap ;
1414use std:: sync:: Arc ;
15- use tokio:: sync:: RwLock ;
15+ use tokio:: sync:: Mutex ;
1616
1717fn build_storage_key ( user_token : & str , store_id : & str , key : & str ) -> String {
1818 format ! ( "{}#{}#{}" , user_token, store_id, key)
1919}
2020
2121/// In-memory implementation of the VSS Store.
2222pub struct InMemoryBackendImpl {
23- store : Arc < RwLock < HashMap < String , VssDbRecord > > > ,
23+ store : Arc < Mutex < BTreeMap < String , VssDbRecord > > > ,
2424}
2525
2626impl InMemoryBackendImpl {
2727 /// Creates an in-memory instance.
2828 pub fn new ( ) -> Self {
29- Self { store : Arc :: new ( RwLock :: new ( HashMap :: new ( ) ) ) }
29+ Self { store : Arc :: new ( Mutex :: new ( BTreeMap :: new ( ) ) ) }
3030 }
3131
3232 fn get_current_global_version (
33- & self , guard : & HashMap < String , VssDbRecord > , user_token : & str , store_id : & str ,
33+ & self , guard : & BTreeMap < String , VssDbRecord > , user_token : & str , store_id : & str ,
3434 ) -> i64 {
3535 let global_key = build_storage_key ( user_token, store_id, GLOBAL_VERSION_KEY ) ;
3636 guard. get ( & global_key) . map ( |r| r. version ) . unwrap_or ( 0 )
3737 }
3838}
3939
40- // Validation functions - check if operations can succeed without modifying data
4140fn validate_put_operation (
42- store : & HashMap < String , VssDbRecord > , user_token : & str , store_id : & str , key_value : & KeyValue ,
41+ store : & BTreeMap < String , VssDbRecord > , user_token : & str , store_id : & str , key_value : & KeyValue ,
4342) -> Result < ( ) , VssError > {
4443 let key = build_storage_key ( user_token, store_id, & key_value. key ) ;
4544
4645 if key_value. version == -1 {
47- // Non-conditional upsert always succeeds
4846 Ok ( ( ) )
4947 } else if key_value. version == 0 {
5048 if store. contains_key ( & key) {
@@ -75,12 +73,11 @@ fn validate_put_operation(
7573}
7674
7775fn validate_delete_operation (
78- store : & HashMap < String , VssDbRecord > , user_token : & str , store_id : & str , key_value : & KeyValue ,
76+ store : & BTreeMap < String , VssDbRecord > , user_token : & str , store_id : & str , key_value : & KeyValue ,
7977) -> Result < ( ) , VssError > {
8078 let key = build_storage_key ( user_token, store_id, & key_value. key ) ;
8179
8280 if key_value. version == -1 {
83- // Non-conditional delete always succeeds
8481 Ok ( ( ) )
8582 } else {
8683 if let Some ( existing) = store. get ( & key) {
@@ -101,20 +98,25 @@ fn validate_delete_operation(
10198 }
10299}
103100
104- fn execute_non_conditional_upsert (
105- store : & mut HashMap < String , VssDbRecord > , user_token : & str , store_id : & str , key_value : KeyValue ,
101+ fn execute_put_object (
102+ store : & mut BTreeMap < String , VssDbRecord > , user_token : & str , store_id : & str ,
103+ key_value : KeyValue ,
106104) {
107105 let key = build_storage_key ( user_token, store_id, & key_value. key ) ;
108106 let now = Utc :: now ( ) ;
109107
110108 match store. entry ( key) {
111- std:: collections:: hash_map :: Entry :: Occupied ( mut occ) => {
109+ std:: collections:: btree_map :: Entry :: Occupied ( mut occ) => {
112110 let existing = occ. get_mut ( ) ;
113- existing. version = INITIAL_RECORD_VERSION as i64 ;
111+ existing. version = if key_value. version == -1 {
112+ INITIAL_RECORD_VERSION as i64
113+ } else {
114+ existing. version . saturating_add ( 1 )
115+ } ;
114116 existing. value = key_value. value . to_vec ( ) ;
115117 existing. last_updated_at = now;
116118 } ,
117- std:: collections:: hash_map :: Entry :: Vacant ( vac) => {
119+ std:: collections:: btree_map :: Entry :: Vacant ( vac) => {
118120 let new_record = VssDbRecord {
119121 user_token : user_token. to_string ( ) ,
120122 store_id : store_id. to_string ( ) ,
@@ -129,51 +131,8 @@ fn execute_non_conditional_upsert(
129131 }
130132}
131133
132- fn execute_conditional_insert (
133- store : & mut HashMap < String , VssDbRecord > , user_token : & str , store_id : & str , key_value : KeyValue ,
134- ) {
135- let key = build_storage_key ( user_token, store_id, & key_value. key ) ;
136- let now = Utc :: now ( ) ;
137-
138- let new_record = VssDbRecord {
139- user_token : user_token. to_string ( ) ,
140- store_id : store_id. to_string ( ) ,
141- key : key_value. key ,
142- value : key_value. value . to_vec ( ) ,
143- version : INITIAL_RECORD_VERSION as i64 ,
144- created_at : now,
145- last_updated_at : now,
146- } ;
147- store. insert ( key, new_record) ;
148- }
149-
150- fn execute_conditional_update (
151- store : & mut HashMap < String , VssDbRecord > , user_token : & str , store_id : & str , key_value : KeyValue ,
152- ) {
153- let key = build_storage_key ( user_token, store_id, & key_value. key ) ;
154- let now = Utc :: now ( ) ;
155-
156- if let Some ( existing) = store. get_mut ( & key) {
157- existing. version = key_value. version . saturating_add ( 1 ) ;
158- existing. value = key_value. value . to_vec ( ) ;
159- existing. last_updated_at = now;
160- }
161- }
162-
163- fn execute_put_object (
164- store : & mut HashMap < String , VssDbRecord > , user_token : & str , store_id : & str , key_value : KeyValue ,
165- ) {
166- if key_value. version == -1 {
167- execute_non_conditional_upsert ( store, user_token, store_id, key_value) ;
168- } else if key_value. version == 0 {
169- execute_conditional_insert ( store, user_token, store_id, key_value) ;
170- } else {
171- execute_conditional_update ( store, user_token, store_id, key_value) ;
172- }
173- }
174-
175134fn execute_delete_object (
176- store : & mut HashMap < String , VssDbRecord > , user_token : & str , store_id : & str ,
135+ store : & mut BTreeMap < String , VssDbRecord > , user_token : & str , store_id : & str ,
177136 key_value : & KeyValue ,
178137) {
179138 let key = build_storage_key ( user_token, store_id, & key_value. key ) ;
@@ -186,9 +145,9 @@ impl KvStore for InMemoryBackendImpl {
186145 & self , user_token : String , request : GetObjectRequest ,
187146 ) -> Result < GetObjectResponse , VssError > {
188147 let key = build_storage_key ( & user_token, & request. store_id , & request. key ) ;
189- let guard = self . store . read ( ) . await ;
148+ let guard = self . store . lock ( ) . await ;
190149
191- if let Some ( record) = guard. get ( & key) {
150+ let result = if let Some ( record) = guard. get ( & key) {
192151 Ok ( GetObjectResponse {
193152 value : Some ( KeyValue {
194153 key : record. key . clone ( ) ,
@@ -197,6 +156,7 @@ impl KvStore for InMemoryBackendImpl {
197156 } ) ,
198157 } )
199158 } else if request. key == GLOBAL_VERSION_KEY {
159+ // Non-zero global version is handled above; this is only for initial version 0.
200160 Ok ( GetObjectResponse {
201161 value : Some ( KeyValue {
202162 key : GLOBAL_VERSION_KEY . to_string ( ) ,
@@ -206,7 +166,9 @@ impl KvStore for InMemoryBackendImpl {
206166 } )
207167 } else {
208168 Err ( VssError :: NoSuchKeyError ( "Requested key not found." . to_string ( ) ) )
209- }
169+ } ;
170+
171+ result
210172 }
211173
212174 async fn put (
@@ -221,7 +183,7 @@ impl KvStore for InMemoryBackendImpl {
221183 }
222184
223185 let store_id = request. store_id . clone ( ) ;
224- let mut guard = self . store . write ( ) . await ;
186+ let mut guard = self . store . lock ( ) . await ;
225187
226188 if let Some ( version) = request. global_version {
227189 validate_put_operation (
@@ -268,7 +230,7 @@ impl KvStore for InMemoryBackendImpl {
268230 } ) ?;
269231
270232 let store_id = request. store_id . clone ( ) ;
271- let mut guard = self . store . write ( ) . await ;
233+ let mut guard = self . store . lock ( ) . await ;
272234
273235 execute_delete_object ( & mut guard, & user_token, & store_id, & key_value) ;
274236
@@ -278,71 +240,56 @@ impl KvStore for InMemoryBackendImpl {
278240 async fn list_key_versions (
279241 & self , user_token : String , request : ListKeyVersionsRequest ,
280242 ) -> Result < ListKeyVersionsResponse , VssError > {
281- let store_id = request. store_id ;
282- let key_prefix = request. key_prefix . unwrap_or ( "" . to_string ( ) ) ;
283- let page_token_option = request. page_token ;
243+ let store_id = request. store_id . clone ( ) ;
244+ let key_prefix = request. key_prefix . clone ( ) . unwrap_or_default ( ) ;
284245 let page_size = request. page_size . unwrap_or ( i32:: MAX ) ;
285246 let limit = std:: cmp:: min ( page_size, LIST_KEY_VERSIONS_MAX_PAGE_SIZE ) as usize ;
286247
287- let ( keys_with_versions , global_version ) = {
288- let guard = self . store . read ( ) . await ;
248+ let offset : usize =
249+ request . page_token . as_ref ( ) . and_then ( |s| s . parse :: < usize > ( ) . ok ( ) ) . unwrap_or ( 0 ) ;
289250
290- let mut global_version: Option < i64 > = None ;
291- if page_token_option. is_none ( ) {
292- global_version =
293- Some ( self . get_current_global_version ( & guard, & user_token, & store_id) ) ;
294- }
251+ let guard = self . store . lock ( ) . await ;
252+
253+ let mut global_version: Option < i64 > = None ;
254+ if offset == 0 {
255+ global_version = Some ( self . get_current_global_version ( & guard, & user_token, & store_id) ) ;
256+ }
295257
296- let storage_prefix = format ! ( "{}#{}#" , user_token, store_id) ;
297- let mut temp: Vec < ( String , i64 ) > = Vec :: new ( ) ;
258+ let storage_prefix = format ! ( "{}#{}#" , user_token, store_id) ;
259+ let prefix_len = storage_prefix. len ( ) ;
260+
261+ let mut all_items: Vec < KeyValue > = guard
262+ . iter ( )
263+ . filter ( |( storage_key, _) | storage_key. starts_with ( & storage_prefix) )
264+ . filter_map ( |( storage_key, record) | {
265+ let key = & storage_key[ prefix_len..] ;
298266
299- for ( storage_key, r) in guard. iter ( ) {
300- if !storage_key. starts_with ( & storage_prefix) {
301- continue ;
302- }
303- let key = & storage_key[ storage_prefix. len ( ) ..] ;
304267 if key == GLOBAL_VERSION_KEY {
305- continue ;
268+ return None ;
306269 }
270+
307271 if !key_prefix. is_empty ( ) && !key. starts_with ( & key_prefix) {
308- continue ;
272+ return None ;
309273 }
310- temp. push ( ( key. to_string ( ) , r. version ) ) ;
311- }
312274
313- ( temp, global_version)
314- } ;
315-
316- let mut keys_with_versions = keys_with_versions;
317- keys_with_versions. sort_by ( |a, b| a. 0 . cmp ( & b. 0 ) ) ;
318-
319- let start_idx = if page_token_option. is_none ( ) {
320- 0
321- } else if page_token_option. as_deref ( ) == Some ( "" ) {
322- keys_with_versions. len ( )
323- } else {
324- let token = page_token_option. as_deref ( ) . unwrap ( ) ;
325- keys_with_versions
326- . iter ( )
327- . position ( |( k, _) | k. as_str ( ) > token)
328- . unwrap_or ( keys_with_versions. len ( ) )
329- } ;
330-
331- let page_items: Vec < KeyValue > = keys_with_versions
332- . iter ( )
333- . skip ( start_idx)
334- . take ( limit)
335- . map ( |( key, version) | KeyValue {
336- key : key. clone ( ) ,
337- value : Bytes :: new ( ) ,
338- version : * version,
275+ Some ( KeyValue {
276+ key : key. to_string ( ) ,
277+ value : Bytes :: new ( ) ,
278+ version : record. version ,
279+ } )
339280 } )
340281 . collect ( ) ;
341282
283+ all_items. sort_by ( |a, b| a. key . cmp ( & b. key ) ) ;
284+
285+ let page_items: Vec < KeyValue > =
286+ all_items. iter ( ) . skip ( offset) . take ( limit) . cloned ( ) . collect ( ) ;
287+
288+ let next_offset = offset + page_items. len ( ) ;
342289 let next_page_token = if page_items. is_empty ( ) {
343290 Some ( "" . to_string ( ) )
344291 } else {
345- page_items . last ( ) . map ( |kv| kv . key . clone ( ) )
292+ Some ( next_offset . to_string ( ) )
346293 } ;
347294
348295 Ok ( ListKeyVersionsResponse { key_versions : page_items, next_page_token, global_version } )
0 commit comments