Verified Commit 674b3b54 authored by Jordan Petridis's avatar Jordan Petridis 🌱

Pipeline: reuse the prexisting runtime executor

Instead of creating our own threadpool, we should reuse the executor
of the tokio::runtime::Runtime that backs the tokio::reactor::Core.
parent 09948845
......@@ -1504,6 +1504,7 @@ dependencies = [
"rfc822_sanitizer 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"rss 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-executor 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-threadpool 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
......
......@@ -18,6 +18,7 @@ xdg = "2.1.0"
xml-rs = "0.8.0"
futures = "0.1.23"
hyper = "0.11.27"
tokio = "0.1.8"
tokio-core = "0.1.17"
tokio-threadpool = "0.1.6"
tokio-executor = "0.1"
......
......@@ -82,6 +82,7 @@ extern crate num_cpus;
extern crate rayon;
extern crate rfc822_sanitizer;
extern crate rss;
extern crate tokio;
extern crate tokio_core;
extern crate tokio_executor;
extern crate tokio_threadpool;
......
// FIXME:
//! Docs.
use futures::future::*;
use futures::prelude::*;
use futures::stream::*;
use futures::{future::ok, lazy, stream::iter_ok};
use hyper::client::HttpConnector;
use hyper::Client;
use hyper_tls::HttpsConnector;
use tokio::runtime::TaskExecutor;
use tokio_core::reactor::Core;
use tokio_threadpool::{self, ThreadPool};
use num_cpus;
......@@ -27,7 +26,7 @@ type HttpsClient = Client<HttpsConnector<HttpConnector>>;
pub fn pipeline<'a, S>(
sources: S,
client: HttpsClient,
pool: tokio_threadpool::Sender,
executor: TaskExecutor,
) -> impl Future<Item = Vec<()>, Error = DataError> + 'a
where
S: Stream<Item = Source, Error = DataError> + 'a,
......@@ -35,9 +34,9 @@ where
sources
.and_then(move |s| s.into_feed(client.clone()))
.and_then(move |feed| {
pool.spawn(lazy(|| {
feed.index().map_err(|err| error!("Error: {}", err))
})).map_err(From::from)
let fut = lazy(|| feed.index().map_err(|err| error!("Error: {}", err)));
executor.spawn(fut);
Ok(())
})
// the stream will stop at the first error so
// we ensure that everything will succeded regardless.
......@@ -52,20 +51,16 @@ pub fn run<S>(sources: S) -> Result<(), DataError>
where
S: IntoIterator<Item = Source>,
{
let pool = ThreadPool::new();
let sender = pool.sender().clone();
let mut core = Core::new()?;
let executor = core.runtime().executor();
let handle = core.handle();
let client = Client::configure()
.connector(HttpsConnector::new(num_cpus::get(), &handle)?)
.build(&handle);
let stream = iter_ok::<_, DataError>(sources);
let p = pipeline(stream, client, sender);
core.run(p)?;
pool.shutdown_on_idle().wait().unwrap();
Ok(())
let p = pipeline(stream, client, executor);
core.run(p).map(|_| ())
}
#[cfg(test)]
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment