Start work on #34
Some checks failed
Testing/Linting / test (analysis) (push) Has been cancelled
Testing/Linting / test (backend) (push) Has been cancelled
Testing/Linting / lint (analysis) (push) Has been cancelled
Testing/Linting / lint (backend) (push) Has been cancelled
build-docker-image / docker (push) Has been cancelled
Some checks failed
Testing/Linting / test (analysis) (push) Has been cancelled
Testing/Linting / test (backend) (push) Has been cancelled
Testing/Linting / lint (analysis) (push) Has been cancelled
Testing/Linting / lint (backend) (push) Has been cancelled
build-docker-image / docker (push) Has been cancelled
Started work on the garbage-collection module. This for now just loads the stored demo ids and checks if they are also found in the database, logging every demo that could not be found and should at some point delete these demos
This commit is contained in:
36
backend/src/gc.rs
Normal file
36
backend/src/gc.rs
Normal file
@@ -0,0 +1,36 @@
|
|||||||
|
use diesel::prelude::*;
|
||||||
|
use diesel_async::RunQueryDsl;
|
||||||
|
|
||||||
|
#[tracing::instrument(skip(storage))]
|
||||||
|
pub async fn run_gc(storage: &mut dyn crate::storage::DemoStorage) -> Result<(), ()> {
|
||||||
|
let stored_demos = match storage.list_demos().await {
|
||||||
|
Ok(ds) => ds,
|
||||||
|
Err(e) => return Err(()),
|
||||||
|
};
|
||||||
|
|
||||||
|
tracing::info!("Found {} demos in storage", stored_demos.len());
|
||||||
|
|
||||||
|
let mut db_con = crate::db_connection().await;
|
||||||
|
|
||||||
|
let db_res = db_con
|
||||||
|
.build_transaction()
|
||||||
|
.run(move |con| {
|
||||||
|
Box::pin(async move {
|
||||||
|
for demo in stored_demos {
|
||||||
|
let query = crate::schema::demos::dsl::demos
|
||||||
|
.filter(crate::schema::demos::dsl::demo_id.eq(&demo)).select(crate::models::Demo::as_select());
|
||||||
|
|
||||||
|
let matching: Vec<crate::models::Demo> = query.load(con).await?;
|
||||||
|
|
||||||
|
if matching.is_empty() {
|
||||||
|
tracing::debug!("Should delete old demo {:?}", demo);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok::<(), diesel::result::Error>(())
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
|
||||||
|
db_res.map_err(|e| ())
|
||||||
|
}
|
||||||
@@ -8,6 +8,8 @@ pub mod diesel_sessionstore;
|
|||||||
|
|
||||||
pub mod analysis;
|
pub mod analysis;
|
||||||
|
|
||||||
|
mod gc;
|
||||||
|
|
||||||
pub async fn db_connection() -> diesel_async::AsyncPgConnection {
|
pub async fn db_connection() -> diesel_async::AsyncPgConnection {
|
||||||
use diesel_async::AsyncConnection;
|
use diesel_async::AsyncConnection;
|
||||||
|
|
||||||
@@ -132,3 +134,16 @@ pub async fn run_analysis(storage: Box<dyn crate::storage::DemoStorage>) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(skip(storage))]
|
||||||
|
pub async fn run_garbage_collection(mut storage: Box<dyn crate::storage::DemoStorage>) {
|
||||||
|
loop {
|
||||||
|
tracing::info!("Running Garbage Collection");
|
||||||
|
|
||||||
|
if let Err(e) = gc::run_gc(storage.as_mut()).await {
|
||||||
|
tracing::error!("Running GC {:?}", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
tokio::time::sleep(std::time::Duration::from_secs(15 * 60)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -27,6 +27,9 @@ struct CliArgs {
|
|||||||
|
|
||||||
#[clap(long = "analysis", default_value_t = true)]
|
#[clap(long = "analysis", default_value_t = true)]
|
||||||
analysis: bool,
|
analysis: bool,
|
||||||
|
|
||||||
|
#[clap(long = "gc", default_value_t = true)]
|
||||||
|
garbage_collection: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::main(flavor = "multi_thread")]
|
#[tokio::main(flavor = "multi_thread")]
|
||||||
@@ -93,7 +96,12 @@ async fn main() {
|
|||||||
if args.analysis {
|
if args.analysis {
|
||||||
tracing::info!("Enabled Analysis module");
|
tracing::info!("Enabled Analysis module");
|
||||||
|
|
||||||
component_set.spawn(backend::run_analysis(storage));
|
component_set.spawn(backend::run_analysis(storage.duplicate()));
|
||||||
|
}
|
||||||
|
if args.garbage_collection {
|
||||||
|
tracing::info!("Enabled Garbage-Collection module");
|
||||||
|
|
||||||
|
component_set.spawn(backend::run_garbage_collection(storage));
|
||||||
}
|
}
|
||||||
tracing::info!("Started modules");
|
tracing::info!("Started modules");
|
||||||
|
|
||||||
|
|||||||
@@ -21,6 +21,9 @@ pub trait DemoStorage: Send + Sync {
|
|||||||
) -> futures::future::BoxFuture<'f, Result<crate::analysis::AnalysisData, String>>
|
) -> futures::future::BoxFuture<'f, Result<crate::analysis::AnalysisData, String>>
|
||||||
where
|
where
|
||||||
'own: 'f;
|
'own: 'f;
|
||||||
|
|
||||||
|
fn list_demos<'own, 'f>(&'own self) -> futures::future::BoxFuture<'f, Result<Vec<String>, String>>
|
||||||
|
where 'own: 'f;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct FileStorage {
|
pub struct FileStorage {
|
||||||
@@ -107,6 +110,14 @@ impl DemoStorage for FileStorage {
|
|||||||
}
|
}
|
||||||
.boxed()
|
.boxed()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn list_demos<'own, 'f>(&'own self) -> futures::future::BoxFuture<'f, Result<Vec<String>, String>>
|
||||||
|
where 'own: 'f {
|
||||||
|
async move {
|
||||||
|
// TODO
|
||||||
|
Ok(Vec::new())
|
||||||
|
}.boxed()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct S3Storage {
|
pub struct S3Storage {
|
||||||
@@ -184,4 +195,20 @@ impl DemoStorage for S3Storage {
|
|||||||
}
|
}
|
||||||
.boxed()
|
.boxed()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn list_demos<'own, 'f>(&'own self) -> futures::future::BoxFuture<'f, Result<Vec<String>, String>>
|
||||||
|
where 'own: 'f {
|
||||||
|
async move {
|
||||||
|
let listed = match self.bucket.list("".to_string(), None).await {
|
||||||
|
Ok(l) => l,
|
||||||
|
Err(e) => return Err(format!("{:?}", e)),
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(listed.into_iter().flat_map(|rs| {
|
||||||
|
rs.contents.into_iter().map(|entry| {
|
||||||
|
entry.key
|
||||||
|
})
|
||||||
|
}).collect())
|
||||||
|
}.boxed()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user