diff --git a/backend/src/analysis.rs b/backend/src/analysis.rs index 3c1387b..e125c63 100644 --- a/backend/src/analysis.rs +++ b/backend/src/analysis.rs @@ -111,6 +111,8 @@ where let result = db_con .build_transaction() .run::<_, TaskError, _>(|conn| { + + Box::pin(async move { let mut results: Vec = query.load(conn).await?; let task = match results.pop() { @@ -118,21 +120,28 @@ where None => return Ok(None), }; - let input = AnalysisInput::load( - task.steam_id.clone(), - task.demo_id.clone(), + let delete_query = + diesel::dsl::delete(crate::schema::analysis_queue::dsl::analysis_queue) + .filter(crate::schema::analysis_queue::dsl::demo_id.eq(task.demo_id.clone())) + .filter(crate::schema::analysis_queue::dsl::steam_id.eq(task.steam_id.clone())); + + let input = match AnalysisInput::load( + task.steam_id, + task.demo_id, storage.as_ref(), ) - .await - .unwrap(); + .await { + Ok(i) => i, + Err(e) => { + tracing::error!("Loading Analysis Input: {:?}", e); + delete_query.execute(conn).await?; + return Ok(Some(())); + } + }; let tmp = action(input, &mut *conn); tmp.await.map_err(|e| TaskError::RunningAction(e))?; - let delete_query = - diesel::dsl::delete(crate::schema::analysis_queue::dsl::analysis_queue) - .filter(crate::schema::analysis_queue::dsl::demo_id.eq(task.demo_id)) - .filter(crate::schema::analysis_queue::dsl::steam_id.eq(task.steam_id)); delete_query.execute(conn).await?; Ok(Some(())) diff --git a/backend/src/storage.rs b/backend/src/storage.rs index a35ea39..6c72aab 100644 --- a/backend/src/storage.rs +++ b/backend/src/storage.rs @@ -154,12 +154,10 @@ impl DemoStorage for S3Storage { let body_reader = tokio_util::io::StreamReader::new(body_with_io_error); futures::pin_mut!(body_reader); - self.bucket.list(String::new(), None).await.unwrap(); - self.bucket .put_object_stream(&mut body_reader, path) .await - .unwrap(); + .map_err(|e| format!("Uploading Stream to bucket: {:?}", e))?; Ok(()) } @@ -178,7 +176,7 @@ impl DemoStorage for S3Storage { let path = std::path::PathBuf::new().join(user_id).join(demo_id); let path = path.to_str().unwrap(); - let resp = self.bucket.get_object(path).await.unwrap(); + let resp = self.bucket.get_object(path).await.map_err(|e| format!("Loading from Bucket: {:?}", e))?; Ok(crate::analysis::AnalysisData::Preloaded( resp.to_vec().into(),