diff --git a/backend/src/gc.rs b/backend/src/gc.rs new file mode 100644 index 0000000..69cdcdf --- /dev/null +++ b/backend/src/gc.rs @@ -0,0 +1,36 @@ +use diesel::prelude::*; +use diesel_async::RunQueryDsl; + +#[tracing::instrument(skip(storage))] +pub async fn run_gc(storage: &mut dyn crate::storage::DemoStorage) -> Result<(), ()> { + let stored_demos = match storage.list_demos().await { + Ok(ds) => ds, + Err(e) => return Err(()), + }; + + tracing::info!("Found {} demos in storage", stored_demos.len()); + + let mut db_con = crate::db_connection().await; + + let db_res = db_con + .build_transaction() + .run(move |con| { + Box::pin(async move { + for demo in stored_demos { + let query = crate::schema::demos::dsl::demos + .filter(crate::schema::demos::dsl::demo_id.eq(&demo)).select(crate::models::Demo::as_select()); + + let matching: Vec = query.load(con).await?; + + if matching.is_empty() { + tracing::debug!("Should delete old demo {:?}", demo); + } + } + + Ok::<(), diesel::result::Error>(()) + }) + }) + .await; + + db_res.map_err(|e| ()) +} diff --git a/backend/src/lib.rs b/backend/src/lib.rs index 13a3b1b..432201a 100644 --- a/backend/src/lib.rs +++ b/backend/src/lib.rs @@ -8,6 +8,8 @@ pub mod diesel_sessionstore; pub mod analysis; +mod gc; + pub async fn db_connection() -> diesel_async::AsyncPgConnection { use diesel_async::AsyncConnection; @@ -132,3 +134,16 @@ pub async fn run_analysis(storage: Box) { } } } + +#[tracing::instrument(skip(storage))] +pub async fn run_garbage_collection(mut storage: Box) { + loop { + tracing::info!("Running Garbage Collection"); + + if let Err(e) = gc::run_gc(storage.as_mut()).await { + tracing::error!("Running GC {:?}", e); + } + + tokio::time::sleep(std::time::Duration::from_secs(15 * 60)).await; + } +} diff --git a/backend/src/main.rs b/backend/src/main.rs index 15115cc..5424da7 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -27,6 +27,9 @@ struct CliArgs { #[clap(long = "analysis", default_value_t = true)] analysis: bool, + + #[clap(long = "gc", default_value_t = true)] + garbage_collection: bool, } #[tokio::main(flavor = "multi_thread")] @@ -93,7 +96,12 @@ async fn main() { if args.analysis { tracing::info!("Enabled Analysis module"); - component_set.spawn(backend::run_analysis(storage)); + component_set.spawn(backend::run_analysis(storage.duplicate())); + } + if args.garbage_collection { + tracing::info!("Enabled Garbage-Collection module"); + + component_set.spawn(backend::run_garbage_collection(storage)); } tracing::info!("Started modules"); diff --git a/backend/src/storage.rs b/backend/src/storage.rs index 6c72aab..207e4a7 100644 --- a/backend/src/storage.rs +++ b/backend/src/storage.rs @@ -21,6 +21,9 @@ pub trait DemoStorage: Send + Sync { ) -> futures::future::BoxFuture<'f, Result> where 'own: 'f; + + fn list_demos<'own, 'f>(&'own self) -> futures::future::BoxFuture<'f, Result, String>> + where 'own: 'f; } pub struct FileStorage { @@ -107,6 +110,14 @@ impl DemoStorage for FileStorage { } .boxed() } + + fn list_demos<'own, 'f>(&'own self) -> futures::future::BoxFuture<'f, Result, String>> + where 'own: 'f { + async move { + // TODO + Ok(Vec::new()) + }.boxed() + } } pub struct S3Storage { @@ -184,4 +195,20 @@ impl DemoStorage for S3Storage { } .boxed() } + + fn list_demos<'own, 'f>(&'own self) -> futures::future::BoxFuture<'f, Result, String>> + where 'own: 'f { + async move { + let listed = match self.bucket.list("".to_string(), None).await { + Ok(l) => l, + Err(e) => return Err(format!("{:?}", e)), + }; + + Ok(listed.into_iter().flat_map(|rs| { + rs.contents.into_iter().map(|entry| { + entry.key + }) + }).collect()) + }.boxed() + } }