Add support for s3 storage

This commit is contained in:
Lol3rrr
2024-11-01 04:03:35 +01:00
parent f25eac83e1
commit 809ae0479e
9 changed files with 484 additions and 53 deletions

View File

@@ -40,3 +40,5 @@ phf = { version = "0.11", features = ["macros"] }
uuid = { version = "1.10", features = ["v7"] }
chrono = { version = "0.4", features = ["serde"] }
rust-s3 = { version = "0.35.1", features = ["no-verify-ssl"] }
bytes = "1.8"

View File

@@ -5,31 +5,39 @@ pub mod base;
pub mod heatmap;
pub mod perround;
#[derive(Debug, Clone)]
pub enum AnalysisData {
MemMapped(std::sync::Arc<memmap2::Mmap>),
Preloaded(std::sync::Arc<[u8]>),
}
#[derive(Debug, Clone)]
pub struct AnalysisInput {
pub steamid: String,
pub demoid: String,
data: std::sync::Arc<memmap2::Mmap>,
data: AnalysisData,
}
impl AnalysisInput {
pub async fn load(
steamid: String,
demoid: String,
path: impl AsRef<std::path::Path>,
storage: &dyn crate::storage::DemoStorage,
) -> Result<Self, ()> {
let file = std::fs::File::open(path.as_ref()).unwrap();
let mmap = unsafe { memmap2::MmapOptions::new().map(&file).unwrap() };
let data = storage.load(steamid.clone(), demoid.clone()).await.unwrap();
Ok(Self {
steamid,
demoid,
data: std::sync::Arc::new(mmap),
data,
})
}
pub fn data(&self) -> &[u8] {
&self.data
match &self.data {
AnalysisData::MemMapped(v) => &v,
AnalysisData::Preloaded(v) => &v,
}
}
}
@@ -75,7 +83,7 @@ impl<AE> From<diesel::result::Error> for TaskError<AE> {
}
pub async fn poll_next_task<A, AE>(
upload_folder: impl Into<std::path::PathBuf>,
storage: Box<dyn crate::storage::DemoStorage>,
db_con: &mut diesel_async::pg::AsyncPgConnection,
action: A,
) -> Result<(), TaskError<AE>>
@@ -96,10 +104,8 @@ where
.for_update()
.skip_locked();
let upload_folder: std::path::PathBuf = upload_folder.into();
loop {
let upload_folder = upload_folder.clone();
let storage = storage.duplicate();
let action = action.clone();
let result = db_con
@@ -115,9 +121,7 @@ where
let input = AnalysisInput::load(
task.steam_id.clone(),
task.demo_id.clone(),
upload_folder
.join(&task.steam_id)
.join(format!("{}.dem", task.demo_id)),
storage.as_ref(),
)
.await
.unwrap();

View File

@@ -153,7 +153,7 @@ pub struct RouterConfig {
pub steam_api_key: String,
pub steam_callback_base_url: String,
pub steam_callback_path: String,
pub upload_dir: std::path::PathBuf,
pub storage: Box<dyn crate::storage::DemoStorage>,
}
pub fn router(config: RouterConfig) -> axum::Router {
@@ -166,7 +166,7 @@ pub fn router(config: RouterConfig) -> axum::Router {
config.steam_api_key,
),
)
.nest("/demos/", demos::router(config.upload_dir))
.nest("/demos/", demos::router(config.storage))
.nest("/user/", user::router())
}

View File

@@ -6,13 +6,10 @@ use diesel_async::RunQueryDsl;
use std::sync::Arc;
struct DemoState {
upload_folder: std::path::PathBuf,
storage: Box<dyn crate::storage::DemoStorage + Send + Sync>,
}
pub fn router<P>(upload_folder: P) -> axum::Router
where
P: Into<std::path::PathBuf>,
{
pub fn router(storage: Box<dyn crate::storage::DemoStorage>) -> axum::Router {
axum::Router::new()
.route("/list", axum::routing::get(list))
.route(
@@ -25,9 +22,7 @@ where
.route("/:id/analysis/scoreboard", axum::routing::get(scoreboard))
.route("/:id/analysis/perround", axum::routing::get(perround))
.route("/:id/analysis/heatmap", axum::routing::get(heatmap))
.with_state(Arc::new(DemoState {
upload_folder: upload_folder.into(),
}))
.with_state(Arc::new(DemoState { storage }))
}
#[tracing::instrument(skip(session))]
@@ -94,11 +89,11 @@ async fn upload(
State(state): State<Arc<DemoState>>,
session: crate::UserSession,
mut form: axum::extract::Multipart,
) -> Result<axum::response::Redirect, (axum::http::StatusCode, &'static str)> {
) -> Result<axum::response::Redirect, (axum::http::StatusCode, String)> {
let steam_id = session
.data()
.steam_id
.ok_or_else(|| (axum::http::StatusCode::UNAUTHORIZED, "Not logged in"))?;
.ok_or_else(|| (axum::http::StatusCode::UNAUTHORIZED, "Not logged in".into()))?;
tracing::info!("Upload for Session: {:?}", steam_id);
@@ -107,11 +102,11 @@ async fn upload(
Ok(Some(f)) => f,
Ok(None) => {
tracing::error!("");
return Err((axum::http::StatusCode::BAD_REQUEST, "Missing Data"));
return Err((axum::http::StatusCode::BAD_REQUEST, "Missing Data".into()));
}
Err(e) => {
tracing::error!("");
return Err((axum::http::StatusCode::BAD_REQUEST, ""));
return Err((axum::http::StatusCode::BAD_REQUEST, "".into()));
}
};
@@ -123,16 +118,16 @@ async fn upload(
let raw_demo_id = uuid::Uuid::now_v7();
let demo_id = raw_demo_id.to_string();
let user_folder = std::path::Path::new(&state.upload_folder).join(format!("{}/", steam_id));
if !tokio::fs::try_exists(&user_folder).await.unwrap_or(false) {
tokio::fs::create_dir_all(&user_folder).await.unwrap();
}
let demo_file_path = user_folder.join(format!("{}.dem", demo_id));
super::stream_to_file(demo_file_path, file_field)
use futures::stream::StreamExt;
state
.storage
.upload(
steam_id.to_string(),
demo_id.clone(),
file_field.filter_map(|b| async { b.ok() }).boxed(),
)
.await
.unwrap();
.map_err(|e| (axum::http::StatusCode::INTERNAL_SERVER_ERROR, e))?;
let mut db_con = crate::db_connection().await;

View File

@@ -21,13 +21,13 @@ pub async fn db_connection() -> diesel_async::AsyncPgConnection {
pub mod api;
pub mod steam_api;
#[tracing::instrument(skip(upload_folder, steam_api_key))]
pub mod storage;
#[tracing::instrument(skip(storage, steam_api_key))]
pub async fn run_api(
upload_folder: impl Into<std::path::PathBuf>,
storage: Box<dyn crate::storage::DemoStorage>,
steam_api_key: impl Into<String>,
) {
let upload_folder: std::path::PathBuf = upload_folder.into();
let session_store = crate::diesel_sessionstore::DieselStore::new();
let session_layer = tower_sessions::SessionManagerLayer::new(session_store)
.with_secure(false)
@@ -35,10 +35,6 @@ pub async fn run_api(
48,
)));
if !tokio::fs::try_exists(&upload_folder).await.unwrap_or(false) {
tokio::fs::create_dir_all(&upload_folder).await.unwrap();
}
let serve_dir = option_env!("FRONTEND_DIST_DIR").unwrap_or("../frontend/dist/");
tracing::debug!("Serving static files from {:?}", serve_dir);
@@ -53,7 +49,7 @@ pub async fn run_api(
steam_api_key: steam_api_key.into(),
steam_callback_base_url,
steam_callback_path: "/api/steam/callback".into(),
upload_dir: upload_folder.clone(),
storage,
}),
)
.layer(session_layer)
@@ -69,23 +65,26 @@ pub async fn run_api(
axum::serve(listener, router).await.unwrap();
}
#[tracing::instrument(skip(upload_folder))]
pub async fn run_analysis(upload_folder: impl Into<std::path::PathBuf>) {
#[tracing::instrument(skip(storage))]
pub async fn run_analysis(storage: Box<dyn crate::storage::DemoStorage>) {
use diesel::prelude::*;
use diesel_async::RunQueryDsl;
let upload_folder: std::path::PathBuf = upload_folder.into();
loop {
let mut db_con = db_connection().await;
let res = crate::analysis::poll_next_task(
&upload_folder,
storage.duplicate(),
&mut db_con,
move |input: analysis::AnalysisInput, db_con: &mut diesel_async::AsyncPgConnection| {
Box::pin(async move {
let demo_id = input.demoid.clone();
let _span = tracing::info_span!("Analysis", demo=?demo_id);
tracing::info!("Starting analysis");
let mut store_result_fns = Vec::new();
for analysis in analysis::ANALYSIS_METHODS.iter().map(|a| a.clone()) {
let input = input.clone();
@@ -119,6 +118,8 @@ pub async fn run_analysis(upload_folder: impl Into<std::path::PathBuf>) {
}
update_process_info.execute(db_con).await.map_err(|e| ())?;
tracing::info!("Completed analysis");
Ok::<(), ()>(())
})
},

View File

@@ -7,11 +7,21 @@ async fn run_migrations(connection: &mut diesel_async::AsyncPgConnection) {
MIGRATIONS.run_pending_migrations(connection).await.unwrap();
}
#[derive(clap::ValueEnum, Clone, Default, Debug)]
enum CliStorage {
#[default]
File,
S3,
}
#[derive(clap::Parser)]
struct CliArgs {
#[clap(long = "upload-folder", default_value = "uploads/")]
upload_folder: std::path::PathBuf,
#[clap(long = "storage", default_value = "file")]
storage: CliStorage,
#[clap(long = "api", default_value_t = true)]
api: bool,
@@ -40,6 +50,26 @@ async fn main() {
let mut component_set = tokio::task::JoinSet::new();
let storage: Box<dyn backend::storage::DemoStorage> = match args.storage {
CliStorage::File => Box::new(backend::storage::FileStorage::new(
args.upload_folder.clone(),
)),
CliStorage::S3 => {
let credentials = s3::creds::Credentials::from_env_specific(
Some("S3_ACCESS_KEY"),
Some("S3_SECRET_KEY"),
None,
None
).unwrap();
let region = s3::Region::from_env("S3_REGION", Some("S3_ENDPOINT")).unwrap();
let bucket = std::env::var("S3_BUCKET").expect("Need 'S3_BUCKET' for using s3 storage backend");
Box::new(backend::storage::S3Storage::new(&bucket, region, credentials))
},
};
tracing::info!("Starting modules");
if args.api {
tracing::info!("Enabled API module");
@@ -52,12 +82,12 @@ async fn main() {
}
};
component_set.spawn(backend::run_api(args.upload_folder.clone(), steam_api_key));
component_set.spawn(backend::run_api(storage.duplicate(), steam_api_key));
}
if args.analysis {
tracing::info!("Enabled Analysis module");
component_set.spawn(backend::run_analysis(args.upload_folder.clone()));
component_set.spawn(backend::run_analysis(storage));
}
tracing::info!("Started modules");

152
backend/src/storage.rs Normal file
View File

@@ -0,0 +1,152 @@
use futures::FutureExt;
use futures::StreamExt;
pub trait DemoStorage: Send + Sync {
fn duplicate(&self) -> Box<dyn DemoStorage>;
fn upload<'f, 's, 'own>(
&'own self,
user_id: String,
demo_id: String,
stream: futures_util::stream::BoxStream<'s, axum::body::Bytes>,
) -> futures::future::BoxFuture<'f, Result<(), String>>
where
's: 'f,
'own: 'f;
fn load<'f, 'own>(&'own self, user_id: String, demo_id: String) -> futures::future::BoxFuture<'f, Result<crate::analysis::AnalysisData, String>> where 'own: 'f;
}
pub struct FileStorage {
folder: std::sync::Arc<std::path::PathBuf>,
}
impl FileStorage {
pub fn new<P>(folder: P) -> Self
where
P: Into<std::path::PathBuf>,
{
Self {
folder: std::sync::Arc::new(folder.into()),
}
}
}
impl DemoStorage for FileStorage {
fn duplicate(&self) -> Box<dyn DemoStorage> {
Box::new(Self {
folder: self.folder.clone(),
})
}
fn upload<'f, 's, 'own>(
&'own self,
user_id: String,
demo_id: String,
stream: futures_util::stream::BoxStream<'s, axum::body::Bytes>,
) -> futures::future::BoxFuture<'f, Result<(), String>>
where
's: 'f,'own: 'f
{
let path = self.folder.clone();
async move {
let user_folder = std::path::Path::new(path.as_path()).join(format!("{}/", user_id));
if !tokio::fs::try_exists(&user_folder).await.unwrap_or(false) {
tokio::fs::create_dir_all(&user_folder).await.unwrap();
}
let demo_file_path = user_folder.join(format!("{}.dem", demo_id));
async {
// Convert the stream into an `AsyncRead`.
let body_with_io_error = stream.map(|b| Ok::<_, std::io::Error>(b));
let body_reader = tokio_util::io::StreamReader::new(body_with_io_error);
futures::pin_mut!(body_reader);
// Create the file. `File` implements `AsyncWrite`.
let mut file =
tokio::io::BufWriter::new(tokio::fs::File::create(demo_file_path).await?);
// Copy the body into the file.
tokio::io::copy(&mut body_reader, &mut file).await?;
Ok::<_, std::io::Error>(())
}
.await
.map_err(|err| err.to_string())
}
.boxed()
}
fn load<'f, 'own>(&'own self, user_id: String, demo_id: String) -> futures::future::BoxFuture<'f, Result<crate::analysis::AnalysisData, String>> where 'own: 'f {
async move {
let user_folder = std::path::Path::new(self.folder.as_path()).join(format!("{}/", user_id));
let demo_file_path = user_folder.join(format!("{}.dem", demo_id));
let file = std::fs::File::open(demo_file_path.as_path()).unwrap();
let mmap = unsafe { memmap2::MmapOptions::new().map(&file).unwrap() };
Ok(crate::analysis::AnalysisData::MemMapped(std::sync::Arc::new(mmap)))
}.boxed()
}
}
pub struct S3Storage {
bucket: std::sync::Arc<s3::Bucket>,
}
impl S3Storage {
pub fn new(bucket_name: &str, region: s3::region::Region, credentials: s3::creds::Credentials) -> Self {
let mut bucket = s3::bucket::Bucket::new(bucket_name, region, credentials).unwrap();
bucket.set_path_style();
Self {
bucket: bucket.into(),
}
}
}
impl DemoStorage for S3Storage {
fn duplicate(&self) -> Box<dyn DemoStorage> {
Box::new(Self {
bucket: self.bucket.clone(),
})
}
fn upload<'f, 's, 'own>(
&'own self,
user_id: String,
demo_id: String,
stream: futures_util::stream::BoxStream<'s, axum::body::Bytes>,
) -> futures::future::BoxFuture<'f, Result<(), String>>
where
's: 'f, 'own: 'f {
async move {
let path = std::path::PathBuf::new().join(user_id).join(demo_id);
let path = path.to_str().unwrap();
// Convert the stream into an `AsyncRead`.
let body_with_io_error = stream.map(|b| Ok::<_, std::io::Error>(b));
let body_reader = tokio_util::io::StreamReader::new(body_with_io_error);
futures::pin_mut!(body_reader);
self.bucket.list(String::new(), None).await.unwrap();
self.bucket.put_object_stream(&mut body_reader, path).await.unwrap();
Ok(())
}.boxed()
}
fn load<'f, 'own>(&'own self, user_id: String, demo_id: String) -> futures::future::BoxFuture<'f, Result<crate::analysis::AnalysisData, String>> where 'own: 'f {
async move {
let path = std::path::PathBuf::new().join(user_id).join(demo_id);
let path = path.to_str().unwrap();
let resp = self.bucket.get_object(path).await.unwrap();
Ok(crate::analysis::AnalysisData::Preloaded(resp.to_vec().into()))
}.boxed()
}
}