Verified Commit 7181c46e authored by Christopher Davis's avatar Christopher Davis Committed by Jordan Petridis

podcasts-data: Upgrade hyper to 12.X

This also allows us to bump hyper_tls and native_tls, bringing
support for openssl 1.1.
parent afa7e693
This diff is collapsed.
......@@ -17,12 +17,13 @@ url = "1.7.1"
xdg = "2.1.0"
xml-rs = "0.8.0"
futures = "0.1.23"
hyper = "0.11.27"
hyper = "0.12.11"
http = "0.1.13"
tokio-core = "0.1.17"
tokio-threadpool = "0.1.6"
tokio-executor = "0.1"
hyper-tls = "0.1.3"
native-tls = "0.1.5"
hyper-tls = "0.3.0"
native-tls = "0.2.1"
num_cpus = "1.8.0"
failure = "0.1.2"
failure_derive = "0.1.2"
......
use diesel;
use diesel::r2d2;
use diesel_migrations::RunMigrationsError;
use http;
use hyper;
use native_tls;
use rss;
......@@ -47,6 +48,8 @@ pub enum DataError {
R2D2PoolError(#[cause] r2d2::PoolError),
#[fail(display = "Hyper Error: {}", _0)]
HyperError(#[cause] hyper::Error),
#[fail(display = "ToStr Error: {}", _0)]
HttpToStr(#[cause] http::header::ToStrError),
#[fail(display = "Tokio Spawn Error: {}", _0)]
SpawnError(#[cause] tokio_executor::SpawnError),
#[fail(display = "Failed to parse a url: {}", _0)]
......@@ -106,6 +109,12 @@ impl From<hyper::Error> for DataError {
}
}
impl From<http::header::ToStrError> for DataError {
fn from(err: http::header::ToStrError) -> Self {
DataError::HttpToStr(err)
}
}
impl From<tokio_executor::SpawnError> for DataError {
fn from(err: tokio_executor::SpawnError) -> Self {
DataError::SpawnError(err)
......
......@@ -76,6 +76,7 @@ extern crate log;
extern crate ammonia;
extern crate chrono;
extern crate futures;
extern crate http;
extern crate hyper;
extern crate hyper_tls;
extern crate native_tls;
......
......@@ -4,12 +4,14 @@ use rss::Channel;
use url::Url;
use hyper::client::HttpConnector;
use hyper::header::{
ETag, EntityTag, HttpDate, IfModifiedSince, IfNoneMatch, LastModified, Location, UserAgent,
};
use hyper::{Client, Method, Request, Response, StatusCode, Uri};
use hyper::{Client, Body};
use hyper_tls::HttpsConnector;
use http::{Request, Response, StatusCode, Uri};
use http::header::{
ETAG, IF_MODIFIED_SINCE, IF_NONE_MATCH, LAST_MODIFIED, LOCATION, USER_AGENT as USER_AGENT_HEADER,
HeaderValue,
};
// use futures::future::ok;
use futures::future::{loop_fn, Future, Loop};
use futures::prelude::*;
......@@ -91,11 +93,11 @@ impl Source {
/// Extract Etag and LastModifier from res, and update self and the
/// corresponding db row.
fn update_etag(mut self, res: &Response) -> Result<Self, DataError> {
fn update_etag(mut self, res: &Response<Body>) -> Result<Self, DataError> {
let headers = res.headers();
let etag = headers.get::<ETag>().map(|x| x.tag());
let lmod = headers.get::<LastModified>().map(|x| format!("{}", x));
let etag = headers.get(ETAG).and_then(|h| h.to_str().ok()).map(From::from);
let lmod = headers.get(LAST_MODIFIED).and_then(|h| h.to_str().ok()).map(From::from);
if (self.http_etag() != etag) || (self.last_modified != lmod) {
self.set_http_etag(etag);
......@@ -134,51 +136,57 @@ impl Source {
// 408: Timeout
// 410: Feed deleted
// TODO: Rething this api,
fn match_status(mut self, res: Response) -> Result<Response, DataError> {
fn match_status(mut self, res: Response<Body>) -> Result<Response<Body>, DataError> {
let code = res.status();
match code {
if code.is_success() {
// If request is succesful save the etag
StatusCode::NotModified | StatusCode::Ok => self = self.update_etag(&res)?,
// Clear the Etag/lmod else
_ => self = self.clear_etags()?,
self = self.update_etag(&res)?
} else {
match code.as_u16() {
// Save etags if it returns NotModified
304 => self = self.update_etag(&res)?,
// Clear the Etag/lmod else
_ => self = self.clear_etags()?,
};
};
match code {
StatusCode::NotModified => {
match code.as_u16() {
304 => {
info!("304: Source, (id: {}), is up to date", self.id());
return Err(DataError::FeedNotModified(self));
}
StatusCode::MovedPermanently | StatusCode::Found | StatusCode::PermanentRedirect => {
301 | 302 | 308 => {
warn!("Feed was moved permanently.");
self = self.update_url(&res)?;
return Err(DataError::FeedRedirect(self));
}
StatusCode::TemporaryRedirect => {
307 => {
warn!("307: Temporary Redirect.");
// FIXME: How is it actually handling the redirect?
return Err(DataError::FeedRedirect(self));
}
StatusCode::Unauthorized => return Err(self.make_err("401: Unauthorized.", code)),
StatusCode::Forbidden => return Err(self.make_err("403: Forbidden.", code)),
StatusCode::NotFound => return Err(self.make_err("404: Not found.", code)),
StatusCode::RequestTimeout => return Err(self.make_err("408: Request Timeout.", code)),
StatusCode::Gone => return Err(self.make_err("410: Feed was deleted..", code)),
401 => return Err(self.make_err("401: Unauthorized.", code)),
403 => return Err(self.make_err("403: Forbidden.", code)),
404 => return Err(self.make_err("404: Not found.", code)),
408 => return Err(self.make_err("408: Request Timeout.", code)),
410 => return Err(self.make_err("410: Feed was deleted..", code)),
_ => info!("HTTP StatusCode: {}", code),
};
Ok(res)
}
fn update_url(mut self, res: &Response) -> Result<Self, DataError> {
fn update_url(mut self, res: &Response<Body>) -> Result<Self, DataError> {
let code = res.status();
let headers = res.headers();
info!("HTTP StatusCode: {}", code);
debug!("Headers {:#?}", headers);
if let Some(url) = headers.get::<Location>() {
if let Some(url) = headers.get(LOCATION) {
debug!("Previous Source: {:#?}", &self);
self.set_uri(url.to_string());
self.set_uri(url.to_str()?.into());
self = self.clear_etags()?;
debug!("Updated Source: {:#?}", &self);
......@@ -239,23 +247,22 @@ impl Source {
fn request_constructor(
self,
client: &Client<HttpsConnector<HttpConnector>>,
) -> impl Future<Item = Response, Error = DataError> {
) -> impl Future<Item = Response<Body>, Error = DataError> {
// FIXME: remove unwrap somehow
let uri = Uri::from_str(self.uri()).unwrap();
let mut req = Request::new(Method::Get, uri);
let mut req = Request::get(uri)
.body(Body::empty())
.unwrap();
// Set the UserAgent cause ppl still seem to check it for some reason...
req.headers_mut().set(UserAgent::new(USER_AGENT));
req.headers_mut().insert(USER_AGENT_HEADER, HeaderValue::from_static(USER_AGENT));
if let Some(etag) = self.http_etag() {
let tag = vec![EntityTag::new(true, etag.to_owned())];
req.headers_mut().set(IfNoneMatch::Items(tag));
req.headers_mut().insert(IF_NONE_MATCH, HeaderValue::from_str(etag).unwrap());
}
if let Some(lmod) = self.last_modified() {
if let Ok(date) = lmod.parse::<HttpDate>() {
req.headers_mut().set(IfModifiedSince(date));
}
req.headers_mut().insert(IF_MODIFIED_SINCE, HeaderValue::from_str(lmod).unwrap());
}
client
......@@ -265,9 +272,8 @@ impl Source {
}
}
#[allow(needless_pass_by_value)]
fn response_to_channel(res: Response) -> impl Future<Item = Channel, Error = DataError> + Send {
res.body()
fn response_to_channel(res: Response<Body>) -> impl Future<Item = Channel, Error = DataError> + Send {
res.into_body()
.concat2()
.map(|x| x.into_iter())
.map_err(From::from)
......@@ -281,6 +287,7 @@ mod tests {
use super::*;
use failure::Error;
use tokio_core::reactor::Core;
use num_cpus;
use database::truncate_db;
use utils::get_feed;
......@@ -290,9 +297,8 @@ mod tests {
truncate_db()?;
let mut core = Core::new()?;
let client = Client::configure()
.connector(HttpsConnector::new(4, &core.handle())?)
.build(&core.handle());
let https = HttpsConnector::new(num_cpus::get())?;
let client = Client::builder().build::<_, Body>(https);
let url = "https://web.archive.org/web/20180120083840if_/https://feeds.feedburner.\
com/InterceptedWithJeremyScahill";
......
......@@ -6,7 +6,7 @@ use tokio_core::reactor::Core;
use tokio_threadpool::{self, ThreadPool};
use hyper::client::HttpConnector;
use hyper::Client;
use hyper::{Client, Body};
use hyper_tls::HttpsConnector;
use num_cpus;
......@@ -57,10 +57,8 @@ where
let pool = ThreadPool::new();
let sender = pool.sender().clone();
let mut core = Core::new()?;
let handle = core.handle();
let client = Client::configure()
.connector(HttpsConnector::new(num_cpus::get(), &handle)?)
.build(&handle);
let https = HttpsConnector::new(num_cpus::get())?;
let client = Client::builder().build::<_, Body>(https);
let stream = iter_ok::<_, DataError>(sources);
let p = pipeline(stream, client, sender);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment