Switch memcache implementation.

Use pooled r2d2-memcache instead of the old memcached-rust.
This commit is contained in:
Rasmus Kaj 2019-05-03 01:00:02 +02:00
parent a305d461f0
commit f59f018d33
5 changed files with 37 additions and 39 deletions

View File

@ -23,9 +23,9 @@ kamadak-exif = "~0.3.0"
diesel = { version = "1.4.0", features = ["r2d2", "chrono", "postgres"] } diesel = { version = "1.4.0", features = ["r2d2", "chrono", "postgres"] }
dotenv = "0.13.0" dotenv = "0.13.0"
djangohashers = "*" djangohashers = "*"
r2d2-memcache = "0.3.0"
rand = "0.6.5" rand = "0.6.5"
rust-crypto = "0.2.36" rust-crypto = "0.2.36"
memcached-rs = "0.4.1"
flate2 = "^1.0.0" flate2 = "^1.0.0"
brotli2 = "*" brotli2 = "*"
mime = "0.3.0" mime = "0.3.0"

View File

@ -6,8 +6,7 @@ use crate::server::SizeTag;
use diesel::pg::PgConnection; use diesel::pg::PgConnection;
use diesel::prelude::*; use diesel::prelude::*;
use log::{debug, info}; use log::{debug, info};
use memcached::proto::{Operation, ProtoType}; use r2d2_memcache::memcache::Client;
use memcached::Client;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
/// Make sure all photos are stored in the cache. /// Make sure all photos are stored in the cache.
@ -23,8 +22,7 @@ pub fn precache(
) -> Result<(), Error> { ) -> Result<(), Error> {
let max_time = Duration::from_secs(max_secs); let max_time = Duration::from_secs(max_secs);
let timer = Instant::now(); let timer = Instant::now();
let mut cache = let mut cache = Client::connect("memcache://127.0.0.1:11211")?;
Client::connect(&[("tcp://127.0.0.1:11211", 1)], ProtoType::Binary)?;
let size = SizeTag::Small; let size = SizeTag::Small;
let (mut n, mut n_stored) = (0, 0); let (mut n, mut n_stored) = (0, 0);
let photos = Photo::query(true) let photos = Photo::query(true)
@ -34,7 +32,7 @@ pub fn precache(
for photo in photos { for photo in photos {
n += 1; n += 1;
let key = &photo.cache_key(size); let key = &photo.cache_key(size);
if cache.get(key.as_bytes()).is_err() { if cache.get::<Vec<u8>>(key)?.is_none() {
let size = size.px(); let size = size.px();
let data = pd.scale_image(&photo, size, size).map_err(|e| { let data = pd.scale_image(&photo, size, size).map_err(|e| {
Error::Other(format!( Error::Other(format!(
@ -42,7 +40,7 @@ pub fn precache(
photo.id, photo.path, e, photo.id, photo.path, e,
)) ))
})?; })?;
cache.set(key.as_bytes(), &data, 0, no_expire)?; cache.set(key, &data[..], no_expire)?;
debug!("Cache: stored {} for {}", key, photo.path); debug!("Cache: stored {} for {}", key, photo.path);
n_stored += 1; n_stored += 1;
if timer.elapsed() > max_time { if timer.elapsed() > max_time {

View File

@ -3,7 +3,7 @@ use chrono::ParseError as ChronoParseError;
use diesel::prelude::ConnectionError; use diesel::prelude::ConnectionError;
use diesel::result::Error as DieselError; use diesel::result::Error as DieselError;
use exif; use exif;
use memcached::proto::Error as MemcachedError; use r2d2_memcache::memcache::MemcacheError;
use std::convert::From; use std::convert::From;
use std::num::ParseIntError; use std::num::ParseIntError;
use std::str::Utf8Error; use std::str::Utf8Error;
@ -17,7 +17,7 @@ pub enum Error {
UnknownOrientation(u32), UnknownOrientation(u32),
BadTimeFormat(ChronoParseError), BadTimeFormat(ChronoParseError),
BadIntFormat(ParseIntError), BadIntFormat(ParseIntError),
Cache(MemcachedError), Cache(MemcacheError),
MissingWidth, MissingWidth,
MissingHeight, MissingHeight,
PlacesFailed(fetch_places::Error), PlacesFailed(fetch_places::Error),
@ -46,8 +46,8 @@ impl fmt::Display for Error {
} }
} }
impl From<MemcachedError> for Error { impl From<MemcacheError> for Error {
fn from(e: MemcachedError) -> Self { fn from(e: MemcacheError) -> Self {
Error::Cache(e) Error::Cache(e)
} }
} }

View File

@ -5,11 +5,9 @@ use diesel::pg::PgConnection;
use diesel::r2d2::{ConnectionManager, Pool, PooledConnection}; use diesel::r2d2::{ConnectionManager, Pool, PooledConnection};
use jwt::{Claims, Header, Registered, Token}; use jwt::{Claims, Header, Registered, Token};
use log::{debug, error, warn}; use log::{debug, error, warn};
use memcached::proto::{Error as MprotError, Operation, ProtoType}; use r2d2_memcache::r2d2::Error;
use memcached::Client; use r2d2_memcache::MemcacheConnectionManager;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::error::Error;
use std::io;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use warp::filters::{cookie, BoxedFilter}; use warp::filters::{cookie, BoxedFilter};
@ -17,16 +15,18 @@ use warp::path::{self, FullPath};
use warp::reject::custom; use warp::reject::custom;
use warp::{self, Filter}; use warp::{self, Filter};
type PooledPg = PooledConnection<ConnectionManager<PgConnection>>;
type PgPool = Pool<ConnectionManager<PgConnection>>; type PgPool = Pool<ConnectionManager<PgConnection>>;
type PooledPg = PooledConnection<ConnectionManager<PgConnection>>;
type MemcachePool = Pool<MemcacheConnectionManager>;
type PooledMemcache = PooledConnection<MemcacheConnectionManager>;
pub fn create_session_filter( pub fn create_session_filter(
db_url: &str, db_url: &str,
memcached_server: String, memcache_server: &str,
jwt_secret: String, jwt_secret: String,
) -> BoxedFilter<(Context,)> { ) -> BoxedFilter<(Context,)> {
let global = let global =
Arc::new(GlobalContext::new(db_url, memcached_server, jwt_secret)); Arc::new(GlobalContext::new(db_url, memcache_server, jwt_secret));
warp::any() warp::any()
.and(path::full()) .and(path::full())
.and(cookie::optional("EXAUTH")) .and(cookie::optional("EXAUTH"))
@ -48,22 +48,24 @@ pub fn create_session_filter(
struct GlobalContext { struct GlobalContext {
db_pool: PgPool, db_pool: PgPool,
photosdir: PhotosDir, photosdir: PhotosDir,
memcached_server: String, // TODO: Use a connection pool! memcache_pool: MemcachePool,
jwt_secret: String, jwt_secret: String,
} }
impl GlobalContext { impl GlobalContext {
fn new( fn new(db_url: &str, memcache_server: &str, jwt_secret: String) -> Self {
db_url: &str,
memcached_server: String,
jwt_secret: String,
) -> Self {
let db_manager = ConnectionManager::<PgConnection>::new(db_url); let db_manager = ConnectionManager::<PgConnection>::new(db_url);
let mc_manager = MemcacheConnectionManager::new(memcache_server);
GlobalContext { GlobalContext {
db_pool: Pool::new(db_manager) db_pool: Pool::builder()
.expect("Postgres connection pool could not be created"), .connection_timeout(Duration::from_secs(1))
.build(db_manager)
.expect("Posgresql pool"),
photosdir: PhotosDir::new(photos_dir()), photosdir: PhotosDir::new(photos_dir()),
memcached_server, memcache_pool: Pool::builder()
.connection_timeout(Duration::from_secs(1))
.build(mc_manager)
.expect("Memcache pool"),
jwt_secret, jwt_secret,
} }
} }
@ -101,8 +103,8 @@ impl GlobalContext {
Err(format!("Invalid token {:?}", token)) Err(format!("Invalid token {:?}", token))
} }
} }
fn cache(&self) -> Result<Client, io::Error> { fn cache(&self) -> Result<PooledMemcache, Error> {
Client::connect(&[(&self.memcached_server, 1)], ProtoType::Binary) Ok(self.memcache_pool.get()?)
} }
} }
@ -153,14 +155,12 @@ impl Context {
{ {
match self.global.cache() { match self.global.cache() {
Ok(mut client) => { Ok(mut client) => {
match client.get(key.as_bytes()) { match client.get(key) {
Ok((data, _flags)) => { Ok(Some(data)) => {
debug!("Cache: {} found", key); debug!("Cache: {} found", key);
return Ok(data); return Ok(data);
} }
Err(MprotError::BinaryProtoError(ref err)) Ok(None) => {
if err.description() == "key not found" =>
{
debug!("Cache: {} not found", key); debug!("Cache: {} not found", key);
} }
Err(err) => { Err(err) => {
@ -168,22 +168,22 @@ impl Context {
} }
} }
let data = calculate()?; let data = calculate()?;
match client.set(key.as_bytes(), &data, 0, 7 * 24 * 60 * 60) { match client.set(key, &data[..], 7 * 24 * 60 * 60) {
Ok(()) => debug!("Cache: stored {}", key), Ok(()) => debug!("Cache: stored {}", key),
Err(err) => warn!("Cache: Error storing {}: {}", key, err), Err(err) => warn!("Cache: Error storing {}: {}", key, err),
} }
Ok(data) Ok(data)
} }
Err(err) => { Err(err) => {
warn!("Error connecting to memcached: {}", err); warn!("Error connecting to memcache: {}", err);
calculate() calculate()
} }
} }
} }
pub fn clear_cache(&self, key: &str) { pub fn clear_cache(&self, key: &str) {
if let Ok(mut client) = self.global.cache() { if let Ok(mut client) = self.global.cache() {
match client.delete(key.as_bytes()) { match client.delete(key) {
Ok(()) => debug!("Cache: deleted {}", key), Ok(flag) => debug!("Cache: deleted {}: {:?}", key, flag),
Err(e) => warn!("Cache: Failed to delete {}: {}", key, e), Err(e) => warn!("Cache: Failed to delete {}: {}", key, e),
} }
} }

View File

@ -153,7 +153,7 @@ pub fn run(args: &ArgMatches) -> Result<(), Error> {
} }
let session_filter = create_session_filter( let session_filter = create_session_filter(
&dburl(), &dburl(),
env_or("MEMCACHED_SERVER", "tcp://127.0.0.1:11211"), &env_or("MEMCACHED_SERVER", "memcache://127.0.0.1:11211"),
jwt_key(), jwt_key(),
); );
let s = move || session_filter.clone(); let s = move || session_filter.clone();