Switch to postgres for the analysis task queue and some other minor improvements

This commit is contained in:
Lol3rrr
2024-09-17 16:35:55 +02:00
parent fd4d3f735d
commit 8290fcf390
9 changed files with 115 additions and 37 deletions

View File

@@ -1,5 +1,44 @@
use std::path::PathBuf;
use diesel::prelude::*;
use diesel_async::RunQueryDsl;
pub async fn poll_next_task(upload_folder: &std::path::Path, db_con: &mut diesel_async::pg::AsyncPgConnection) -> Result<AnalysisInput, ()> {
let query = crate::schema::analysis_queue::dsl::analysis_queue.order(crate::schema::analysis_queue::dsl::created_at.asc()).limit(1).select(crate::models::AnalysisTask::as_select()).for_update().skip_locked();
loop {
let result = db_con.build_transaction().run::<'_, _, diesel::result::Error, _>(|conn| Box::pin(async move {
let mut results: Vec<crate::models::AnalysisTask> = query.load(conn).await?;
let final_result = match results.pop() {
Some(r) => r,
None => return Ok(None),
};
let delete_query = diesel::dsl::delete(crate::schema::analysis_queue::dsl::analysis_queue).filter(crate::schema::analysis_queue::dsl::demo_id.eq(final_result.demo_id)).filter(crate::schema::analysis_queue::dsl::steam_id.eq(final_result.steam_id.clone()));
delete_query.execute(conn).await?;
Ok(Some(final_result))
})).await;
match result {
Ok(Some(r)) => {
return Ok(AnalysisInput {
path: upload_folder.join(&r.steam_id).join(format!("{}.dem", r.demo_id)),
steamid: r.steam_id,
demoid: r.demo_id,
});
}
Ok(None) => {
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
Err(e) => {
tracing::error!("Getting Task from Postgres: {:?}", e);
return Err(());
}
};
}
}
#[derive(Debug)]
pub struct AnalysisInput {
pub steamid: String,

View File

@@ -155,7 +155,6 @@ pub struct RouterConfig {
}
pub fn router(
base_analysis: tokio::sync::mpsc::UnboundedSender<crate::analysis::AnalysisInput>,
config: RouterConfig,
) -> axum::Router {
axum::Router::new()
@@ -167,6 +166,6 @@ pub fn router(
config.steam_api_key,
),
)
.nest("/demos/", demos::router(config.upload_dir, base_analysis))
.nest("/demos/", demos::router(config.upload_dir))
.nest("/user/", user::router())
}

View File

@@ -6,12 +6,10 @@ use std::sync::Arc;
struct DemoState {
upload_folder: std::path::PathBuf,
base_analysis: tokio::sync::mpsc::UnboundedSender<crate::analysis::AnalysisInput>,
}
pub fn router<P>(
upload_folder: P,
base_analysis: tokio::sync::mpsc::UnboundedSender<crate::analysis::AnalysisInput>,
) -> axum::Router
where
P: Into<std::path::PathBuf>,
@@ -27,7 +25,6 @@ where
.route("/:id/reanalyse", axum::routing::get(analyise))
.with_state(Arc::new(DemoState {
upload_folder: upload_folder.into(),
base_analysis,
}))
}
@@ -104,14 +101,12 @@ async fn upload(
});
query.execute(&mut db_con).await.unwrap();
state
.base_analysis
.send(crate::analysis::AnalysisInput {
steamid: steam_id.to_string(),
demoid: demo_id,
path: demo_file_path,
})
.unwrap();
let queue_query = diesel::dsl::insert_into(crate::schema::analysis_queue::dsl::analysis_queue).values(crate::models::AddAnalysisTask {
demo_id,
steam_id: steam_id.to_string(),
});
queue_query.execute(&mut db_con).await.unwrap();
let processing_query =
diesel::dsl::insert_into(crate::schema::processing_status::dsl::processing_status)
.values(crate::models::ProcessingStatus { demo_id, info: 0 });
@@ -120,9 +115,8 @@ async fn upload(
Ok(axum::response::Redirect::to("/"))
}
#[tracing::instrument(skip(state, session))]
#[tracing::instrument(skip(session))]
async fn analyise(
State(state): State<Arc<DemoState>>,
session: crate::UserSession,
Path(demo_id): Path<i64>,
) -> Result<(), (axum::http::StatusCode, &'static str)> {
@@ -150,15 +144,11 @@ async fn analyise(
));
}
let user_folder = std::path::Path::new(&state.upload_folder).join(format!("{}/", steam_id));
state
.base_analysis
.send(crate::analysis::AnalysisInput {
path: user_folder.join(format!("{}.dem", demo_id)),
demoid: demo_id,
steamid: steam_id.to_string(),
})
.unwrap();
let queue_query = diesel::dsl::insert_into(crate::schema::analysis_queue::dsl::analysis_queue).values(crate::models::AddAnalysisTask {
demo_id,
steam_id: steam_id.to_string(),
});
queue_query.execute(&mut db_con).await.unwrap();
Ok(())
}

View File

@@ -43,10 +43,9 @@ pub async fn get_demo_from_upload(
pub mod api;
pub mod steam_api;
#[tracing::instrument(skip(upload_folder, base_analysis_tx, steam_api_key))]
#[tracing::instrument(skip(upload_folder, steam_api_key))]
pub async fn run_api(
upload_folder: impl Into<std::path::PathBuf>,
base_analysis_tx: tokio::sync::mpsc::UnboundedSender<analysis::AnalysisInput>,
steam_api_key: impl Into<String>,
) {
let upload_folder: std::path::PathBuf = upload_folder.into();
@@ -66,10 +65,10 @@ pub async fn run_api(
.nest(
"/api/",
crate::api::router(
base_analysis_tx,
crate::api::RouterConfig {
steam_api_key: steam_api_key.into(),
steam_callback_base_url: "http://192.168.0.156:3000".into(),
// steam_callback_base_url: "http://192.168.0.156:3000".into(),
steam_callback_base_url: "http://localhost:3000".into(),
steam_callback_path: "/api/steam/callback".into(),
upload_dir: upload_folder.clone(),
},
@@ -85,14 +84,26 @@ pub async fn run_api(
axum::serve(listener, router).await.unwrap();
}
#[tracing::instrument(skip(base_analysis_rx))]
#[tracing::instrument(skip(upload_folder))]
pub async fn run_analysis(
mut base_analysis_rx: tokio::sync::mpsc::UnboundedReceiver<analysis::AnalysisInput>,
upload_folder: impl Into<std::path::PathBuf>
) {
use diesel::prelude::*;
use diesel_async::{AsyncConnection, RunQueryDsl};
while let Some(input) = base_analysis_rx.recv().await {
let upload_folder: std::path::PathBuf = upload_folder.into();
loop {
let mut db_con = db_connection().await;
let input = match crate::analysis::poll_next_task(&upload_folder, &mut db_con).await {
Ok(i) => i,
Err(e) => {
tracing::error!("Polling for next Task: {:?}", e);
break;
}
};
let demo_id = input.demoid;
let result = tokio::task::spawn_blocking(move || crate::analysis::analyse_base(input))
@@ -126,5 +137,8 @@ pub async fn run_analysis(
})
.await
.unwrap();
// TODO
// Remove task from queue
}
}

View File

@@ -38,9 +38,6 @@ async fn main() {
run_migrations(&mut backend::db_connection().await).await;
tracing::info!("Completed Migrations");
let (base_analysis_tx, base_analysis_rx) =
tokio::sync::mpsc::unbounded_channel::<backend::analysis::AnalysisInput>();
let mut component_set = tokio::task::JoinSet::new();
if args.api {
@@ -54,12 +51,11 @@ async fn main() {
component_set.spawn(backend::run_api(
args.upload_folder.clone(),
base_analysis_tx,
steam_api_key,
));
}
if args.analysis {
component_set.spawn(backend::run_analysis(base_analysis_rx));
component_set.spawn(backend::run_analysis(args.upload_folder.clone()));
}
component_set.join_all().await;

View File

@@ -40,3 +40,19 @@ pub struct ProcessingStatus {
pub demo_id: i64,
pub info: i16,
}
#[derive(Insertable, Debug)]
#[diesel(table_name = crate::schema::analysis_queue)]
#[diesel(check_for_backend(diesel::pg::Pg))]
pub struct AddAnalysisTask {
pub demo_id: i64,
pub steam_id: String,
}
#[derive(Queryable, Selectable, Debug)]
#[diesel(table_name = crate::schema::analysis_queue)]
#[diesel(check_for_backend(diesel::pg::Pg))]
pub struct AnalysisTask {
pub demo_id: i64,
pub steam_id: String,
}

View File

@@ -1,5 +1,13 @@
// @generated automatically by Diesel CLI.
diesel::table! {
analysis_queue (demo_id) {
demo_id -> Int8,
steam_id -> Text,
created_at -> Timestamp,
}
}
diesel::table! {
demo_info (demo_id) {
demo_id -> Int8,
@@ -36,7 +44,15 @@ diesel::table! {
}
}
diesel::joinable!(analysis_queue -> demos (demo_id));
diesel::joinable!(demo_info -> demos (demo_id));
diesel::joinable!(processing_status -> demos (demo_id));
diesel::allow_tables_to_appear_in_same_query!(demo_info, demos, processing_status, sessions, users,);
diesel::allow_tables_to_appear_in_same_query!(
analysis_queue,
demo_info,
demos,
processing_status,
sessions,
users,
);