Skip to content

Commit 932ba39

Browse files
Merge pull request #132 from code0-tech/110-sagittarius-definition-endpoint
Sagittarius Definition Endpoint
2 parents 87dee81 + 9291b62 commit 932ba39

File tree

13 files changed

+349
-33
lines changed

13 files changed

+349
-33
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/cli/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,6 @@ tokio = { workspace = true, features = ["rt", "rt-multi-thread", "macros"] }
2121
futures = { workspace = true }
2222
zip = { workspace = true }
2323
bytes = { workspace = true }
24-
prost = { workspace = true }
24+
prost = { workspace = true }
25+
tonic = "0.14.2"
26+
log = "0.4.28"

crates/cli/src/analyser/flow_type.rs

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -36,27 +36,27 @@ impl Analyser {
3636
));
3737
}
3838

39-
if let Some(identifier) = &flow.input_type_identifier {
40-
if !self.data_type_identifier_exists(identifier, None) {
41-
self.reporter.add(Diagnose::new(
42-
name.clone(),
43-
original.clone(),
44-
DiagnosticKind::UndefinedDataTypeIdentifier {
45-
identifier: identifier.clone(),
46-
},
47-
));
48-
}
39+
if let Some(identifier) = &flow.input_type_identifier
40+
&& !self.data_type_identifier_exists(identifier, None)
41+
{
42+
self.reporter.add(Diagnose::new(
43+
name.clone(),
44+
original.clone(),
45+
DiagnosticKind::UndefinedDataTypeIdentifier {
46+
identifier: identifier.clone(),
47+
},
48+
));
4949
}
50-
if let Some(identifier) = &flow.return_type_identifier {
51-
if !self.data_type_identifier_exists(identifier, None) {
52-
self.reporter.add(Diagnose::new(
53-
name.clone(),
54-
original.clone(),
55-
DiagnosticKind::UndefinedDataTypeIdentifier {
56-
identifier: identifier.clone(),
57-
},
58-
));
59-
}
50+
if let Some(identifier) = &flow.return_type_identifier
51+
&& !self.data_type_identifier_exists(identifier, None)
52+
{
53+
self.reporter.add(Diagnose::new(
54+
name.clone(),
55+
original.clone(),
56+
DiagnosticKind::UndefinedDataTypeIdentifier {
57+
identifier: identifier.clone(),
58+
},
59+
));
6060
}
6161

6262
for setting in &flow.settings {

crates/cli/src/command/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
pub mod definition;
21
pub mod download;
32
pub mod feature;
3+
pub mod push;
44
pub mod report;
5+
pub mod search;
56
pub mod watch;
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
use std::str::FromStr;
2+
use tonic::metadata::{MetadataMap, MetadataValue};
3+
4+
pub fn get_authorization_metadata(token: &str) -> MetadataMap {
5+
let metadata_value = MetadataValue::from_str(token).unwrap_or_else(|error| {
6+
panic!(
7+
"An error occurred trying to convert runtime_token into metadata: {}",
8+
error
9+
);
10+
});
11+
12+
let mut map = MetadataMap::new();
13+
map.insert("authorization", metadata_value);
14+
map
15+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
use crate::command::push::auth::get_authorization_metadata;
2+
use tonic::{Extensions, Request, transport::Channel};
3+
use tucana::sagittarius::{
4+
DataTypeUpdateRequest as SagittariusDataTypeUpdateRequest,
5+
data_type_service_client::DataTypeServiceClient,
6+
};
7+
use tucana::shared::DefinitionDataType;
8+
9+
pub struct SagittariusDataTypeServiceClient {
10+
client: DataTypeServiceClient<Channel>,
11+
token: String,
12+
}
13+
14+
impl SagittariusDataTypeServiceClient {
15+
pub async fn new(sagittarius_url: String, token: String) -> Self {
16+
let client = match DataTypeServiceClient::connect(sagittarius_url).await {
17+
Ok(client) => {
18+
log::info!("Successfully connected to Sagittarius DataType Endpoint!");
19+
client
20+
}
21+
Err(err) => panic!(
22+
"Failed to connect to Sagittarius (DataType Endpoint): {:?}",
23+
err
24+
),
25+
};
26+
27+
Self { client, token }
28+
}
29+
30+
pub async fn update_data_types(&mut self, data_types: Vec<DefinitionDataType>) {
31+
let request = Request::from_parts(
32+
get_authorization_metadata(&self.token),
33+
Extensions::new(),
34+
SagittariusDataTypeUpdateRequest { data_types },
35+
);
36+
37+
match self.client.update(request).await {
38+
Ok(response) => {
39+
log::info!(
40+
"Successfully transferred data types. Did Sagittarius updated them? {:?}",
41+
&response
42+
);
43+
}
44+
Err(err) => {
45+
log::error!("Failed to update DataTypes: {:?}", err);
46+
}
47+
};
48+
}
49+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
use crate::command::push::auth::get_authorization_metadata;
2+
use tonic::Extensions;
3+
use tonic::Request;
4+
use tonic::transport::Channel;
5+
use tucana::sagittarius::FlowTypeUpdateRequest as SagittariusFlowTypeUpdateRequest;
6+
use tucana::sagittarius::flow_type_service_client::FlowTypeServiceClient;
7+
use tucana::shared::FlowType;
8+
9+
pub struct SagittariusFlowTypeServiceClient {
10+
client: FlowTypeServiceClient<Channel>,
11+
token: String,
12+
}
13+
14+
impl SagittariusFlowTypeServiceClient {
15+
pub async fn new(sagittarius_url: String, token: String) -> Self {
16+
let client = match FlowTypeServiceClient::connect(sagittarius_url).await {
17+
Ok(client) => {
18+
log::info!("Successfully connected to Sagittarius FlowType Endpoint!");
19+
client
20+
}
21+
Err(err) => panic!(
22+
"Failed to connect to Sagittarius (FlowType Endpoint): {:?}",
23+
err
24+
),
25+
};
26+
27+
Self { client, token }
28+
}
29+
30+
pub async fn update_flow_types(&mut self, flow_types: Vec<FlowType>) {
31+
let request = Request::from_parts(
32+
get_authorization_metadata(&self.token),
33+
Extensions::new(),
34+
SagittariusFlowTypeUpdateRequest { flow_types },
35+
);
36+
37+
match self.client.update(request).await {
38+
Ok(response) => {
39+
log::info!(
40+
"Successfully transferred FlowTypes. Did Sagittarius updated them? {:?}",
41+
&response
42+
);
43+
}
44+
Err(err) => {
45+
log::error!("Failed to update FlowTypes: {:?}", err);
46+
}
47+
};
48+
}
49+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
use crate::command::push::auth::get_authorization_metadata;
2+
use tonic::Extensions;
3+
use tonic::Request;
4+
use tonic::transport::Channel;
5+
use tucana::sagittarius::RuntimeFunctionDefinitionUpdateRequest as SagittariusRuntimeFunctionUpdateRequest;
6+
use tucana::sagittarius::runtime_function_definition_service_client::RuntimeFunctionDefinitionServiceClient;
7+
use tucana::shared::RuntimeFunctionDefinition;
8+
9+
pub struct SagittariusRuntimeFunctionServiceClient {
10+
client: RuntimeFunctionDefinitionServiceClient<Channel>,
11+
token: String,
12+
}
13+
14+
impl SagittariusRuntimeFunctionServiceClient {
15+
pub async fn new(sagittarius_url: String, token: String) -> Self {
16+
let client = match RuntimeFunctionDefinitionServiceClient::connect(sagittarius_url).await {
17+
Ok(client) => {
18+
log::info!("Successfully connected to Sagittarius RuntimeFunction Endpoint!");
19+
client
20+
}
21+
Err(err) => panic!(
22+
"Failed to connect to Sagittarius (RuntimeFunction Endpoint): {:?}",
23+
err
24+
),
25+
};
26+
27+
Self { client, token }
28+
}
29+
30+
pub async fn update_runtime_function_definitions(
31+
&mut self,
32+
runtime_functions: Vec<RuntimeFunctionDefinition>,
33+
) {
34+
let request = Request::from_parts(
35+
get_authorization_metadata(&self.token),
36+
Extensions::new(),
37+
SagittariusRuntimeFunctionUpdateRequest { runtime_functions },
38+
);
39+
40+
match self.client.update(request).await {
41+
Ok(response) => {
42+
log::info!(
43+
"Successfully transferred RuntimeFunctions. Did Sagittarius updated them? {:?}",
44+
&response
45+
);
46+
}
47+
Err(err) => {
48+
log::error!("Failed to update RuntimeFunctions: {:?}", err);
49+
}
50+
};
51+
}
52+
}

crates/cli/src/command/push/mod.rs

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
use crate::analyser::core::Analyser;
2+
use crate::command::push::data_type_client_impl::SagittariusDataTypeServiceClient;
3+
use crate::command::push::flow_type_client_impl::SagittariusFlowTypeServiceClient;
4+
use crate::command::push::function_client_impl::SagittariusRuntimeFunctionServiceClient;
5+
use crate::formatter::{default, info};
6+
use notify::event::ModifyKind;
7+
use notify::{EventKind, RecursiveMode, Watcher, recommended_watcher};
8+
use std::sync::mpsc::channel;
9+
use std::time::{Duration, Instant};
10+
11+
mod auth;
12+
mod data_type_client_impl;
13+
mod flow_type_client_impl;
14+
mod function_client_impl;
15+
16+
pub async fn push(token: String, url: String, path: Option<String>) {
17+
let dir_path = path.unwrap_or_else(|| "./definitions".to_string());
18+
19+
info(format!("Watching directory: {dir_path}"));
20+
info(String::from("Press Ctrl+C to stop watching..."));
21+
22+
{
23+
Analyser::new(dir_path.as_str()).report(false);
24+
}
25+
26+
// Set up file watcher
27+
let (tx, rx) = channel();
28+
let mut watcher = recommended_watcher(tx).unwrap();
29+
watcher
30+
.watch(std::path::Path::new(&dir_path), RecursiveMode::Recursive)
31+
.unwrap();
32+
33+
let mut last_run = Instant::now();
34+
35+
let mut data_type_client =
36+
SagittariusDataTypeServiceClient::new(url.clone(), token.clone()).await;
37+
let mut flow_type_client =
38+
SagittariusFlowTypeServiceClient::new(url.clone(), token.clone()).await;
39+
let mut function_client = SagittariusRuntimeFunctionServiceClient::new(url, token).await;
40+
41+
loop {
42+
if let Ok(Ok(event)) = rx.recv() {
43+
match event.kind {
44+
EventKind::Modify(modify) => {
45+
if let ModifyKind::Data(_) = modify
46+
&& last_run.elapsed() > Duration::from_millis(500)
47+
{
48+
default(String::from(
49+
"\n\n\n--------------------------------------------------------------------------\n\n",
50+
));
51+
info(String::from("Change detected! Regenerating report..."));
52+
let mut analyzer = Analyser::new(dir_path.as_str());
53+
54+
// No errors when reporter is empty!
55+
if analyzer.reporter.is_empty() {
56+
data_type_client
57+
.update_data_types(
58+
analyzer
59+
.data_types
60+
.iter()
61+
.map(|d| d.definition_data_type.clone())
62+
.collect(),
63+
)
64+
.await;
65+
flow_type_client
66+
.update_flow_types(
67+
analyzer
68+
.flow_types
69+
.iter()
70+
.map(|d| d.flow_type.clone())
71+
.collect(),
72+
)
73+
.await;
74+
function_client
75+
.update_runtime_function_definitions(
76+
analyzer
77+
.functions
78+
.iter()
79+
.map(|d| d.function.clone())
80+
.collect(),
81+
)
82+
.await;
83+
}
84+
85+
analyzer.report(false);
86+
87+
last_run = Instant::now();
88+
}
89+
}
90+
EventKind::Remove(_) => {
91+
if last_run.elapsed() > Duration::from_millis(500) {
92+
default(String::from(
93+
"\n\n\n--------------------------------------------------------------------------\n\n",
94+
));
95+
info(String::from("Change detected! Regenerating report..."));
96+
let mut analyzer = Analyser::new(dir_path.as_str());
97+
98+
// No errors when reporter is empty!
99+
if analyzer.reporter.is_empty() {
100+
data_type_client
101+
.update_data_types(
102+
analyzer
103+
.data_types
104+
.iter()
105+
.map(|d| d.definition_data_type.clone())
106+
.collect(),
107+
)
108+
.await;
109+
flow_type_client
110+
.update_flow_types(
111+
analyzer
112+
.flow_types
113+
.iter()
114+
.map(|d| d.flow_type.clone())
115+
.collect(),
116+
)
117+
.await;
118+
function_client
119+
.update_runtime_function_definitions(
120+
analyzer
121+
.functions
122+
.iter()
123+
.map(|d| d.function.clone())
124+
.collect(),
125+
)
126+
.await;
127+
}
128+
129+
analyzer.report(false);
130+
last_run = Instant::now();
131+
}
132+
}
133+
_ => {}
134+
}
135+
}
136+
}
137+
}

0 commit comments

Comments
 (0)