Verified Commit 0b8a0695 authored by Jordan Petridis's avatar Jordan Petridis 🌱

Pipeline: Use tokio threadpool to index feeds

parent a7c95d57
......@@ -1505,6 +1505,8 @@ dependencies = [
"rss 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-executor 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-threadpool 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
"url 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
"xdg 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"xml-rs 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
......
......@@ -19,6 +19,8 @@ xml-rs = "0.8.0"
futures = "0.1.23"
hyper = "0.11.27"
tokio-core = "0.1.17"
tokio-threadpool = "0.1.6"
tokio-executor = "0.1"
hyper-tls = "0.1.3"
native-tls = "0.1.5"
num_cpus = "1.8.0"
......
......@@ -4,6 +4,7 @@ use diesel_migrations::RunMigrationsError;
use hyper;
use native_tls;
use rss;
use tokio_executor;
use url;
use xml;
......@@ -46,6 +47,8 @@ pub enum DataError {
R2D2PoolError(#[cause] r2d2::PoolError),
#[fail(display = "Hyper Error: {}", _0)]
HyperError(#[cause] hyper::Error),
#[fail(display = "Tokio Spawn Error: {}", _0)]
SpawnError(#[cause] tokio_executor::SpawnError),
#[fail(display = "Failed to parse a url: {}", _0)]
UrlError(#[cause] url::ParseError),
#[fail(display = "TLS Error: {}", _0)]
......@@ -101,6 +104,12 @@ impl From<hyper::Error> for DataError {
}
}
impl From<tokio_executor::SpawnError> for DataError {
fn from(err: tokio_executor::SpawnError) -> Self {
DataError::SpawnError(err)
}
}
impl From<url::ParseError> for DataError {
fn from(err: url::ParseError) -> Self {
DataError::UrlError(err)
......
......@@ -83,6 +83,8 @@ extern crate rayon;
extern crate rfc822_sanitizer;
extern crate rss;
extern crate tokio_core;
extern crate tokio_executor;
extern crate tokio_threadpool;
extern crate url;
extern crate xdg;
extern crate xml;
......
......@@ -9,6 +9,7 @@ use hyper::client::HttpConnector;
use hyper::Client;
use hyper_tls::HttpsConnector;
use tokio_core::reactor::Core;
use tokio_threadpool::{self, ThreadPool};
use num_cpus;
......@@ -47,13 +48,18 @@ type HttpsClient = Client<HttpsConnector<HttpConnector>>;
pub fn pipeline<'a, S>(
sources: S,
client: &HttpsClient,
pool: &tokio_threadpool::Sender,
) -> impl Future<Item = Vec<()>, Error = DataError> + 'a
where
S: Stream<Item = Source, Error = DataError> + 'a,
{
sources
.and_then(clone!(client => move |s| s.into_feed(client.clone())))
.and_then(|feed| feed.index())
.and_then(clone!(pool => move |feed| {
pool.spawn(lazy(|| {
feed.index().map_err(|err| error!("Error: {}", err))
})).map_err(From::from)
}))
// the stream will stop at the first error so
// we ensure that everything will succeded regardless.
.map_err(|err| error!("Error: {}", err))
......@@ -67,6 +73,7 @@ pub fn run<S>(sources: S) -> Result<(), DataError>
where
S: IntoIterator<Item = Source>,
{
let pool = ThreadPool::new();
let mut core = Core::new()?;
let handle = core.handle();
let client = Client::configure()
......@@ -74,8 +81,14 @@ where
.build(&handle);
let stream = iter_ok::<_, DataError>(sources);
let p = pipeline(stream, &client);
core.run(p).map(|_| ())
let p = {
let sender = pool.sender();
pipeline(stream, &client, &sender)
};
core.run(p)?;
pool.shutdown_on_idle().wait().unwrap();
Ok(())
}
#[cfg(test)]
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment