Improve error handling for queue elements
This commit is contained in:
@@ -111,6 +111,8 @@ where
|
|||||||
let result = db_con
|
let result = db_con
|
||||||
.build_transaction()
|
.build_transaction()
|
||||||
.run::<_, TaskError<AE>, _>(|conn| {
|
.run::<_, TaskError<AE>, _>(|conn| {
|
||||||
|
|
||||||
|
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
let mut results: Vec<crate::models::AnalysisTask> = query.load(conn).await?;
|
let mut results: Vec<crate::models::AnalysisTask> = query.load(conn).await?;
|
||||||
let task = match results.pop() {
|
let task = match results.pop() {
|
||||||
@@ -118,21 +120,28 @@ where
|
|||||||
None => return Ok(None),
|
None => return Ok(None),
|
||||||
};
|
};
|
||||||
|
|
||||||
let input = AnalysisInput::load(
|
let delete_query =
|
||||||
task.steam_id.clone(),
|
diesel::dsl::delete(crate::schema::analysis_queue::dsl::analysis_queue)
|
||||||
task.demo_id.clone(),
|
.filter(crate::schema::analysis_queue::dsl::demo_id.eq(task.demo_id.clone()))
|
||||||
|
.filter(crate::schema::analysis_queue::dsl::steam_id.eq(task.steam_id.clone()));
|
||||||
|
|
||||||
|
let input = match AnalysisInput::load(
|
||||||
|
task.steam_id,
|
||||||
|
task.demo_id,
|
||||||
storage.as_ref(),
|
storage.as_ref(),
|
||||||
)
|
)
|
||||||
.await
|
.await {
|
||||||
.unwrap();
|
Ok(i) => i,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!("Loading Analysis Input: {:?}", e);
|
||||||
|
delete_query.execute(conn).await?;
|
||||||
|
return Ok(Some(()));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
let tmp = action(input, &mut *conn);
|
let tmp = action(input, &mut *conn);
|
||||||
tmp.await.map_err(|e| TaskError::RunningAction(e))?;
|
tmp.await.map_err(|e| TaskError::RunningAction(e))?;
|
||||||
|
|
||||||
let delete_query =
|
|
||||||
diesel::dsl::delete(crate::schema::analysis_queue::dsl::analysis_queue)
|
|
||||||
.filter(crate::schema::analysis_queue::dsl::demo_id.eq(task.demo_id))
|
|
||||||
.filter(crate::schema::analysis_queue::dsl::steam_id.eq(task.steam_id));
|
|
||||||
delete_query.execute(conn).await?;
|
delete_query.execute(conn).await?;
|
||||||
|
|
||||||
Ok(Some(()))
|
Ok(Some(()))
|
||||||
|
|||||||
@@ -154,12 +154,10 @@ impl DemoStorage for S3Storage {
|
|||||||
let body_reader = tokio_util::io::StreamReader::new(body_with_io_error);
|
let body_reader = tokio_util::io::StreamReader::new(body_with_io_error);
|
||||||
futures::pin_mut!(body_reader);
|
futures::pin_mut!(body_reader);
|
||||||
|
|
||||||
self.bucket.list(String::new(), None).await.unwrap();
|
|
||||||
|
|
||||||
self.bucket
|
self.bucket
|
||||||
.put_object_stream(&mut body_reader, path)
|
.put_object_stream(&mut body_reader, path)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.map_err(|e| format!("Uploading Stream to bucket: {:?}", e))?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -178,7 +176,7 @@ impl DemoStorage for S3Storage {
|
|||||||
let path = std::path::PathBuf::new().join(user_id).join(demo_id);
|
let path = std::path::PathBuf::new().join(user_id).join(demo_id);
|
||||||
let path = path.to_str().unwrap();
|
let path = path.to_str().unwrap();
|
||||||
|
|
||||||
let resp = self.bucket.get_object(path).await.unwrap();
|
let resp = self.bucket.get_object(path).await.map_err(|e| format!("Loading from Bucket: {:?}", e))?;
|
||||||
|
|
||||||
Ok(crate::analysis::AnalysisData::Preloaded(
|
Ok(crate::analysis::AnalysisData::Preloaded(
|
||||||
resp.to_vec().into(),
|
resp.to_vec().into(),
|
||||||
|
|||||||
Reference in New Issue
Block a user