Improve async handling.

Avoid blockig the entire server on image scaling, by using tokio
spawn_blocking.
This commit is contained in:
Rasmus Kaj 2020-06-30 20:51:40 +02:00
parent 91a1698890
commit cca8c61ac2
6 changed files with 113 additions and 67 deletions

View File

@ -1,6 +1,6 @@
use super::result::Error;
use crate::models::{Photo, SizeTag};
use crate::photosdir::PhotosDir;
use crate::photosdir::{get_scaled_jpeg, PhotosDir};
use crate::schema::photos::dsl::{date, is_public};
use crate::{CacheOpt, DbOpt, DirOpt};
use diesel::prelude::*;
@ -31,7 +31,7 @@ impl Args {
/// overwhelm the host while precaching.
/// The images are handled in public first, new first order, to have
/// the probably most requested images precached as soon as possible.
pub fn run(&self) -> Result<(), Error> {
pub async fn run(&self) -> Result<(), Error> {
let max_time = Duration::from_secs(self.max_time);
let timer = Instant::now();
let mut cache = Client::connect(self.cache.memcached_url.as_ref())?;
@ -46,14 +46,16 @@ impl Args {
n += 1;
let key = &photo.cache_key(size);
if cache.get::<Vec<u8>>(key)?.is_none() {
let path = pd.get_raw_path(&photo);
let size = size.px();
let data =
pd.scale_image(&photo, size, size).map_err(|e| {
Error::Other(format!(
"Failed to scale #{} ({}): {}",
photo.id, photo.path, e,
))
})?;
let data = get_scaled_jpeg(path, photo.rotation, size)
.await
.map_err(|e| {
Error::Other(format!(
"Failed to scale #{} ({}): {:?}",
photo.id, photo.path, e,
))
})?;
cache.set(key, &data[..], no_expire)?;
debug!("Cache: stored {} for {}", key, photo.path);
n_stored += 1;

View File

@ -105,7 +105,7 @@ async fn run(args: &RPhotos) -> Result<(), Error> {
RPhotos::Userlist { db } => users::list(&db.connect()?),
RPhotos::Userpass { db, user } => users::passwd(&db.connect()?, user),
RPhotos::Fetchplaces(cmd) => cmd.run().await,
RPhotos::Precache(cmd) => cmd.run(),
RPhotos::Precache(cmd) => cmd.run().await,
RPhotos::Storestatics { dir } => storestatics::to_dir(dir),
RPhotos::Runserver(ra) => server::run(ra).await,
}

View File

@ -6,6 +6,7 @@ use log::{debug, info, warn};
use std::ffi::OsStr;
use std::path::{Path, PathBuf};
use std::{fs, io};
use tokio::task::{spawn_blocking, JoinError};
pub struct PhotosDir {
basedir: PathBuf,
@ -18,49 +19,14 @@ impl PhotosDir {
}
}
#[allow(dead_code)]
pub fn scale_image(
&self,
photo: &Photo,
width: u32,
height: u32,
) -> Result<Vec<u8>, ImageError> {
let path = self.basedir.join(&photo.path);
info!("Should open {:?}", path);
let img = image::open(path)?;
let img = if 3 * width <= img.width() || 3 * height <= img.height() {
img.thumbnail(width, height)
} else if width < img.width() || height < img.height() {
img.resize(width, height, FilterType::CatmullRom)
} else {
img
};
let img = match photo.rotation {
_x @ 0..=44 | _x @ 315..=360 => img,
_x @ 45..=134 => img.rotate90(),
_x @ 135..=224 => img.rotate180(),
_x @ 225..=314 => img.rotate270(),
x => {
warn!("Should rotate photo {} deg, which is unsupported", x);
img
}
};
let mut buf = Vec::new();
img.write_to(&mut buf, ImageFormat::Jpeg)?;
Ok(buf)
}
#[allow(dead_code)]
pub fn get_raw_path(&self, photo: &Photo) -> PathBuf {
self.basedir.join(&photo.path)
}
#[allow(dead_code)]
pub fn has_file<S: AsRef<OsStr> + ?Sized>(&self, path: &S) -> bool {
self.basedir.join(Path::new(path)).is_file()
}
#[allow(dead_code)]
pub fn find_files(
&self,
dir: &Path,
@ -119,3 +85,73 @@ fn actual_image_size(path: &Path) -> Result<(u32, u32), ImageError> {
let image = image::open(&path)?;
Ok((image.width(), image.height()))
}
#[derive(Debug)]
pub enum ImageLoadFailed {
File(io::Error),
Image(image::ImageError),
Join(JoinError),
}
impl std::error::Error for ImageLoadFailed {}
impl std::fmt::Display for ImageLoadFailed {
fn fmt(&self, out: &mut std::fmt::Formatter) -> std::fmt::Result {
match &self {
ImageLoadFailed::File(e) => e.fmt(out),
ImageLoadFailed::Image(e) => e.fmt(out),
ImageLoadFailed::Join(e) => e.fmt(out),
}
}
}
impl From<io::Error> for ImageLoadFailed {
fn from(e: io::Error) -> ImageLoadFailed {
ImageLoadFailed::File(e)
}
}
impl From<image::ImageError> for ImageLoadFailed {
fn from(e: image::ImageError) -> ImageLoadFailed {
ImageLoadFailed::Image(e)
}
}
impl From<JoinError> for ImageLoadFailed {
fn from(e: JoinError) -> ImageLoadFailed {
ImageLoadFailed::Join(e)
}
}
pub async fn get_scaled_jpeg(
path: PathBuf,
rotation: i16,
size: u32,
) -> Result<Vec<u8>, ImageLoadFailed> {
spawn_blocking(move || {
info!("Should open {:?}", path);
let img = image::open(path)?;
let img = if 3 * size <= img.width() || 3 * size <= img.height() {
info!("T-nail from {}x{} to {}", img.width(), img.height(), size);
img.thumbnail(size, size)
} else if size < img.width() || size < img.height() {
info!("Scaling from {}x{} to {}", img.width(), img.height(), size);
img.resize(size, size, FilterType::CatmullRom)
} else {
img
};
let img = match rotation {
_x @ 0..=44 | _x @ 315..=360 => img,
_x @ 45..=134 => img.rotate90(),
_x @ 135..=224 => img.rotate180(),
_x @ 225..=314 => img.rotate270(),
x => {
warn!("Should rotate photo {} deg, which is unsupported", x);
img
}
};
let mut buf = Vec::new();
img.write_to(&mut buf, ImageFormat::Jpeg)?;
Ok(buf)
})
.await?
}

View File

@ -7,6 +7,7 @@ use log::{debug, warn};
use medallion::{Header, Payload, Token};
use r2d2_memcache::r2d2::Error;
use r2d2_memcache::MemcacheConnectionManager;
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
use warp::filters::{cookie, header, BoxedFilter};
@ -127,13 +128,14 @@ impl Context {
pub fn path_without_query(&self) -> &str {
self.path.as_str()
}
pub fn cached_or<F, E>(
pub async fn cached_or<F, R, E>(
&self,
key: &str,
calculate: F,
) -> Result<Vec<u8>, E>
where
F: FnOnce() -> Result<Vec<u8>, E>,
F: FnOnce() -> R,
R: Future<Output = Result<Vec<u8>, E>>,
{
match self.global.cache() {
Ok(mut client) => {
@ -149,7 +151,7 @@ impl Context {
warn!("Cache: get {} failed: {:?}", key, err);
}
}
let data = calculate()?;
let data = calculate().await?;
match client.set(key, &data[..], 7 * 24 * 60 * 60) {
Ok(()) => debug!("Cache: stored {}", key),
Err(err) => warn!("Cache: Error storing {}: {}", key, err),
@ -158,7 +160,7 @@ impl Context {
}
Err(err) => {
warn!("Error connecting to memcache: {}", err);
calculate()
calculate().await
}
}
}

View File

@ -1,17 +1,21 @@
use super::BuilderExt;
use super::{error_response, not_found, Context};
use crate::models::{Photo, SizeTag};
use crate::photosdir::{get_scaled_jpeg, ImageLoadFailed};
use diesel::prelude::*;
use std::str::FromStr;
use warp::http::response::Builder;
use warp::http::{header, StatusCode};
use warp::reply::Response;
use warp::Rejection;
pub fn show_image(img: ImgName, context: Context) -> Response {
pub async fn show_image(
img: ImgName,
context: Context,
) -> Result<Response, Rejection> {
use crate::schema::photos::dsl::photos;
if let Ok(tphoto) =
photos.find(img.id).first::<Photo>(&context.db().unwrap())
{
let tphoto = photos.find(img.id).first::<Photo>(&context.db().unwrap());
if let Ok(tphoto) = tphoto {
if context.is_authorized() || tphoto.is_public() {
if img.size == SizeTag::Large {
if context.is_authorized() {
@ -24,7 +28,7 @@ pub fn show_image(img: ImgName, context: Context) -> Response {
.map(|mut f| f.read_to_end(&mut buf))
.is_ok()
{
return Builder::new()
return Ok(Builder::new()
.status(StatusCode::OK)
.header(
header::CONTENT_TYPE,
@ -32,27 +36,28 @@ pub fn show_image(img: ImgName, context: Context) -> Response {
)
.far_expires()
.body(buf.into())
.unwrap();
.unwrap());
} else {
return error_response(
return Ok(error_response(
StatusCode::INTERNAL_SERVER_ERROR,
)
.unwrap();
.unwrap());
}
}
} else {
let data = get_image_data(&context, &tphoto, img.size)
.await
.expect("Get image data");
return Builder::new()
return Ok(Builder::new()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, mime::IMAGE_JPEG.as_ref())
.far_expires()
.body(data.into())
.unwrap();
.unwrap());
}
}
}
not_found(&context)
Ok(not_found(&context))
}
/// A client-side / url file name for a file.
@ -104,13 +109,14 @@ fn parse_bad_imgname_2() {
assert_eq!("blurgel".parse::<ImgName>(), Err(BadImgName {}))
}
fn get_image_data(
async fn get_image_data(
context: &Context,
photo: &Photo,
size: SizeTag,
) -> Result<Vec<u8>, image::ImageError> {
context.cached_or(&photo.cache_key(size), || {
let size = size.px();
context.photos().scale_image(photo, size, size)
})
) -> Result<Vec<u8>, ImageLoadFailed> {
let p = context.photos().get_raw_path(photo);
let r = photo.rotation;
context
.cached_or(&photo.cache_key(size), || get_scaled_jpeg(p, r, size.px()))
.await
}

View File

@ -88,7 +88,7 @@ pub async fn run(args: &Args) -> Result<(), Error> {
.or(path("logout").and(end()).and(s()).map(login::logout))
.or(get().and(end()).and(s()).map(all_years))
.or(get().and(path("img")).and(param()).and(end()).and(s()).map(photo_details))
.or(get().and(path("img")).and(param()).and(end()).and(s()).map(image::show_image))
.or(get().and(path("img")).and(param()).and(end()).and(s()).and_then(image::show_image))
.or(get().and(path("0")).and(end()).and(s()).map(all_null_date))
.or(get().and(param()).and(end()).and(s()).map(months_in_year))
.or(get().and(param()).and(param()).and(end()).and(s()).map(days_in_month))