Switch to streaming the uploaded file to disk

This commit is contained in:
Lol3rrr
2024-10-18 19:46:56 +02:00
parent 201cead3f1
commit 966bd4f413
5 changed files with 49 additions and 36 deletions

View File

@@ -169,3 +169,31 @@ pub fn router(config: RouterConfig) -> axum::Router {
.nest("/demos/", demos::router(config.upload_dir))
.nest("/user/", user::router())
}
// Save a `Stream` to a file
async fn stream_to_file<S, E>(path: impl Into<std::path::PathBuf>, stream: S) -> Result<(), (axum::http::StatusCode, String)>
where
S: futures::Stream<Item = Result<axum::body::Bytes, E>>,
E: Into<Box<dyn std::error::Error + Send + Sync + 'static>>,
{
use futures::{Stream, TryStreamExt};
let path: std::path::PathBuf = path.into();
async {
// Convert the stream into an `AsyncRead`.
let body_with_io_error = stream.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err));
let body_reader = tokio_util::io::StreamReader::new(body_with_io_error);
futures::pin_mut!(body_reader);
// Create the file. `File` implements `AsyncWrite`.
let mut file = tokio::io::BufWriter::new(tokio::fs::File::create(path).await?);
// Copy the body into the file.
tokio::io::copy(&mut body_reader, &mut file).await?;
Ok::<_,std::io::Error>(())
}
.await
.map_err(|err| (axum::http::StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))
}

View File

@@ -93,7 +93,7 @@ async fn list(
async fn upload(
State(state): State<Arc<DemoState>>,
session: crate::UserSession,
form: axum::extract::Multipart,
mut form: axum::extract::Multipart,
) -> Result<axum::response::Redirect, (axum::http::StatusCode, &'static str)> {
let steam_id = session
.data()
@@ -102,22 +102,27 @@ async fn upload(
tracing::info!("Upload for Session: {:?}", steam_id);
let file_content = match crate::get_demo_from_upload("demo", form).await {
Some(c) => c,
None => {
tracing::error!("Getting File content from request");
return Err((
axum::http::StatusCode::BAD_REQUEST,
"Failed to get file-content from upload",
));
let file_field = loop {
let field = match form.next_field().await {
Ok(Some(f)) => f,
Ok(None) => {
tracing::error!("");
return Err((axum::http::StatusCode::BAD_REQUEST, "Missing Data"));
}
Err(e) => {
tracing::error!("");
return Err((axum::http::StatusCode::BAD_REQUEST, ""));
}
};
if field.name().map(|n| n == "demo").unwrap_or(false) {
break field;
}
};
let raw_demo_id = uuid::Uuid::now_v7();
let demo_id = raw_demo_id.to_string();
tracing::debug!(?demo_id, "Upload Size: {:?}", file_content.len());
let user_folder = std::path::Path::new(&state.upload_folder).join(format!("{}/", steam_id));
if !tokio::fs::try_exists(&user_folder).await.unwrap_or(false) {
tokio::fs::create_dir_all(&user_folder).await.unwrap();
@@ -125,9 +130,7 @@ async fn upload(
let demo_file_path = user_folder.join(format!("{}.dem", demo_id));
tokio::fs::write(&demo_file_path, file_content)
.await
.unwrap();
super::stream_to_file(demo_file_path, file_field).await.unwrap();
let mut db_con = crate::db_connection().await;

View File

@@ -18,28 +18,6 @@ pub async fn db_connection() -> diesel_async::AsyncPgConnection {
.unwrap_or_else(|e| panic!("Error connecting to {} - {:?}", database_url, e))
}
pub async fn get_demo_from_upload(
name: &str,
mut form: axum::extract::Multipart,
) -> Option<axum::body::Bytes> {
while let Ok(field) = form.next_field().await {
let field = match field {
Some(f) => f,
None => continue,
};
if field.name().map(|n| n != name).unwrap_or(false) {
continue;
}
if let Ok(data) = field.bytes().await {
return Some(data);
}
}
None
}
pub mod api;
pub mod steam_api;