diff --git a/analysis/tests/endofgame.rs b/analysis/tests/endofgame.rs index 303892d..57fea3c 100644 --- a/analysis/tests/endofgame.rs +++ b/analysis/tests/endofgame.rs @@ -11,13 +11,24 @@ fn endofgame_nuke() { let expected = endofgame::EndOfGame { map: "de_nuke".to_owned(), - teams: [(2, endofgame::TeamInfo { - end_score: 13, - start_side: "CT".into(), - }), (3, endofgame::TeamInfo { - end_score: 8, - start_side: "TERRORIST".into() - })].into_iter().collect(), + teams: [ + ( + 2, + endofgame::TeamInfo { + end_score: 13, + start_side: "CT".into(), + }, + ), + ( + 3, + endofgame::TeamInfo { + end_score: 8, + start_side: "TERRORIST".into(), + }, + ), + ] + .into_iter() + .collect(), players: vec![ ( endofgame::PlayerInfo { diff --git a/backend/src/analysis.rs b/backend/src/analysis.rs index 4c38120..1511f0c 100644 --- a/backend/src/analysis.rs +++ b/backend/src/analysis.rs @@ -36,10 +36,33 @@ pub static ANALYSIS_METHODS: std::sync::LazyLock<[std::sync::Arc { + Diesel(diesel::result::Error), + RunningAction(AE), +} + +impl From for TaskError { + fn from(value: diesel::result::Error) -> Self { + Self::Diesel(value) + } +} + +pub async fn poll_next_task( + upload_folder: impl Into, db_con: &mut diesel_async::pg::AsyncPgConnection, -) -> Result { + action: A, +) -> Result<(), TaskError> +where + A: Fn( + AnalysisInput, + &mut diesel_async::pg::AsyncPgConnection, + ) + -> core::pin::Pin> + Send + '_)>> + + Send + + Clone + + Sync, +{ let query = crate::schema::analysis_queue::dsl::analysis_queue .order(crate::schema::analysis_queue::dsl::created_at.asc()) .limit(1) @@ -47,50 +70,53 @@ pub async fn poll_next_task( .for_update() .skip_locked(); + let upload_folder: std::path::PathBuf = upload_folder.into(); + loop { + let upload_folder = upload_folder.clone(); + let action = action.clone(); + let result = db_con .build_transaction() - .run::<_, diesel::result::Error, _>(|conn| { + .run::<_, TaskError, _>(|conn| { Box::pin(async move { let mut results: Vec = query.load(conn).await?; - let final_result = match results.pop() { + let task = match results.pop() { Some(r) => r, None => return Ok(None), }; + let input = AnalysisInput { + path: upload_folder + .join(&task.steam_id) + .join(format!("{}.dem", task.demo_id)), + steamid: task.steam_id.clone(), + demoid: task.demo_id.clone(), + }; + + let tmp = action(input, &mut *conn); + tmp.await.map_err(|e| TaskError::RunningAction(e))?; + let delete_query = diesel::dsl::delete(crate::schema::analysis_queue::dsl::analysis_queue) - .filter( - crate::schema::analysis_queue::dsl::demo_id - .eq(final_result.demo_id.clone()), - ) - .filter( - crate::schema::analysis_queue::dsl::steam_id - .eq(final_result.steam_id.clone()), - ); + .filter(crate::schema::analysis_queue::dsl::demo_id.eq(task.demo_id)) + .filter(crate::schema::analysis_queue::dsl::steam_id.eq(task.steam_id)); delete_query.execute(conn).await?; - Ok(Some(final_result)) + Ok(Some(())) }) }) .await; match result { - Ok(Some(r)) => { - return Ok(AnalysisInput { - path: upload_folder - .join(&r.steam_id) - .join(format!("{}.dem", r.demo_id)), - steamid: r.steam_id, - demoid: r.demo_id, - }); + Ok(Some(())) => { + return Ok(()); } Ok(None) => { tokio::time::sleep(std::time::Duration::from_secs(5)).await; } Err(e) => { - tracing::error!("Getting Task from Postgres: {:?}", e); - return Err(()); + return Err(e); } }; } diff --git a/backend/src/api/demos.rs b/backend/src/api/demos.rs index 9918abb..12e695d 100644 --- a/backend/src/api/demos.rs +++ b/backend/src/api/demos.rs @@ -456,7 +456,11 @@ async fn perround( .map(|dteam| common::demo_analysis::PerRoundTeam { name: dteam.start_name, number: dteam.team as u32, - players: players.iter().filter(|p| p.team == dteam.team).map(|p| p.name.clone()).collect(), + players: players + .iter() + .filter(|p| p.team == dteam.team) + .map(|p| p.name.clone()) + .collect(), }) .collect(); diff --git a/backend/src/lib.rs b/backend/src/lib.rs index b80a698..94fbf9e 100644 --- a/backend/src/lib.rs +++ b/backend/src/lib.rs @@ -64,7 +64,8 @@ pub async fn run_api( let serve_dir = option_env!("FRONTEND_DIST_DIR").unwrap_or("../frontend/dist/"); tracing::debug!("Serving static files from {:?}", serve_dir); - let steam_callback_base_url = std::env::var("BASE_URL").unwrap_or("http://localhost:3000".to_owned()); + let steam_callback_base_url = + std::env::var("BASE_URL").unwrap_or("http://localhost:3000".to_owned()); tracing::debug!("Base-URL: {:?}", steam_callback_base_url); let router = axum::Router::new() @@ -93,68 +94,63 @@ pub async fn run_api( #[tracing::instrument(skip(upload_folder))] pub async fn run_analysis(upload_folder: impl Into) { use diesel::prelude::*; - use diesel_async::{AsyncConnection, RunQueryDsl}; + use diesel_async::RunQueryDsl; let upload_folder: std::path::PathBuf = upload_folder.into(); loop { let mut db_con = db_connection().await; - let input = match crate::analysis::poll_next_task(&upload_folder, &mut db_con).await { - Ok(i) => i, - Err(e) => { - tracing::error!("Polling for next Task: {:?}", e); - tokio::time::sleep(std::time::Duration::from_secs(30)).await; - continue; - } - }; - let demo_id = input.demoid.clone(); - - let mut store_result_fns = Vec::new(); - for analysis in analysis::ANALYSIS_METHODS.iter().map(|a| a.clone()) { - let input = input.clone(); - let store_result = - match tokio::task::spawn_blocking(move || analysis.analyse(input)).await { - Ok(Ok(r)) => r, - Ok(Err(e)) => { - tracing::error!("Analysis failed: {:?}", e); - continue; - } - Err(e) => { - tracing::error!("Joining Task: {:?}", e); - continue; - } - }; - - store_result_fns.push(store_result); - } - - let mut db_con = crate::db_connection().await; - - let update_process_info = - diesel::dsl::update(crate::schema::processing_status::dsl::processing_status) - .set(crate::schema::processing_status::dsl::info.eq(1)) - .filter(crate::schema::processing_status::dsl::demo_id.eq(demo_id)); - - let store_res = db_con - .transaction::<'_, '_, '_, _, diesel::result::Error, _>(|conn| { + let res = crate::analysis::poll_next_task( + &upload_folder, + &mut db_con, + move |input: analysis::AnalysisInput, db_con: &mut diesel_async::AsyncPgConnection| { Box::pin(async move { - for store_fn in store_result_fns { - store_fn(conn).await?; - } - update_process_info.execute(conn).await?; + let demo_id = input.demoid.clone(); - Ok(()) + let mut store_result_fns = Vec::new(); + for analysis in analysis::ANALYSIS_METHODS.iter().map(|a| a.clone()) { + let input = input.clone(); + let store_result = match tokio::task::spawn_blocking(move || { + analysis.analyse(input) + }) + .await + { + Ok(Ok(r)) => r, + Ok(Err(e)) => { + tracing::error!("Analysis failed: {:?}", e); + continue; + } + Err(e) => { + tracing::error!("Joining Task: {:?}", e); + continue; + } + }; + + store_result_fns.push(store_result); + } + + let update_process_info = diesel::dsl::update( + crate::schema::processing_status::dsl::processing_status, + ) + .set(crate::schema::processing_status::dsl::info.eq(1)) + .filter(crate::schema::processing_status::dsl::demo_id.eq(demo_id)); + + for store_fn in store_result_fns { + store_fn(db_con).await.map_err(|e| ())?; + } + update_process_info.execute(db_con).await.map_err(|e| ())?; + + Ok::<(), ()>(()) }) - }) - .await; - match store_res { - Ok(_) => { - tracing::info!("Stored analysis results"); - } - Err(e) => { - tracing::error!("Failed to store results: {:?}", e); - } - }; + }, + ) + .await; + + if let Err(e) = res { + tracing::error!("Polling for next Task: {:?}", e); + tokio::time::sleep(std::time::Duration::from_secs(30)).await; + continue; + } } } diff --git a/frontend/src/demo.rs b/frontend/src/demo.rs index ebf01ba..72642a4 100644 --- a/frontend/src/demo.rs +++ b/frontend/src/demo.rs @@ -2,8 +2,8 @@ use leptos::*; use leptos_router::{Outlet, A}; pub mod heatmap; -pub mod scoreboard; pub mod perround; +pub mod scoreboard; #[derive(Debug, Clone)] struct CurrentDemoName(ReadSignal); @@ -31,10 +31,10 @@ pub fn demo() -> impl leptos::IntoView { }, ); - let rerun_analysis = create_action(move |_: &()| { - async move { - let _ = reqwasm::http::Request::get(&format!("/api/demos/{}/reanalyse", id())).send().await; - } + let rerun_analysis = create_action(move |_: &()| async move { + let _ = reqwasm::http::Request::get(&format!("/api/demos/{}/reanalyse", id())) + .send() + .await; }); let map = move || match demo_info.get() { diff --git a/frontend/src/demo/heatmap.rs b/frontend/src/demo/heatmap.rs index a60ba1b..30ad194 100644 --- a/frontend/src/demo/heatmap.rs +++ b/frontend/src/demo/heatmap.rs @@ -4,19 +4,17 @@ use super::CurrentDemoName; #[leptos::component] pub fn heatmaps() -> impl leptos::IntoView { - let heatmaps_resource = - create_resource(leptos_router::use_params_map(), |params| async move { - let id = params.get("id").unwrap(); + let heatmaps_resource = create_resource(leptos_router::use_params_map(), |params| async move { + let id = params.get("id").unwrap(); - let res = - reqwasm::http::Request::get(&format!("/api/demos/{}/analysis/heatmap", id)) - .send() - .await - .unwrap(); - res.json::>() - .await - .unwrap() - }); + let res = reqwasm::http::Request::get(&format!("/api/demos/{}/analysis/heatmap", id)) + .send() + .await + .unwrap(); + res.json::>() + .await + .unwrap() + }); let style = stylers::style! { "Heatmap-Wrapper", @@ -55,7 +53,7 @@ fn heatmap_view(heatmaps: Vec) -> impl lep .heatmap_container { display: inline-block; } - + .heatmap_image { width: min(40vw, 60vh); height: min(40vw, 60vh); @@ -111,17 +109,27 @@ fn heatmap_view(heatmaps: Vec) -> impl lep let player = players.get(idx).unwrap(); - set_value(heatmaps.iter().filter(|h| &h.name == player).cloned().collect()); + set_value( + heatmaps + .iter() + .filter(|h| &h.name == player) + .cloned() + .collect(), + ); set_idx(idx); }; let players = og_players; let select_values = move || { - players.iter().enumerate().map(|(idx, name)| { - view! { - - } - }).collect::>() + players + .iter() + .enumerate() + .map(|(idx, name)| { + view! { + + } + }) + .collect::>() }; view! { @@ -132,7 +140,7 @@ fn heatmap_view(heatmaps: Vec) -> impl lep
- { heatmap_view } + { heatmap_view } } } diff --git a/frontend/src/demo/perround.rs b/frontend/src/demo/perround.rs index d0002ba..b3ab22c 100644 --- a/frontend/src/demo/perround.rs +++ b/frontend/src/demo/perround.rs @@ -14,25 +14,23 @@ fn to_coloumn(idx: usize) -> usize { #[leptos::component] pub fn per_round() -> impl leptos::IntoView { - let perround_resource = - create_resource(leptos_router::use_params_map(), |params| async move { - let id = params.get("id").unwrap(); + let perround_resource = create_resource(leptos_router::use_params_map(), |params| async move { + let id = params.get("id").unwrap(); + + let res = reqwasm::http::Request::get(&format!("/api/demos/{}/analysis/perround", id)) + .send() + .await + .unwrap(); + res.json::() + .await + .unwrap() + }); - let res = - reqwasm::http::Request::get(&format!("/api/demos/{}/analysis/perround", id)) - .send() - .await - .unwrap(); - res.json::() - .await - .unwrap() - }); - let style = stylers::style! { "PerRound", .round_overview { display: inline-grid; - + width: 90vw; grid-template-columns: auto repeat(12, 1fr) 5px repeat(12, 1fr) 5px repeat(3, 1fr) 5px repeat(3, 1fr); grid-template-rows: repeat(3, auto); @@ -73,7 +71,10 @@ pub fn per_round() -> impl leptos::IntoView { let events_list = move || { let round_index = round(); let data = perround_resource.get(); - let current_round = data.as_ref().map(|rs| rs.rounds.get(round_index).cloned()).flatten(); + let current_round = data + .as_ref() + .map(|rs| rs.rounds.get(round_index).cloned()) + .flatten(); let teams = data.as_ref().map(|rs| rs.teams.clone()); match (current_round, teams) { @@ -171,13 +172,22 @@ pub fn per_round() -> impl leptos::IntoView { None => return view! {}.into_view(), }; - let upper = perround_teams.iter().find(|t| t.name == "CT").map(|t| t.number).unwrap_or(0); - let lower = perround_teams.iter().find(|t| t.name == "TERRORIST").map(|t| t.number).unwrap_or(0); + let upper = perround_teams + .iter() + .find(|t| t.name == "CT") + .map(|t| t.number) + .unwrap_or(0); + let lower = perround_teams + .iter() + .find(|t| t.name == "TERRORIST") + .map(|t| t.number) + .unwrap_or(0); view! { Team { upper } Team { lower } - }.into_view() + } + .into_view() }; view! { diff --git a/frontend/src/demo/scoreboard.rs b/frontend/src/demo/scoreboard.rs index 0143711..6abffc8 100644 --- a/frontend/src/demo/scoreboard.rs +++ b/frontend/src/demo/scoreboard.rs @@ -25,8 +25,10 @@ pub fn scoreboard() -> impl leptos::IntoView { .get() .into_iter() .flat_map(|v| v.teams.into_iter()) - .map(|(team, players)| view! { - + .map(|(team, players)| { + view! { + + } }) .collect::>() }; @@ -46,7 +48,10 @@ mod orderings { #[derive(Debug, Clone)] pub struct Ordering { name: SelectedStat, - pub sort_fn: fn(p1: &common::demo_analysis::ScoreBoardPlayer, p2: &common::demo_analysis::ScoreBoardPlayer) -> core::cmp::Ordering, + pub sort_fn: fn( + p1: &common::demo_analysis::ScoreBoardPlayer, + p2: &common::demo_analysis::ScoreBoardPlayer, + ) -> core::cmp::Ordering, } impl Ordering { @@ -89,7 +94,10 @@ mod orderings { } #[leptos::component] -fn team_scoreboard(value: Vec, team_name: String) -> impl IntoView { +fn team_scoreboard( + value: Vec, + team_name: String, +) -> impl IntoView { let (ordering, set_ordering) = create_signal::(orderings::DAMAGE); let style = stylers::style! { diff --git a/frontend/src/main.rs b/frontend/src/main.rs index 578c2c6..a8cb1f4 100644 --- a/frontend/src/main.rs +++ b/frontend/src/main.rs @@ -20,7 +20,7 @@ fn main() { - +