Verified Commit 0887789f authored by Jordan Petridis's avatar Jordan Petridis 🌱

Pipeline: Complete the move to Tokio Runtime

parent 3c4574f2
Pipeline #33061 failed with stages
in 48 minutes and 7 seconds
......@@ -921,7 +921,7 @@ dependencies = [
"log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)",
"net2 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)",
"time 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-executor 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-io 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-reactor 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
......@@ -1524,9 +1524,7 @@ dependencies = [
"rfc822_sanitizer 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"rss 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-executor 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-threadpool 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)",
"url 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
"xdg 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"xml-rs 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
......@@ -1783,7 +1781,7 @@ dependencies = [
"serde 1.0.78 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.31 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_urlencoded 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-io 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
"url 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
"uuid 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
......@@ -1849,11 +1847,6 @@ dependencies = [
"antidote 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "scoped-tls"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "scopeguard"
version = "0.3.3"
......@@ -2127,13 +2120,14 @@ dependencies = [
[[package]]
name = "tokio"
version = "0.1.8"
version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"bytes 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.24 (registry+https://github.com/rust-lang/crates.io-index)",
"mio 0.6.16 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-codec 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-current-thread 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-current-thread 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-executor 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-fs 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-io 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
......@@ -2155,27 +2149,9 @@ dependencies = [
"tokio-io 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "tokio-core"
version = "0.1.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"bytes 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.24 (registry+https://github.com/rust-lang/crates.io-index)",
"iovec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)",
"mio 0.6.16 (registry+https://github.com/rust-lang/crates.io-index)",
"scoped-tls 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-executor 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-io 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-reactor 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-timer 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "tokio-current-thread"
version = "0.1.1"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"futures 0.1.24 (registry+https://github.com/rust-lang/crates.io-index)",
......@@ -2655,7 +2631,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum safemem 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e27a8b19b835f7aea908818e871f5cc3a5a186550c30773be987e155e8163d8f"
"checksum schannel 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)" = "dc1fabf2a7b6483a141426e1afd09ad543520a77ac49bd03c286e7696ccfd77f"
"checksum scheduled-thread-pool 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1a2ff3fc5223829be817806c6441279c676e454cc7da608faf03b0ccc09d3889"
"checksum scoped-tls 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "332ffa32bf586782a3efaeb58f127980944bbc8c4d6913a86107ac2a5ab24b28"
"checksum scopeguard 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "94258f53601af11e6a49f722422f6e3425c52b06245a5cf9bc09908b174f5e27"
"checksum security-framework 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "697d3f3c23a618272ead9e1fb259c1411102b31c6af8b93f1d64cca9c3b0e8e0"
"checksum security-framework-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "ab01dfbe5756785b5b4d46e0289e5a18071dfa9a7c2b24213ea00b9ef9b665bf"
......@@ -2687,10 +2662,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum termion 1.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "689a3bdfaab439fd92bc87df5c4c78417d3cbe537487274e9b0b2dce76e92096"
"checksum thread_local 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "c6b53e329000edc2b34dbe8545fd20e55a333362d0a321909685a19bd28c3f1b"
"checksum time 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)" = "d825be0eb33fda1a7e68012d51e9c7f451dc1a69391e7fdc197060bb8c56667b"
"checksum tokio 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "fbb6a6e9db2702097bfdfddcb09841211ad423b86c75b5ddaca1d62842ac492c"
"checksum tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)" = "6e93c78d23cc61aa245a8acd2c4a79c4d7fa7fb5c3ca90d5737029f043a84895"
"checksum tokio-codec 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "881e9645b81c2ce95fcb799ded2c29ffb9f25ef5bef909089a420e5961dd8ccb"
"checksum tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)" = "aeeffbbb94209023feaef3c196a41cbcdafa06b4a6f893f68779bb5e53796f71"
"checksum tokio-current-thread 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8fdfb899688ac16f618076bd09215edbfda0fd5dfecb375b6942636cb31fa8a7"
"checksum tokio-current-thread 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "f90fcd90952f0a496d438a976afba8e5c205fb12123f813d8ab3aa1c8436638c"
"checksum tokio-executor 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "c117b6cf86bb730aab4834f10df96e4dd586eff2c3c27d3781348da49e255bde"
"checksum tokio-fs 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "b5cbe4ca6e71cb0b62a66e4e6f53a8c06a6eefe46cc5f665ad6f274c9906f135"
"checksum tokio-io 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "8d6cc2de7725863c86ac71b0b9068476fec50834f055a243558ef1655bbd34cb"
......
......@@ -19,9 +19,7 @@ xml-rs = "0.8.0"
futures = "0.1.23"
hyper = "0.12.11"
http = "0.1.13"
tokio-core = "0.1.17"
tokio-threadpool = "0.1.7"
tokio-executor = "0.1.5"
tokio = "0.1.11"
hyper-tls = "0.3.0"
native-tls = "0.2.1"
num_cpus = "1.8.0"
......
......@@ -5,7 +5,6 @@ use http;
use hyper;
use native_tls;
use rss;
use tokio_executor;
use url;
use xml;
......@@ -50,8 +49,6 @@ pub enum DataError {
HyperError(#[cause] hyper::Error),
#[fail(display = "ToStr Error: {}", _0)]
HttpToStr(#[cause] http::header::ToStrError),
#[fail(display = "Tokio Spawn Error: {}", _0)]
SpawnError(#[cause] tokio_executor::SpawnError),
#[fail(display = "Failed to parse a url: {}", _0)]
UrlError(#[cause] url::ParseError),
#[fail(display = "TLS Error: {}", _0)]
......@@ -115,12 +112,6 @@ impl From<http::header::ToStrError> for DataError {
}
}
impl From<tokio_executor::SpawnError> for DataError {
fn from(err: tokio_executor::SpawnError) -> Self {
DataError::SpawnError(err)
}
}
impl From<url::ParseError> for DataError {
fn from(err: url::ParseError) -> Self {
DataError::UrlError(err)
......
......@@ -122,7 +122,7 @@ fn batch_insert_episodes(episodes: &[NewEpisode]) {
mod tests {
use failure::Error;
use rss::Channel;
use tokio_core::reactor::Core;
use tokio::{self, prelude::*};
use database::truncate_db;
use dbqueries;
......@@ -176,10 +176,9 @@ mod tests {
get_feed(path, s.id())
}).collect();
let mut core = Core::new()?;
// Index the channes
let list: Vec<_> = feeds.into_iter().map(|x| x.index()).collect();
let _foo = core.run(join_all(list));
let stream_ = stream::iter_ok(feeds).for_each(|x| x.index());
tokio::run(stream_.map_err(|_| ()));
// Assert the index rows equal the controlled results
assert_eq!(dbqueries::get_sources()?.len(), 5);
......
......@@ -84,9 +84,7 @@ extern crate num_cpus;
extern crate rayon;
extern crate rfc822_sanitizer;
extern crate rss;
extern crate tokio_core;
extern crate tokio_executor;
extern crate tokio_threadpool;
extern crate tokio;
extern crate url;
extern crate xdg;
extern crate xml;
......
......@@ -286,8 +286,8 @@ fn response_to_channel(res: Response<Body>) -> impl Future<Item = Channel, Error
mod tests {
use super::*;
use failure::Error;
use tokio_core::reactor::Core;
use num_cpus;
use tokio;
use database::truncate_db;
use utils::get_feed;
......@@ -296,7 +296,7 @@ mod tests {
fn test_into_feed() -> Result<(), Error> {
truncate_db()?;
let mut core = Core::new()?;
let mut rt = tokio::runtime::Runtime::new()?;
let https = HttpsConnector::new(num_cpus::get())?;
let client = Client::builder().build::<_, Body>(https);
......@@ -304,9 +304,8 @@ mod tests {
com/InterceptedWithJeremyScahill";
let source = Source::from_url(url)?;
let id = source.id();
let feed = source.into_feed(client);
let feed = core.run(feed)?;
let feed = rt.block_on(feed)?;
let expected = get_feed("tests/feeds/2018-01-20-Intercepted.xml", id);
assert_eq!(expected, feed);
......
// FIXME:
//! Docs.
use futures::{lazy, prelude::*, stream::iter_ok};
use tokio_core::reactor::Core;
use tokio_threadpool::{self, ThreadPool};
use futures::{lazy, prelude::*, future::ok, stream::FuturesOrdered};
use tokio;
use hyper::client::HttpConnector;
use hyper::{Client, Body};
......@@ -14,6 +13,8 @@ use num_cpus;
use errors::DataError;
use Source;
use std::iter::FromIterator;
type HttpsClient = Client<HttpsConnector<HttpConnector>>;
/// The pipline to be run for indexing and updating a Podcast feed that originates from
......@@ -25,7 +26,6 @@ type HttpsClient = Client<HttpsConnector<HttpConnector>>;
pub fn pipeline<'a, S>(
sources: S,
client: HttpsClient,
pool: tokio_threadpool::Sender,
) -> impl Future<Item = (), Error = ()> + 'a
where
S: Stream<Item = Source, Error = DataError> + 'a,
......@@ -41,7 +41,7 @@ where
})
.for_each(move |feed| {
let fut = lazy(|| feed.index().map_err(|err| error!("Error: {}", err)));
pool.spawn(fut).map_err(|_| ())
tokio::spawn(fut)
})
}
......@@ -51,17 +51,14 @@ pub fn run<S>(sources: S) -> Result<(), DataError>
where
S: IntoIterator<Item = Source>,
{
let pool = ThreadPool::new();
let sender = pool.sender().clone();
let mut core = Core::new()?;
let https = HttpsConnector::new(num_cpus::get())?;
let client = Client::builder().build::<_, Body>(https);
let stream = iter_ok::<_, DataError>(sources);
let p = pipeline(stream, client, sender);
let _ = core.run(p);
let foo = sources.into_iter().map(ok::<_, _>);
let stream = FuturesOrdered::from_iter(foo);
let p = pipeline(stream, client);
tokio::run(p);
pool.shutdown_on_idle().wait().unwrap();
Ok(())
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment