Skip to content
Closed
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
361adbc
inital code for versioned ingress apis
camelCasedAditya Jul 31, 2025
772de6e
typescript versioned ingest working
camelCasedAditya Jul 31, 2025
e5e7aa2
versioned registration of infra
camelCasedAditya Aug 1, 2025
0c03301
Revert "versioned registration of infra"
camelCasedAditya Aug 1, 2025
e5a7f09
Revert "Revert "versioned registration of infra""
camelCasedAditya Aug 1, 2025
d034f1b
python versioned ingest apis complete
camelCasedAditya Aug 1, 2025
e8ee77b
ls function displays versioned ingest api infra
camelCasedAditya Aug 4, 2025
1878770
Update packages/ts-moose-lib/src/dmv2/sdk/ingestApi.ts
camelCasedAditya Aug 4, 2025
112a1ee
id generation function change
camelCasedAditya Aug 4, 2025
9e8e15c
fixed mismatched version keys
camelCasedAditya Aug 4, 2025
f8e36f2
cross language consistent infra naming
camelCasedAditya Aug 4, 2025
5bf4990
templates changes
camelCasedAditya Aug 5, 2025
1e4af07
fixed unversioned ingest routing and workflows
camelCasedAditya Aug 5, 2025
1ed6c20
fixed failing tests
camelCasedAditya Aug 5, 2025
8f9c7d1
test for unversioned ingest API
camelCasedAditya Aug 5, 2025
5e33a9c
lints fixes
camelCasedAditya Aug 5, 2025
1a13c8b
version table regestration fix
camelCasedAditya Aug 5, 2025
2a82006
fixed regex as string
camelCasedAditya Aug 5, 2025
164411c
rebased
camelCasedAditya Aug 6, 2025
7dc69cc
reduce code duplication
camelCasedAditya Aug 8, 2025
48e6505
removed unneeded streaming function runner changes
camelCasedAditya Aug 11, 2025
6214bbb
Merge branch 'main' into versioned-ingest-apis
camelCasedAditya Aug 11, 2025
fabfadc
use version field in infra map
camelCasedAditya Aug 12, 2025
b501f56
fallback logic consistent with egress APIs
camelCasedAditya Aug 12, 2025
6721f81
reduced for loop iterations
camelCasedAditya Aug 12, 2025
2ffdd5e
Update packages/ts-moose-lib/src/dmv2/internal.ts
camelCasedAditya Aug 12, 2025
4c01563
removed case sensitivity
camelCasedAditya Aug 12, 2025
2eeee58
dlq fix
camelCasedAditya Aug 12, 2025
dc8e011
better streaming lookup
camelCasedAditya Aug 12, 2025
c9f8aa6
ingest api lookup effiecency improvement
camelCasedAditya Aug 19, 2025
4ad0d5d
Removed unneeded topic field
camelCasedAditya Aug 19, 2025
fb026d9
clippy fix
camelCasedAditya Aug 19, 2025
6c5c96e
Merge branch 'main' into versioned-ingest-apis
camelCasedAditya Aug 21, 2025
965ca28
nested api versions allowed
camelCasedAditya Aug 22, 2025
1ef4bc9
unit tests for ingest routing
camelCasedAditya Aug 22, 2025
bba2d76
better nested test
camelCasedAditya Aug 22, 2025
754cc0f
fixed streaming for nested versions
camelCasedAditya Aug 22, 2025
49d00b6
clippy fixes
camelCasedAditya Aug 22, 2025
694ee3e
standrardizing registry
camelCasedAditya Aug 22, 2025
02c21c0
Merge branch 'main' into versioned-ingest-apis
camelCasedAditya Aug 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 67 additions & 2 deletions apps/framework-cli-e2e/test/templates.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,11 @@ const utils = {
});
console.log("Truncated Bar table");

await client.command({
query: "TRUNCATE TABLE IF EXISTS Foo",
});
console.log("Truncated Foo table");

// Clear materialized view tables
const mvTables = ["BarAggregated", "bar_aggregated"];
for (const table of mvTables) {
Expand Down Expand Up @@ -562,7 +567,7 @@ describe("Moose Templates", () => {
const responses = [];

for (let i = 0; i < recordsToSend; i++) {
const response = await fetch(`${TEST_CONFIG.server.url}/ingest/Foo`, {
const response = await fetch(`${TEST_CONFIG.server.url}/ingest/Foo/1`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
Expand Down Expand Up @@ -622,6 +627,36 @@ describe("Moose Templates", () => {
"Optional Text: Hello world",
]);
});

it("should successfully ingest data to unversioned API and verify in Foo table", async function () {
const eventId = randomUUID();

// Test unversioned API endpoint
const response = await fetch(`${TEST_CONFIG.server.url}/ingest/Foo`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
id: eventId,
timestamp: TEST_CONFIG.timestamp,
message: "Hello from unversioned API",
category: "test",
priority: 1,
}),
});

if (!response.ok) {
console.error("Response code:", response.status);
const text = await response.text();
console.error(`Unversioned API test request failed: ${text}`);
throw new Error(`${response.status}: ${text}`);
}

// Wait for data to be written to Foo table
await utils.waitForDBWrite(devProcess!, "Foo", 1);

// Verify data was inserted into the Foo table
await utils.verifyClickhouseData("Foo", eventId, "id");
});
});

describe("python template", () => {
Expand Down Expand Up @@ -745,7 +780,7 @@ describe("Moose Templates", () => {

it("should successfully ingest data and verify through consumption API", async function () {
const eventId = randomUUID();
const response = await fetch(`${TEST_CONFIG.server.url}/ingest/foo`, {
const response = await fetch(`${TEST_CONFIG.server.url}/ingest/foo/1`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
Expand Down Expand Up @@ -809,5 +844,35 @@ describe("Moose Templates", () => {
"Optional Text: Hello from Python",
]);
});

it("should successfully ingest data to unversioned API and verify in Foo table", async function () {
const eventId = randomUUID();

// Test unversioned API endpoint
const response = await fetch(`${TEST_CONFIG.server.url}/ingest/foo`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
id: eventId,
timestamp: TEST_CONFIG.timestamp,
message: "Hello from unversioned Python API",
category: "test",
priority: 2,
}),
});

if (!response.ok) {
console.error("Response code:", response.status);
const text = await response.text();
console.error(`Unversioned Python API test request failed: ${text}`);
throw new Error(`${response.status}: ${text}`);
}

// Wait for data to be written to Foo table
await utils.waitForDBWrite(devProcess!, "Foo", 1);

// Verify data was inserted into the Foo table
await utils.verifyClickhouseData("Foo", eventId, "id");
});
});
});
57 changes: 35 additions & 22 deletions apps/framework-cli/src/cli/local_webserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1067,39 +1067,52 @@
let res = match (configured_producer, req.method(), &route_split[..]) {
(Some(configured_producer), &hyper::Method::POST, ["ingest", _]) => {
if project.features.data_model_v2 {
// For v2, find the latest version if no version specified
let route_table_read = route_table.read().await;
let base_path = route.to_str().unwrap();
let mut latest_version: Option<&Version> = None;

// First find matching routes, then get latest version
for (path, meta) in route_table_read.iter() {
let path_str = path.to_str().unwrap();
if path_str.starts_with(base_path) {
if let Some(version) = &meta.version {
if latest_version.is_none() || version > latest_version.unwrap() {
latest_version = Some(version);
}
}
}
}

match latest_version {
// If latest version exists, use it
Some(version) => {
// First, try to find an exact match for the unversioned route
if route_table_read.contains_key(&route) {
// Exact match found for unversioned route
ingest_route(
req,
route,
configured_producer,
route_table,
is_prod,
jwt_config,
project.http_server_config.max_request_body_size,
)
.await
} else {
// No exact match, look for versioned routes that share the same base path
// If there is exactly ONE such versioned route, use it. Otherwise, return not found.
let base_path = route.to_str().unwrap();

// Concise early-exit: collect up to 2 matches only
let matches: Vec<PathBuf> = route_table_read
.iter()
.filter(|(path, meta)| {
meta.version.is_some()
&& path

Check failure on line 1095 in apps/framework-cli/src/cli/local_webserver.rs

View workflow job for this annotation

GitHub Actions / Lints

this `map_or` can be simplified
.to_str()
.map_or(false, |s| s.starts_with(&format!("{base_path}/")))
})
.map(|(path, _)| path.clone())
.take(2)
.collect();

if matches.len() == 1 {
ingest_route(
req,
route.join(version.to_string()),
matches[0].clone(),
configured_producer,
route_table,
is_prod,
jwt_config,
project.http_server_config.max_request_body_size,
)
.await
}
None => {
// Otherwise, try direct route
} else {
// Either none or multiple versioned routes exist for this base path; return not found
ingest_route(
req,
route,
Expand Down
53 changes: 38 additions & 15 deletions apps/framework-cli/src/cli/routines/ls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,15 +401,21 @@ impl StreamInfo {
fn from_topic(
value: Topic,
topic_to_table_sync_processes: &HashMap<String, TopicToTableSyncProcess>,
infra_map: &InfrastructureMap,
) -> Self {
let process = topic_to_table_sync_processes
.values()
.find(|p| p.source_topic_id == value.id());

Self {
name: value.name,
name: value.id(),
schema_fields: value.columns.iter().map(|col| col.name.clone()).collect(),
destination: process.map(|p| p.target_table_id.to_string()),
destination: process.and_then(|p| {
infra_map
.get_table(&p.target_table_id)
.ok()
.map(|table| table.name.clone())
}),
}
}
}
Expand Down Expand Up @@ -443,6 +449,7 @@ impl ResourceInfo for Vec<StreamInfo> {
pub struct IngestionApiInfo {
pub name: String,
pub destination: String,
pub path: String,
}

fn to_info(endpoint: &ApiEndpoint) -> Either<IngestionApiInfo, ConsumptionApiInfo> {
Expand All @@ -454,6 +461,7 @@ fn to_info(endpoint: &ApiEndpoint) -> Either<IngestionApiInfo, ConsumptionApiInf
} => Either::Left(IngestionApiInfo {
name: endpoint.name.clone(),
destination: target_topic_id.clone(),
path: endpoint.path.to_string_lossy().to_string(),
}),
APIType::EGRESS {
query_params,
Expand All @@ -476,9 +484,13 @@ impl ResourceInfo for Vec<IngestionApiInfo> {
fn show(&self) {
show_table(
"Ingestion APIs".to_string(),
vec!["name".to_string(), "destination".to_string()],
vec![
"name".to_string(),
"destination".to_string(),
"path".to_string(),
],
self.iter()
.map(|api| vec![api.name.clone(), api.destination.clone()])
.map(|api| vec![api.name.clone(), api.destination.clone(), api.path.clone()])
.collect(),
)
}
Expand Down Expand Up @@ -656,18 +668,29 @@ pub async fn ls_dmv2(
.values()
.filter(|api| name.is_none_or(|name| api.name.contains(name)))
.partition_map(to_info);

// Extract all needed data to avoid ownership issues
let topics: Vec<Topic> = infra_map
.topics
.values()
.filter(|api| name.is_none_or(|name| api.name.contains(name)))
.cloned()
.collect();

let tables: Vec<Table> = infra_map
.tables
.values()
.filter(|api| name.is_none_or(|name| api.name.contains(name)))
.cloned()
.collect();

let resources = ResourceListing {
tables: infra_map
.tables
.into_values()
.filter(|api| name.is_none_or(|name| api.name.contains(name)))
.map(|t| t.into())
.collect(),
streams: infra_map
.topics
.into_values()
.filter(|api| name.is_none_or(|name| api.name.contains(name)))
.map(|t| StreamInfo::from_topic(t, &infra_map.topic_to_table_sync_processes))
tables: tables.into_iter().map(|t| t.into()).collect(),
streams: topics
.into_iter()
.map(|t| {
StreamInfo::from_topic(t, &infra_map.topic_to_table_sync_processes, &infra_map)
})
.collect(),
ingestion_apis,
sql_resources: infra_map
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ pub struct TopicToTableSyncProcess {
impl TopicToTableSyncProcess {
pub fn new(topic: &Topic, table: &Table) -> Self {
if topic.version != table.version {
panic!("Version mismatch between topic and table")
panic!(
"Version mismatch between topic {:?} and table {:?}",
topic.version, table.version
);
}

TopicToTableSyncProcess {
Expand Down
Loading
Loading