diff --git a/Cargo.lock b/Cargo.lock index 0dbabae..5c69d8c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -297,6 +297,7 @@ dependencies = [ "diesel", "diesel-async", "diesel_async_migrations", + "futures", "futures-util", "image", "memmap2", @@ -307,6 +308,7 @@ dependencies = [ "steam-openid", "time", "tokio", + "tokio-util", "tower-http", "tower-sessions", "tracing", diff --git a/backend/Cargo.toml b/backend/Cargo.toml index cf94c0f..396ef5e 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -10,11 +10,13 @@ serde = { version = "1.0.210", features = ["derive"] } steam-openid = "0.2.0" time = { version = "0.3.36", features = ["formatting", "parsing"] } tokio = { version = "1.40.0", features = ["rt", "macros", "net", "mio", "rt-multi-thread"] } +tokio-util = { version = "0.7", features = ["io"]} tower-sessions = "0.13.0" tower-http = { version = "0.6", features = ["fs"] } tracing = { version = "0.1.40", features = ["async-await"] } tracing-subscriber = "0.3.18" futures-util = "0.3" +futures = "0.3" diesel = { version = "2.2", features = ["serde_json", "chrono"] } diesel-async = { version = "0.5", features = ["postgres"] } diff --git a/backend/src/api.rs b/backend/src/api.rs index fa52519..55a5676 100644 --- a/backend/src/api.rs +++ b/backend/src/api.rs @@ -169,3 +169,31 @@ pub fn router(config: RouterConfig) -> axum::Router { .nest("/demos/", demos::router(config.upload_dir)) .nest("/user/", user::router()) } + +// Save a `Stream` to a file +async fn stream_to_file(path: impl Into, stream: S) -> Result<(), (axum::http::StatusCode, String)> +where + S: futures::Stream>, + E: Into>, +{ + use futures::{Stream, TryStreamExt}; + + let path: std::path::PathBuf = path.into(); + + async { + // Convert the stream into an `AsyncRead`. + let body_with_io_error = stream.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)); + let body_reader = tokio_util::io::StreamReader::new(body_with_io_error); + futures::pin_mut!(body_reader); + + // Create the file. `File` implements `AsyncWrite`. + let mut file = tokio::io::BufWriter::new(tokio::fs::File::create(path).await?); + + // Copy the body into the file. + tokio::io::copy(&mut body_reader, &mut file).await?; + + Ok::<_,std::io::Error>(()) + } + .await + .map_err(|err| (axum::http::StatusCode::INTERNAL_SERVER_ERROR, err.to_string())) +} diff --git a/backend/src/api/demos.rs b/backend/src/api/demos.rs index 12e695d..67ec9a2 100644 --- a/backend/src/api/demos.rs +++ b/backend/src/api/demos.rs @@ -93,7 +93,7 @@ async fn list( async fn upload( State(state): State>, session: crate::UserSession, - form: axum::extract::Multipart, + mut form: axum::extract::Multipart, ) -> Result { let steam_id = session .data() @@ -102,22 +102,27 @@ async fn upload( tracing::info!("Upload for Session: {:?}", steam_id); - let file_content = match crate::get_demo_from_upload("demo", form).await { - Some(c) => c, - None => { - tracing::error!("Getting File content from request"); - return Err(( - axum::http::StatusCode::BAD_REQUEST, - "Failed to get file-content from upload", - )); + let file_field = loop { + let field = match form.next_field().await { + Ok(Some(f)) => f, + Ok(None) => { + tracing::error!(""); + return Err((axum::http::StatusCode::BAD_REQUEST, "Missing Data")); + } + Err(e) => { + tracing::error!(""); + return Err((axum::http::StatusCode::BAD_REQUEST, "")); + } + }; + + if field.name().map(|n| n == "demo").unwrap_or(false) { + break field; } }; let raw_demo_id = uuid::Uuid::now_v7(); let demo_id = raw_demo_id.to_string(); - tracing::debug!(?demo_id, "Upload Size: {:?}", file_content.len()); - let user_folder = std::path::Path::new(&state.upload_folder).join(format!("{}/", steam_id)); if !tokio::fs::try_exists(&user_folder).await.unwrap_or(false) { tokio::fs::create_dir_all(&user_folder).await.unwrap(); @@ -125,9 +130,7 @@ async fn upload( let demo_file_path = user_folder.join(format!("{}.dem", demo_id)); - tokio::fs::write(&demo_file_path, file_content) - .await - .unwrap(); + super::stream_to_file(demo_file_path, file_field).await.unwrap(); let mut db_con = crate::db_connection().await; diff --git a/backend/src/lib.rs b/backend/src/lib.rs index 94fbf9e..8947b21 100644 --- a/backend/src/lib.rs +++ b/backend/src/lib.rs @@ -18,28 +18,6 @@ pub async fn db_connection() -> diesel_async::AsyncPgConnection { .unwrap_or_else(|e| panic!("Error connecting to {} - {:?}", database_url, e)) } -pub async fn get_demo_from_upload( - name: &str, - mut form: axum::extract::Multipart, -) -> Option { - while let Ok(field) = form.next_field().await { - let field = match field { - Some(f) => f, - None => continue, - }; - - if field.name().map(|n| n != name).unwrap_or(false) { - continue; - } - - if let Ok(data) = field.bytes().await { - return Some(data); - } - } - - None -} - pub mod api; pub mod steam_api;