Start DB integration

This commit is contained in:
Lol3rrr
2024-09-07 17:44:57 +02:00
parent 324eaf7d3d
commit ae6c1b590f
18 changed files with 680 additions and 53 deletions

View File

@@ -8,10 +8,15 @@ async-trait = "0.1.82"
axum = { version = "0.7.5", features = ["multipart"] }
serde = { version = "1.0.210", features = ["derive"] }
steam-openid = "0.2.0"
time = "0.3.36"
time = { version = "0.3.36", features = ["formatting", "parsing"] }
tokio = { version = "1.40.0", features = ["rt", "macros", "net", "mio"] }
tower-sessions = "0.13.0"
tower-http = { version = "0.5", features = ["fs"] }
tracing = { version = "0.1.40", features = ["async-await"] }
tracing-subscriber = "0.3.18"
futures-util = "0.3"
diesel = { version = "2.2", features = ["serde_json"] }
diesel-async = { version = "0.5", features = ["postgres"] }
serde_json = "1.0.128"
diesel_async_migrations = { version = "0.15" }

View File

@@ -0,0 +1,98 @@
use diesel::prelude::*;
use diesel_async::RunQueryDsl;
#[derive(Debug, Clone)]
pub struct DieselStore {}
static EXPIRY_FORMAT: std::sync::LazyLock<&[time::format_description::BorrowedFormatItem<'static>]> = std::sync::LazyLock::new(|| {
time::macros::format_description!(
"[year]-[month]-[day] [hour]:[minute]:[second] [offset_hour sign:mandatory]:[offset_minute]:[offset_second]"
)
});
impl DieselStore {
pub fn new() -> Self {
Self {}
}
fn id_to_bytes(&self, val: i128) -> Vec<i64> {
let id_bytes = val.to_be_bytes();
vec![i64::from_be_bytes((id_bytes[0..8]).try_into().unwrap()), i64::from_be_bytes((id_bytes[8..16]).try_into().unwrap())]
}
fn bytes_to_id(&self, val: Vec<i64>) -> i128 {
assert_eq!(2, val.len());
let fb = val[0].to_be_bytes();
let sb = val[1].to_be_bytes();
i128::from_be_bytes([fb[0], fb[1], fb[2], fb[3], fb[4], fb[5], fb[6], fb[7], sb[0], sb[1], sb[2], sb[3], sb[4], sb[5], sb[6], sb[7]])
}
fn expiry_to_string(&self, expiry_date: &time::OffsetDateTime) -> String {
expiry_date.format(&EXPIRY_FORMAT).unwrap()
}
fn string_to_expiry(&self, input: &str) -> time::OffsetDateTime {
time::OffsetDateTime::parse(input, &EXPIRY_FORMAT).unwrap()
}
}
#[async_trait::async_trait]
impl tower_sessions::SessionStore for DieselStore {
async fn save(&self,session_record: &tower_sessions::session::Record) -> tower_sessions::session_store::Result<()> {
let db_id = self.id_to_bytes(session_record.id.0);
let data = serde_json::to_value(&session_record.data).unwrap();
let expiry_date = self.expiry_to_string(&session_record.expiry_date);
let query = diesel::dsl::insert_into(crate::schema::sessions::dsl::sessions)
.values(crate::models::Session {
id: db_id,
data: data.clone(),
expiry_date: expiry_date.clone(),
})
.on_conflict(crate::schema::sessions::dsl::id)
.do_update()
.set((crate::schema::sessions::dsl::data.eq(data), crate::schema::sessions::dsl::expiry_date.eq(expiry_date)));
let mut connection = crate::db_connection().await;
query.execute(&mut connection).await.unwrap();
Ok(())
}
async fn load(&self,session_id: &tower_sessions::session::Id) -> tower_sessions::session_store::Result<Option<tower_sessions::session::Record>> {
let db_id = self.id_to_bytes(session_id.0);
let query = crate::schema::sessions::dsl::sessions.filter(crate::schema::sessions::dsl::id.eq(db_id));
let mut connection = crate::db_connection().await;
let mut result: Vec<crate::models::Session> = query.load(&mut connection).await.unwrap();
if result.len() > 1 {
tracing::error!("Found more than 1 result");
return Err(tower_sessions::session_store::Error::Backend("Found more than 1 result".to_string()));
}
let result = result.pop().unwrap();
Ok(Some(tower_sessions::session::Record {
id: tower_sessions::session::Id(self.bytes_to_id(result.id)),
data: serde_json::from_value(result.data).unwrap(),
expiry_date: self.string_to_expiry(&result.expiry_date),
}))
}
async fn delete(&self,session_id: &tower_sessions::session::Id) -> tower_sessions::session_store::Result<()> {
let db_id = self.id_to_bytes(session_id.0);
let query = crate::schema::sessions::dsl::sessions.filter(crate::schema::sessions::dsl::id.eq(db_id));
let mut connection = crate::db_connection().await;
diesel::dsl::delete(query).execute(&mut connection).await.unwrap();
Ok(())
}
}

View File

@@ -1,59 +1,19 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct UserSessionData {
pub steam_id: Option<u64>,
}
pub mod models;
pub mod schema;
impl Default for UserSessionData {
fn default() -> Self {
Self { steam_id: None }
}
}
mod usersession;
pub use usersession::{UserSessionData, UserSession};
pub struct UserSession {
pub session: tower_sessions::Session,
data: UserSessionData,
}
pub mod diesel_sessionstore;
impl UserSession {
const KEY: &'static str = "user.data";
pub async fn db_connection() -> diesel_async::AsyncPgConnection {
use diesel_async::AsyncConnection;
pub fn data(&self) -> &UserSessionData {
&self.data
}
let database_url = std::env::var("DATABASE_URL").expect("'DATABASE_URL' must be set");
pub async fn modify_data<F>(&mut self, func: F)
where
F: FnOnce(&mut UserSessionData),
{
let mut entry = &mut self.data;
func(&mut entry);
self.session.insert(Self::KEY, entry).await.unwrap();
}
}
#[async_trait::async_trait]
impl<S> axum::extract::FromRequestParts<S> for UserSession
where
S: Send + Sync,
{
type Rejection = (axum::http::StatusCode, &'static str);
async fn from_request_parts(
req: &mut axum::http::request::Parts,
state: &S,
) -> Result<Self, Self::Rejection> {
let session = tower_sessions::Session::from_request_parts(req, state).await?;
let guest_data: UserSessionData = session.get(Self::KEY).await.unwrap().unwrap_or_default();
Ok(Self {
session,
data: guest_data,
})
}
diesel_async::AsyncPgConnection::establish(&database_url).await.unwrap_or_else(|e| panic!("Error connecting to {} - {:?}", database_url, e))
}
pub async fn get_demo_from_upload(name: &str, mut form: axum::extract::Multipart) -> Option<axum::body::Bytes> {

View File

@@ -1,10 +1,19 @@
use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt;
use diesel::prelude::*;
use diesel_async::{RunQueryDsl, AsyncConnection, AsyncPgConnection};
static OPENID: std::sync::LazyLock<steam_openid::SteamOpenId> = std::sync::LazyLock::new(|| {
steam_openid::SteamOpenId::new("http://192.168.0.156:3000", "/api/steam/callback").unwrap()
});
static UPLOAD_FOLDER: &str = "uploads/";
const MIGRATIONS: diesel_async_migrations::EmbeddedMigrations = diesel_async_migrations::embed_migrations!("../migrations/");
async fn run_migrations(connection: &mut diesel_async::AsyncPgConnection) {
MIGRATIONS.run_pending_migrations(connection).await.unwrap();
}
#[tokio::main(flavor = "current_thread")]
async fn main() {
let registry = tracing_subscriber::Registry::default()
@@ -16,7 +25,11 @@ async fn main() {
tracing::info!("Starting...");
let session_store = tower_sessions::MemoryStore::default();
tracing::info!("Applying Migrations");
run_migrations(&mut backend::db_connection().await).await;
tracing::info!("Completed Migrations");
let session_store = backend::diesel_sessionstore::DieselStore::new();
let session_layer = tower_sessions::SessionManagerLayer::new(session_store)
.with_secure(false)
.with_expiry(tower_sessions::Expiry::OnInactivity(
@@ -60,8 +73,11 @@ async fn upload(session: backend::UserSession, form: axum::extract::Multipart) -
tokio::fs::write(demo_file_path, file_content).await.unwrap();
// TODO
// Insert Demo into users list of demos and possibly queue demo for analysis?
let query = diesel::dsl::insert_into(backend::schema::demos::dsl::demos).values(backend::models::Demo {
demo_id: timestamp_secs as i64,
steam_id: steam_id as i64,
});
query.execute(&mut backend::db_connection().await).await.unwrap();
Ok(axum::response::Redirect::to("/"))
}
@@ -101,5 +117,10 @@ async fn demos_list(session: backend::UserSession) -> Result<(), axum::http::Sta
let steam_id = session.data().steam_id.ok_or_else(|| axum::http::StatusCode::UNAUTHORIZED)?;
tracing::info!("SteamID: {:?}", steam_id);
let query = backend::schema::demos::dsl::demos.filter(backend::schema::demos::dsl::steam_id.eq(steam_id as i64));
let results: Vec<backend::models::Demo> = query.load(&mut backend::db_connection().await).await.unwrap();
dbg!(&results);
Ok(())
}

18
backend/src/models.rs Normal file
View File

@@ -0,0 +1,18 @@
use diesel::prelude::*;
#[derive(Queryable, Selectable, Insertable, Debug)]
#[diesel(table_name = crate::schema::sessions)]
#[diesel(check_for_backend(diesel::pg::Pg))]
pub struct Session {
pub id: Vec<i64>,
pub data: serde_json::Value,
pub expiry_date: String,
}
#[derive(Queryable, Selectable, Insertable, Debug)]
#[diesel(table_name = crate::schema::demos)]
#[diesel(check_for_backend(diesel::pg::Pg))]
pub struct Demo {
pub steam_id: i64,
pub demo_id: i64,
}

14
backend/src/schema.rs Normal file
View File

@@ -0,0 +1,14 @@
diesel::table! {
sessions (id) {
id -> Array<BigInt>,
data -> Jsonb,
expiry_date -> Text,
}
}
diesel::table! {
demos (steam_id) {
steam_id -> BigInt,
demo_id -> BigInt
}
}

View File

@@ -0,0 +1,58 @@
use diesel::prelude::*;
use diesel_async::RunQueryDsl;
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct UserSessionData {
pub steam_id: Option<u64>,
}
impl Default for UserSessionData {
fn default() -> Self {
Self { steam_id: None }
}
}
pub struct UserSession {
pub session: tower_sessions::Session,
data: UserSessionData,
}
impl UserSession {
const KEY: &'static str = "user.data";
pub fn data(&self) -> &UserSessionData {
&self.data
}
pub async fn modify_data<F>(&mut self, func: F)
where
F: FnOnce(&mut UserSessionData),
{
let mut entry = &mut self.data;
func(&mut entry);
self.session.insert(Self::KEY, entry).await.unwrap();
}
}
#[async_trait::async_trait]
impl<S> axum::extract::FromRequestParts<S> for UserSession
where
S: Send + Sync,
{
type Rejection = (axum::http::StatusCode, &'static str);
async fn from_request_parts(
req: &mut axum::http::request::Parts,
state: &S,
) -> Result<Self, Self::Rejection> {
let session = tower_sessions::Session::from_request_parts(req, state).await?;
let guest_data: UserSessionData = session.get(Self::KEY).await.unwrap().unwrap_or_default();
Ok(Self {
session,
data: guest_data,
})
}
}