Verified Commit 14a90e71 authored by Jordan Petridis's avatar Jordan Petridis 🌱

Remove Futures_Cpupool.

The performance boost is not good enough to justify the
code complexity it add and the memory overhead of
yeat another threadpool.

We will start refactoring the whole pipeline implemantation
and might transition to either rayon-futures or tokio-runtime.
parent dd2366a1
Pipeline #8231 failed with stages
in 41 minutes and 48 seconds
......@@ -656,7 +656,6 @@ dependencies = [
"failure 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"failure_derive 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
"hyper 0.11.24 (registry+https://github.com/rust-lang/crates.io-index)",
"hyper-tls 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
"itertools 0.7.8 (registry+https://github.com/rust-lang/crates.io-index)",
......
......@@ -21,7 +21,6 @@ hyper = "0.11.24"
tokio-core = "0.1.16"
hyper-tls = "0.1.3"
native-tls = "0.1.5"
futures-cpupool = "0.1.8"
num_cpus = "1.8.0"
failure = "0.1.1"
failure_derive = "0.1.1"
......
......@@ -42,7 +42,6 @@ extern crate log;
extern crate ammonia;
extern crate chrono;
extern crate futures;
extern crate futures_cpupool;
extern crate hyper;
extern crate hyper_tls;
extern crate itertools;
......
......@@ -11,7 +11,6 @@ use hyper_tls::HttpsConnector;
// use futures::future::ok;
use futures::prelude::*;
use futures_cpupool::CpuPool;
use database::connection;
use errors::DataError;
......@@ -179,12 +178,11 @@ impl Source {
pub fn into_feed(
self,
client: &Client<HttpsConnector<HttpConnector>>,
pool: CpuPool,
ignore_etags: bool,
) -> Box<Future<Item = Feed, Error = DataError>> {
let id = self.id();
let feed = self.request_constructor(client, ignore_etags)
.and_then(move |(_, res)| response_to_channel(res, pool))
.and_then(move |(_, res)| response_to_channel(res))
.and_then(move |chan| {
FeedBuilder::default()
.channel(chan)
......@@ -237,10 +235,7 @@ impl Source {
}
#[allow(needless_pass_by_value)]
fn response_to_channel(
res: Response,
pool: CpuPool,
) -> Box<Future<Item = Channel, Error = DataError> + Send> {
fn response_to_channel(res: Response) -> Box<Future<Item = Channel, Error = DataError> + Send> {
let chan = res.body()
.concat2()
.map(|x| x.into_iter())
......@@ -249,8 +244,7 @@ fn response_to_channel(
.map(|utf_8_bytes| String::from_utf8_lossy(&utf_8_bytes).into_owned())
.and_then(|buf| Channel::from_str(&buf).map_err(From::from));
let cpu_chan = pool.spawn(chan);
Box::new(cpu_chan)
Box::new(chan)
}
#[cfg(test)]
......
......@@ -2,7 +2,6 @@
//! Docs.
use futures::future::*;
use futures_cpupool::CpuPool;
// use futures::prelude::*;
use hyper::Client;
......@@ -20,23 +19,6 @@ use models::{IndexState, NewEpisode, NewEpisodeMinimal};
// use std::sync::{Arc, Mutex};
macro_rules! clone {
(@param _) => ( _ );
(@param $x:ident) => ( $x );
($($n:ident),+ => move || $body:expr) => (
{
$( let $n = $n.clone(); )+
move || $body
}
);
($($n:ident),+ => move |$($p:tt),+| $body:expr) => (
{
$( let $n = $n.clone(); )+
move |$(clone!(@param $p),)+| $body
}
);
}
/// The pipline to be run for indexing and updating a Podcast feed that originates from
/// `Source.uri`.
///
......@@ -47,13 +29,12 @@ pub fn pipeline<S: IntoIterator<Item = Source>>(
sources: S,
ignore_etags: bool,
tokio_core: &mut Core,
pool: &CpuPool,
client: Client<HttpsConnector<HttpConnector>>,
) -> Result<(), DataError> {
let list: Vec<_> = sources
.into_iter()
.map(clone!(pool => move |s| s.into_feed(&client, pool.clone(), ignore_etags)))
.map(|fut| fut.and_then(clone!(pool => move |feed| pool.clone().spawn(feed.index()))))
.map(move |s| s.into_feed(&client, ignore_etags))
.map(|fut| fut.and_then(|feed| feed.index()))
.map(|fut| fut.map(|_| ()).map_err(|err| error!("Error: {}", err)))
.collect();
......@@ -67,26 +48,24 @@ pub fn pipeline<S: IntoIterator<Item = Source>>(
Ok(())
}
/// Creates a tokio `reactor::Core`, a `CpuPool`, and a `hyper::Client` and
/// Creates a tokio `reactor::Core`, and a `hyper::Client` and
/// runs the pipeline.
pub fn run(sources: Vec<Source>, ignore_etags: bool) -> Result<(), DataError> {
if sources.is_empty() {
return Ok(());
}
let pool = CpuPool::new_num_cpus();
let mut core = Core::new()?;
let handle = core.handle();
let client = Client::configure()
.connector(HttpsConnector::new(num_cpus::get(), &handle)?)
.build(&handle);
pipeline(sources, ignore_etags, &mut core, &pool, client)
pipeline(sources, ignore_etags, &mut core, client)
}
/// Docs
pub fn index_single_source(s: Source, ignore_etags: bool) -> Result<(), DataError> {
let pool = CpuPool::new_num_cpus();
let mut core = Core::new()?;
let handle = core.handle();
......@@ -94,8 +73,8 @@ pub fn index_single_source(s: Source, ignore_etags: bool) -> Result<(), DataErro
.connector(HttpsConnector::new(num_cpus::get(), &handle)?)
.build(&handle);
let work = s.into_feed(&client, pool.clone(), ignore_etags)
.and_then(clone!(pool => move |feed| pool.clone().spawn(feed.index())))
let work = s.into_feed(&client, ignore_etags)
.and_then(move |feed| feed.index())
.map(|_| ());
core.run(work)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment