diff --git a/core/rs/core/src/bootstrap.rs b/core/rs/core/src/bootstrap.rs index c3fc2b1e..75918d56 100644 --- a/core/rs/core/src/bootstrap.rs +++ b/core/rs/core/src/bootstrap.rs @@ -48,6 +48,27 @@ fn create_site_id_and_site_id_table(db: *mut sqlite3) -> Result<[u8; 16], Result insert_site_id(db) } +pub fn create_site_id_triggers(db: *mut sqlite3) -> Result { + db.exec_safe(&format!( + "CREATE TRIGGER IF NOT EXISTS {tbl}_insert_trig AFTER INSERT ON \"{tbl}\" + WHEN NEW.ordinal != 0 + BEGIN + VALUES (crsql_update_site_id(NEW.site_id, NEW.ordinal)); + END; + CREATE TRIGGER IF NOT EXISTS {tbl}_update_trig AFTER UPDATE ON \"{tbl}\" + WHEN NEW.ordinal != 0 + BEGIN + VALUES (crsql_update_site_id(NEW.site_id, NEW.ordinal)); + END; + CREATE TRIGGER IF NOT EXISTS {tbl}_delete_trig AFTER DELETE ON \"{tbl}\" + WHEN OLD.ordinal != 0 + BEGIN + VALUES (crsql_update_site_id(OLD.site_id, -1)); + END;", + tbl = consts::TBL_SITE_ID + )) +} + #[no_mangle] pub extern "C" fn crsql_init_peer_tracking_table(db: *mut sqlite3) -> c_int { match db.exec_safe("CREATE TABLE IF NOT EXISTS crsql_tracked_peers (\"site_id\" BLOB NOT NULL, \"version\" INTEGER NOT NULL, \"seq\" INTEGER DEFAULT 0, \"tag\" INTEGER, \"event\" INTEGER, PRIMARY KEY (\"site_id\", \"tag\", \"event\")) STRICT;") { diff --git a/core/rs/core/src/c.rs b/core/rs/core/src/c.rs index 6569acc4..3a1539d9 100644 --- a/core/rs/core/src/c.rs +++ b/core/rs/core/src/c.rs @@ -73,6 +73,7 @@ pub struct crsql_ExtData { pub pSelectClockTablesStmt: *mut sqlite::stmt, pub mergeEqualValues: ::core::ffi::c_int, pub timestamp: ::core::ffi::c_ulonglong, + pub ordinalMap: *mut ::core::ffi::c_void, } #[repr(C)] @@ -108,10 +109,12 @@ extern "C" { db: *mut sqlite::sqlite3, pExtData: *mut crsql_ExtData, ) -> c_int; - pub fn crsql_newExtData( + pub fn crsql_newExtData(db: *mut sqlite::sqlite3) -> *mut crsql_ExtData; + pub fn crsql_initSiteIdExt( db: *mut sqlite::sqlite3, + pExtData: *mut crsql_ExtData, siteIdBuffer: *mut c_char, - ) -> *mut crsql_ExtData; + ) -> c_int; pub fn crsql_freeExtData(pExtData: *mut crsql_ExtData); pub fn crsql_finalize(pExtData: *mut crsql_ExtData); } @@ -268,7 +271,7 @@ fn bindgen_test_layout_crsql_ExtData() { let ptr = UNINIT.as_ptr(); assert_eq!( ::core::mem::size_of::(), - 160usize, + 168usize, concat!("Size of: ", stringify!(crsql_ExtData)) ); assert_eq!( @@ -498,4 +501,14 @@ fn bindgen_test_layout_crsql_ExtData() { stringify!(timestamp) ) ); + assert_eq!( + unsafe { ::core::ptr::addr_of!((*ptr).ordinalMap) as usize - ptr as usize }, + 160usize, + concat!( + "Offset of field: ", + stringify!(crsql_ExtData), + "::", + stringify!(ordinalMap) + ) + ); } diff --git a/core/rs/core/src/changes_vtab.rs b/core/rs/core/src/changes_vtab.rs index 6ef95c58..e75b80ab 100644 --- a/core/rs/core/src/changes_vtab.rs +++ b/core/rs/core/src/changes_vtab.rs @@ -1,5 +1,5 @@ extern crate alloc; -use crate::alloc::string::ToString; +use crate::alloc::{collections::BTreeMap, string::ToString}; use crate::changes_vtab_write::crsql_merge_insert; use crate::stmt_cache::reset_cached_stmt; use crate::tableinfo::{crsql_ensure_table_infos_are_up_to_date, TableInfo}; @@ -565,3 +565,27 @@ pub extern "C" fn crsql_changes_commit(vtab: *mut sqlite::vtab) -> c_int { } ResultCode::OK as c_int } + +#[no_mangle] +pub extern "C" fn crsql_changes_savepoint(_vtab: *mut sqlite::vtab, _n: c_int) -> c_int { + ResultCode::OK as c_int +} + +#[no_mangle] +pub extern "C" fn crsql_changes_release(_vtab: *mut sqlite::vtab, _n: c_int) -> c_int { + ResultCode::OK as c_int +} + +// clear ordinal cache on rollback so we don't have wrong data in the cache. +#[no_mangle] +pub extern "C" fn crsql_changes_rollback_to(vtab: *mut sqlite::vtab, _: c_int) -> c_int { + let tab = vtab.cast::(); + + let mut ordinals = unsafe { + mem::ManuallyDrop::new(Box::from_raw( + (*(*tab).pExtData).ordinalMap as *mut BTreeMap, i64>, + )) + }; + ordinals.clear(); + ResultCode::OK as c_int +} diff --git a/core/rs/core/src/commit.rs b/core/rs/core/src/commit.rs index 1c29bba3..4c9e4335 100644 --- a/core/rs/core/src/commit.rs +++ b/core/rs/core/src/commit.rs @@ -1,5 +1,7 @@ +use alloc::{boxed::Box, collections::BTreeMap, vec::Vec}; use core::{ ffi::{c_int, c_void}, + mem, ptr::null, }; @@ -31,4 +33,9 @@ pub unsafe fn commit_or_rollback_reset(ext_data: *mut crsql_ExtData) { (*ext_data).seq = 0; (*ext_data).timestamp = 0; (*ext_data).updatedTableInfosThisTx = 0; + + let mut ordinals: mem::ManuallyDrop, i64>>> = mem::ManuallyDrop::new( + Box::from_raw((*ext_data).ordinalMap as *mut BTreeMap, i64>), + ); + ordinals.clear(); } diff --git a/core/rs/core/src/db_version.rs b/core/rs/core/src/db_version.rs index 19521678..9aa60473 100644 --- a/core/rs/core/src/db_version.rs +++ b/core/rs/core/src/db_version.rs @@ -187,6 +187,12 @@ pub extern "C" fn crsql_init_last_db_versions_map(ext_data: *mut crsql_ExtData) unsafe { (*ext_data).lastDbVersions = Box::into_raw(Box::new(map)) as *mut c_void } } +#[no_mangle] +pub extern "C" fn crsql_init_ordinal_map(ext_data: *mut crsql_ExtData) { + let map: BTreeMap, i64> = BTreeMap::new(); + unsafe { (*ext_data).ordinalMap = Box::into_raw(Box::new(map)) as *mut c_void } +} + #[no_mangle] pub extern "C" fn crsql_drop_last_db_versions_map(ext_data: *mut crsql_ExtData) { unsafe { @@ -196,6 +202,15 @@ pub extern "C" fn crsql_drop_last_db_versions_map(ext_data: *mut crsql_ExtData) } } +#[no_mangle] +pub extern "C" fn crsql_drop_ordinal_map(ext_data: *mut crsql_ExtData) { + unsafe { + drop(Box::from_raw( + (*ext_data).ordinalMap as *mut BTreeMap, i64>, + )); + } +} + pub fn insert_db_version( ext_data: *mut crsql_ExtData, insert_site_id: &[u8], @@ -258,6 +273,15 @@ pub unsafe fn get_or_set_site_ordinal( ext_data: *mut crsql_ExtData, site_id: &[u8], ) -> Result { + // check the cache first + let mut ordinals: mem::ManuallyDrop, i64>>> = mem::ManuallyDrop::new( + Box::from_raw((*ext_data).ordinalMap as *mut BTreeMap, i64>), + ); + + if let Some(ordinal) = ordinals.get(site_id) { + return Ok(*ordinal); + } + let bind_result = (*ext_data) .pSelectSiteIdOrdinalStmt @@ -268,11 +292,11 @@ pub unsafe fn get_or_set_site_ordinal( return Err(rc); } - match (*ext_data).pSelectSiteIdOrdinalStmt.step() { + let ordinal = match (*ext_data).pSelectSiteIdOrdinalStmt.step() { Ok(ResultCode::ROW) => { let ordinal = (*ext_data).pSelectSiteIdOrdinalStmt.column_int64(0); reset_cached_stmt((*ext_data).pSelectSiteIdOrdinalStmt)?; - Ok(ordinal) + ordinal } Ok(_) => { reset_cached_stmt((*ext_data).pSelectSiteIdOrdinalStmt)?; @@ -296,7 +320,7 @@ pub unsafe fn get_or_set_site_ordinal( Ok(_) => { let ordinal = (*ext_data).pSetSiteIdOrdinalStmt.column_int64(0); reset_cached_stmt((*ext_data).pSetSiteIdOrdinalStmt)?; - Ok(ordinal) + ordinal } Err(rc) => { reset_cached_stmt((*ext_data).pSetSiteIdOrdinalStmt)?; @@ -308,5 +332,7 @@ pub unsafe fn get_or_set_site_ordinal( reset_cached_stmt((*ext_data).pSetSiteIdOrdinalStmt)?; return Err(rc); } - } + }; + ordinals.insert(site_id.to_vec(), ordinal); + Ok(ordinal) } diff --git a/core/rs/core/src/lib.rs b/core/rs/core/src/lib.rs index 13473056..a8a0554d 100644 --- a/core/rs/core/src/lib.rs +++ b/core/rs/core/src/lib.rs @@ -48,9 +48,9 @@ mod triggers; mod unpack_columns_vtab; mod util; -use alloc::borrow::Cow; use alloc::format; use alloc::string::ToString; +use alloc::{borrow::Cow, boxed::Box, collections::BTreeMap, vec::Vec}; use core::ffi::c_char; use core::mem; use core::ptr::null_mut; @@ -58,7 +58,7 @@ extern crate alloc; use alter::crsql_compact_post_alter; use automigrate::*; use backfill::*; -use c::{crsql_freeExtData, crsql_newExtData}; +use c::{crsql_freeExtData, crsql_initSiteIdExt, crsql_newExtData}; use config::{crsql_config_get, crsql_config_set}; use core::ffi::{c_int, c_void, CStr}; use create_crr::create_crr; @@ -232,6 +232,30 @@ pub extern "C" fn sqlite3_crsqlcore_init( return null_mut(); } + // allocate ext data earlier in the init process because we need its + // pointer to be available for the crsql_update_site_id function. + let ext_data = unsafe { crsql_newExtData(db) }; + if ext_data.is_null() { + return null_mut(); + } + + let rc = db + .create_function_v2( + "crsql_update_site_id", + 2, + sqlite::UTF8 | sqlite::INNOCUOUS | sqlite::DETERMINISTIC, + Some(ext_data as *mut c_void), + Some(x_crsql_update_site_id), + None, + None, + None, + ) + .unwrap_or(ResultCode::ERROR); + if rc != ResultCode::OK { + unsafe { crsql_freeExtData(ext_data) }; + return null_mut(); + } + // TODO: convert this function to a proper rust function // and have rust free: // 1. site_id_buffer @@ -243,12 +267,18 @@ pub extern "C" fn sqlite3_crsqlcore_init( let rc = crate::bootstrap::crsql_init_site_id(db, site_id_buffer); if rc != ResultCode::OK as c_int { sqlite::free(site_id_buffer as *mut c_void); + unsafe { crsql_freeExtData(ext_data) }; return null_mut(); } - let ext_data = unsafe { crsql_newExtData(db, site_id_buffer as *mut c_char) }; - if ext_data.is_null() { - // no need to free the site id buffer here, this is cleaned up already. + let rc = unsafe { crsql_initSiteIdExt(db, ext_data, site_id_buffer as *mut c_char) }; + if rc != ResultCode::OK as c_int { + unsafe { crsql_freeExtData(ext_data) }; + return null_mut(); + } + + if let Err(_) = crate::bootstrap::create_site_id_triggers(db) { + sqlite::free(site_id_buffer as *mut c_void); return null_mut(); } @@ -408,7 +438,7 @@ pub extern "C" fn sqlite3_crsqlcore_init( let rc = db .create_function_v2( "crsql_set_ts", - -1, + 1, sqlite::UTF8 | sqlite::DETERMINISTIC, Some(ext_data as *mut c_void), Some(x_crsql_set_ts), @@ -422,6 +452,24 @@ pub extern "C" fn sqlite3_crsqlcore_init( return null_mut(); } + #[cfg(feature = "test")] + let rc = db + .create_function_v2( + "crsql_cache_site_ordinal", + 1, + sqlite::UTF8 | sqlite::DETERMINISTIC, + Some(ext_data as *mut c_void), + Some(x_crsql_cache_site_ordinal), + None, + None, + None, + ) + .unwrap_or(ResultCode::ERROR); + if rc != ResultCode::OK { + unsafe { crsql_freeExtData(ext_data) }; + return null_mut(); + } + let rc = db .create_function_v2( "crsql_set_db_version", @@ -627,6 +675,32 @@ unsafe extern "C" fn x_crsql_site_id( sqlite::result_blob(ctx, site_id, consts::SITE_ID_LEN, Destructor::STATIC); } +/** + * update in-memory map of site ids to ordinals. Only valid within a transaction. + * + * `select crsql_update_site_id(site_id, ordinal)` + */ +unsafe extern "C" fn x_crsql_update_site_id( + ctx: *mut sqlite::context, + argc: i32, + argv: *mut *mut sqlite::value, +) { + let ext_data = ctx.user_data() as *mut c::crsql_ExtData; + let args = sqlite::args!(argc, argv); + let site_id = args[0].blob(); + let ordinal = args[1].int64(); + let mut ordinals: mem::ManuallyDrop, i64>>> = mem::ManuallyDrop::new( + Box::from_raw((*ext_data).ordinalMap as *mut BTreeMap, i64>), + ); + + if ordinal == -1 { + ordinals.remove(&site_id.to_vec()); + } else { + ordinals.insert(site_id.to_vec(), ordinal); + } + ctx.result_text_static("OK"); +} + unsafe extern "C" fn x_crsql_finalize( ctx: *mut sqlite::context, _argc: i32, @@ -854,10 +928,7 @@ unsafe extern "C" fn x_crsql_set_ts( argv: *mut *mut sqlite::value, ) { if argc == 0 { - ctx.result_error( - "Wrong number of args provided to crsql_begin_alter. Provide the - schema name and table name or just the table name.", - ); + ctx.result_error("Wrong number of args provided to x_crsql_set_ts. Provide the timestamp."); return; } @@ -876,6 +947,37 @@ unsafe extern "C" fn x_crsql_set_ts( ctx.result_text_static("OK"); } +/** + * Get the site ordinal cached in the ext data for the current transaction. + * only used for test to inspect the ordinal map. + */ +#[cfg(feature = "test")] +unsafe extern "C" fn x_crsql_cache_site_ordinal( + ctx: *mut sqlite::context, + argc: i32, + argv: *mut *mut sqlite::value, +) { + if argc == 0 { + ctx.result_error( + "Wrong number of args provided to crsql_cache_site_ordinal. Provide the site id.", + ); + return; + } + + let ext_data = ctx.user_data() as *mut c::crsql_ExtData; + let args = sqlite::args!(argc, argv); + let site_id = args[0].blob(); + + let ord_map = mem::ManuallyDrop::new(Box::from_raw( + (*ext_data).ordinalMap as *mut BTreeMap, i64>, + )); + let res = ord_map.get(site_id).cloned().unwrap_or(-1); + sqlite::result_int64(ctx, res); +} + +/** + * Return the timestamp for the current transaction. + */ unsafe extern "C" fn x_crsql_get_ts( ctx: *mut sqlite::context, _argc: i32, diff --git a/core/rs/integration_check/src/t/tableinfo.rs b/core/rs/integration_check/src/t/tableinfo.rs index 3e74e88a..1f5574c7 100644 --- a/core/rs/integration_check/src/t/tableinfo.rs +++ b/core/rs/integration_check/src/t/tableinfo.rs @@ -6,7 +6,7 @@ use alloc::vec::Vec; use core::{ffi::c_char, mem}; use crsql_bundle::test_exports; use crsql_bundle::test_exports::tableinfo::TableInfo; -use sqlite::Connection; +use sqlite::{Connection, ResultCode}; use sqlite_nostd as sqlite; // Unfortunate circumstance that we still have some C code that requires this argument @@ -53,7 +53,9 @@ fn test_ensure_table_infos_are_up_to_date() { ) .expect("made foo clock"); - let ext_data = unsafe { test_exports::c::crsql_newExtData(raw_db, make_site()) }; + let ext_data = unsafe { test_exports::c::crsql_newExtData(raw_db) }; + let rc = unsafe { test_exports::c::crsql_initSiteIdExt(raw_db, ext_data, make_site()) }; + assert_eq!(rc, 0); test_exports::tableinfo::crsql_ensure_table_infos_are_up_to_date(raw_db, ext_data, err); let mut table_infos = unsafe { @@ -358,10 +360,44 @@ fn test_leak_condition() { .expect("inserted into foo"); } +fn test_site_id_initialization() { + { + let db = crate::opendb().expect("Opened DB"); + let raw_db = db.db.db; + let site_id = select_site_id(raw_db).expect("selected site id"); + assert_eq!(site_id.len(), 16); + raw_db.exec_safe("DELETE FROM crsql_site_id;").expect("deleted site id"); + } + + { + let db = crate::opendb().expect("Opened DB"); + let raw_db = db.db.db; + let site_id = select_site_id(raw_db).expect("selected site id"); + assert_eq!(site_id.len(), 16); + raw_db.exec_safe("DROP TABLE crsql_site_id;").expect("dropped crsql_site_id"); + } + + { + let db = crate::opendb().expect("Opened DB"); + let raw_db = db.db.db; + let site_id = select_site_id(raw_db).expect("selected site id"); + assert_eq!(site_id.len(), 16); + } +} + +fn select_site_id(db: *mut sqlite::sqlite3) -> Result, ResultCode> { + let site_id_stmt = db.prepare_v2("SELECT crsql_site_id()")?; + site_id_stmt.step()?; + let site_id = site_id_stmt.column_blob(0)?.to_vec(); + Ok(site_id) +} + pub fn run_suite() { + libc_print::libc_println!("Running tableinfo suite"); test_ensure_table_infos_are_up_to_date(); test_pull_table_info(); test_is_table_compatible(); test_create_clock_table_from_table_info(); test_leak_condition(); + test_site_id_initialization(); } diff --git a/core/rs/integration_check/src/t/test_db_version.rs b/core/rs/integration_check/src/t/test_db_version.rs index b7b40827..aacf08ab 100644 --- a/core/rs/integration_check/src/t/test_db_version.rs +++ b/core/rs/integration_check/src/t/test_db_version.rs @@ -1,5 +1,5 @@ extern crate alloc; -use alloc::{ffi::CString, string::String}; +use alloc::{ffi::CString, format, string::String}; use core::ffi::c_char; use crsql_bundle::test_exports; use sqlite::{Connection, ResultCode}; @@ -30,7 +30,9 @@ fn test_fetch_db_version_from_storage() -> Result { let site_id = get_site_id(raw_db); - let ext_data = unsafe { test_exports::c::crsql_newExtData(raw_db, site_id) }; + let ext_data = unsafe { test_exports::c::crsql_newExtData(raw_db) }; + let rc = unsafe { test_exports::c::crsql_initSiteIdExt(raw_db, ext_data, site_id) }; + assert_eq!(rc, 0); test_exports::db_version::fetch_db_version_from_storage(raw_db, ext_data)?; // no clock tables, no version. @@ -81,7 +83,9 @@ fn test_next_db_version() -> Result<(), String> { let c = crate::opendb().expect("db opened"); let db = &c.db; let raw_db = db.db; - let ext_data = unsafe { test_exports::c::crsql_newExtData(raw_db, make_site()) }; + let ext_data = unsafe { test_exports::c::crsql_newExtData(raw_db) }; + let rc = unsafe { test_exports::c::crsql_initSiteIdExt(raw_db, ext_data, make_site()) }; + assert_eq!(rc, 0); // is current + 1 // doesn't bump forward on successive calls @@ -112,8 +116,84 @@ fn test_next_db_version() -> Result<(), String> { Ok(()) } +fn test_get_or_set_site_ordinal() -> Result<(), ResultCode> { + let c = crate::opendb().expect("db opened"); + let db = &c.db; + db.db + .exec_safe("CREATE TABLE foo (a primary key not null, b);")?; + + db.db.exec_safe("SELECT crsql_as_crr('foo');")?; + + db.db.exec_safe("BEGIN TRANSACTION;")?; + + let other_site_id = "other_site_id".as_bytes(); + + let update_ordinal_stmt = db + .db + .prepare_v2("INSERT OR REPLACE INTO crsql_site_id (site_id, ordinal) VALUES (?, ?);")?; + + update_ordinal_stmt.bind_blob(1, other_site_id, sqlite::Destructor::STATIC)?; + update_ordinal_stmt.bind_int64(2, 2)?; + update_ordinal_stmt.step()?; + + // test ordinal is set + assert_eq!(2, get_cache_ordinal(db.db, other_site_id)?); + + let delete_ordinal_stmt = db.prepare_v2("DELETE FROM crsql_site_id WHERE site_id = ?;")?; + delete_ordinal_stmt.bind_blob(1, other_site_id, sqlite::Destructor::STATIC)?; + delete_ordinal_stmt.step()?; + + assert_eq!(-1, get_cache_ordinal(db.db, other_site_id)?); + + db.db.exec_safe("SAVEPOINT test;")?; + + // new site_id in crsql_changes table + let pk: [u8; 3] = [1, 9, 1]; + let site_id3 = "second_site_id".as_bytes(); + let stmt = db + .db + .prepare_v2("INSERT INTO crsql_changes VALUES ('foo', ?, 'b', 1, 1, 1, ?, 1, 0, 0);")?; + stmt.bind_blob(1, &pk, sqlite::Destructor::STATIC)?; + stmt.bind_blob(2, &site_id3, sqlite::Destructor::STATIC)?; + stmt.step()?; + stmt.reset()?; + + assert_eq!(1, get_cache_ordinal(db.db, site_id3)?); + db.db.exec_safe("RELEASE SAVEPOINT test;")?; + + assert_eq!(1, get_cache_ordinal(db.db, site_id3)?); + + db.db.exec_safe("SAVEPOINT test;")?; + let pk: [u8; 3] = [1, 9, 2]; + let site_id4 = "third_site_id".as_bytes(); + stmt.bind_blob(1, &pk, sqlite::Destructor::STATIC)?; + stmt.bind_blob(2, &site_id4, sqlite::Destructor::STATIC)?; + stmt.step()?; + + assert_eq!(1, get_cache_ordinal(db.db, site_id3)?); + assert_eq!(2, get_cache_ordinal(db.db, site_id4)?); + + // sp rollback (when crsql_changes vtab is called) clears the cache + db.db.exec_safe("ROLLBACK TO SAVEPOINT test;")?; + assert_eq!(-1, get_cache_ordinal(db.db, site_id4)?); + + db.db.exec_safe("COMMIT;")?; + + Ok(()) +} + +fn get_cache_ordinal(db: *mut sqlite::sqlite3, site_id: &[u8]) -> Result { + let stmt = db.prepare_v2("SELECT crsql_cache_site_ordinal(?);")?; + stmt.bind_blob(1, site_id, sqlite::Destructor::STATIC)?; + stmt.step()?; + Ok(stmt.column_int64(0)) +} + pub fn run_suite() -> Result<(), String> { test_fetch_db_version_from_storage()?; test_next_db_version()?; + if let Err(rc) = test_get_or_set_site_ordinal() { + return Err(format!("test_get_or_set_site_ordinal failed: {:?}", rc)); + } Ok(()) } diff --git a/core/src/changes-vtab.c b/core/src/changes-vtab.c index cb48a664..6b041a92 100644 --- a/core/src/changes-vtab.c +++ b/core/src/changes-vtab.c @@ -151,9 +151,12 @@ int crsql_changes_column( int i /* Which column to return */ ); int crsql_changes_eof(sqlite3_vtab_cursor *cur); +int crsql_changes_savepoint(sqlite3_vtab *pVTab, int iSavepoint); +int crsql_changes_release(sqlite3_vtab *pVTab, int iSavepoint); +int crsql_changes_rollback_to(sqlite3_vtab *pVTab, int iSavepoint); sqlite3_module crsql_changesModule = { - /* iVersion */ 0, + /* iVersion */ 2, /* xCreate */ 0, /* xConnect */ changesConnect, /* xBestIndex */ crsql_changes_best_index, @@ -173,9 +176,9 @@ sqlite3_module crsql_changesModule = { /* xRollback */ 0, /* xFindMethod */ 0, /* xRename */ 0, - /* xSavepoint */ 0, - /* xRelease */ 0, - /* xRollbackTo */ 0, + /* xSavepoint */ crsql_changes_savepoint, + /* xRelease */ crsql_changes_release, + /* xRollbackTo */ crsql_changes_rollback_to, /* xShadowName */ 0 #ifdef LIBSQL , diff --git a/core/src/ext-data.c b/core/src/ext-data.c index 193eff9d..5ae04582 100644 --- a/core/src/ext-data.c +++ b/core/src/ext-data.c @@ -7,17 +7,19 @@ void crsql_clear_stmt_cache(crsql_ExtData *pExtData); void crsql_init_table_info_vec(crsql_ExtData *pExtData); +void crsql_init_ordinal_map(crsql_ExtData *pExtData); +void crsql_drop_ordinal_map(crsql_ExtData *pExtData); void crsql_drop_table_info_vec(crsql_ExtData *pExtData); void crsql_init_last_db_versions_map(crsql_ExtData *pExtData); void crsql_drop_last_db_versions_map(crsql_ExtData *pExtData); // void crsql_init_table_info_vec(crsql_ExtData *pExtData); // void crsql_drop_table_info_vec(crsql_ExtData *pExtData); -crsql_ExtData *crsql_newExtData(sqlite3 *db, unsigned char *siteIdBuffer) { +// The initialization here is incomplete! We need to call crsql_initSiteIdExt after this. +crsql_ExtData *crsql_newExtData(sqlite3 *db) { crsql_ExtData *pExtData = sqlite3_malloc(sizeof *pExtData); - pExtData->siteId = siteIdBuffer; - + pExtData->siteId = 0; pExtData->pPragmaSchemaVersionStmt = 0; int rc = sqlite3_prepare_v3(db, "PRAGMA schema_version", -1, SQLITE_PREPARE_PERSISTENT, @@ -34,14 +36,8 @@ crsql_ExtData *crsql_newExtData(sqlite3 *db, unsigned char *siteIdBuffer) { &(pExtData->pClearSyncBitStmt), 0); pExtData->pSetSiteIdOrdinalStmt = 0; - rc += sqlite3_prepare_v3( - db, "INSERT INTO crsql_site_id (site_id) VALUES (?) RETURNING ordinal", - -1, SQLITE_PREPARE_PERSISTENT, &(pExtData->pSetSiteIdOrdinalStmt), 0); pExtData->pSelectSiteIdOrdinalStmt = 0; - rc += sqlite3_prepare_v3( - db, "SELECT ordinal FROM crsql_site_id WHERE site_id = ?", -1, - SQLITE_PREPARE_PERSISTENT, &(pExtData->pSelectSiteIdOrdinalStmt), 0); pExtData->pSelectClockTablesStmt = 0; rc += @@ -52,7 +48,6 @@ crsql_ExtData *crsql_newExtData(sqlite3 *db, unsigned char *siteIdBuffer) { pExtData->pendingDbVersion = -1; pExtData->pSetDbVersionStmt = 0; - // printf("instantiating pSetDbVersionStmt... current rc: %d\n", rc); rc += sqlite3_prepare_v3( db, "INSERT INTO crsql_db_versions (site_id, db_version) VALUES " @@ -74,10 +69,12 @@ crsql_ExtData *crsql_newExtData(sqlite3 *db, unsigned char *siteIdBuffer) { -1, SQLITE_PREPARE_PERSISTENT, &(pExtData->pDbVersionStmt), 0); pExtData->tableInfos = 0; pExtData->lastDbVersions = 0; + pExtData->ordinalMap = 0; pExtData->rowsImpacted = 0; pExtData->updatedTableInfosThisTx = 0; crsql_init_table_info_vec(pExtData); crsql_init_last_db_versions_map(pExtData); + crsql_init_ordinal_map(pExtData); sqlite3_stmt *pStmt; @@ -123,9 +120,27 @@ crsql_ExtData *crsql_newExtData(sqlite3 *db, unsigned char *siteIdBuffer) { return pExtData; } +int crsql_initSiteIdExt(sqlite3 *db, crsql_ExtData *pExtData, unsigned char *siteIdBuffer) { + pExtData->siteId = siteIdBuffer; + + pExtData->pSetSiteIdOrdinalStmt = 0; + int rc = sqlite3_prepare_v3( + db, "INSERT INTO crsql_site_id (site_id) VALUES (?) RETURNING ordinal", + -1, SQLITE_PREPARE_PERSISTENT, &(pExtData->pSetSiteIdOrdinalStmt), 0); + + pExtData->pSelectSiteIdOrdinalStmt = 0; + rc += sqlite3_prepare_v3( + db, "SELECT ordinal FROM crsql_site_id WHERE site_id = ?", -1, + SQLITE_PREPARE_PERSISTENT, &(pExtData->pSelectSiteIdOrdinalStmt), 0); + + return rc; +} + void crsql_freeExtData(crsql_ExtData *pExtData) { // printf("free ext\n"); - sqlite3_free(pExtData->siteId); + if (pExtData->siteId != 0) { + sqlite3_free(pExtData->siteId); + } sqlite3_finalize(pExtData->pDbVersionStmt); sqlite3_finalize(pExtData->pSetDbVersionStmt); sqlite3_finalize(pExtData->pPragmaSchemaVersionStmt); @@ -133,8 +148,12 @@ void crsql_freeExtData(crsql_ExtData *pExtData) { sqlite3_finalize(pExtData->pSetSyncBitStmt); sqlite3_finalize(pExtData->pClearSyncBitStmt); sqlite3_finalize(pExtData->pSetSiteIdOrdinalStmt); - sqlite3_finalize(pExtData->pSelectSiteIdOrdinalStmt); - sqlite3_finalize(pExtData->pSelectClockTablesStmt); + if (pExtData->pSelectClockTablesStmt != 0) { + sqlite3_finalize(pExtData->pSelectClockTablesStmt); + } + if (pExtData->pSelectSiteIdOrdinalStmt != 0) { + sqlite3_finalize(pExtData->pSelectSiteIdOrdinalStmt); + } pExtData->pDbVersionStmt = 0; pExtData->pSetDbVersionStmt = 0; pExtData->pPragmaSchemaVersionStmt = 0; @@ -147,6 +166,7 @@ void crsql_freeExtData(crsql_ExtData *pExtData) { crsql_clear_stmt_cache(pExtData); crsql_drop_table_info_vec(pExtData); crsql_drop_last_db_versions_map(pExtData); + crsql_drop_ordinal_map(pExtData); sqlite3_free(pExtData); } @@ -163,8 +183,12 @@ void crsql_finalize(crsql_ExtData *pExtData) { sqlite3_finalize(pExtData->pPragmaDataVersionStmt); sqlite3_finalize(pExtData->pSetSyncBitStmt); sqlite3_finalize(pExtData->pClearSyncBitStmt); - sqlite3_finalize(pExtData->pSetSiteIdOrdinalStmt); - sqlite3_finalize(pExtData->pSelectSiteIdOrdinalStmt); + if (pExtData->pSetSiteIdOrdinalStmt != 0) { + sqlite3_finalize(pExtData->pSetSiteIdOrdinalStmt); + } + if (pExtData->pSelectSiteIdOrdinalStmt != 0) { + sqlite3_finalize(pExtData->pSelectSiteIdOrdinalStmt); + } sqlite3_finalize(pExtData->pSelectClockTablesStmt); crsql_clear_stmt_cache(pExtData); pExtData->pDbVersionStmt = 0; diff --git a/core/src/ext-data.h b/core/src/ext-data.h index dd56a0ae..2d8dd9b4 100644 --- a/core/src/ext-data.h +++ b/core/src/ext-data.h @@ -48,14 +48,18 @@ struct crsql_ExtData { int mergeEqualValues; unsigned long long timestamp; + void *ordinalMap; }; -crsql_ExtData *crsql_newExtData(sqlite3 *db, unsigned char *siteIdBuffer); +crsql_ExtData *crsql_newExtData(sqlite3 *db); +int crsql_initSiteIdExt(sqlite3 *db, crsql_ExtData *pExtData, unsigned char *siteIdBuffer); void crsql_freeExtData(crsql_ExtData *pExtData); int crsql_fetchPragmaSchemaVersion(sqlite3 *db, crsql_ExtData *pExtData, int which); int crsql_fetchPragmaDataVersion(sqlite3 *db, crsql_ExtData *pExtData); int crsql_recreate_db_version_stmt(sqlite3 *db, crsql_ExtData *pExtData); void crsql_finalize(crsql_ExtData *pExtData); +void crsql_init_ordinal_map(crsql_ExtData *pExtData); +void crsql_drop_ordinal_map(crsql_ExtData *pExtData); #endif diff --git a/core/src/ext-data.test.c b/core/src/ext-data.test.c index 85127a7d..fa3ffab7 100644 --- a/core/src/ext-data.test.c +++ b/core/src/ext-data.test.c @@ -10,6 +10,13 @@ int crsql_close(sqlite3 *db); +crsql_ExtData *crsqlExtDataInit(sqlite3 *db, unsigned char *siteIdBuffer) { + crsql_ExtData *pExtData = crsql_newExtData(db); + int rc = crsql_initSiteIdExt(db, pExtData, siteIdBuffer); + assert(rc == 0); + return pExtData; +} + static void textNewExtData() { printf("NewExtData\n"); sqlite3 *db; @@ -18,7 +25,7 @@ static void textNewExtData() { assert(rc == SQLITE_OK); unsigned char *siteIdBuffer = sqlite3_malloc(SITE_ID_LEN * sizeof *(siteIdBuffer)); - crsql_ExtData *pExtData = crsql_newExtData(db, siteIdBuffer); + crsql_ExtData *pExtData = crsqlExtDataInit(db, siteIdBuffer); assert(pExtData->dbVersion == -1); // statement used to determine schema version @@ -35,6 +42,8 @@ static void textNewExtData() { assert(pExtData->pDbVersionStmt != 0); // table info allocated to an empty vec assert(pExtData->tableInfos != 0); + // ordinal map allocated to an empty map + assert(pExtData->ordinalMap != 0); // data version should have been fetched assert(pExtData->pragmaDataVersion != -1); @@ -54,7 +63,8 @@ static void testFreeExtData() { rc = sqlite3_open(":memory:", &db); assert(rc == SQLITE_OK); unsigned char *siteIdBuffer = sqlite3_malloc(SITE_ID_LEN * sizeof(char *)); - crsql_ExtData *pExtData = crsql_newExtData(db, siteIdBuffer); + crsql_ExtData *pExtData = crsqlExtDataInit(db, siteIdBuffer); + assert(rc == 0); crsql_finalize(pExtData); crsql_freeExtData(pExtData); @@ -69,7 +79,8 @@ static void testFinalize() { rc = sqlite3_open(":memory:", &db); assert(rc == SQLITE_OK); unsigned char *siteIdBuffer = sqlite3_malloc(SITE_ID_LEN * sizeof(char *)); - crsql_ExtData *pExtData = crsql_newExtData(db, siteIdBuffer); + crsql_ExtData *pExtData = crsqlExtDataInit(db, siteIdBuffer); + assert(rc == 0); crsql_finalize(pExtData); assert(pExtData->pDbVersionStmt == 0); @@ -90,7 +101,8 @@ static void testFetchPragmaSchemaVersion() { rc = sqlite3_open(":memory:", &db); assert(rc == SQLITE_OK); unsigned char *siteIdBuffer = sqlite3_malloc(SITE_ID_LEN * sizeof(char *)); - crsql_ExtData *pExtData = crsql_newExtData(db, siteIdBuffer); + crsql_ExtData *pExtData = crsqlExtDataInit(db, siteIdBuffer); + assert(rc == 0); // fetch the schema info for db version update didChange = crsql_fetchPragmaSchemaVersion(db, pExtData, 0); @@ -158,9 +170,9 @@ static void testFetchPragmaDataVersion() { assert(rc == SQLITE_OK); unsigned char *siteIdBuffer = sqlite3_malloc(SITE_ID_LEN * sizeof(char *)); - crsql_ExtData *pExtData1 = crsql_newExtData(db1, siteIdBuffer); + crsql_ExtData *pExtData1 = crsqlExtDataInit(db1, siteIdBuffer); siteIdBuffer = sqlite3_malloc(SITE_ID_LEN * sizeof(char *)); - crsql_ExtData *pExtData2 = crsql_newExtData(db2, siteIdBuffer); + crsql_ExtData *pExtData2 = crsqlExtDataInit(db2, siteIdBuffer); // should not change after init rc = crsql_fetchPragmaDataVersion(db1, pExtData1); @@ -205,47 +217,11 @@ static void testFetchPragmaDataVersion() { printf("\t\e[0;32mSuccess\e[0m\n"); } -// we use a static statement for db_version and no longer recreate it. -// static void testRecreateDbVersionStmt() { -// printf("RecreateDbVersionStmt\n"); -// sqlite3 *db; -// int rc; -// rc = sqlite3_open(":memory:", &db); -// unsigned char *siteIdBuffer = sqlite3_malloc(SITE_ID_LEN * sizeof(char *)); -// crsql_ExtData *pExtData = crsql_newExtData(db, siteIdBuffer); - -// rc = crsql_recreate_db_version_stmt(db, pExtData); - -// // there are no clock tables yet. nothing to create. -// assert(rc == -1); -// assert(pExtData->pDbVersionStmt == 0); - -// sqlite3_exec(db, "CREATE TABLE foo (a primary key not null, b);", 0, 0, 0); -// sqlite3_exec(db, "SELECT crsql_as_crr('foo')", 0, 0, 0); - -// rc = crsql_recreate_db_version_stmt(db, pExtData); -// assert(rc == 0); -// assert(pExtData->pDbVersionStmt != 0); - -// // recreating while a created statement exists isn't an error -// rc = crsql_recreate_db_version_stmt(db, pExtData); -// assert(rc == 0); -// assert(pExtData->pDbVersionStmt != 0); - -// crsql_finalize(pExtData); -// assert(pExtData->pDbVersionStmt == 0); -// crsql_freeExtData(pExtData); -// crsql_close(db); -// printf("\t\e[0;32mSuccess\e[0m\n"); -// } - void crsqlExtDataTestSuite() { printf("\e[47m\e[1;30mSuite: crsql_ExtData\e[0m\n"); textNewExtData(); testFreeExtData(); testFinalize(); testFetchPragmaSchemaVersion(); - // We no longer recreate the db version stmt - // testRecreateDbVersionStmt(); testFetchPragmaDataVersion(); }