diff --git a/backend/src/analysis.rs b/backend/src/analysis.rs index be8c451..6b59030 100644 --- a/backend/src/analysis.rs +++ b/backend/src/analysis.rs @@ -1,5 +1,44 @@ use std::path::PathBuf; +use diesel::prelude::*; +use diesel_async::RunQueryDsl; + +pub async fn poll_next_task(upload_folder: &std::path::Path, db_con: &mut diesel_async::pg::AsyncPgConnection) -> Result { + let query = crate::schema::analysis_queue::dsl::analysis_queue.order(crate::schema::analysis_queue::dsl::created_at.asc()).limit(1).select(crate::models::AnalysisTask::as_select()).for_update().skip_locked(); + + loop { + let result = db_con.build_transaction().run::<'_, _, diesel::result::Error, _>(|conn| Box::pin(async move { + let mut results: Vec = query.load(conn).await?; + let final_result = match results.pop() { + Some(r) => r, + None => return Ok(None), + }; + + let delete_query = diesel::dsl::delete(crate::schema::analysis_queue::dsl::analysis_queue).filter(crate::schema::analysis_queue::dsl::demo_id.eq(final_result.demo_id)).filter(crate::schema::analysis_queue::dsl::steam_id.eq(final_result.steam_id.clone())); + delete_query.execute(conn).await?; + + Ok(Some(final_result)) + })).await; + + match result { + Ok(Some(r)) => { + return Ok(AnalysisInput { + path: upload_folder.join(&r.steam_id).join(format!("{}.dem", r.demo_id)), + steamid: r.steam_id, + demoid: r.demo_id, + }); + } + Ok(None) => { + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + } + Err(e) => { + tracing::error!("Getting Task from Postgres: {:?}", e); + return Err(()); + } + }; + } +} + #[derive(Debug)] pub struct AnalysisInput { pub steamid: String, diff --git a/backend/src/api.rs b/backend/src/api.rs index d744cfb..b8d57c0 100644 --- a/backend/src/api.rs +++ b/backend/src/api.rs @@ -155,7 +155,6 @@ pub struct RouterConfig { } pub fn router( - base_analysis: tokio::sync::mpsc::UnboundedSender, config: RouterConfig, ) -> axum::Router { axum::Router::new() @@ -167,6 +166,6 @@ pub fn router( config.steam_api_key, ), ) - .nest("/demos/", demos::router(config.upload_dir, base_analysis)) + .nest("/demos/", demos::router(config.upload_dir)) .nest("/user/", user::router()) } diff --git a/backend/src/api/demos.rs b/backend/src/api/demos.rs index ff8da34..a306432 100644 --- a/backend/src/api/demos.rs +++ b/backend/src/api/demos.rs @@ -6,12 +6,10 @@ use std::sync::Arc; struct DemoState { upload_folder: std::path::PathBuf, - base_analysis: tokio::sync::mpsc::UnboundedSender, } pub fn router

( upload_folder: P, - base_analysis: tokio::sync::mpsc::UnboundedSender, ) -> axum::Router where P: Into, @@ -27,7 +25,6 @@ where .route("/:id/reanalyse", axum::routing::get(analyise)) .with_state(Arc::new(DemoState { upload_folder: upload_folder.into(), - base_analysis, })) } @@ -104,14 +101,12 @@ async fn upload( }); query.execute(&mut db_con).await.unwrap(); - state - .base_analysis - .send(crate::analysis::AnalysisInput { - steamid: steam_id.to_string(), - demoid: demo_id, - path: demo_file_path, - }) - .unwrap(); + let queue_query = diesel::dsl::insert_into(crate::schema::analysis_queue::dsl::analysis_queue).values(crate::models::AddAnalysisTask { + demo_id, + steam_id: steam_id.to_string(), + }); + queue_query.execute(&mut db_con).await.unwrap(); + let processing_query = diesel::dsl::insert_into(crate::schema::processing_status::dsl::processing_status) .values(crate::models::ProcessingStatus { demo_id, info: 0 }); @@ -120,9 +115,8 @@ async fn upload( Ok(axum::response::Redirect::to("/")) } -#[tracing::instrument(skip(state, session))] +#[tracing::instrument(skip(session))] async fn analyise( - State(state): State>, session: crate::UserSession, Path(demo_id): Path, ) -> Result<(), (axum::http::StatusCode, &'static str)> { @@ -150,15 +144,11 @@ async fn analyise( )); } - let user_folder = std::path::Path::new(&state.upload_folder).join(format!("{}/", steam_id)); - state - .base_analysis - .send(crate::analysis::AnalysisInput { - path: user_folder.join(format!("{}.dem", demo_id)), - demoid: demo_id, - steamid: steam_id.to_string(), - }) - .unwrap(); + let queue_query = diesel::dsl::insert_into(crate::schema::analysis_queue::dsl::analysis_queue).values(crate::models::AddAnalysisTask { + demo_id, + steam_id: steam_id.to_string(), + }); + queue_query.execute(&mut db_con).await.unwrap(); Ok(()) } diff --git a/backend/src/lib.rs b/backend/src/lib.rs index bd597b2..1dc8d31 100644 --- a/backend/src/lib.rs +++ b/backend/src/lib.rs @@ -43,10 +43,9 @@ pub async fn get_demo_from_upload( pub mod api; pub mod steam_api; -#[tracing::instrument(skip(upload_folder, base_analysis_tx, steam_api_key))] +#[tracing::instrument(skip(upload_folder, steam_api_key))] pub async fn run_api( upload_folder: impl Into, - base_analysis_tx: tokio::sync::mpsc::UnboundedSender, steam_api_key: impl Into, ) { let upload_folder: std::path::PathBuf = upload_folder.into(); @@ -66,10 +65,10 @@ pub async fn run_api( .nest( "/api/", crate::api::router( - base_analysis_tx, crate::api::RouterConfig { steam_api_key: steam_api_key.into(), - steam_callback_base_url: "http://192.168.0.156:3000".into(), + // steam_callback_base_url: "http://192.168.0.156:3000".into(), + steam_callback_base_url: "http://localhost:3000".into(), steam_callback_path: "/api/steam/callback".into(), upload_dir: upload_folder.clone(), }, @@ -85,14 +84,26 @@ pub async fn run_api( axum::serve(listener, router).await.unwrap(); } -#[tracing::instrument(skip(base_analysis_rx))] +#[tracing::instrument(skip(upload_folder))] pub async fn run_analysis( - mut base_analysis_rx: tokio::sync::mpsc::UnboundedReceiver, + upload_folder: impl Into ) { use diesel::prelude::*; use diesel_async::{AsyncConnection, RunQueryDsl}; - while let Some(input) = base_analysis_rx.recv().await { + let upload_folder: std::path::PathBuf = upload_folder.into(); + + + loop { + let mut db_con = db_connection().await; + let input = match crate::analysis::poll_next_task(&upload_folder, &mut db_con).await { + Ok(i) => i, + Err(e) => { + tracing::error!("Polling for next Task: {:?}", e); + break; + } + }; + let demo_id = input.demoid; let result = tokio::task::spawn_blocking(move || crate::analysis::analyse_base(input)) @@ -126,5 +137,8 @@ pub async fn run_analysis( }) .await .unwrap(); + + // TODO + // Remove task from queue } } diff --git a/backend/src/main.rs b/backend/src/main.rs index cabd939..43906af 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -38,9 +38,6 @@ async fn main() { run_migrations(&mut backend::db_connection().await).await; tracing::info!("Completed Migrations"); - let (base_analysis_tx, base_analysis_rx) = - tokio::sync::mpsc::unbounded_channel::(); - let mut component_set = tokio::task::JoinSet::new(); if args.api { @@ -54,12 +51,11 @@ async fn main() { component_set.spawn(backend::run_api( args.upload_folder.clone(), - base_analysis_tx, steam_api_key, )); } if args.analysis { - component_set.spawn(backend::run_analysis(base_analysis_rx)); + component_set.spawn(backend::run_analysis(args.upload_folder.clone())); } component_set.join_all().await; diff --git a/backend/src/models.rs b/backend/src/models.rs index 5a4e6f0..e8179fc 100644 --- a/backend/src/models.rs +++ b/backend/src/models.rs @@ -40,3 +40,19 @@ pub struct ProcessingStatus { pub demo_id: i64, pub info: i16, } + +#[derive(Insertable, Debug)] +#[diesel(table_name = crate::schema::analysis_queue)] +#[diesel(check_for_backend(diesel::pg::Pg))] +pub struct AddAnalysisTask { + pub demo_id: i64, + pub steam_id: String, +} + +#[derive(Queryable, Selectable, Debug)] +#[diesel(table_name = crate::schema::analysis_queue)] +#[diesel(check_for_backend(diesel::pg::Pg))] +pub struct AnalysisTask { + pub demo_id: i64, + pub steam_id: String, +} diff --git a/backend/src/schema.rs b/backend/src/schema.rs index 689d5e7..db7ae3a 100644 --- a/backend/src/schema.rs +++ b/backend/src/schema.rs @@ -1,5 +1,13 @@ // @generated automatically by Diesel CLI. +diesel::table! { + analysis_queue (demo_id) { + demo_id -> Int8, + steam_id -> Text, + created_at -> Timestamp, + } +} + diesel::table! { demo_info (demo_id) { demo_id -> Int8, @@ -36,7 +44,15 @@ diesel::table! { } } +diesel::joinable!(analysis_queue -> demos (demo_id)); diesel::joinable!(demo_info -> demos (demo_id)); diesel::joinable!(processing_status -> demos (demo_id)); -diesel::allow_tables_to_appear_in_same_query!(demo_info, demos, processing_status, sessions, users,); +diesel::allow_tables_to_appear_in_same_query!( + analysis_queue, + demo_info, + demos, + processing_status, + sessions, + users, +); diff --git a/migrations/2024-09-16-221954_analysis-queue/down.sql b/migrations/2024-09-16-221954_analysis-queue/down.sql new file mode 100644 index 0000000..b6f2e6b --- /dev/null +++ b/migrations/2024-09-16-221954_analysis-queue/down.sql @@ -0,0 +1,2 @@ +-- This file should undo anything in `up.sql` +DROP TABLE ANALYSIS_QUEUE; diff --git a/migrations/2024-09-16-221954_analysis-queue/up.sql b/migrations/2024-09-16-221954_analysis-queue/up.sql new file mode 100644 index 0000000..4122343 --- /dev/null +++ b/migrations/2024-09-16-221954_analysis-queue/up.sql @@ -0,0 +1,6 @@ +-- Your SQL goes here +CREATE TABLE IF NOT EXISTS ANALYSIS_QUEUE ( + demo_id bigint PRIMARY KEY REFERENCES demos(demo_id), + steam_id Text NOT NULL, + created_at timestamp NOT NULL default current_timestamp +);