pipeline.rs 3.62 KB
Newer Older
1
// FIXME:
Jordan Petridis's avatar
Jordan Petridis committed
2
//! Docs.
3

4
use futures::future::*;
5 6
use futures::prelude::*;
use futures::stream::*;
7

8
use hyper::client::HttpConnector;
Jordan Petridis's avatar
Jordan Petridis committed
9
use hyper::Client;
Jordan Petridis's avatar
Jordan Petridis committed
10
use hyper_tls::HttpsConnector;
11
use tokio_core::reactor::Core;
12
use tokio_threadpool::{self, ThreadPool};
Jordan Petridis's avatar
Jordan Petridis committed
13

14
use num_cpus;
15

16
use errors::DataError;
Jordan Petridis's avatar
Jordan Petridis committed
17
use Source;
18

19 20
type HttpsClient = Client<HttpsConnector<HttpConnector>>;

21 22 23 24
/// The pipline to be run for indexing and updating a Podcast feed that originates from
/// `Source.uri`.
///
/// Messy temp diagram:
25 26
/// Source -> GET Request -> Update Etags -> Check Status -> Parse `xml/Rss` ->
/// Convert `rss::Channel` into `Feed` -> Index Podcast -> Index Episodes.
27
pub fn pipeline<'a, S>(
28
    sources: S,
29 30
    client: HttpsClient,
    pool: tokio_threadpool::Sender,
31
) -> impl Future<Item = Vec<()>, Error = DataError> + 'a
32
where
33
    S: Stream<Item = Source, Error = DataError> + 'a,
34
{
35
    sources
36 37
        .and_then(move |s| s.into_feed(client.clone()))
        .and_then(move |feed| {
38 39 40
            pool.spawn(lazy(|| {
                feed.index().map_err(|err| error!("Error: {}", err))
            })).map_err(From::from)
41
        })
42 43
        // the stream will stop at the first error so
        // we ensure that everything will succeded regardless.
44
        .map_err(|err| error!("Error: {}", err))
45
        .then(|_| ok::<(), DataError>(()))
46
        .collect()
47 48
}

49
/// Creates a tokio `reactor::Core`, and a `hyper::Client` and
50
/// runs the pipeline to completion. The `reactor::Core` is dropped afterwards.
51
pub fn run<S>(sources: S) -> Result<(), DataError>
52 53 54
where
    S: IntoIterator<Item = Source>,
{
55
    let pool = ThreadPool::new();
56
    let sender = pool.sender().clone();
57 58 59
    let mut core = Core::new()?;
    let handle = core.handle();
    let client = Client::configure()
60
        .connector(HttpsConnector::new(num_cpus::get(), &handle)?)
61 62
        .build(&handle);

63
    let stream = iter_ok::<_, DataError>(sources);
64
    let p = pipeline(stream, client, sender);
65 66 67 68
    core.run(p)?;

    pool.shutdown_on_idle().wait().unwrap();
    Ok(())
69 70 71 72 73 74
}

#[cfg(test)]
mod tests {
    use super::*;
    use database::truncate_db;
75
    use dbqueries;
76
    use failure::Error;
Jordan Petridis's avatar
Jordan Petridis committed
77
    use Source;
78 79

    // (path, url) tuples.
80 81 82 83 84 85 86 87 88
    const URLS: &[&str] = &[
        "https://web.archive.org/web/20180120083840if_/https://feeds.feedburner.\
         com/InterceptedWithJeremyScahill",
        "https://web.archive.org/web/20180120110314if_/https://feeds.feedburner.com/linuxunplugged",
        "https://web.archive.org/web/20180120110727if_/https://rss.acast.com/thetipoff",
        "https://web.archive.org/web/20180120104957if_/https://rss.art19.com/steal-the-stars",
        "https://web.archive.org/web/20180120104741if_/https://www.greaterthancode.\
         com/feed/podcast",
    ];
89 90 91

    #[test]
    /// Insert feeds and update/index them.
92 93
    fn test_pipeline() -> Result<(), Error> {
        truncate_db()?;
Jordan Petridis's avatar
Jordan Petridis committed
94
        let bad_url = "https://gitlab.gnome.org/World/podcasts.atom";
95 96
        // if a stream returns error/None it stops
        // bad we want to parse all feeds regardless if one fails
97
        Source::from_url(bad_url)?;
98 99

        URLS.iter().for_each(|url| {
100 101 102
            // Index the urls into the source table.
            Source::from_url(url).unwrap();
        });
103

104 105
        let sources = dbqueries::get_sources()?;
        run(sources)?;
106

107
        let sources = dbqueries::get_sources()?;
108
        // Run again to cover Unique constrains erros.
109
        run(sources)?;
110 111

        // Assert the index rows equal the controlled results
112 113 114 115
        assert_eq!(dbqueries::get_sources()?.len(), 6);
        assert_eq!(dbqueries::get_podcasts()?.len(), 5);
        assert_eq!(dbqueries::get_episodes()?.len(), 354);
        Ok(())
116 117
    }
}