Skip to content

Commit e1e6b15

Browse files
committed
Merge branch 'master' into thlorenz/subscription-metrics
* master: fix: move delete onto separate thread (#629) feat: return ledger + accountsdb metrics (#624)
2 parents ae73b6d + ded9c50 commit e1e6b15

File tree

6 files changed

+128
-71
lines changed

6 files changed

+128
-71
lines changed

magicblock-api/src/tickers.rs

Lines changed: 12 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -107,35 +107,6 @@ pub fn init_system_metrics_ticker(
107107
ledger: &Arc<Ledger>,
108108
accountsdb: &Arc<AccountsDb>,
109109
token: CancellationToken,
110-
) -> tokio::task::JoinHandle<()> {
111-
fn try_set_ledger_storage_size(ledger: &Ledger) {
112-
match ledger.storage_size() {
113-
Ok(byte_size) => metrics::set_ledger_size(byte_size),
114-
Err(err) => warn!("Failed to get ledger storage size: {:?}", err),
115-
}
116-
}
117-
let ledger = ledger.clone();
118-
tokio::task::spawn(async move {
119-
loop {
120-
tokio::select! {
121-
_ = tokio::time::sleep(tick_duration) => {
122-
try_set_ledger_storage_size(&ledger);
123-
},
124-
_ = token.cancelled() => {
125-
break;
126-
}
127-
}
128-
}
129-
})
130-
}
131-
132-
/*
133-
#[allow(unused_variables)]
134-
pub fn init_system_metrics_ticker_old(
135-
tick_duration: Duration,
136-
ledger: &Arc<Ledger>,
137-
accountsdb: &Arc<AccountsDb>,
138-
token: CancellationToken,
139110
) -> tokio::task::JoinHandle<()> {
140111
fn try_set_ledger_counts(ledger: &Ledger) {
141112
macro_rules! try_set_ledger_count {
@@ -173,23 +144,28 @@ pub fn init_system_metrics_ticker_old(
173144
Err(err) => warn!("Failed to get ledger storage size: {:?}", err),
174145
}
175146
}
176-
fn set_accounts_storage_size(bank: &Bank) {
177-
let byte_size = bank.accounts_db_storage_size();
178-
metrics::set_accounts_size(byte_size);
147+
fn set_accounts_storage_size(accounts_db: &AccountsDb) {
148+
let byte_size = accounts_db.storage_size();
149+
metrics::set_accounts_size(byte_size.try_into().unwrap_or(i64::MAX));
179150
}
180-
fn set_accounts_count(bank: &Bank) {
181-
metrics::set_accounts_count(bank.accounts_db.get_accounts_count());
151+
fn set_accounts_count(accounts_db: &AccountsDb) {
152+
metrics::set_accounts_count(
153+
accounts_db
154+
.get_accounts_count()
155+
.try_into()
156+
.unwrap_or(i64::MAX),
157+
);
182158
}
183159

184160
let ledger = ledger.clone();
185-
let bank = bank.clone();
161+
let bank = accountsdb.clone();
186162
tokio::task::spawn(async move {
187163
loop {
188164
tokio::select! {
189165
_ = tokio::time::sleep(tick_duration) => {
190166
try_set_ledger_storage_size(&ledger);
191167
set_accounts_storage_size(&bank);
192-
try_set_ledger_counts(&ledger);
168+
metrics::observe_columns_count_duration(|| try_set_ledger_counts(&ledger));
193169
set_accounts_count(&bank);
194170
},
195171
_ = token.cancelled() => {
@@ -198,7 +174,4 @@ pub fn init_system_metrics_ticker_old(
198174
}
199175
}
200176
})
201-
202-
tokio::task::spawn(async move {})
203177
}
204-
*/

magicblock-ledger/src/database/columns.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,9 @@ pub trait Column {
129129
// first item in the key.
130130
fn as_index(slot: Slot) -> Self::Index;
131131
fn slot(index: Self::Index) -> Slot;
132+
fn keep_all_on_compaction() -> bool {
133+
false
134+
}
132135
}
133136

134137
pub trait ColumnName {
@@ -651,6 +654,11 @@ impl Column for AccountModDatas {
651654
fn as_index(slot: Slot) -> Self::Index {
652655
slot
653656
}
657+
658+
/// We don't clean AccountModData on compaction as it isn't slot based
659+
fn keep_all_on_compaction() -> bool {
660+
true
661+
}
654662
}
655663

656664
impl TypedColumn for AccountModDatas {

magicblock-ledger/src/database/compaction_filter.rs

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use std::{
77
},
88
};
99

10-
use log::trace;
1110
use rocksdb::{
1211
compaction_filter::CompactionFilter,
1312
compaction_filter_factory::{
@@ -82,21 +81,17 @@ pub(crate) struct PurgedSlotFilter<C: Column + ColumnName> {
8281
impl<C: Column + ColumnName> CompactionFilter for PurgedSlotFilter<C> {
8382
fn filter(
8483
&mut self,
85-
level: u32,
84+
_level: u32,
8685
key: &[u8],
8786
_value: &[u8],
8887
) -> CompactionDecision {
8988
use rocksdb::CompactionDecision::*;
90-
trace!("CompactionFilter: triggered!");
89+
if C::keep_all_on_compaction() {
90+
return Keep;
91+
}
9192

9293
let slot_in_key = C::slot(C::index(key));
9394
if slot_in_key < self.oldest_slot {
94-
trace!(
95-
"CompactionFilter: removing key. level: {}, slot: {}",
96-
level,
97-
slot_in_key
98-
);
99-
10095
// It is safe to delete this key
10196
// since those slots were truncated anyway
10297
Remove

magicblock-ledger/src/database/ledger_column.rs

Lines changed: 70 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ use std::{
77
};
88

99
use bincode::{deserialize, serialize};
10-
use log::{error, warn};
10+
use log::warn;
1111
use prost::Message;
12-
use rocksdb::{properties as RocksProperties, ColumnFamily};
12+
use rocksdb::{properties as RocksProperties, CStrLike, ColumnFamily};
1313
use serde::de::DeserializeOwned;
1414

1515
use super::{
@@ -228,7 +228,7 @@ where
228228
/// [here](https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L654-L689).
229229
pub fn get_int_property(
230230
&self,
231-
name: &'static std::ffi::CStr,
231+
name: impl CStrLike,
232232
) -> Result<i64, LedgerError> {
233233
self.backend.get_int_property_cf(self.handle(), name)
234234
}
@@ -277,23 +277,45 @@ where
277277
self.backend.flush_cf(self.handle())
278278
}
279279

280+
#[inline(always)]
281+
fn is_sequential_cf<T: ColumnName>() -> bool {
282+
matches!(
283+
T::NAME,
284+
crate::database::columns::Blocktime::NAME
285+
| crate::database::columns::Blockhash::NAME
286+
| crate::database::columns::PerfSamples::NAME
287+
| crate::database::columns::AccountModDatas::NAME
288+
)
289+
}
290+
291+
/// Initialize value when current one is `DIRTY_VALUE`
292+
/// Value isn't set if current doesn't equal `DIRTY_VALUE`
293+
fn init_column_count_cache(&self) -> LedgerResult<()> {
294+
let count = if Self::is_sequential_cf::<C>() {
295+
get_column_count_sequential_column(self)? as i64
296+
} else {
297+
get_column_count_complex_column(self)? as i64
298+
};
299+
300+
// We can ignore error here since it means value already initialized
301+
let _ = self.entry_counter.compare_exchange(
302+
DIRTY_COUNT,
303+
count,
304+
Ordering::AcqRel,
305+
Ordering::Relaxed,
306+
);
307+
308+
Ok(())
309+
}
310+
280311
pub fn count_column_using_cache(&self) -> LedgerResult<i64> {
281312
let cached = self.entry_counter.load(Ordering::Relaxed);
282313
if cached != DIRTY_COUNT {
283-
return Ok(cached);
314+
Ok(cached)
315+
} else {
316+
self.init_column_count_cache()?;
317+
Ok(self.entry_counter.load(Ordering::Acquire))
284318
}
285-
286-
self
287-
.iter(IteratorMode::Start)
288-
.map(Iterator::count)
289-
.map(|val| if val > i64::MAX as usize {
290-
// NOTE: this value is only used for metrics/diagnostics and
291-
// aside from the fact that we will never encounter this case,
292-
// it is good enough to return i64::MAX
293-
error!("Column {} count is too large: {} for metrics, returning max.", C::NAME, val);
294-
i64::MAX
295-
} else { val as i64 })
296-
.inspect(|updated| self.entry_counter.store(*updated, Ordering::Relaxed))
297319
}
298320

299321
/// Increases entries counter if it's not [`DIRTY_COUNT`]
@@ -540,6 +562,38 @@ where
540562
}
541563
}
542564

565+
/// When column key format is sequentially incremented key, like: `SlotColumn`
566+
/// We can simplify extraction of count by calculating `LastKey - FirstKey + 1`
567+
fn get_column_count_sequential_column<C: Column + ColumnName>(
568+
ledger_column: &LedgerColumn<C>,
569+
) -> LedgerResult<u64> {
570+
let last_key = ledger_column.iter(IteratorMode::End)?.next();
571+
let start_key = ledger_column.iter(IteratorMode::Start)?.next();
572+
let count = match (start_key, last_key) {
573+
(Some((start_key, _)), Some((last_key, _))) => {
574+
let last_slot = C::slot(last_key);
575+
let start_slot = C::slot(start_key);
576+
577+
last_slot - start_slot + 1
578+
}
579+
// Empty ColumnFamily
580+
_ => 0,
581+
};
582+
583+
Ok(count)
584+
}
585+
586+
/// For complex columns, like: `AddressSignatures`
587+
/// We get column count using rocksdb's "estimate-num-keys" proprty
588+
/// Due to properies of how we use DB this value shall be ~correct on start
589+
fn get_column_count_complex_column<C: Column + ColumnName>(
590+
ledger_column: &LedgerColumn<C>,
591+
) -> LedgerResult<u64> {
592+
const ESTIMATE_NUM_KEYS: &str = "rocksdb.estimate-num-keys";
593+
594+
Ok(ledger_column.get_int_property(ESTIMATE_NUM_KEYS)? as u64)
595+
}
596+
543597
/// Increases entries counter if it's not [`DIRTY_COUNT`]
544598
/// Otherwise just skips it until it is set
545599
pub fn try_increase_entry_counter(entry_counter: &AtomicI64, by: u64) {

magicblock-ledger/src/database/rocks_db.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use std::{
88
};
99

1010
use rocksdb::{
11-
AsColumnFamilyRef, ColumnFamily, DBIterator, DBPinnableSlice,
11+
AsColumnFamilyRef, CStrLike, ColumnFamily, DBIterator, DBPinnableSlice,
1212
DBRawIterator, FlushOptions, IteratorMode as RocksIteratorMode, LiveFile,
1313
Options, WriteBatch as RWriteBatch, DB,
1414
};
@@ -237,7 +237,7 @@ impl Rocks {
237237
pub fn get_int_property_cf(
238238
&self,
239239
cf: &ColumnFamily,
240-
name: &'static std::ffi::CStr,
240+
name: impl CStrLike,
241241
) -> LedgerResult<i64> {
242242
match self.db.property_int_value_cf(cf, name) {
243243
Ok(Some(value)) => Ok(value.try_into().unwrap()),

magicblock-metrics/src/metrics/mod.rs

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,20 @@ lazy_static::lazy_static! {
8282
static ref LEDGER_ACCOUNT_MOD_DATA_GAUGE: IntGauge = IntGauge::new(
8383
"ledger_account_mod_data_gauge", "Ledger Account Mod Data Gauge",
8484
).unwrap();
85+
pub static ref LEDGER_COLUMNS_COUNT_DURATION_SECONDS: Histogram = Histogram::with_opts(
86+
HistogramOpts::new(
87+
"ledger_columns_count_duration_seconds",
88+
"Time taken to compute ledger columns counts"
89+
)
90+
.buckets(
91+
MICROS_10_90.iter().chain(
92+
MICROS_100_900.iter()).chain(
93+
MILLIS_1_9.iter()).chain(
94+
MILLIS_10_90.iter()).chain(
95+
MILLIS_100_900.iter()).chain(
96+
SECONDS_1_9.iter()).cloned().collect()
97+
),
98+
).unwrap();
8599

86100
// -----------------
87101
// Accounts
@@ -235,14 +249,11 @@ lazy_static::lazy_static! {
235249

236250
static ref COMMITTOR_INTENT_EXECUTION_TIME_HISTOGRAM: HistogramVec = HistogramVec::new(
237251
HistogramOpts::new(
238-
"committor_intent_execution_time_histogram",
252+
"committor_intent_execution_time_histogram_v2",
239253
"Time in seconds spent on intent execution"
240254
)
241255
.buckets(
242-
MILLIS_1_9.iter()
243-
.chain(MILLIS_10_90.iter())
244-
.chain(MILLIS_100_900.iter())
245-
.chain(SECONDS_1_9.iter()).cloned().collect(),
256+
vec![0.01, 0.1, 1.0, 3.0, 5.0, 10.0, 15.0, 20.0, 25.0]
246257
),
247258
&["intent_kind", "outcome_kind"],
248259
).unwrap();
@@ -276,6 +287,7 @@ pub(crate) fn register() {
276287
register!(LEDGER_TRANSACTION_MEMOS_GAUGE);
277288
register!(LEDGER_PERF_SAMPLES_GAUGE);
278289
register!(LEDGER_ACCOUNT_MOD_DATA_GAUGE);
290+
register!(LEDGER_COLUMNS_COUNT_DURATION_SECONDS);
279291
register!(ACCOUNTS_SIZE_GAUGE);
280292
register!(ACCOUNTS_COUNT_GAUGE);
281293
register!(PENDING_ACCOUNT_CLONES_GAUGE);
@@ -358,6 +370,21 @@ pub fn set_ledger_account_mod_data_count(count: i64) {
358370
LEDGER_ACCOUNT_MOD_DATA_GAUGE.set(count);
359371
}
360372

373+
pub fn observe_columns_count_duration<F, T>(f: F) -> T
374+
where
375+
F: FnOnce() -> T,
376+
{
377+
LEDGER_COLUMNS_COUNT_DURATION_SECONDS.observe_closure_duration(f)
378+
}
379+
380+
pub fn set_accounts_size(value: i64) {
381+
ACCOUNTS_SIZE_GAUGE.set(value)
382+
}
383+
384+
pub fn set_accounts_count(value: i64) {
385+
ACCOUNTS_COUNT_GAUGE.set(value)
386+
}
387+
361388
pub fn inc_pending_clone_requests() {
362389
PENDING_ACCOUNT_CLONES_GAUGE.inc()
363390
}

0 commit comments

Comments
 (0)