diff --git a/backend/src/lib.rs b/backend/src/lib.rs index ecf5f65..0b73f4b 100644 --- a/backend/src/lib.rs +++ b/backend/src/lib.rs @@ -68,3 +68,61 @@ pub mod steam_api { } } } + +#[tracing::instrument(skip(upload_folder, base_analysis_tx))] +pub async fn run_api(upload_folder: UP, base_analysis_tx: tokio::sync::mpsc::UnboundedSender) where UP: Into { + let upload_folder: std::path::PathBuf = upload_folder.into(); + + let session_store = crate::diesel_sessionstore::DieselStore::new(); + let session_layer = tower_sessions::SessionManagerLayer::new(session_store) + .with_secure(false) + .with_expiry(tower_sessions::Expiry::OnInactivity( + time::Duration::hours(48), + )); + + if !tokio::fs::try_exists(&upload_folder).await.unwrap_or(false) { + tokio::fs::create_dir_all(&upload_folder).await.unwrap(); + } + + let router = axum::Router::new() + .nest("/api/", crate::api::router(base_analysis_tx)) + .layer(session_layer) + .nest_service("/", tower_http::services::ServeDir::new("../frontend/dist/")); + + let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap(); + axum::serve(listener, router).await.unwrap(); +} + +#[tracing::instrument] +pub async fn run_analysis(mut base_analysis_rx: tokio::sync::mpsc::UnboundedReceiver) { + use diesel::prelude::*; + use diesel_async::{RunQueryDsl, AsyncConnection}; + + while let Some(input) = base_analysis_rx.recv().await { + let demo_id = input.demoid; + + let result = tokio::task::spawn_blocking(move || { + crate::analysis::analyse_base(input) + }).await.unwrap(); + + dbg!(&result); + + let mut db_con = crate::db_connection().await; + + let store_info_query = diesel::dsl::insert_into(crate::schema::demo_info::dsl::demo_info).values(crate::models::DemoInfo { + demo_id, + map: result.map, + }); + let update_process_info = diesel::dsl::update(crate::schema::processing_status::dsl::processing_status).set(crate::schema::processing_status::dsl::info.eq(1)).filter(crate::schema::processing_status::dsl::demo_id.eq(demo_id)); + + tracing::trace!(?store_info_query, "Store demo info query"); + tracing::trace!(?update_process_info, "Update processing info query"); + + db_con.transaction::<'_, '_, '_, _, diesel::result::Error, _>(|conn| Box::pin(async move { + store_info_query.execute(conn).await.map(|e| ())?; + update_process_info.execute(conn).await.map(|e| ())?; + Ok(()) + })).await; + + } +} diff --git a/backend/src/main.rs b/backend/src/main.rs index 711fa8b..b15e147 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -1,8 +1,5 @@ use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt; -use diesel::prelude::*; -use diesel_async::RunQueryDsl; - static UPLOAD_FOLDER: &str = "uploads/"; const MIGRATIONS: diesel_async_migrations::EmbeddedMigrations = diesel_async_migrations::embed_migrations!("../migrations/"); @@ -26,59 +23,12 @@ async fn main() { run_migrations(&mut backend::db_connection().await).await; tracing::info!("Completed Migrations"); - let (base_analysis_tx, mut base_analysis_rx) = tokio::sync::mpsc::unbounded_channel::(); - tokio::task::spawn_blocking(move || { - use diesel_async::AsyncConnection; + let (base_analysis_tx, base_analysis_rx) = tokio::sync::mpsc::unbounded_channel::(); - while let Some(input) = base_analysis_rx.blocking_recv() { - let demo_id = input.demoid; - let result = backend::analysis::analyse_base(input); + let mut component_set = tokio::task::JoinSet::new(); - dbg!(&result); + component_set.spawn(backend::run_api(UPLOAD_FOLDER, base_analysis_tx)); + component_set.spawn(backend::run_analysis(base_analysis_rx)); - let handle = tokio::task::spawn( - async move { - let mut db_con = backend::db_connection().await; - - let store_info_query = diesel::dsl::insert_into(backend::schema::demo_info::dsl::demo_info).values(backend::models::DemoInfo { - demo_id, - map: result.map, - }); - let update_process_info = diesel::dsl::update(backend::schema::processing_status::dsl::processing_status).set(backend::schema::processing_status::dsl::info.eq(1)).filter(backend::schema::processing_status::dsl::demo_id.eq(demo_id)); - - tracing::trace!(?store_info_query, "Store demo info query"); - tracing::trace!(?update_process_info, "Update processing info query"); - - db_con.transaction::<'_, '_, '_, _, diesel::result::Error, _>(|conn| Box::pin(async move { - store_info_query.execute(conn).await.map(|e| ())?; - update_process_info.execute(conn).await.map(|e| ())?; - Ok(()) - })).await; - } - ); - } - }); - - let session_store = backend::diesel_sessionstore::DieselStore::new(); - let session_layer = tower_sessions::SessionManagerLayer::new(session_store) - .with_secure(false) - .with_expiry(tower_sessions::Expiry::OnInactivity( - time::Duration::hours(48), - )); - - if !tokio::fs::try_exists(UPLOAD_FOLDER).await.unwrap_or(false) { - tokio::fs::create_dir_all(UPLOAD_FOLDER).await.unwrap(); - } - - let router = axum::Router::new() - .nest("/api/", backend::api::router(base_analysis_tx)) - .layer(session_layer) - .nest_service("/", tower_http::services::ServeDir::new("../frontend/dist/")); - - let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap(); - axum::serve(listener, router).await.unwrap(); -} - -async fn demo_info(session: backend::UserSession) -> Result<(), axum::http::StatusCode> { - Ok(()) + component_set.join_all().await; }