diff --git a/Cargo.lock b/Cargo.lock index 5c69d8c..70edf75 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -167,6 +167,20 @@ dependencies = [ "syn", ] +[[package]] +name = "attohttpc" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a13149d0cf3f7f9b9261fad4ec63b2efbf9a80665f52def86282d26255e6331" +dependencies = [ + "http 1.1.0", + "log", + "native-tls", + "serde", + "serde_json", + "url", +] + [[package]] name = "attribute-derive" version = "0.9.2" @@ -226,6 +240,32 @@ dependencies = [ "arrayvec", ] +[[package]] +name = "aws-creds" +version = "0.37.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f84143206b9c72b3c5cb65415de60c7539c79cd1559290fddec657939131be0" +dependencies = [ + "attohttpc", + "home", + "log", + "quick-xml", + "rust-ini", + "serde", + "thiserror", + "time", + "url", +] + +[[package]] +name = "aws-region" +version = "0.25.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9aed3f9c7eac9be28662fdb3b0f4d1951e812f7c64fed4f0327ba702f459b3b" +dependencies = [ + "thiserror", +] + [[package]] name = "axum" version = "0.7.7" @@ -290,6 +330,7 @@ dependencies = [ "async-trait", "axum", "base64 0.22.1", + "bytes", "chrono", "clap", "common", @@ -303,6 +344,7 @@ dependencies = [ "memmap2", "phf", "reqwest 0.12.8", + "rust-s3", "serde", "serde_json", "steam-openid", @@ -414,9 +456,9 @@ checksum = "8f1fe948ff07f4bd06c30984e69f5b4899c516a3ef74f34df92a2df2ab535495" [[package]] name = "bytes" -version = "1.7.2" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "428d9aa8fbc0670b7b8d6030a7fadd0f86151cae55e4dbbece15f3780a3dfaf3" +checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da" [[package]] name = "camino" @@ -579,6 +621,26 @@ dependencies = [ "toml", ] +[[package]] +name = "const-random" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87e00182fe74b066627d63b85fd550ac2998d4b0bd86bfed477a0ae4c7c71359" +dependencies = [ + "const-random-macro", +] + +[[package]] +name = "const-random-macro" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e" +dependencies = [ + "getrandom", + "once_cell", + "tiny-keccak", +] + [[package]] name = "const_format" version = "0.2.33" @@ -867,6 +929,15 @@ dependencies = [ "subtle", ] +[[package]] +name = "dlv-list" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "442039f5147480ba31067cb00ada1adae6892028e40e45fc5de7b7df6dcc1b5f" +dependencies = [ + "const-random", +] + [[package]] name = "drain_filter_polyfill" version = "0.1.3" @@ -1269,6 +1340,12 @@ version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" +[[package]] +name = "hex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" + [[package]] name = "hmac" version = "0.12.1" @@ -1278,6 +1355,15 @@ dependencies = [ "digest", ] +[[package]] +name = "home" +version = "0.5.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3d1354bf6b7235cb4a0576c2619fd4ed18183f689b12b006a0ee7329eeff9a5" +dependencies = [ + "windows-sys 0.52.0", +] + [[package]] name = "html-escape" version = "0.2.13" @@ -1923,6 +2009,17 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" +[[package]] +name = "maybe-async" +version = "0.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5cf92c10c7e361d6b99666ec1c6f9805b0bea2c3bd8c78dc6fe98ac5bd78db11" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "maybe-rayon" version = "0.1.1" @@ -1942,6 +2039,12 @@ dependencies = [ "digest", ] +[[package]] +name = "md5" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771" + [[package]] name = "memchr" version = "2.7.4" @@ -1973,6 +2076,15 @@ dependencies = [ "unicase", ] +[[package]] +name = "minidom" +version = "0.15.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f45614075738ce1b77a1768912a60c0227525971b03e09122a05b8a34a2a6278" +dependencies = [ + "rxml", +] + [[package]] name = "minimal-lexical" version = "0.2.1" @@ -2207,6 +2319,16 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "ordered-multimap" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49203cdcae0030493bad186b28da2fa25645fa276a51b6fec8010d281e02ef79" +dependencies = [ + "dlv-list", + "hashbrown 0.14.5", +] + [[package]] name = "overload" version = "0.1.1" @@ -2602,6 +2724,16 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a993555f31e5a609f617c12db6250dedcac1b0a85076912c436e6fc9b2c8e6a3" +[[package]] +name = "quick-xml" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d3a6e5838b60e0e8fa7a43f22ade549a37d61f8bdbe636d0d7816191de969c2" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "quinn" version = "0.11.5" @@ -2960,6 +3092,54 @@ dependencies = [ "thiserror", ] +[[package]] +name = "rust-ini" +version = "0.21.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e310ef0e1b6eeb79169a1171daf9abcb87a2e17c03bee2c4bb100b55c75409f" +dependencies = [ + "cfg-if", + "ordered-multimap", + "trim-in-place", +] + +[[package]] +name = "rust-s3" +version = "0.35.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3df3f353b1f4209dcf437d777cda90279c397ab15a0cd6fd06bd32c88591533" +dependencies = [ + "async-trait", + "aws-creds", + "aws-region", + "base64 0.22.1", + "bytes", + "cfg-if", + "futures", + "hex", + "hmac", + "http 0.2.12", + "hyper 0.14.31", + "hyper-tls", + "log", + "maybe-async", + "md5", + "minidom", + "native-tls", + "percent-encoding", + "quick-xml", + "serde", + "serde_derive", + "serde_json", + "sha2", + "thiserror", + "time", + "tokio", + "tokio-native-tls", + "tokio-stream", + "url", +] + [[package]] name = "rustc-demangle" version = "0.1.24" @@ -3046,6 +3226,23 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e819f2bc632f285be6d7cd36e25940d45b2391dd6d9b939e79de557f7014248" +[[package]] +name = "rxml" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a98f186c7a2f3abbffb802984b7f1dfd65dac8be1aafdaabbca4137f53f0dff7" +dependencies = [ + "bytes", + "rxml_validation", + "smartstring", +] + +[[package]] +name = "rxml_validation" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22a197350ece202f19a166d1ad6d9d6de145e1d2a8ef47db299abe164dbd7530" + [[package]] name = "ryu" version = "1.0.18" @@ -3354,6 +3551,17 @@ version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" +[[package]] +name = "smartstring" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fb72c633efbaa2dd666986505016c32c3044395ceaf881518399d2f4127ee29" +dependencies = [ + "autocfg", + "static_assertions", + "version_check", +] + [[package]] name = "snap" version = "1.1.1" @@ -3379,6 +3587,12 @@ dependencies = [ "lock_api", ] +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + [[package]] name = "steam-openid" version = "0.2.0" @@ -3591,6 +3805,15 @@ dependencies = [ "time-core", ] +[[package]] +name = "tiny-keccak" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237" +dependencies = [ + "crunchy", +] + [[package]] name = "tinyvec" version = "1.8.0" @@ -3680,6 +3903,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f4e6ce100d0eb49a2734f8c0812bcd324cf357d21810932c5df6b96ef2b86f1" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.12" @@ -3931,6 +4165,12 @@ dependencies = [ "syn", ] +[[package]] +name = "trim-in-place" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "343e926fc669bc8cde4fa3129ab681c63671bae288b1f1081ceee6d9d37904fc" + [[package]] name = "try-lock" version = "0.2.5" diff --git a/README.md b/README.md index cf87bde..19c9629 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,13 @@ A self-hosted demo analysis tool - `STEAM_API_KEY` - `BASE_URL` +If using the 's3' storage backend +- `S3_ACCESS_KEY` +- `S3_SECRET_KEY` +- `S3_REGION` +- `S3_ENDPOINT` +- `S3_BUCKET` + ### Needed external Software - `postgresql` diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 396ef5e..fac58bf 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -40,3 +40,5 @@ phf = { version = "0.11", features = ["macros"] } uuid = { version = "1.10", features = ["v7"] } chrono = { version = "0.4", features = ["serde"] } +rust-s3 = { version = "0.35.1", features = ["no-verify-ssl"] } +bytes = "1.8" diff --git a/backend/src/analysis.rs b/backend/src/analysis.rs index ec63ec4..3c1387b 100644 --- a/backend/src/analysis.rs +++ b/backend/src/analysis.rs @@ -5,31 +5,39 @@ pub mod base; pub mod heatmap; pub mod perround; +#[derive(Debug, Clone)] +pub enum AnalysisData { + MemMapped(std::sync::Arc), + Preloaded(std::sync::Arc<[u8]>), +} + #[derive(Debug, Clone)] pub struct AnalysisInput { pub steamid: String, pub demoid: String, - data: std::sync::Arc, + data: AnalysisData, } impl AnalysisInput { pub async fn load( steamid: String, demoid: String, - path: impl AsRef, + storage: &dyn crate::storage::DemoStorage, ) -> Result { - let file = std::fs::File::open(path.as_ref()).unwrap(); - let mmap = unsafe { memmap2::MmapOptions::new().map(&file).unwrap() }; + let data = storage.load(steamid.clone(), demoid.clone()).await.unwrap(); Ok(Self { steamid, demoid, - data: std::sync::Arc::new(mmap), + data, }) } pub fn data(&self) -> &[u8] { - &self.data + match &self.data { + AnalysisData::MemMapped(v) => &v, + AnalysisData::Preloaded(v) => &v, + } } } @@ -75,7 +83,7 @@ impl From for TaskError { } pub async fn poll_next_task( - upload_folder: impl Into, + storage: Box, db_con: &mut diesel_async::pg::AsyncPgConnection, action: A, ) -> Result<(), TaskError> @@ -96,10 +104,8 @@ where .for_update() .skip_locked(); - let upload_folder: std::path::PathBuf = upload_folder.into(); - loop { - let upload_folder = upload_folder.clone(); + let storage = storage.duplicate(); let action = action.clone(); let result = db_con @@ -115,9 +121,7 @@ where let input = AnalysisInput::load( task.steam_id.clone(), task.demo_id.clone(), - upload_folder - .join(&task.steam_id) - .join(format!("{}.dem", task.demo_id)), + storage.as_ref(), ) .await .unwrap(); diff --git a/backend/src/api.rs b/backend/src/api.rs index 7101f15..2da86aa 100644 --- a/backend/src/api.rs +++ b/backend/src/api.rs @@ -153,7 +153,7 @@ pub struct RouterConfig { pub steam_api_key: String, pub steam_callback_base_url: String, pub steam_callback_path: String, - pub upload_dir: std::path::PathBuf, + pub storage: Box, } pub fn router(config: RouterConfig) -> axum::Router { @@ -166,7 +166,7 @@ pub fn router(config: RouterConfig) -> axum::Router { config.steam_api_key, ), ) - .nest("/demos/", demos::router(config.upload_dir)) + .nest("/demos/", demos::router(config.storage)) .nest("/user/", user::router()) } diff --git a/backend/src/api/demos.rs b/backend/src/api/demos.rs index 68f3695..101390c 100644 --- a/backend/src/api/demos.rs +++ b/backend/src/api/demos.rs @@ -6,13 +6,10 @@ use diesel_async::RunQueryDsl; use std::sync::Arc; struct DemoState { - upload_folder: std::path::PathBuf, + storage: Box, } -pub fn router

(upload_folder: P) -> axum::Router -where - P: Into, -{ +pub fn router(storage: Box) -> axum::Router { axum::Router::new() .route("/list", axum::routing::get(list)) .route( @@ -25,9 +22,7 @@ where .route("/:id/analysis/scoreboard", axum::routing::get(scoreboard)) .route("/:id/analysis/perround", axum::routing::get(perround)) .route("/:id/analysis/heatmap", axum::routing::get(heatmap)) - .with_state(Arc::new(DemoState { - upload_folder: upload_folder.into(), - })) + .with_state(Arc::new(DemoState { storage })) } #[tracing::instrument(skip(session))] @@ -94,11 +89,11 @@ async fn upload( State(state): State>, session: crate::UserSession, mut form: axum::extract::Multipart, -) -> Result { +) -> Result { let steam_id = session .data() .steam_id - .ok_or_else(|| (axum::http::StatusCode::UNAUTHORIZED, "Not logged in"))?; + .ok_or_else(|| (axum::http::StatusCode::UNAUTHORIZED, "Not logged in".into()))?; tracing::info!("Upload for Session: {:?}", steam_id); @@ -107,11 +102,11 @@ async fn upload( Ok(Some(f)) => f, Ok(None) => { tracing::error!(""); - return Err((axum::http::StatusCode::BAD_REQUEST, "Missing Data")); + return Err((axum::http::StatusCode::BAD_REQUEST, "Missing Data".into())); } Err(e) => { tracing::error!(""); - return Err((axum::http::StatusCode::BAD_REQUEST, "")); + return Err((axum::http::StatusCode::BAD_REQUEST, "".into())); } }; @@ -123,16 +118,16 @@ async fn upload( let raw_demo_id = uuid::Uuid::now_v7(); let demo_id = raw_demo_id.to_string(); - let user_folder = std::path::Path::new(&state.upload_folder).join(format!("{}/", steam_id)); - if !tokio::fs::try_exists(&user_folder).await.unwrap_or(false) { - tokio::fs::create_dir_all(&user_folder).await.unwrap(); - } - - let demo_file_path = user_folder.join(format!("{}.dem", demo_id)); - - super::stream_to_file(demo_file_path, file_field) + use futures::stream::StreamExt; + state + .storage + .upload( + steam_id.to_string(), + demo_id.clone(), + file_field.filter_map(|b| async { b.ok() }).boxed(), + ) .await - .unwrap(); + .map_err(|e| (axum::http::StatusCode::INTERNAL_SERVER_ERROR, e))?; let mut db_con = crate::db_connection().await; diff --git a/backend/src/lib.rs b/backend/src/lib.rs index 8947b21..2abdc38 100644 --- a/backend/src/lib.rs +++ b/backend/src/lib.rs @@ -21,13 +21,13 @@ pub async fn db_connection() -> diesel_async::AsyncPgConnection { pub mod api; pub mod steam_api; -#[tracing::instrument(skip(upload_folder, steam_api_key))] +pub mod storage; + +#[tracing::instrument(skip(storage, steam_api_key))] pub async fn run_api( - upload_folder: impl Into, + storage: Box, steam_api_key: impl Into, ) { - let upload_folder: std::path::PathBuf = upload_folder.into(); - let session_store = crate::diesel_sessionstore::DieselStore::new(); let session_layer = tower_sessions::SessionManagerLayer::new(session_store) .with_secure(false) @@ -35,10 +35,6 @@ pub async fn run_api( 48, ))); - if !tokio::fs::try_exists(&upload_folder).await.unwrap_or(false) { - tokio::fs::create_dir_all(&upload_folder).await.unwrap(); - } - let serve_dir = option_env!("FRONTEND_DIST_DIR").unwrap_or("../frontend/dist/"); tracing::debug!("Serving static files from {:?}", serve_dir); @@ -53,7 +49,7 @@ pub async fn run_api( steam_api_key: steam_api_key.into(), steam_callback_base_url, steam_callback_path: "/api/steam/callback".into(), - upload_dir: upload_folder.clone(), + storage, }), ) .layer(session_layer) @@ -69,23 +65,26 @@ pub async fn run_api( axum::serve(listener, router).await.unwrap(); } -#[tracing::instrument(skip(upload_folder))] -pub async fn run_analysis(upload_folder: impl Into) { +#[tracing::instrument(skip(storage))] +pub async fn run_analysis(storage: Box) { use diesel::prelude::*; use diesel_async::RunQueryDsl; - let upload_folder: std::path::PathBuf = upload_folder.into(); loop { let mut db_con = db_connection().await; let res = crate::analysis::poll_next_task( - &upload_folder, + storage.duplicate(), &mut db_con, move |input: analysis::AnalysisInput, db_con: &mut diesel_async::AsyncPgConnection| { Box::pin(async move { let demo_id = input.demoid.clone(); + let _span = tracing::info_span!("Analysis", demo=?demo_id); + + tracing::info!("Starting analysis"); + let mut store_result_fns = Vec::new(); for analysis in analysis::ANALYSIS_METHODS.iter().map(|a| a.clone()) { let input = input.clone(); @@ -119,6 +118,8 @@ pub async fn run_analysis(upload_folder: impl Into) { } update_process_info.execute(db_con).await.map_err(|e| ())?; + tracing::info!("Completed analysis"); + Ok::<(), ()>(()) }) }, diff --git a/backend/src/main.rs b/backend/src/main.rs index 3e55f59..e5e08c9 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -7,11 +7,21 @@ async fn run_migrations(connection: &mut diesel_async::AsyncPgConnection) { MIGRATIONS.run_pending_migrations(connection).await.unwrap(); } +#[derive(clap::ValueEnum, Clone, Default, Debug)] +enum CliStorage { + #[default] + File, + S3, +} + #[derive(clap::Parser)] struct CliArgs { #[clap(long = "upload-folder", default_value = "uploads/")] upload_folder: std::path::PathBuf, + #[clap(long = "storage", default_value = "file")] + storage: CliStorage, + #[clap(long = "api", default_value_t = true)] api: bool, @@ -40,6 +50,26 @@ async fn main() { let mut component_set = tokio::task::JoinSet::new(); + let storage: Box = match args.storage { + CliStorage::File => Box::new(backend::storage::FileStorage::new( + args.upload_folder.clone(), + )), + CliStorage::S3 => { + let credentials = s3::creds::Credentials::from_env_specific( + Some("S3_ACCESS_KEY"), + Some("S3_SECRET_KEY"), + None, + None + ).unwrap(); + + let region = s3::Region::from_env("S3_REGION", Some("S3_ENDPOINT")).unwrap(); + + let bucket = std::env::var("S3_BUCKET").expect("Need 'S3_BUCKET' for using s3 storage backend"); + + Box::new(backend::storage::S3Storage::new(&bucket, region, credentials)) + }, + }; + tracing::info!("Starting modules"); if args.api { tracing::info!("Enabled API module"); @@ -52,12 +82,12 @@ async fn main() { } }; - component_set.spawn(backend::run_api(args.upload_folder.clone(), steam_api_key)); + component_set.spawn(backend::run_api(storage.duplicate(), steam_api_key)); } if args.analysis { tracing::info!("Enabled Analysis module"); - component_set.spawn(backend::run_analysis(args.upload_folder.clone())); + component_set.spawn(backend::run_analysis(storage)); } tracing::info!("Started modules"); diff --git a/backend/src/storage.rs b/backend/src/storage.rs new file mode 100644 index 0000000..9383036 --- /dev/null +++ b/backend/src/storage.rs @@ -0,0 +1,152 @@ +use futures::FutureExt; +use futures::StreamExt; + +pub trait DemoStorage: Send + Sync { + fn duplicate(&self) -> Box; + + fn upload<'f, 's, 'own>( + &'own self, + user_id: String, + demo_id: String, + stream: futures_util::stream::BoxStream<'s, axum::body::Bytes>, + ) -> futures::future::BoxFuture<'f, Result<(), String>> + where + 's: 'f, + 'own: 'f; + + fn load<'f, 'own>(&'own self, user_id: String, demo_id: String) -> futures::future::BoxFuture<'f, Result> where 'own: 'f; +} + +pub struct FileStorage { + folder: std::sync::Arc, +} + +impl FileStorage { + pub fn new

(folder: P) -> Self + where + P: Into, + { + Self { + folder: std::sync::Arc::new(folder.into()), + } + } +} + +impl DemoStorage for FileStorage { + fn duplicate(&self) -> Box { + Box::new(Self { + folder: self.folder.clone(), + }) + } + + fn upload<'f, 's, 'own>( + &'own self, + user_id: String, + demo_id: String, + stream: futures_util::stream::BoxStream<'s, axum::body::Bytes>, + ) -> futures::future::BoxFuture<'f, Result<(), String>> + where + 's: 'f,'own: 'f + { + let path = self.folder.clone(); + + async move { + let user_folder = std::path::Path::new(path.as_path()).join(format!("{}/", user_id)); + if !tokio::fs::try_exists(&user_folder).await.unwrap_or(false) { + tokio::fs::create_dir_all(&user_folder).await.unwrap(); + } + + let demo_file_path = user_folder.join(format!("{}.dem", demo_id)); + + async { + // Convert the stream into an `AsyncRead`. + let body_with_io_error = stream.map(|b| Ok::<_, std::io::Error>(b)); + let body_reader = tokio_util::io::StreamReader::new(body_with_io_error); + futures::pin_mut!(body_reader); + + // Create the file. `File` implements `AsyncWrite`. + let mut file = + tokio::io::BufWriter::new(tokio::fs::File::create(demo_file_path).await?); + + // Copy the body into the file. + tokio::io::copy(&mut body_reader, &mut file).await?; + + Ok::<_, std::io::Error>(()) + } + .await + .map_err(|err| err.to_string()) + } + .boxed() + } + + fn load<'f, 'own>(&'own self, user_id: String, demo_id: String) -> futures::future::BoxFuture<'f, Result> where 'own: 'f { + async move { + let user_folder = std::path::Path::new(self.folder.as_path()).join(format!("{}/", user_id)); + let demo_file_path = user_folder.join(format!("{}.dem", demo_id)); + let file = std::fs::File::open(demo_file_path.as_path()).unwrap(); + let mmap = unsafe { memmap2::MmapOptions::new().map(&file).unwrap() }; + + Ok(crate::analysis::AnalysisData::MemMapped(std::sync::Arc::new(mmap))) + }.boxed() + } +} + +pub struct S3Storage { + bucket: std::sync::Arc, +} + +impl S3Storage { + pub fn new(bucket_name: &str, region: s3::region::Region, credentials: s3::creds::Credentials) -> Self { + let mut bucket = s3::bucket::Bucket::new(bucket_name, region, credentials).unwrap(); + bucket.set_path_style(); + + Self { + bucket: bucket.into(), + } + } +} + +impl DemoStorage for S3Storage { + fn duplicate(&self) -> Box { + Box::new(Self { + bucket: self.bucket.clone(), + }) + } + + fn upload<'f, 's, 'own>( + &'own self, + user_id: String, + demo_id: String, + stream: futures_util::stream::BoxStream<'s, axum::body::Bytes>, + ) -> futures::future::BoxFuture<'f, Result<(), String>> + where + 's: 'f, 'own: 'f { + async move { + + let path = std::path::PathBuf::new().join(user_id).join(demo_id); + let path = path.to_str().unwrap(); + + // Convert the stream into an `AsyncRead`. + let body_with_io_error = stream.map(|b| Ok::<_, std::io::Error>(b)); + let body_reader = tokio_util::io::StreamReader::new(body_with_io_error); + futures::pin_mut!(body_reader); + + self.bucket.list(String::new(), None).await.unwrap(); + + self.bucket.put_object_stream(&mut body_reader, path).await.unwrap(); + + Ok(()) + }.boxed() + } + + fn load<'f, 'own>(&'own self, user_id: String, demo_id: String) -> futures::future::BoxFuture<'f, Result> where 'own: 'f { + async move { + let path = std::path::PathBuf::new().join(user_id).join(demo_id); + let path = path.to_str().unwrap(); + + let resp = self.bucket.get_object(path).await.unwrap(); + + Ok(crate::analysis::AnalysisData::Preloaded(resp.to_vec().into())) + }.boxed() + } +}