Move analysis into own crate and some more improvements all around

This commit is contained in:
Lol3rrr
2024-09-23 17:12:52 +02:00
parent 118a25aa9a
commit 7b50da640c
19 changed files with 676 additions and 206 deletions

View File

@@ -3,27 +3,51 @@ use std::path::PathBuf;
use diesel::prelude::*;
use diesel_async::RunQueryDsl;
pub async fn poll_next_task(upload_folder: &std::path::Path, db_con: &mut diesel_async::pg::AsyncPgConnection) -> Result<AnalysisInput, ()> {
let query = crate::schema::analysis_queue::dsl::analysis_queue.order(crate::schema::analysis_queue::dsl::created_at.asc()).limit(1).select(crate::models::AnalysisTask::as_select()).for_update().skip_locked();
pub async fn poll_next_task(
upload_folder: &std::path::Path,
db_con: &mut diesel_async::pg::AsyncPgConnection,
) -> Result<AnalysisInput, ()> {
let query = crate::schema::analysis_queue::dsl::analysis_queue
.order(crate::schema::analysis_queue::dsl::created_at.asc())
.limit(1)
.select(crate::models::AnalysisTask::as_select())
.for_update()
.skip_locked();
loop {
let result = db_con.build_transaction().run::<'_, _, diesel::result::Error, _>(|conn| Box::pin(async move {
let mut results: Vec<crate::models::AnalysisTask> = query.load(conn).await?;
let final_result = match results.pop() {
Some(r) => r,
None => return Ok(None),
};
let result = db_con
.build_transaction()
.run::<'_, _, diesel::result::Error, _>(|conn| {
Box::pin(async move {
let mut results: Vec<crate::models::AnalysisTask> = query.load(conn).await?;
let final_result = match results.pop() {
Some(r) => r,
None => return Ok(None),
};
let delete_query = diesel::dsl::delete(crate::schema::analysis_queue::dsl::analysis_queue).filter(crate::schema::analysis_queue::dsl::demo_id.eq(final_result.demo_id)).filter(crate::schema::analysis_queue::dsl::steam_id.eq(final_result.steam_id.clone()));
delete_query.execute(conn).await?;
let delete_query =
diesel::dsl::delete(crate::schema::analysis_queue::dsl::analysis_queue)
.filter(
crate::schema::analysis_queue::dsl::demo_id
.eq(final_result.demo_id),
)
.filter(
crate::schema::analysis_queue::dsl::steam_id
.eq(final_result.steam_id.clone()),
);
delete_query.execute(conn).await?;
Ok(Some(final_result))
})).await;
Ok(Some(final_result))
})
})
.await;
match result {
Ok(Some(r)) => {
return Ok(AnalysisInput {
path: upload_folder.join(&r.steam_id).join(format!("{}.dem", r.demo_id)),
path: upload_folder
.join(&r.steam_id)
.join(format!("{}.dem", r.demo_id)),
steamid: r.steam_id,
demoid: r.demo_id,
});
@@ -65,84 +89,34 @@ pub struct BasePlayerInfo {
pub struct BasePlayerStats {
pub kills: usize,
pub deaths: usize,
pub damage: usize,
pub assists: usize,
}
#[tracing::instrument(skip(input))]
pub fn analyse_base(input: AnalysisInput) -> BaseInfo {
tracing::info!("Performing Base analysis");
tracing::info!("Performing Base analysis");
let file = std::fs::File::open(&input.path).unwrap();
let mmap = unsafe { memmap2::MmapOptions::new().map(&file).unwrap() };
let tmp = csdemo::Container::parse(&mmap).unwrap();
let output = csdemo::parser::parse(csdemo::FrameIterator::parse(tmp.inner)).unwrap();
let header = &output.header;
tracing::info!("Header: {:?}", header);
let mut player_stats = std::collections::HashMap::with_capacity(output.player_info.len());
for event in output.events.iter() {
match event {
csdemo::DemoEvent::Tick(tick) => {}
csdemo::DemoEvent::ServerInfo(info) => {}
csdemo::DemoEvent::RankUpdate(update) => {}
csdemo::DemoEvent::RankReveal(reveal) => {}
csdemo::DemoEvent::GameEvent(gevent) => {
match gevent {
csdemo::game_event::GameEvent::BeginNewMatch(_) => {
player_stats.clear();
}
csdemo::game_event::GameEvent::PlayerTeam(pteam) => {
// tracing::info!("{:?}", pteam);
}
csdemo::game_event::GameEvent::RoundOfficiallyEnded(r_end) => {
// tracing::info!("{:?}", r_end);
}
csdemo::game_event::GameEvent::PlayerDeath(pdeath) => {
// tracing::info!("{:?}", pdeath);
let player_died_id = pdeath.userid.unwrap();
let player_died = player_stats.entry(player_died_id).or_insert(BasePlayerStats {
kills: 0,
deaths: 0,
});
player_died.deaths += 1;
if let Some(attacker_id) = pdeath.attacker {
let attacker = player_stats.entry(attacker_id).or_insert(BasePlayerStats {
kills: 0,
deaths: 0,
});
attacker.kills += 1;
// tracing::trace!("{:?} killed {:?}", attacker_id, player_died_id);
}
}
other => {}
};
}
};
}
let players: Vec<_> = player_stats.into_iter().filter_map(|(id, stats)| {
let player = output.player_info.get(&id)?;
Some((BasePlayerInfo {
name: player.name.clone(),
steam_id: player.xuid.to_string(),
team: player.team,
color: player.color,
ingame_id: id.0,
}, stats))
}).collect();
let map = header.map_name().to_owned();
let result = analysis::endofgame::parse(&mmap).unwrap();
BaseInfo {
map,
players,
map: result.map,
players: result.players.into_iter().map(|(info, stats)| {
(BasePlayerInfo {
name: info.name,
steam_id: info.steam_id,
team: info.team,
ingame_id: info.ingame_id,
color: info.color,
}, BasePlayerStats {
kills: stats.kills,
assists: stats.assists,
damage: stats.damage,
deaths: stats.deaths,
})
}).collect()
}
}

View File

@@ -154,9 +154,7 @@ pub struct RouterConfig {
pub upload_dir: std::path::PathBuf,
}
pub fn router(
config: RouterConfig,
) -> axum::Router {
pub fn router(config: RouterConfig) -> axum::Router {
axum::Router::new()
.nest(
"/steam/",

View File

@@ -8,9 +8,7 @@ struct DemoState {
upload_folder: std::path::PathBuf,
}
pub fn router<P>(
upload_folder: P,
) -> axum::Router
pub fn router<P>(upload_folder: P) -> axum::Router
where
P: Into<std::path::PathBuf>,
{
@@ -102,10 +100,11 @@ async fn upload(
});
query.execute(&mut db_con).await.unwrap();
let queue_query = diesel::dsl::insert_into(crate::schema::analysis_queue::dsl::analysis_queue).values(crate::models::AddAnalysisTask {
demo_id,
steam_id: steam_id.to_string(),
});
let queue_query = diesel::dsl::insert_into(crate::schema::analysis_queue::dsl::analysis_queue)
.values(crate::models::AddAnalysisTask {
demo_id,
steam_id: steam_id.to_string(),
});
queue_query.execute(&mut db_con).await.unwrap();
let processing_query =
@@ -145,10 +144,11 @@ async fn analyise(
));
}
let queue_query = diesel::dsl::insert_into(crate::schema::analysis_queue::dsl::analysis_queue).values(crate::models::AddAnalysisTask {
demo_id,
steam_id: steam_id.to_string(),
});
let queue_query = diesel::dsl::insert_into(crate::schema::analysis_queue::dsl::analysis_queue)
.values(crate::models::AddAnalysisTask {
demo_id,
steam_id: steam_id.to_string(),
});
queue_query.execute(&mut db_con).await.unwrap();
Ok(())
@@ -181,20 +181,33 @@ async fn info(
}
#[tracing::instrument(skip(session))]
async fn scoreboard(session: UserSession, Path(demo_id): Path<i64>) -> Result<axum::response::Json<common::demo_analysis::ScoreBoard>, axum::http::StatusCode> {
async fn scoreboard(
session: UserSession,
Path(demo_id): Path<i64>,
) -> Result<axum::response::Json<common::demo_analysis::ScoreBoard>, axum::http::StatusCode> {
let query = crate::schema::demo_players::dsl::demo_players
.inner_join(crate::schema::demo_player_stats::dsl::demo_player_stats.on(crate::schema::demo_players::dsl::demo_id.eq(crate::schema::demo_player_stats::dsl::demo_id).and(crate::schema::demo_players::dsl::steam_id.eq(crate::schema::demo_player_stats::dsl::steam_id))))
.inner_join(
crate::schema::demo_player_stats::dsl::demo_player_stats.on(
crate::schema::demo_players::dsl::demo_id
.eq(crate::schema::demo_player_stats::dsl::demo_id)
.and(
crate::schema::demo_players::dsl::steam_id
.eq(crate::schema::demo_player_stats::dsl::steam_id),
),
),
)
.filter(crate::schema::demo_players::dsl::demo_id.eq(demo_id));
let mut db_con = crate::db_connection().await;
let response: Vec<(crate::models::DemoPlayer, crate::models::DemoPlayerStats)> = match query.load(&mut db_con).await {
Ok(d) => d,
Err(e) => {
tracing::error!("Querying DB: {:?}", e);
return Err(axum::http::StatusCode::INTERNAL_SERVER_ERROR);
}
};
let response: Vec<(crate::models::DemoPlayer, crate::models::DemoPlayerStats)> =
match query.load(&mut db_con).await {
Ok(d) => d,
Err(e) => {
tracing::error!("Querying DB: {:?}", e);
return Err(axum::http::StatusCode::INTERNAL_SERVER_ERROR);
}
};
if response.is_empty() {
tracing::error!("DB Response was empty");
@@ -206,12 +219,18 @@ async fn scoreboard(session: UserSession, Path(demo_id): Path<i64>) -> Result<ax
let mut team1 = Vec::new();
let mut team2 = Vec::new();
for (player, stats) in response {
let team_vec = if player.team == team1_number { &mut team1 } else { &mut team2 };
let team_vec = if player.team == team1_number {
&mut team1
} else {
&mut team2
};
team_vec.push(common::demo_analysis::ScoreBoardPlayer {
name: player.name,
kills: stats.kills as usize,
deaths: stats.deaths as usize,
damage: stats.damage as usize,
assists: stats.assists as usize,
});
}

View File

@@ -64,15 +64,13 @@ pub async fn run_api(
let router = axum::Router::new()
.nest(
"/api/",
crate::api::router(
crate::api::RouterConfig {
steam_api_key: steam_api_key.into(),
// steam_callback_base_url: "http://192.168.0.156:3000".into(),
steam_callback_base_url: "http://localhost:3000".into(),
steam_callback_path: "/api/steam/callback".into(),
upload_dir: upload_folder.clone(),
},
),
crate::api::router(crate::api::RouterConfig {
steam_api_key: steam_api_key.into(),
steam_callback_base_url: "http://192.168.0.156:3000".into(),
// steam_callback_base_url: "http://localhost:3000".into(),
steam_callback_path: "/api/steam/callback".into(),
upload_dir: upload_folder.clone(),
}),
)
.layer(session_layer)
.nest_service(
@@ -85,15 +83,12 @@ pub async fn run_api(
}
#[tracing::instrument(skip(upload_folder))]
pub async fn run_analysis(
upload_folder: impl Into<std::path::PathBuf>
) {
pub async fn run_analysis(upload_folder: impl Into<std::path::PathBuf>) {
use diesel::prelude::*;
use diesel_async::{AsyncConnection, RunQueryDsl};
let upload_folder: std::path::PathBuf = upload_folder.into();
loop {
let mut db_con = db_connection().await;
let input = match crate::analysis::poll_next_task(&upload_folder, &mut db_con).await {
@@ -114,38 +109,70 @@ pub async fn run_analysis(
let mut db_con = crate::db_connection().await;
let (player_info, player_stats): (Vec<_>, Vec<_>) = result.players.into_iter().map(|(info, stats)| {
(crate::models::DemoPlayer {
demo_id,
name: info.name,
steam_id: info.steam_id.clone(),
team: info.team as i16,
color: info.color as i16,
}, crate::models::DemoPlayerStats {
demo_id,
steam_id: info.steam_id,
deaths: stats.deaths as i16,
kills: stats.kills as i16,
})
}).unzip();
let (player_info, player_stats): (Vec<_>, Vec<_>) = result
.players
.into_iter()
.map(|(info, stats)| {
(
crate::models::DemoPlayer {
demo_id,
name: info.name,
steam_id: info.steam_id.clone(),
team: info.team as i16,
color: info.color as i16,
},
crate::models::DemoPlayerStats {
demo_id,
steam_id: info.steam_id,
deaths: stats.deaths as i16,
kills: stats.kills as i16,
damage: stats.damage as i16,
assists: stats.assists as i16,
},
)
})
.unzip();
let demo_info = crate::models::DemoInfo {
demo_id,
map: result.map,
};
demo_id,
map: result.map,
};
let store_demo_info_query = diesel::dsl::insert_into(crate::schema::demo_info::dsl::demo_info)
.values(&demo_info).on_conflict(crate::schema::demo_info::dsl::demo_id).do_update().set(crate::schema::demo_info::dsl::map.eq(diesel::upsert::excluded(crate::schema::demo_info::dsl::map)));
let store_demo_players_query = diesel::dsl::insert_into(crate::schema::demo_players::dsl::demo_players).values(player_info)
.on_conflict_do_nothing();
let store_demo_player_stats_query = diesel::dsl::insert_into(crate::schema::demo_player_stats::dsl::demo_player_stats)
.values(player_stats)
.on_conflict((crate::schema::demo_player_stats::dsl::demo_id, crate::schema::demo_player_stats::dsl::steam_id))
.do_update()
.set((
crate::schema::demo_player_stats::dsl::deaths.eq(diesel::upsert::excluded(crate::schema::demo_player_stats::dsl::deaths)),
crate::schema::demo_player_stats::dsl::kills.eq(diesel::upsert::excluded(crate::schema::demo_player_stats::dsl::kills)),
));
let store_demo_info_query =
diesel::dsl::insert_into(crate::schema::demo_info::dsl::demo_info)
.values(&demo_info)
.on_conflict(crate::schema::demo_info::dsl::demo_id)
.do_update()
.set(
crate::schema::demo_info::dsl::map
.eq(diesel::upsert::excluded(crate::schema::demo_info::dsl::map)),
);
let store_demo_players_query =
diesel::dsl::insert_into(crate::schema::demo_players::dsl::demo_players)
.values(player_info)
.on_conflict_do_nothing();
let store_demo_player_stats_query =
diesel::dsl::insert_into(crate::schema::demo_player_stats::dsl::demo_player_stats)
.values(player_stats)
.on_conflict((
crate::schema::demo_player_stats::dsl::demo_id,
crate::schema::demo_player_stats::dsl::steam_id,
))
.do_update()
.set((
crate::schema::demo_player_stats::dsl::deaths.eq(diesel::upsert::excluded(
crate::schema::demo_player_stats::dsl::deaths,
)),
crate::schema::demo_player_stats::dsl::kills.eq(diesel::upsert::excluded(
crate::schema::demo_player_stats::dsl::kills,
)),
crate::schema::demo_player_stats::dsl::assists.eq(diesel::upsert::excluded(
crate::schema::demo_player_stats::dsl::assists,
)),
crate::schema::demo_player_stats::dsl::damage.eq(diesel::upsert::excluded(
crate::schema::demo_player_stats::dsl::damage,
)),
));
let update_process_info =
diesel::dsl::update(crate::schema::processing_status::dsl::processing_status)
.set(crate::schema::processing_status::dsl::info.eq(1))

View File

@@ -49,10 +49,7 @@ async fn main() {
}
};
component_set.spawn(backend::run_api(
args.upload_folder.clone(),
steam_api_key,
));
component_set.spawn(backend::run_api(args.upload_folder.clone(), steam_api_key));
}
if args.analysis {
component_set.spawn(backend::run_analysis(args.upload_folder.clone()));

View File

@@ -52,6 +52,8 @@ pub struct DemoPlayerStats {
pub steam_id: String,
pub kills: i16,
pub deaths: i16,
pub damage: i16,
pub assists: i16,
}
#[derive(Queryable, Selectable, Insertable, Debug)]

View File

@@ -21,6 +21,8 @@ diesel::table! {
steam_id -> Text,
kills -> Int2,
deaths -> Int2,
damage -> Int2,
assists -> Int2,
}
}