Commit fe2f8617 authored by Jordan Petridis's avatar Jordan Petridis 🌱

Merge branch '33-downloader-re-work' into 'master'

Resolve "Downloader Re-work"

Closes #33

See merge request alatiera/Hammond!12
parents 345d4b38 7f854437
Pipeline #1962 passed with stages
in 30 minutes and 8 seconds
......@@ -508,6 +508,11 @@ dependencies = [
"pkg-config 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "glob"
version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "gobject-sys"
version = "0.5.0"
......@@ -591,6 +596,7 @@ version = "0.1.0"
dependencies = [
"diesel 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"error-chain 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)",
"glob 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)",
"hammond-data 0.1.0",
"hyper 0.11.10 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
......@@ -1782,6 +1788,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum gio-sys 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a303bbf7a5e75ab3b627117ff10e495d1b9e97e1d68966285ac2b1f6270091bc"
"checksum glib 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "450247060df7d52fdad31e1d66f30d967e925c9d1d26a0ae050cfe33dcd00d08"
"checksum glib-sys 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d9693049613ff52b93013cc3d2590366d8e530366d288438724b73f6c7dc4be8"
"checksum glob 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "8be18de09a56b60ed0edf84bc9df007e30040691af7acd1c41874faac5895bfb"
"checksum gobject-sys 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "60d507c87a71b1143c66ed21a969be9b99a76df234b342d733e787e6c9c7d7c2"
"checksum gtk 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "0847c507e52c1feaede13ef56fb4847742438602655449d5f1f782e8633f146f"
"checksum gtk-sys 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "905fcfbaaad1b44ec0b4bba9e4d527d728284c62bc2ba41fccedace2b096766f"
......
//! Database Setup. This is only public to help with some unit tests.
use r2d2_diesel::ConnectionManager;
use diesel::prelude::*;
use r2d2;
......@@ -35,7 +37,8 @@ lazy_static! {
static ref DB_PATH: PathBuf = TEMPDIR.path().join("hammond.db");
}
pub(crate) fn connection() -> Pool {
/// Get an r2d2 SqliteConnection.
pub fn connection() -> Pool {
POOL.clone()
}
......
......@@ -56,7 +56,7 @@ pub mod utils;
pub mod feed;
#[allow(missing_docs)]
pub mod errors;
pub(crate) mod database;
pub mod database;
pub(crate) mod models;
mod parser;
mod schema;
......
......@@ -215,6 +215,22 @@ pub struct EpisodeWidgetQuery {
podcast_id: i32,
}
impl From<Episode> for EpisodeWidgetQuery {
fn from(e: Episode) -> EpisodeWidgetQuery {
EpisodeWidgetQuery {
rowid: e.rowid,
title: e.title,
uri: e.uri,
local_uri: e.local_uri,
epoch: e.epoch,
length: e.length,
duration: e.duration,
played: e.played,
podcast_id: e.podcast_id,
}
}
}
impl EpisodeWidgetQuery {
/// Get the value of the sqlite's `ROW_ID`
pub fn rowid(&self) -> i32 {
......@@ -597,7 +613,6 @@ impl<'a> Source {
fn update_etag(&mut self, req: &reqwest::Response) -> Result<()> {
let headers = req.headers();
// let etag = headers.get_raw("ETag").unwrap();
let etag = headers.get::<ETag>();
let lmod = headers.get::<LastModified>();
......
......@@ -137,21 +137,14 @@ pub fn get_download_folder(pd_title: &str) -> Result<String> {
/// Removes all the entries associated with the given show from the database,
/// and deletes all of the downloaded content.
/// TODO: Write Tests
/// TODO: Return Result instead
pub fn delete_show(pd: &Podcast) {
let res = dbqueries::remove_feed(pd);
if res.is_ok() {
info!("{} was removed succesfully.", pd.title());
let dl_fold = get_download_folder(pd.title());
if let Ok(fold) = dl_fold {
let res3 = fs::remove_dir_all(&fold);
// TODO: Show errors?
if res3.is_ok() {
info!("All the content at, {} was removed succesfully", &fold);
}
};
}
pub fn delete_show(pd: &Podcast) -> Result<()> {
dbqueries::remove_feed(&pd)?;
info!("{} was removed succesfully.", pd.title());
let fold = get_download_folder(pd.title())?;
fs::remove_dir_all(&fold)?;
info!("All the content at, {} was removed succesfully", &fold);
Ok(())
}
#[cfg(test)]
......
......@@ -11,6 +11,7 @@ log = "0.3.8"
mime_guess = "1.8.3"
reqwest = "0.8.2"
tempdir = "0.3.5"
glob = "0.2.11"
[dependencies.diesel]
features = ["sqlite"]
......
......@@ -2,11 +2,13 @@ use reqwest;
use hyper::header::*;
use tempdir::TempDir;
use mime_guess;
use glob::glob;
use std::fs::{rename, DirBuilder, File};
use std::io::{BufWriter, Read, Write};
use std::path::Path;
use std::fs;
use std::sync::{Arc, Mutex};
use errors::*;
use hammond_data::{EpisodeWidgetQuery, PodcastCoverQuery};
......@@ -15,6 +17,11 @@ use hammond_data::xdg_dirs::HAMMOND_CACHE;
// TODO: Replace path that are of type &str with std::path.
// TODO: Have a convention/document absolute/relative paths, if they should end with / or not.
pub trait DownloadProgress {
fn set_downloaded(&mut self, downloaded: u64);
fn set_size(&mut self, bytes: u64);
}
// Adapted from https://github.com/mattgathu/rget .
// I never wanted to write a custom downloader.
// Sorry to those who will have to work with that code.
......@@ -22,7 +29,12 @@ use hammond_data::xdg_dirs::HAMMOND_CACHE;
// or bindings for a lib like youtube-dl(python),
// But cant seem to find one.
// TODO: Write unit-tests.
fn download_into(dir: &str, file_title: &str, url: &str) -> Result<String> {
fn download_into(
dir: &str,
file_title: &str,
url: &str,
progress: Option<Arc<Mutex<DownloadProgress>>>,
) -> Result<String> {
info!("GET request to: {}", url);
let client = reqwest::Client::builder().referer(false).build()?;
let mut resp = client.get(url).send()?;
......@@ -33,22 +45,28 @@ fn download_into(dir: &str, file_title: &str, url: &str) -> Result<String> {
}
let headers = resp.headers().clone();
let ct_len = headers.get::<ContentLength>().map(|ct_len| **ct_len);
let ct_type = headers.get::<ContentType>();
ct_len.map(|x| info!("File Lenght: {}", x));
ct_type.map(|x| info!("Content Type: {}", x));
let ext = get_ext(ct_type.cloned()).unwrap_or(String::from("unkown"));
let ext = get_ext(ct_type.cloned()).unwrap_or(String::from("unknown"));
info!("Extension: {}", ext);
// Construct a temp file to save desired content.
let tempdir = TempDir::new_in(dir, "")?;
// It has to be a `new_in` instead of new cause rename can't move cross filesystems.
let tempdir = TempDir::new_in(HAMMOND_CACHE.to_str().unwrap(), "temp_download")?;
let out_file = format!("{}/temp.part", tempdir.path().to_str().unwrap(),);
ct_len.map(|x| {
if let Some(p) = progress.clone() {
let mut m = p.lock().unwrap();
m.set_size(x);
}
});
// Save requested content into the file.
save_io(&out_file, &mut resp, ct_len)?;
save_io(&out_file, &mut resp, ct_len, progress)?;
// Construct the desired path.
let target = format!("{}/{}.{}", dir, file_title, ext);
......@@ -58,7 +76,7 @@ fn download_into(dir: &str, file_title: &str, url: &str) -> Result<String> {
Ok(target)
}
// Determine the file extension from the http content-type header.
/// Determine the file extension from the http content-type header.
fn get_ext(content: Option<ContentType>) -> Option<String> {
let cont = content.clone()?;
content
......@@ -73,8 +91,14 @@ fn get_ext(content: Option<ContentType>) -> Option<String> {
}
// TODO: Write unit-tests.
// TODO: Refactor... Somehow.
/// Handles the I/O of fetching a remote file and saving into a Buffer and A File.
fn save_io(file: &str, resp: &mut reqwest::Response, content_lenght: Option<u64>) -> Result<()> {
fn save_io(
file: &str,
resp: &mut reqwest::Response,
content_lenght: Option<u64>,
progress: Option<Arc<Mutex<DownloadProgress>>>,
) -> Result<()> {
info!("Downloading into: {}", file);
let chunk_size = match content_lenght {
Some(x) => x as usize / 99,
......@@ -89,6 +113,14 @@ fn save_io(file: &str, resp: &mut reqwest::Response, content_lenght: Option<u64>
buffer.truncate(bcount);
if !buffer.is_empty() {
writer.write_all(buffer.as_slice())?;
if let Some(prog) = progress.clone() {
// This sucks.
let len = writer.get_ref().metadata().map(|x| x.len());
if let Ok(l) = len {
let mut m = prog.lock().unwrap();
m.set_downloaded(l);
}
}
} else {
break;
}
......@@ -98,7 +130,11 @@ fn save_io(file: &str, resp: &mut reqwest::Response, content_lenght: Option<u64>
}
// TODO: Refactor
pub fn get_episode(ep: &mut EpisodeWidgetQuery, download_folder: &str) -> Result<()> {
pub fn get_episode(
ep: &mut EpisodeWidgetQuery,
download_folder: &str,
progress: Option<Arc<Mutex<DownloadProgress>>>,
) -> Result<()> {
// Check if its alrdy downloaded
if ep.local_uri().is_some() {
if Path::new(ep.local_uri().unwrap()).exists() {
......@@ -110,7 +146,12 @@ pub fn get_episode(ep: &mut EpisodeWidgetQuery, download_folder: &str) -> Result
ep.save()?;
};
let res = download_into(download_folder, ep.title(), ep.uri().unwrap());
let res = download_into(
download_folder,
&ep.rowid().to_string(),
ep.uri().unwrap(),
progress,
);
if let Ok(path) = res {
// If download succedes set episode local_uri to dlpath.
......@@ -136,42 +177,33 @@ pub fn cache_image(pd: &PodcastCoverQuery) -> Option<String> {
return None;
}
let download_fold = format!(
"{}{}",
HAMMOND_CACHE.to_str().unwrap(),
pd.title().to_owned()
);
let cache_download_fold = format!("{}{}", HAMMOND_CACHE.to_str()?, pd.title().to_owned());
// Hacky way
// TODO: make it so it returns the first cover.* file encountered.
// Use glob instead
let png = format!("{}/cover.png", download_fold);
let jpg = format!("{}/cover.jpg", download_fold);
let jpe = format!("{}/cover.jpe", download_fold);
let jpeg = format!("{}/cover.jpeg", download_fold);
if Path::new(&png).exists() {
return Some(png);
} else if Path::new(&jpe).exists() {
return Some(jpe);
} else if Path::new(&jpg).exists() {
return Some(jpg);
} else if Path::new(&jpeg).exists() {
return Some(jpeg);
// Weird glob magic.
if let Ok(mut foo) = glob(&format!("{}/cover.*", cache_download_fold)) {
// For some reason there is no .first() method so nth(0) is used
let path = foo.nth(0).and_then(|x| x.ok());
if let Some(p) = path {
return Some(p.to_str()?.into());
}
};
// Create the folders if they don't exist.
DirBuilder::new()
.recursive(true)
.create(&download_fold)
.unwrap();
.create(&cache_download_fold)
.ok()?;
let dlpath = download_into(&download_fold, "cover", &url);
if let Ok(path) = dlpath {
info!("Cached img into: {}", &path);
Some(path)
} else {
error!("Failed to get feed image.");
error!("Error: {}", dlpath.unwrap_err());
None
match download_into(&cache_download_fold, "cover", &url, None) {
Ok(path) => {
info!("Cached img into: {}", &path);
Some(path)
}
Err(err) => {
error!("Failed to get feed image.");
error!("Error: {}", err);
None
}
}
}
......
......@@ -3,6 +3,7 @@
extern crate diesel;
#[macro_use]
extern crate error_chain;
extern crate glob;
extern crate hammond_data;
extern crate hyper;
#[macro_use]
......
......@@ -120,10 +120,10 @@
</packing>
</child>
<child>
<object class="GtkLabel" id="size_label">
<object class="GtkLabel" id="local_size">
<property name="can_focus">False</property>
<property name="no_show_all">True</property>
<property name="label" translatable="yes">42 MB</property>
<property name="label" translatable="yes">0 MB</property>
<property name="single_line_mode">True</property>
<property name="track_visited_links">False</property>
<style>
......@@ -137,10 +137,10 @@
</packing>
</child>
<child>
<object class="GtkLabel" id="progress_label">
<object class="GtkLabel" id="prog_separator">
<property name="can_focus">False</property>
<property name="no_show_all">True</property>
<property name="label" translatable="yes">12 MB / 42 MB</property>
<property name="label" translatable="yes">/</property>
<property name="single_line_mode">True</property>
<property name="track_visited_links">False</property>
<style>
......@@ -153,6 +153,23 @@
<property name="position">5</property>
</packing>
</child>
<child>
<object class="GtkLabel" id="total_size">
<property name="can_focus">False</property>
<property name="no_show_all">True</property>
<property name="label" translatable="yes">XX MB</property>
<property name="single_line_mode">True</property>
<property name="track_visited_links">False</property>
<style>
<class name="dim-label"/>
</style>
</object>
<packing>
<property name="expand">False</property>
<property name="fill">True</property>
<property name="position">6</property>
</packing>
</child>
</object>
<packing>
<property name="expand">False</property>
......@@ -257,6 +274,7 @@
<object class="GtkProgressBar" id="progress_bar">
<property name="can_focus">False</property>
<property name="no_show_all">True</property>
<property name="pulse_step">0</property>
</object>
<packing>
<property name="expand">False</property>
......
......@@ -11,14 +11,19 @@ use headerbar::Header;
use content::Content;
use utils;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Arc;
use std::sync::mpsc::{channel, Receiver, Sender};
#[derive(Clone, Debug)]
pub enum Action {
UpdateSources(Option<Source>),
RefreshViews,
RefreshAllViews,
RefreshEpisodesView,
RefreshEpisodesViewBGR,
RefreshShowsView,
RefreshWidget,
RefreshWidgetIfVis,
RefreshWidgetIfSame(i32),
HeaderBarShowTile(String),
HeaderBarNormal,
HeaderBarHideUpdateIndicator,
......@@ -134,12 +139,17 @@ impl App {
utils::refresh_feed(headerbar.clone(), Some(vec![s]), sender.clone())
}
}
Ok(Action::RefreshViews) => content.update(),
Ok(Action::RefreshAllViews) => content.update(),
Ok(Action::RefreshShowsView) => content.update_shows_view(),
Ok(Action::RefreshWidget) => content.update_widget(),
Ok(Action::RefreshWidgetIfVis) => content.update_widget_if_visible(),
Ok(Action::RefreshWidgetIfSame(id)) => content.update_widget_if_same(id),
Ok(Action::RefreshEpisodesView) => content.update_episode_view(),
Ok(Action::RefreshEpisodesViewBGR) => content.update_episode_view_if_baground(),
Ok(Action::HeaderBarShowTile(title)) => headerbar.switch_to_back(&title),
Ok(Action::HeaderBarNormal) => headerbar.switch_to_normal(),
Ok(Action::HeaderBarHideUpdateIndicator) => headerbar.hide_update_notification(),
_ => (),
Err(_) => (),
}
Continue(true)
......
......@@ -41,8 +41,9 @@ impl Content {
}
pub fn update(&self) {
self.update_shows_view();
self.update_episode_view();
self.update_shows_view();
self.update_widget()
}
pub fn update_episode_view(&self) {
......@@ -56,7 +57,23 @@ impl Content {
}
pub fn update_shows_view(&self) {
self.shows.update();
self.shows.update_podcasts();
}
pub fn update_widget(&self) {
self.shows.update_widget();
}
pub fn update_widget_if_same(&self, pid: i32) {
self.shows.update_widget_if_same(pid);
}
pub fn update_widget_if_visible(&self) {
if self.stack.get_visible_child_name() == Some("shows".to_string())
&& self.shows.get_stack().get_visible_child_name() == Some("widget".to_string())
{
self.shows.update_widget();
}
}
pub fn get_stack(&self) -> gtk::Stack {
......@@ -100,15 +117,11 @@ impl ShowStack {
show
}
// fn is_empty(&self) -> bool {
// self.podcasts.is_empty()
// pub fn update(&self) {
// self.update_widget();
// self.update_podcasts();
// }
pub fn update(&self) {
self.update_podcasts();
self.update_widget();
}
pub fn update_podcasts(&self) {
let vis = self.stack.get_visible_child_name().unwrap();
......@@ -194,12 +207,12 @@ impl ShowStack {
let vis = self.stack.get_visible_child_name().unwrap();
let old = self.stack.get_child_by_name("widget").unwrap();
let id = WidgetExt::get_name(&old).unwrap();
if id == "GtkBox" {
let id = WidgetExt::get_name(&old);
if id == Some("GtkBox".to_string()) || id.is_none() {
return;
}
let pd = dbqueries::get_podcast_from_id(id.parse::<i32>().unwrap());
let pd = dbqueries::get_podcast_from_id(id.unwrap().parse::<i32>().unwrap());
if let Ok(pd) = pd {
self.replace_widget(&pd);
self.stack.set_visible_child_name(&vis);
......@@ -207,6 +220,17 @@ impl ShowStack {
}
}
// Only update widget if it's podcast_id is equal to pid.
pub fn update_widget_if_same(&self, pid: i32) {
let old = self.stack.get_child_by_name("widget").unwrap();
let id = WidgetExt::get_name(&old);
if id != Some(pid.to_string()) || id.is_none() {
return;
}
self.update_widget();
}
pub fn switch_podcasts_animated(&self) {
self.stack
.set_visible_child_full("podcasts", gtk::StackTransitionType::SlideRight);
......
......@@ -53,6 +53,7 @@ mod content;
mod app;
mod utils;
mod manager;
mod static_resource;
use app::App;
......
// use hammond_data::Episode;
use hammond_data::dbqueries;
use hammond_downloader::downloader::get_episode;
use hammond_downloader::downloader::DownloadProgress;
use app::Action;
use std::collections::HashMap;
use std::sync::{Arc, Mutex, RwLock};
use std::sync::mpsc::Sender;
// use std::sync::atomic::AtomicUsize;
// use std::path::PathBuf;
use std::thread;
#[derive(Debug)]
pub struct Progress {
total_bytes: u64,
downloaded_bytes: u64,
}
impl Progress {
pub fn get_fraction(&self) -> f64 {
let ratio = self.downloaded_bytes as f64 / self.total_bytes as f64;
debug!("{:?}", self);
debug!("Ratio completed: {}", ratio);
if ratio >= 1.0 {
return 1.0;
};
ratio
}
pub fn get_total_size(&self) -> u64 {
self.total_bytes
}
pub fn get_downloaded(&self) -> u64 {
self.downloaded_bytes
}
}
impl Default for Progress {
fn default() -> Self {
Progress {
total_bytes: 0,
downloaded_bytes: 0,
}
}
}
impl DownloadProgress for Progress {
fn set_downloaded(&mut self, downloaded: u64) {
self.downloaded_bytes = downloaded
}
fn set_size(&mut self, bytes: u64) {
self.total_bytes = bytes;
}
}
lazy_static! {
pub static ref ACTIVE_DOWNLOADS: Arc<RwLock<HashMap<i32, Arc<Mutex<Progress>>>>> = {
Arc::new(RwLock::new(HashMap::new()))
};
}
pub fn add(id: i32, directory: &str, sender: Sender<Action>) {
// Create a new `Progress` struct to keep track of dl progress.
let prog = Arc::new(Mutex::new(Progress::default()));
{
if let Ok(mut m) = ACTIVE_DOWNLOADS.write() {
m.insert(id, prog.clone());
}
}
let dir = directory.to_owned();
thread::spawn(move || {
if let Ok(episode) = dbqueries::get_episode_from_rowid(id) {
let pid = episode.podcast_id();
let id = episode.rowid();
get_episode(&mut episode.into(), dir.as_str(), Some(prog))
.err()
.map(|err| {
error!("Error while trying to download an episode");
error!("Error: {}", err);
});
{
if let Ok(mut m) = ACTIVE_DOWNLOADS.write() {
info!("Removed: {:?}", m.remove(&id));
}
}
// {
// if let Ok(m) = ACTIVE_DOWNLOADS.read() {
// debug!("ACTIVE DOWNLOADS: {:#?}", m);
// }
// }
sender.send(Action::RefreshEpisodesView).unwrap();
sender.send(Action::RefreshWidgetIfSame(pid)).unwrap();
}
});
}
#[cfg(test)]
mod tests {
use super::*;
use diesel::Identifiable;
use hammond_data::database;
use hammond_data::utils::get_download_folder;
use hammond_data::feed::*;
use hammond_data::{Episode, Source};
use hammond_data::dbqueries;
use std::path::Path;
use std::{thread, time};
use std::sync::mpsc::channel;
#[test]
// This test inserts an rss feed to your `XDG_DATA/hammond/hammond.db` so we make it explicit
// to run it.
#[ignore]
// THIS IS NOT A RELIABLE TEST
// Just quick sanity check
fn test_start_dl() {
let url = "http://www.newrustacean.com/feed.xml";
// Create and index a source
let source = Source::from_url(url).unwrap();
// Copy it's id
let sid = source.id().clone();
// Convert Source it into a Feed and index it
let feed = source.into_feed(true).unwrap();
index(&feed);
// Get the Podcast
let pd = dbqueries::get_podcast_from_source_id(sid).unwrap();
// Get an episode
let episode: Episode = {
let con = database::connection();
dbqueries::get_episode_from_pk(&*con.get().unwrap(), "e000: Hello, world!", *pd.id())
.unwrap()
};
let (sender, _rx) = channel();
let download_fold = get_download_folder(&pd.title()).unwrap();
add(episode.rowid(), download_fold.as_str(), sender);
// Give it soem time to download the file
thread::sleep(time::Duration::from_secs(40));
let final_path = format!("{}/{}.unknown", &download_fold, episode.rowid());
println!("{}", &final_path);
assert!(Path::new(&final_path).exists());
}
}
......@@ -28,7 +28,7 @@ pub fn refresh_feed(headerbar: Arc<Header>, source: Option<Vec<Source>>, sender:
};
sender.send(Action::HeaderBarHideUpdateIndicator).unwrap();
sender.send(Action::RefreshViews).unwrap();
sender.send(Action::RefreshAllViews).unwrap();
});
}
......
......@@ -77,7 +77,7 @@ impl Default for EpisodesView {
impl EpisodesView {
pub fn new(sender: Sender<Action>) -> Arc<EpisodesView> {
let view = EpisodesView::default();
let episodes = dbqueries::get_episodes_widgets_with_limit(100).unwrap();
let episodes = dbqueries::get_episodes_widgets_with_limit(50).unwrap();
let now_utc = Utc::now();
episodes.into_iter().for_each(|mut ep| {
......@@ -205,10 +205,7 @@ impl EpisodesViewWidget {
let image: gtk::Image = builder.get_object("cover").unwrap();
if let Ok(pd) = dbqueries::get_podcast_cover_from_id(episode.podcast_id()) {
let img = get_pixbuf_from_path(&pd, 64);
if let Some(i) = img {
image.set_from_pixbuf(&i);
}
get_pixbuf_from_path(&pd, 64).map(|img| image.set_from_pixbuf(&img));
}
let ep = EpisodeWidget::new(episode, sender.clone());
......
......@@ -11,14 +11,32 @@ use hammond_data::dbqueries;