Formatting and minor code changes

This commit is contained in:
Lol3rrr
2024-10-18 13:11:10 +02:00
parent 835b4484dc
commit 6c098b412a
9 changed files with 198 additions and 135 deletions

View File

@@ -36,10 +36,33 @@ pub static ANALYSIS_METHODS: std::sync::LazyLock<[std::sync::Arc<dyn Analysis +
]
});
pub async fn poll_next_task(
upload_folder: &std::path::Path,
#[derive(Debug)]
pub enum TaskError<AE> {
Diesel(diesel::result::Error),
RunningAction(AE),
}
impl<AE> From<diesel::result::Error> for TaskError<AE> {
fn from(value: diesel::result::Error) -> Self {
Self::Diesel(value)
}
}
pub async fn poll_next_task<A, AE>(
upload_folder: impl Into<std::path::PathBuf>,
db_con: &mut diesel_async::pg::AsyncPgConnection,
) -> Result<AnalysisInput, ()> {
action: A,
) -> Result<(), TaskError<AE>>
where
A: Fn(
AnalysisInput,
&mut diesel_async::pg::AsyncPgConnection,
)
-> core::pin::Pin<Box<(dyn core::future::Future<Output = Result<(), AE>> + Send + '_)>>
+ Send
+ Clone
+ Sync,
{
let query = crate::schema::analysis_queue::dsl::analysis_queue
.order(crate::schema::analysis_queue::dsl::created_at.asc())
.limit(1)
@@ -47,50 +70,53 @@ pub async fn poll_next_task(
.for_update()
.skip_locked();
let upload_folder: std::path::PathBuf = upload_folder.into();
loop {
let upload_folder = upload_folder.clone();
let action = action.clone();
let result = db_con
.build_transaction()
.run::<_, diesel::result::Error, _>(|conn| {
.run::<_, TaskError<AE>, _>(|conn| {
Box::pin(async move {
let mut results: Vec<crate::models::AnalysisTask> = query.load(conn).await?;
let final_result = match results.pop() {
let task = match results.pop() {
Some(r) => r,
None => return Ok(None),
};
let input = AnalysisInput {
path: upload_folder
.join(&task.steam_id)
.join(format!("{}.dem", task.demo_id)),
steamid: task.steam_id.clone(),
demoid: task.demo_id.clone(),
};
let tmp = action(input, &mut *conn);
tmp.await.map_err(|e| TaskError::RunningAction(e))?;
let delete_query =
diesel::dsl::delete(crate::schema::analysis_queue::dsl::analysis_queue)
.filter(
crate::schema::analysis_queue::dsl::demo_id
.eq(final_result.demo_id.clone()),
)
.filter(
crate::schema::analysis_queue::dsl::steam_id
.eq(final_result.steam_id.clone()),
);
.filter(crate::schema::analysis_queue::dsl::demo_id.eq(task.demo_id))
.filter(crate::schema::analysis_queue::dsl::steam_id.eq(task.steam_id));
delete_query.execute(conn).await?;
Ok(Some(final_result))
Ok(Some(()))
})
})
.await;
match result {
Ok(Some(r)) => {
return Ok(AnalysisInput {
path: upload_folder
.join(&r.steam_id)
.join(format!("{}.dem", r.demo_id)),
steamid: r.steam_id,
demoid: r.demo_id,
});
Ok(Some(())) => {
return Ok(());
}
Ok(None) => {
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
Err(e) => {
tracing::error!("Getting Task from Postgres: {:?}", e);
return Err(());
return Err(e);
}
};
}

View File

@@ -456,7 +456,11 @@ async fn perround(
.map(|dteam| common::demo_analysis::PerRoundTeam {
name: dteam.start_name,
number: dteam.team as u32,
players: players.iter().filter(|p| p.team == dteam.team).map(|p| p.name.clone()).collect(),
players: players
.iter()
.filter(|p| p.team == dteam.team)
.map(|p| p.name.clone())
.collect(),
})
.collect();

View File

@@ -64,7 +64,8 @@ pub async fn run_api(
let serve_dir = option_env!("FRONTEND_DIST_DIR").unwrap_or("../frontend/dist/");
tracing::debug!("Serving static files from {:?}", serve_dir);
let steam_callback_base_url = std::env::var("BASE_URL").unwrap_or("http://localhost:3000".to_owned());
let steam_callback_base_url =
std::env::var("BASE_URL").unwrap_or("http://localhost:3000".to_owned());
tracing::debug!("Base-URL: {:?}", steam_callback_base_url);
let router = axum::Router::new()
@@ -93,68 +94,63 @@ pub async fn run_api(
#[tracing::instrument(skip(upload_folder))]
pub async fn run_analysis(upload_folder: impl Into<std::path::PathBuf>) {
use diesel::prelude::*;
use diesel_async::{AsyncConnection, RunQueryDsl};
use diesel_async::RunQueryDsl;
let upload_folder: std::path::PathBuf = upload_folder.into();
loop {
let mut db_con = db_connection().await;
let input = match crate::analysis::poll_next_task(&upload_folder, &mut db_con).await {
Ok(i) => i,
Err(e) => {
tracing::error!("Polling for next Task: {:?}", e);
tokio::time::sleep(std::time::Duration::from_secs(30)).await;
continue;
}
};
let demo_id = input.demoid.clone();
let mut store_result_fns = Vec::new();
for analysis in analysis::ANALYSIS_METHODS.iter().map(|a| a.clone()) {
let input = input.clone();
let store_result =
match tokio::task::spawn_blocking(move || analysis.analyse(input)).await {
Ok(Ok(r)) => r,
Ok(Err(e)) => {
tracing::error!("Analysis failed: {:?}", e);
continue;
}
Err(e) => {
tracing::error!("Joining Task: {:?}", e);
continue;
}
};
store_result_fns.push(store_result);
}
let mut db_con = crate::db_connection().await;
let update_process_info =
diesel::dsl::update(crate::schema::processing_status::dsl::processing_status)
.set(crate::schema::processing_status::dsl::info.eq(1))
.filter(crate::schema::processing_status::dsl::demo_id.eq(demo_id));
let store_res = db_con
.transaction::<'_, '_, '_, _, diesel::result::Error, _>(|conn| {
let res = crate::analysis::poll_next_task(
&upload_folder,
&mut db_con,
move |input: analysis::AnalysisInput, db_con: &mut diesel_async::AsyncPgConnection| {
Box::pin(async move {
for store_fn in store_result_fns {
store_fn(conn).await?;
}
update_process_info.execute(conn).await?;
let demo_id = input.demoid.clone();
Ok(())
let mut store_result_fns = Vec::new();
for analysis in analysis::ANALYSIS_METHODS.iter().map(|a| a.clone()) {
let input = input.clone();
let store_result = match tokio::task::spawn_blocking(move || {
analysis.analyse(input)
})
.await
{
Ok(Ok(r)) => r,
Ok(Err(e)) => {
tracing::error!("Analysis failed: {:?}", e);
continue;
}
Err(e) => {
tracing::error!("Joining Task: {:?}", e);
continue;
}
};
store_result_fns.push(store_result);
}
let update_process_info = diesel::dsl::update(
crate::schema::processing_status::dsl::processing_status,
)
.set(crate::schema::processing_status::dsl::info.eq(1))
.filter(crate::schema::processing_status::dsl::demo_id.eq(demo_id));
for store_fn in store_result_fns {
store_fn(db_con).await.map_err(|e| ())?;
}
update_process_info.execute(db_con).await.map_err(|e| ())?;
Ok::<(), ()>(())
})
})
.await;
match store_res {
Ok(_) => {
tracing::info!("Stored analysis results");
}
Err(e) => {
tracing::error!("Failed to store results: {:?}", e);
}
};
},
)
.await;
if let Err(e) = res {
tracing::error!("Polling for next Task: {:?}", e);
tokio::time::sleep(std::time::Duration::from_secs(30)).await;
continue;
}
}
}