Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions turbopack/crates/turbo-tasks-backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ no_fast_stale = []
verify_serialization = []
verify_aggregation_graph = []
verify_immutable = []
verify_determinism = []
trace_aggregation_update = []
trace_find_and_schedule = []
trace_task_completion = []
Expand Down
49 changes: 41 additions & 8 deletions turbopack/crates/turbo-tasks-backend/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use turbo_tasks::{
TraitTypeId, TurboTasksBackendApi, ValueTypeId,
backend::{
Backend, CachedTaskType, CellContent, TaskExecutionSpec, TransientTaskRoot,
TransientTaskType, TurboTasksExecutionError, TypedCellContent,
TransientTaskType, TurboTasksExecutionError, TypedCellContent, VerificationMode,
},
event::{Event, EventListener},
message_queue::TimingEvent,
Expand Down Expand Up @@ -423,6 +423,8 @@ struct TaskExecutionCompletePrepareResult {
pub new_children: FxHashSet<TaskId>,
pub removed_data: Vec<CachedDataItem>,
pub is_now_immutable: bool,
#[cfg(feature = "verify_determinism")]
pub no_output_set: bool,
pub new_output: Option<OutputValue>,
pub output_dependent_tasks: SmallVec<[TaskId; 4]>,
}
Expand Down Expand Up @@ -1765,6 +1767,8 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
new_children,
mut removed_data,
is_now_immutable,
#[cfg(feature = "verify_determinism")]
no_output_set,
new_output,
output_dependent_tasks,
}) = self.task_execution_completed_prepare(
Expand Down Expand Up @@ -1809,6 +1813,8 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
if self.task_execution_completed_finish(
&mut ctx,
task_id,
#[cfg(feature = "verify_determinism")]
no_output_set,
new_output,
&mut removed_data,
is_now_immutable,
Expand Down Expand Up @@ -1843,6 +1849,8 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
new_children: Default::default(),
removed_data: Default::default(),
is_now_immutable: false,
#[cfg(feature = "verify_determinism")]
no_output_set: false,
new_output: None,
output_dependent_tasks: Default::default(),
});
Expand Down Expand Up @@ -2021,6 +2029,8 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {

// Check if output need to be updated
let current_output = get!(task, Output);
#[cfg(feature = "verify_determinism")]
let no_output_set = current_output.is_none();
let new_output = match result {
Ok(RawVc::TaskOutput(output_task_id)) => {
if let Some(OutputValue::Output(current_task_id)) = current_output
Expand Down Expand Up @@ -2092,6 +2102,8 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
new_children,
removed_data,
is_now_immutable,
#[cfg(feature = "verify_determinism")]
no_output_set,
new_output,
output_dependent_tasks,
})
Expand Down Expand Up @@ -2232,6 +2244,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
&self,
ctx: &mut impl ExecuteContext<'_>,
task_id: TaskId,
#[cfg(feature = "verify_determinism")] no_output_set: bool,
new_output: Option<OutputValue>,
removed_data: &mut Vec<CachedDataItem>,
is_now_immutable: bool,
Expand Down Expand Up @@ -2306,7 +2319,8 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
None
};

let data_update = if old_dirty_state != new_dirty_state {
let dirty_changed = old_dirty_state != new_dirty_state;
let data_update = if dirty_changed {
if let Some(new_dirty_state) = new_dirty_state {
task.insert(CachedDataItem::Dirty {
value: new_dirty_state,
Expand Down Expand Up @@ -2353,17 +2367,32 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
None
};

drop(task);
drop(old_content);
#[cfg(feature = "verify_determinism")]
let reschedule = (dirty_changed || no_output_set) && !task_id.is_transient();
#[cfg(not(feature = "verify_determinism"))]
let reschedule = false;
if reschedule {
task.add_new(CachedDataItem::InProgress {
value: InProgressState::Scheduled {
done_event,
reason: TaskExecutionReason::Stale,
},
});
drop(task);
} else {
drop(task);

// Notify dependent tasks that are waiting for this task to finish
done_event.notify(usize::MAX);
}

// Notify dependent tasks that are waiting for this task to finish
done_event.notify(usize::MAX);
drop(old_content);

if let Some(data_update) = data_update {
AggregationUpdateQueue::run(data_update, ctx);
}

false
reschedule
}

fn task_execution_completed_cleanup(&self, ctx: &mut impl ExecuteContext<'_>, task_id: TaskId) {
Expand Down Expand Up @@ -2652,12 +2681,14 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
task_id: TaskId,
cell: CellId,
content: CellContent,
verification_mode: VerificationMode,
turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
) {
operation::UpdateCellOperation::run(
task_id,
cell,
content,
verification_mode,
self.execute_context(turbo_tasks),
);
}
Expand Down Expand Up @@ -3251,9 +3282,11 @@ impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
task_id: TaskId,
cell: CellId,
content: CellContent,
verification_mode: VerificationMode,
turbo_tasks: &dyn TurboTasksBackendApi<Self>,
) {
self.0.update_task_cell(task_id, cell, content, turbo_tasks);
self.0
.update_task_cell(task_id, cell, content, verification_mode, turbo_tasks);
}

fn mark_own_task_as_finished(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use std::mem::take;

use serde::{Deserialize, Serialize};
use smallvec::SmallVec;
#[cfg(not(feature = "verify_determinism"))]
use turbo_tasks::backend::VerificationMode;
use turbo_tasks::{CellId, TaskId, TypedSharedReference, backend::CellContent};

#[cfg(feature = "trace_task_dirty")]
Expand All @@ -13,7 +15,7 @@ use crate::{
AggregationUpdateQueue, ExecuteContext, Operation, TaskGuard,
invalidate::make_task_dirty_internal,
},
storage::{get_many, remove},
storage::{get, get_many, remove},
},
data::{CachedDataItem, CachedDataItemKey, CellRef},
};
Expand All @@ -40,18 +42,61 @@ pub enum UpdateCellOperation {
}

impl UpdateCellOperation {
pub fn run(task_id: TaskId, cell: CellId, content: CellContent, mut ctx: impl ExecuteContext) {
pub fn run(
task_id: TaskId,
cell: CellId,
content: CellContent,
#[cfg(feature = "verify_determinism")] verification_mode: VerificationMode,
#[cfg(not(feature = "verify_determinism"))] _verification_mode: VerificationMode,
mut ctx: impl ExecuteContext,
) {
let content = if let CellContent(Some(new_content)) = content {
Some(new_content.into_typed(cell.type_id))
} else {
None
};

let mut task = ctx.task(task_id, TaskDataCategory::All);

let is_stateful = task.has_key(&CachedDataItemKey::Stateful {});
// We need to detect recomputation, because here the content has not actually changed (even
// if it's not equal to the old content, as not all values implement Eq). We have to
// assume that tasks are deterministic and pure.
let should_invalidate = ctx.should_track_dependencies()
&& (task.has_key(&CachedDataItemKey::Dirty {}) ||
let assume_unchanged = !ctx.should_track_dependencies()
|| (!task.has_key(&CachedDataItemKey::Dirty {})
// This is a hack for the streaming hack. Stateful tasks are never recomputed, so this forces invalidation for them in case of this hack.
task.has_key(&CachedDataItemKey::Stateful {}));
&& !is_stateful);

let old_content = get!(task, CellData { cell });

if assume_unchanged {
if old_content.is_some() {
// Never update cells when recomputing if they already have a value.
// It's not expected that content changes during recomputation.

// Check if this assumption holds.
#[cfg(feature = "verify_determinism")]
if !is_stateful
&& matches!(verification_mode, VerificationMode::EqualityCheck)
&& content.as_ref() != old_content
{
let task_description = ctx.get_task_description(task_id);
let cell_type = turbo_tasks::registry::get_value_type(cell.type_id).global_name;
eprintln!(
"Task {} updated cell #{} (type: {}) while recomputing",
task_description, cell.index, cell_type
);
}
return;
} else {
// Initial computation, or computation after a cell has been cleared.
// We can just set the content, but we don't want to notify dependent tasks,
// as we assume that content hasn't changed (deterministic tasks).
}
} else {
// When not recomputing, we need to notify dependent tasks if the content actually
// changes.

if should_invalidate {
let dependent_tasks: SmallVec<[TaskId; 4]> = get_many!(
task,
CellDependent { cell: dependent_cell, task }
Expand All @@ -78,12 +123,6 @@ impl UpdateCellOperation {
drop(task);
drop(old_content);

let content = if let CellContent(Some(new_content)) = content {
Some(new_content.into_typed(cell.type_id))
} else {
None
};

UpdateCellOperation::InvalidateWhenCellDependency {
cell_ref: CellRef {
task: task_id,
Expand All @@ -101,8 +140,7 @@ impl UpdateCellOperation {
// Fast path: We don't need to invalidate anything.
// So we can just update the cell content.

let old_content = if let CellContent(Some(new_content)) = content {
let new_content = new_content.into_typed(cell.type_id);
let old_content = if let Some(new_content) = content {
task.insert(CachedDataItem::CellData {
cell,
value: new_content,
Expand Down
3 changes: 3 additions & 0 deletions turbopack/crates/turbo-tasks-backend/tests/detached.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ static REGISTRATION: Registration = register!();
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_spawns_detached() -> anyhow::Result<()> {
run_once(&REGISTRATION, || async {
println!("test_spawns_detached");
// HACK: The watch channel we use has an incorrect implementation of `TraceRawVcs`, just
// disable GC for the test so this can't cause any problems.
prevent_gc();
Expand Down Expand Up @@ -73,7 +74,9 @@ async fn spawns_detached(
sender: TransientInstance<WatchSenderTaskInput<Option<Vc<u32>>>>,
) -> Vc<()> {
tokio::spawn(turbo_tasks().detached_for_testing(Box::pin(async move {
println!("spawns_detached: waiting for notify");
notify.0.notified().await;
println!("spawns_detached: notified, sending value");
// creating cells after the normal lifetime of the task should be okay, as the parent task
// is waiting on us before exiting!
sender.0.send(Some(Vc::cell(42))).unwrap();
Expand Down
10 changes: 8 additions & 2 deletions turbopack/crates/turbo-tasks-testing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use tokio::sync::mpsc::Receiver;
use turbo_tasks::{
CellId, ExecutionId, InvalidationReason, LocalTaskId, MagicAny, RawVc, ReadCellOptions,
ReadOutputOptions, TaskId, TaskPersistence, TraitTypeId, TurboTasksApi, TurboTasksCallApi,
backend::{CellContent, TaskCollectiblesMap, TypedCellContent},
backend::{CellContent, TaskCollectiblesMap, TypedCellContent, VerificationMode},
event::{Event, EventListener},
message_queue::CompilationEvent,
test_helpers::with_turbo_tasks_for_testing,
Expand Down Expand Up @@ -261,7 +261,13 @@ impl TurboTasksApi for VcStorage {
.into_typed(index.type_id))
}

fn update_own_task_cell(&self, task: TaskId, index: CellId, content: CellContent) {
fn update_own_task_cell(
&self,
task: TaskId,
index: CellId,
content: CellContent,
_verification_mode: VerificationMode,
) {
let mut map = self.cells.lock().unwrap();
let cell = map.entry((task, index)).or_default();
*cell = content;
Expand Down
20 changes: 13 additions & 7 deletions turbopack/crates/turbo-tasks-testing/src/run.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{fmt::Debug, future::Future, sync::Arc};
use std::{env, fmt::Debug, future::Future, sync::Arc};

use anyhow::Result;
use turbo_tasks::{TurboTasksApi, trace::TraceRawVcs};
Expand Down Expand Up @@ -98,22 +98,28 @@ where
F: Future<Output = Result<T>> + Send + 'static,
T: Debug + PartialEq + Eq + TraceRawVcs + Send + 'static,
{
let single_run = env::var("SINGLE_RUN").is_ok();
let name = closure_to_name(&fut);
let tt = registration.create_turbo_tasks(&name, true);
println!("Run #1 (without cache)");
let start = std::time::Instant::now();
let first = fut(tt.clone()).await?;
println!("Run #1 took {:?}", start.elapsed());
for i in 2..10 {
println!("Run #{i} (with memory cache, same TurboTasks instance)");
let start = std::time::Instant::now();
let second = fut(tt.clone()).await?;
println!("Run #{i} took {:?}", start.elapsed());
assert_eq!(first, second);
if !single_run {
for i in 2..10 {
println!("Run #{i} (with memory cache, same TurboTasks instance)");
let start = std::time::Instant::now();
let second = fut(tt.clone()).await?;
println!("Run #{i} took {:?}", start.elapsed());
assert_eq!(first, second);
}
}
let start = std::time::Instant::now();
tt.stop_and_wait().await;
println!("Stopping TurboTasks took {:?}", start.elapsed());
if single_run {
return Ok(());
}
for i in 10..20 {
let tt = registration.create_turbo_tasks(&name, false);
println!("Run #{i} (with filesystem cache if available, new TurboTasks instance)");
Expand Down
6 changes: 6 additions & 0 deletions turbopack/crates/turbo-tasks/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,11 @@ impl From<anyhow::Error> for TurboTasksExecutionError {
}
}

pub enum VerificationMode {
EqualityCheck,
Skip,
}

pub trait Backend: Sync + Send {
#[allow(unused_variables)]
fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {}
Expand Down Expand Up @@ -621,6 +626,7 @@ pub trait Backend: Sync + Send {
task: TaskId,
index: CellId,
content: CellContent,
verification_mode: VerificationMode,
turbo_tasks: &dyn TurboTasksBackendApi<Self>,
);

Expand Down
Loading
Loading