Verified Commit 09948845 authored by Jordan Petridis's avatar Jordan Petridis 🌱

Pipeline: Remove use of clone! macro

parent 0b8a0695
...@@ -16,27 +16,6 @@ use num_cpus; ...@@ -16,27 +16,6 @@ use num_cpus;
use errors::DataError; use errors::DataError;
use Source; use Source;
// use std::sync::{Arc, Mutex};
// http://gtk-rs.org/tuto/closures
#[macro_export]
macro_rules! clone {
(@param _) => ( _ );
(@param $x:ident) => ( $x );
($($n:ident),+ => move || $body:expr) => (
{
$( let $n = $n.clone(); )+
move || $body
}
);
($($n:ident),+ => move |$($p:tt),+| $body:expr) => (
{
$( let $n = $n.clone(); )+
move |$(clone!(@param $p),)+| $body
}
);
}
type HttpsClient = Client<HttpsConnector<HttpConnector>>; type HttpsClient = Client<HttpsConnector<HttpConnector>>;
/// The pipline to be run for indexing and updating a Podcast feed that originates from /// The pipline to be run for indexing and updating a Podcast feed that originates from
...@@ -47,19 +26,19 @@ type HttpsClient = Client<HttpsConnector<HttpConnector>>; ...@@ -47,19 +26,19 @@ type HttpsClient = Client<HttpsConnector<HttpConnector>>;
/// Convert `rss::Channel` into `Feed` -> Index Podcast -> Index Episodes. /// Convert `rss::Channel` into `Feed` -> Index Podcast -> Index Episodes.
pub fn pipeline<'a, S>( pub fn pipeline<'a, S>(
sources: S, sources: S,
client: &HttpsClient, client: HttpsClient,
pool: &tokio_threadpool::Sender, pool: tokio_threadpool::Sender,
) -> impl Future<Item = Vec<()>, Error = DataError> + 'a ) -> impl Future<Item = Vec<()>, Error = DataError> + 'a
where where
S: Stream<Item = Source, Error = DataError> + 'a, S: Stream<Item = Source, Error = DataError> + 'a,
{ {
sources sources
.and_then(clone!(client => move |s| s.into_feed(client.clone()))) .and_then(move |s| s.into_feed(client.clone()))
.and_then(clone!(pool => move |feed| { .and_then(move |feed| {
pool.spawn(lazy(|| { pool.spawn(lazy(|| {
feed.index().map_err(|err| error!("Error: {}", err)) feed.index().map_err(|err| error!("Error: {}", err))
})).map_err(From::from) })).map_err(From::from)
})) })
// the stream will stop at the first error so // the stream will stop at the first error so
// we ensure that everything will succeded regardless. // we ensure that everything will succeded regardless.
.map_err(|err| error!("Error: {}", err)) .map_err(|err| error!("Error: {}", err))
...@@ -74,6 +53,7 @@ where ...@@ -74,6 +53,7 @@ where
S: IntoIterator<Item = Source>, S: IntoIterator<Item = Source>,
{ {
let pool = ThreadPool::new(); let pool = ThreadPool::new();
let sender = pool.sender().clone();
let mut core = Core::new()?; let mut core = Core::new()?;
let handle = core.handle(); let handle = core.handle();
let client = Client::configure() let client = Client::configure()
...@@ -81,10 +61,7 @@ where ...@@ -81,10 +61,7 @@ where
.build(&handle); .build(&handle);
let stream = iter_ok::<_, DataError>(sources); let stream = iter_ok::<_, DataError>(sources);
let p = { let p = pipeline(stream, client, sender);
let sender = pool.sender();
pipeline(stream, &client, &sender)
};
core.run(p)?; core.run(p)?;
pool.shutdown_on_idle().wait().unwrap(); pool.shutdown_on_idle().wait().unwrap();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment