Fixed some warnings and improved the code quality some more

This commit is contained in:
Lol3rrr
2024-09-17 00:17:38 +02:00
parent 1e6c3da58b
commit fd4d3f735d
7 changed files with 227 additions and 71 deletions

View File

@@ -16,30 +16,36 @@ pub mod steam {
pub steamid: String,
pub personaname: String,
#[serde(flatten)]
other: HashMap<String, serde_json::Value>,
_other: HashMap<String, serde_json::Value>,
}
pub fn router(url: &str, callback_path: &str) -> axum::Router {
struct SteamApiState {
openid: steam_openid::SteamOpenId,
api_key: String,
}
pub fn router(url: &str, callback_path: &str, api_key: impl Into<String>) -> axum::Router {
axum::Router::new()
.route("/login", axum::routing::get(steam_login))
.route("/callback", axum::routing::get(steam_callback))
.with_state(Arc::new(
steam_openid::SteamOpenId::new(url, callback_path).unwrap(),
))
.with_state(Arc::new(SteamApiState {
openid: steam_openid::SteamOpenId::new(url, callback_path).unwrap(),
api_key: api_key.into(),
}))
}
#[tracing::instrument(skip(openid))]
#[tracing::instrument(skip(state))]
async fn steam_login(
State(openid): State<Arc<steam_openid::SteamOpenId>>,
State(state): State<Arc<SteamApiState>>,
) -> Result<axum::response::Redirect, axum::http::StatusCode> {
let url = openid.get_redirect_url();
let url = state.openid.get_redirect_url();
Ok(axum::response::Redirect::to(url))
}
#[tracing::instrument(skip(openid, session, request))]
#[tracing::instrument(skip(state, session, request))]
async fn steam_callback(
State(openid): State<Arc<steam_openid::SteamOpenId>>,
State(state): State<Arc<SteamApiState>>,
mut session: crate::UserSession,
request: axum::extract::Request,
) -> Result<axum::response::Redirect, axum::http::StatusCode> {
@@ -50,12 +56,12 @@ pub mod steam {
axum::http::StatusCode::BAD_REQUEST
})?;
let id = openid.verify(query).await.map_err(|e| {
let id = state.openid.verify(query).await.map_err(|e| {
tracing::error!("Verifying OpenID: {:?}", e);
axum::http::StatusCode::BAD_REQUEST
})?;
let steam_client = crate::steam_api::Client::new(std::env::var("STEAM_API_KEY").unwrap());
let steam_client = crate::steam_api::Client::new(&state.api_key);
let profile_response_data: ProfileInfoResponse = match steam_client
.get(
"http://api.steampowered.com/ISteamUser/GetPlayerSummaries/v2/",
@@ -79,7 +85,7 @@ pub mod steam {
})
.on_conflict(crate::schema::users::dsl::steamid)
.do_update()
.set((crate::schema::users::dsl::name.eq(player.personaname)));
.set(crate::schema::users::dsl::name.eq(player.personaname));
tracing::debug!("Running Query: {:?}", query);
if let Err(e) = query.execute(&mut db_con).await {
@@ -141,14 +147,26 @@ pub mod user {
}
}
pub struct RouterConfig {
pub steam_api_key: String,
pub steam_callback_base_url: String,
pub steam_callback_path: String,
pub upload_dir: std::path::PathBuf,
}
pub fn router(
base_analysis: tokio::sync::mpsc::UnboundedSender<crate::analysis::AnalysisInput>,
config: RouterConfig,
) -> axum::Router {
axum::Router::new()
.nest(
"/steam/",
steam::router("http://192.168.0.156:3000", "/api/steam/callback"),
steam::router(
&config.steam_callback_base_url,
&config.steam_callback_path,
config.steam_api_key,
),
)
.nest("/demos/", demos::router("uploads/", base_analysis))
.nest("/demos/", demos::router(config.upload_dir, base_analysis))
.nest("/user/", user::router())
}

View File

@@ -95,6 +95,8 @@ async fn upload(
let mut db_con = crate::db_connection().await;
// Turn all of this into a single transaction
let query =
diesel::dsl::insert_into(crate::schema::demos::dsl::demos).values(crate::models::Demo {
demo_id,
@@ -102,11 +104,14 @@ async fn upload(
});
query.execute(&mut db_con).await.unwrap();
state.base_analysis.send(crate::analysis::AnalysisInput {
steamid: steam_id.to_string(),
demoid: demo_id,
path: demo_file_path,
});
state
.base_analysis
.send(crate::analysis::AnalysisInput {
steamid: steam_id.to_string(),
demoid: demo_id,
path: demo_file_path,
})
.unwrap();
let processing_query =
diesel::dsl::insert_into(crate::schema::processing_status::dsl::processing_status)
.values(crate::models::ProcessingStatus { demo_id, info: 0 });
@@ -146,18 +151,21 @@ async fn analyise(
}
let user_folder = std::path::Path::new(&state.upload_folder).join(format!("{}/", steam_id));
state.base_analysis.send(crate::analysis::AnalysisInput {
path: user_folder.join(format!("{}.dem", demo_id)),
demoid: demo_id,
steamid: steam_id.to_string(),
});
state
.base_analysis
.send(crate::analysis::AnalysisInput {
path: user_folder.join(format!("{}.dem", demo_id)),
demoid: demo_id,
steamid: steam_id.to_string(),
})
.unwrap();
Ok(())
}
#[tracing::instrument(skip(session))]
#[tracing::instrument(skip(_session))]
async fn info(
session: UserSession,
_session: UserSession,
Path(demo_id): Path<i64>,
) -> Result<axum::response::Json<common::DemoInfo>, axum::http::StatusCode> {
tracing::info!("Get info for Demo: {:?}", demo_id);

View File

@@ -19,25 +19,6 @@ impl DieselStore {
Self {}
}
fn id_to_bytes(&self, val: i128) -> Vec<i64> {
let id_bytes = val.to_be_bytes();
vec![
i64::from_be_bytes((id_bytes[0..8]).try_into().unwrap()),
i64::from_be_bytes((id_bytes[8..16]).try_into().unwrap()),
]
}
fn bytes_to_id(&self, val: Vec<i64>) -> i128 {
assert_eq!(2, val.len());
let fb = val[0].to_be_bytes();
let sb = val[1].to_be_bytes();
i128::from_be_bytes([
fb[0], fb[1], fb[2], fb[3], fb[4], fb[5], fb[6], fb[7], sb[0], sb[1], sb[2], sb[3],
sb[4], sb[5], sb[6], sb[7],
])
}
fn expiry_to_string(&self, expiry_date: &time::OffsetDateTime) -> String {
expiry_date.format(&EXPIRY_FORMAT).unwrap()
}
@@ -54,7 +35,6 @@ impl tower_sessions::SessionStore for DieselStore {
) -> tower_sessions::session_store::Result<()> {
let db_id = session_record.id.0.to_string();
let data = serde_json::to_value(&session_record.data).unwrap();
let expiry_date = self.expiry_to_string(&session_record.expiry_date);
let steamid = session_record

View File

@@ -43,13 +43,12 @@ pub async fn get_demo_from_upload(
pub mod api;
pub mod steam_api;
#[tracing::instrument(skip(upload_folder, base_analysis_tx))]
pub async fn run_api<UP>(
upload_folder: UP,
#[tracing::instrument(skip(upload_folder, base_analysis_tx, steam_api_key))]
pub async fn run_api(
upload_folder: impl Into<std::path::PathBuf>,
base_analysis_tx: tokio::sync::mpsc::UnboundedSender<analysis::AnalysisInput>,
) where
UP: Into<std::path::PathBuf>,
{
steam_api_key: impl Into<String>,
) {
let upload_folder: std::path::PathBuf = upload_folder.into();
let session_store = crate::diesel_sessionstore::DieselStore::new();
@@ -64,7 +63,18 @@ pub async fn run_api<UP>(
}
let router = axum::Router::new()
.nest("/api/", crate::api::router(base_analysis_tx))
.nest(
"/api/",
crate::api::router(
base_analysis_tx,
crate::api::RouterConfig {
steam_api_key: steam_api_key.into(),
steam_callback_base_url: "http://192.168.0.156:3000".into(),
steam_callback_path: "/api/steam/callback".into(),
upload_dir: upload_folder.clone(),
},
),
)
.layer(session_layer)
.nest_service(
"/",
@@ -75,7 +85,7 @@ pub async fn run_api<UP>(
axum::serve(listener, router).await.unwrap();
}
#[tracing::instrument]
#[tracing::instrument(skip(base_analysis_rx))]
pub async fn run_analysis(
mut base_analysis_rx: tokio::sync::mpsc::UnboundedReceiver<analysis::AnalysisInput>,
) {
@@ -89,7 +99,7 @@ pub async fn run_analysis(
.await
.unwrap();
dbg!(&result);
tracing::debug!("Analysis-Result: {:?}", result);
let mut db_con = crate::db_connection().await;
@@ -109,11 +119,12 @@ pub async fn run_analysis(
db_con
.transaction::<'_, '_, '_, _, diesel::result::Error, _>(|conn| {
Box::pin(async move {
store_info_query.execute(conn).await.map(|e| ())?;
update_process_info.execute(conn).await.map(|e| ())?;
store_info_query.execute(conn).await?;
update_process_info.execute(conn).await?;
Ok(())
})
})
.await;
.await
.unwrap();
}
}

View File

@@ -1,7 +1,5 @@
use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt;
static UPLOAD_FOLDER: &str = "uploads/";
const MIGRATIONS: diesel_async_migrations::EmbeddedMigrations =
diesel_async_migrations::embed_migrations!("../migrations/");
@@ -9,8 +7,22 @@ async fn run_migrations(connection: &mut diesel_async::AsyncPgConnection) {
MIGRATIONS.run_pending_migrations(connection).await.unwrap();
}
#[derive(clap::Parser)]
struct CliArgs {
#[clap(long = "upload-folder", default_value = "uploads/")]
upload_folder: std::path::PathBuf,
#[clap(long = "api", default_value_t = true)]
api: bool,
#[clap(long = "analysis", default_value_t = true)]
analysis: bool,
}
#[tokio::main(flavor = "current_thread")]
async fn main() {
use clap::Parser;
let registry = tracing_subscriber::Registry::default()
.with(tracing_subscriber::fmt::layer())
.with(tracing_subscriber::filter::filter_fn(|meta| {
@@ -20,6 +32,8 @@ async fn main() {
tracing::info!("Starting...");
let args = CliArgs::parse();
tracing::info!("Applying Migrations");
run_migrations(&mut backend::db_connection().await).await;
tracing::info!("Completed Migrations");
@@ -29,8 +43,24 @@ async fn main() {
let mut component_set = tokio::task::JoinSet::new();
component_set.spawn(backend::run_api(UPLOAD_FOLDER, base_analysis_tx));
component_set.spawn(backend::run_analysis(base_analysis_rx));
if args.api {
let steam_api_key = match std::env::var("STEAM_API_KEY") {
Ok(s) => s,
Err(e) => {
tracing::error!("Missing 'STEAM_API_KEY' environment variable - {:?}", e);
return;
}
};
component_set.spawn(backend::run_api(
args.upload_folder.clone(),
base_analysis_tx,
steam_api_key,
));
}
if args.analysis {
component_set.spawn(backend::run_analysis(base_analysis_rx));
}
component_set.join_all().await;
}