Commit f4e6e4fc authored by Felix Häcker's avatar Felix Häcker

Refactor player/gstreamerbackend, improve auto recorder

parent ee8b98a1
This diff is collapsed.
......@@ -20,11 +20,11 @@ pretty_env_logger = "0.3.0"
rusqlite = "0.13"
quick-error = "1.2.2"
restson = "0.4"
stopwatch = "0.0.7"
uuid = { version = "0.7", features = ["v4"] }
chrono = "0.4.6"
rust_cast = "0.14.0"
mdns = "0.3.1"
serde = "1.0.89"
serde_json = "1.0.39"
serde_derive = "1.0.89"
lazy_static = "1.3.0"
......@@ -31,7 +31,7 @@
"--share=network"
],
"env" : {
"RUSTFLAGS" : "--remap-path-prefix =../",
"RUSTFLAGS" : "--remap-path-prefix =../ --error-format=short",
"CARGO_HOME" : "/run/build/Shortwave/cargo",
"RUST_BACKTRACE" : "1",
"RUST_LOG" : "shortwave=debug"
......
max_width=200
\ No newline at end of file
max_width=200
edition = "2018"
#[macro_use]
extern crate log;
extern crate pretty_env_logger;
#[macro_use]
extern crate quick_error;
#[macro_use]
extern crate glib;
#[macro_use]
extern crate lazy_static;
#[macro_use]
extern crate serde_derive;
mod model;
mod player;
......
mod object_wrapper;
mod song_model;
mod station_model;
//mod song_model;
pub use object_wrapper::ObjectWrapper;
pub use song_model::SongModel;
pub use station_model::Order;
pub use station_model::Sorting;
pub use station_model::StationModel;
use gio::prelude::ListStoreExtManual;
use gio::prelude::*;
use glib::prelude::*;
use std::fs;
use crate::model::ObjectWrapper;
use crate::song::Song;
use crate::song_object::SongObject;
#[derive(Clone, Debug)]
#[derive(Clone)]
pub struct SongModel {
pub model: gio::ListStore,
max_len: u32,
current_song: Option<Song>,
current_song_timestamp: Option<u32>,
}
impl SongModel {
pub fn new() -> Self {
let model = gio::ListStore::new(SongObject::static_type());
pub fn new(max_len: u32) -> Self {
let model = gio::ListStore::new(ObjectWrapper::static_type());
let current_song = None;
let current_song_timestamp = None;
Self { model, sorting, order }
Self {
model,
max_len,
current_song,
current_song_timestamp,
}
}
pub fn add_song(&mut self, song: Song) {
// Check if song does not exist yet
if !self.index(&song).is_some() {
let object = SongObject::new(song.clone());
let sorting = self.sorting.clone();
let order = self.order.clone();
self.model.insert(&object);
// Ensure max length
if self.model.get_n_items() >= self.max_len {
let rm_song = self.get_song(self.model.get_n_items() - 1);
self.remove_song(&rm_song).unwrap();
}
let object = ObjectWrapper::new(song.clone());
self.model.insert(0, &object);
}
}
pub fn remove_song(&mut self, song: &Song) {
pub fn remove_song(&mut self, song: &Song) -> std::io::Result<()> {
fs::remove_file(&song.path)?;
self.index(song).map(|index| self.model.remove(index));
Ok(())
}
fn index(&self, song: &Song) -> Option<u32> {
for i in 0..self.model.get_n_items() {
let gobject = self.model.get_object(i).unwrap();
let song_object = gobject.downcast_ref::<SongObject>().expect("SongObject is of wrong type");
let s = song_object.to_song();
let s = self.get_song(i);
if &s == song {
return Some(i);
......@@ -43,7 +61,21 @@ impl SongModel {
None
}
pub fn clear(&mut self) {
pub fn get_song(&self, index: u32) -> Song {
let gobject = self.model.get_object(index).unwrap();
let song_object = gobject.downcast_ref::<ObjectWrapper>().expect("ObjectWrapper is of wrong type");
song_object.deserialize()
}
pub fn clear(&mut self) -> std::io::Result<()> {
// Remove saved songs from disk
for i in 0..self.model.get_n_items() {
let s: Song = self.get_song(i);
fs::remove_file(&s.path)?
}
self.model.remove_all();
Ok(())
}
}
use glib::Sender;
use gstreamer::prelude::*;
use gio::prelude::*;
use glib::{Receiver, Sender};
use gtk::prelude::*;
use rustio::{Client, Station};
use std::cell::RefCell;
use std::fs;
use std::path::PathBuf;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
use std::thread;
use crate::app::Action;
use crate::config;
use crate::player::controller::{GtkController, MprisController};
use crate::song::Song;
use crate::player::gstreamer_backend::GstreamerMessage;
use crate::widgets::song_listbox::SongListBox;
////////////////////////////////////////////////////////////////////////////////////////////
......@@ -44,21 +47,29 @@ pub use controller::Controller;
pub use gstreamer_backend::PlayerBackend;
pub use playback_state::PlaybackState;
use crate::model::SongModel;
pub struct Player {
pub widget: gtk::Box,
controller: Rc<Vec<Box<Controller>>>,
backend: Arc<Mutex<PlayerBackend>>,
song_listbox: Rc<RefCell<SongListBox>>,
song_model: Rc<RefCell<SongModel>>,
song_listbox: SongListBox,
}
impl Player {
pub fn new(sender: Sender<Action>) -> Self {
let builder = gtk::Builder::new_from_resource("/de/haeckerfelix/Shortwave/gtk/player.ui");
let widget: gtk::Box = builder.get_object("player").unwrap();
let backend = Arc::new(Mutex::new(PlayerBackend::new()));
let song_listbox = Rc::new(RefCell::new(SongListBox::new()));
widget.add(&song_listbox.borrow().widget);
let song_model = Rc::new(RefCell::new(SongModel::new(5)));
let song_listbox = SongListBox::new(sender.clone());
song_listbox.bind_model(&song_model.borrow());
widget.add(&song_listbox.widget);
let (gst_sender, gst_receiver) = glib::MainContext::channel(glib::PRIORITY_DEFAULT);
let backend = Arc::new(Mutex::new(PlayerBackend::new(gst_sender)));
let mut controller: Vec<Box<Controller>> = Vec::new();
......@@ -78,18 +89,19 @@ impl Player {
widget,
controller,
backend,
song_model,
song_listbox,
};
player.setup_signals();
player.setup_signals(gst_receiver);
player
}
pub fn set_station(&self, station: Station) {
// discard old song, because it's not completely recorded
self.song_listbox.borrow_mut().discard_current_song();
self.song_listbox.borrow_mut().current_station = Some(station.clone());
// TODO: discard old song (because it's not completely recorded),
// stop recording and stop playback
let song = self.backend.lock().unwrap().stop_recording().unwrap();
self.song_model.borrow_mut().remove_song(&song);
self.set_playback(PlaybackState::Stopped);
for con in &*self.controller {
......@@ -112,97 +124,101 @@ impl Player {
}
PlaybackState::Stopped => {
let _ = self.backend.lock().unwrap().set_state(gstreamer::State::Null);
// We need to set it manually, because we don't receive a gst message when the playback stops
for con in &*self.controller {
con.set_playback_state(&PlaybackState::Stopped);
}
}
_ => (),
};
}
}
pub fn shutdown(&self) {
self.set_playback(PlaybackState::Stopped);
self.song_listbox.borrow_mut().delete_everything();
// Clear song model and remove all saved songs
self.song_model.borrow_mut().clear();
fs::remove_dir_all(Self::get_song_path("".to_string())).expect("Could not remove recording folder");
}
fn parse_bus_message(message: &gstreamer::Message, controller: Rc<Vec<Box<Controller>>>, backend: Arc<Mutex<PlayerBackend>>, song_listbox: Rc<RefCell<SongListBox>>) {
match message.view() {
gstreamer::MessageView::Tag(tag) => {
tag.get_tags().get::<gstreamer::tags::Title>().map(|t| {
let new_song = Song::new(t.get().unwrap());
// Check if song have changed
if song_listbox.borrow_mut().set_new_song(new_song.clone()) {
// set new song
debug!("New song: {:?}", new_song.clone().title);
for con in &*controller {
con.set_song_title(new_song.clone().title.as_ref());
}
debug!("Block the dataflow ...");
backend.lock().unwrap().block_dataflow();
}
});
fn setup_signals(&self, receiver: Receiver<GstreamerMessage>) {
// Wait for new messages from the Gstreamer backend
let controller = self.controller.clone();
let song_model = self.song_model.clone();
let backend = self.backend.clone();
receiver.attach(None, move |message| Self::process_gst_message(message, controller.clone(), song_model.clone(), backend.clone()));
// Show song listbox if a song gets added
let listbox = self.song_listbox.widget.clone();
self.song_model.borrow().model.connect_items_changed(move |_, _, _, added| {
if added == 1 {
listbox.set_visible(true);
}
gstreamer::MessageView::StateChanged(sc) => {
let playback_state = match sc.get_current() {
gstreamer::State::Playing => PlaybackState::Playing,
gstreamer::State::Paused => PlaybackState::Loading,
gstreamer::State::Ready => PlaybackState::Loading,
_ => PlaybackState::Stopped,
};
});
}
fn process_gst_message(message: GstreamerMessage, controller: Rc<Vec<Box<Controller>>>, song_model: Rc<RefCell<SongModel>>, backend: Arc<Mutex<PlayerBackend>>) -> glib::Continue {
match message {
GstreamerMessage::SongTitleChanged(title) => {
debug!("Song title has changed: \"{}\"", title);
for con in &*controller {
con.set_playback_state(&playback_state);
con.set_song_title(&title);
}
// Song have changed -> stop recording
if backend.lock().unwrap().is_recording() {
let song = backend.lock().unwrap().stop_recording().unwrap();
song_model.borrow_mut().add_song(song);
} else {
// Get current/new song title
let title = backend.lock().unwrap().get_current_song_title();
// Nothing needs to be stopped, so we can start directly recording.
backend.lock().unwrap().start_recording(Self::get_song_path(title));
}
}
gstreamer::MessageView::Element(element) => {
let structure = element.get_structure().unwrap();
if structure.get_name() == "GstBinForwarded" {
let message: gstreamer::message::Message = structure.get("message").unwrap();
if let gstreamer::MessageView::Eos(_) = &message.view() {
debug!("muxsinkbin got EOS...");
if song_listbox.borrow().current_song.is_some() {
// Old song got saved correctly (cause we got the EOS message),
// so we can start with the new song now
let song = song_listbox.borrow_mut().current_song.clone().unwrap();
debug!("Cache song \"{}\" under \"{}\"", song.title, song.path);
backend.lock().unwrap().new_filesink_location(&song.path);
} else {
// Or just redirect the stream to /dev/null
backend.lock().unwrap().new_filesink_location("/dev/null");
}
}
GstreamerMessage::RecordingStopped => {
// Recording successfully stopped.
debug!("Recording stopped.");
// Get current/new song title
let title = backend.lock().unwrap().get_current_song_title();
// Start recording new song
if title != "" {
backend.lock().unwrap().start_recording(Self::get_song_path(title));
}
}
gstreamer::MessageView::Error(err) => {
let msg = err.get_error().to_string();
warn!("Gstreamer Error: {:?}", msg);
GstreamerMessage::PlaybackStateChanged(state) => {
for con in &*controller {
con.set_playback_state(&PlaybackState::Failure(msg.clone()));
con.set_playback_state(&state);
}
}
_ => (),
};
}
glib::Continue(true)
}
fn setup_signals(&self) {
// new backend (pipeline) bus messages
let bus = self.backend.lock().unwrap().get_pipeline_bus();
let controller = self.controller.clone();
let backend = self.backend.clone();
let song_listbox = self.song_listbox.clone();
gtk::timeout_add(250, move || {
while bus.have_pending() {
bus.pop().map(|message| {
//debug!("new message {:?}", message);
Self::parse_bus_message(&message, controller.clone(), backend.clone(), song_listbox.clone());
});
}
Continue(true)
});
fn get_song_path(mut title: String) -> PathBuf {
// remove special chars from title
// if anybody knows a better way to do this, feel free to open a MR on GitLab :)
title = title.replace("/", "");
title = title.replace("\\", "");
title = title.replace(":", "");
title = title.replace("<", "");
title = title.replace(">", "");
title = title.replace("\"", "");
title = title.replace("|", "");
title = title.replace("?", "");
title = title.replace("*", "");
let mut path = glib::get_user_cache_dir().unwrap();
path.push(config::NAME);
path.push("recording");
// Make sure that the path exists
fs::create_dir_all(path.clone()).expect("Could not create path for recording");
if title != "" {
path.push(title);
path.set_extension("ogg");
}
path
}
}
......@@ -58,11 +58,11 @@ impl GtkController {
error_label,
};
controller.connect_signals();
controller.setup_signals();
controller
}
fn connect_signals(&self) {
fn setup_signals(&self) {
// start_playback_button
let sender = self.sender.clone();
self.start_playback_button.connect_clicked(move |_| {
......
......@@ -33,7 +33,7 @@ impl MprisController {
station: Cell::new(None),
};
controller.connect_signals();
controller.setup_signals();
controller
}
......@@ -57,7 +57,7 @@ impl MprisController {
self.mpris.set_metadata(metadata);
}
fn connect_signals(&self) {
fn setup_signals(&self) {
// mpris raise
let sender = self.sender.clone();
self.mpris.connect_raise(move || {
......
use glib::Sender;
use gstreamer::prelude::*;
use gstreamer::{Bin, Bus, Element, ElementFactory, Pad, PadProbeId, Pipeline, State};
use gstreamer::{Bin, Element, ElementFactory, Pad, PadProbeId, Pipeline, State};
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use crate::player::playback_state::PlaybackState;
use crate::song::Song;
//////////////////////////////////////////////////////////////////////////////////////////////////////////
// //
......@@ -27,6 +34,13 @@ use gstreamer::{Bin, Bus, Element, ElementFactory, Pad, PadProbeId, Pipeline, St
// //
//////////////////////////////////////////////////////////////////////////////////////////////////////////
#[derive(Clone)]
pub enum GstreamerMessage {
SongTitleChanged(String),
PlaybackStateChanged(PlaybackState),
RecordingStopped,
}
#[allow(dead_code)]
pub struct PlayerBackend {
pipeline: Pipeline,
......@@ -43,10 +57,13 @@ pub struct PlayerBackend {
muxsinkbin: Option<Bin>,
file_srcpad: Pad,
file_blockprobe_id: Option<PadProbeId>,
current_title: Arc<Mutex<String>>,
sender: Sender<GstreamerMessage>,
}
impl PlayerBackend {
pub fn new() -> Self {
pub fn new(sender: Sender<GstreamerMessage>) -> Self {
// create gstreamer pipeline
let pipeline = Pipeline::new("recorder_pipeline");
......@@ -94,7 +111,24 @@ impl PlayerBackend {
}
});
let mut pipeline = Self {
// Current song title. We need this variable to check if the title have changed.
let current_title = Arc::new(Mutex::new(String::new()));
// listen for new pipeline / bus messages
let ct = current_title.clone();
let bus = pipeline.get_bus().expect("Unable to get pipeline bus");
let s = sender.clone();
gtk::timeout_add(250, move || {
while bus.have_pending() {
bus.pop().map(|message| {
//debug!("new message {:?}", message);
Self::parse_bus_message(&message, s.clone(), ct.clone());
});
}
Continue(true)
});
let pipeline = Self {
pipeline,
uridecodebin,
audioconvert,
......@@ -106,39 +140,19 @@ impl PlayerBackend {
muxsinkbin: None,
file_srcpad,
file_blockprobe_id: None,
current_title,
sender,
};
pipeline.create_muxsinkbin("/dev/null");
pipeline
}
pub fn set_state(&self, state: gstreamer::State) {
let _ = self.pipeline.set_state(state);
}
pub fn get_pipeline_bus(&self) -> Bus {
self.pipeline.get_bus().expect("Unable to get pipeline bus")
}
if state == gstreamer::State::Null {
self.sender.send(GstreamerMessage::PlaybackStateChanged(PlaybackState::Stopped)).unwrap();
}
pub fn block_dataflow(&mut self) {
// File branch
let muxsinkbin = self.muxsinkbin.clone();
let file_id = self
.file_srcpad
.add_probe(gstreamer::PadProbeType::BLOCK_DOWNSTREAM, move |_, _| {
// Dataflow is blocked
debug!("Pad is blocked now.");
debug!("Push EOS into muxsinkbin sinkpad...");
let sinkpad = muxsinkbin.clone().unwrap().get_static_pad("sink").unwrap();
sinkpad.send_event(gstreamer::Event::new_eos().build());
gstreamer::PadProbeReturn::Ok
})
.unwrap();
// We need the padprobe id later to remove the block probe
self.file_blockprobe_id = Some(file_id);
let _ = self.pipeline.set_state(state);
}
pub fn new_source_uri(&mut self, source: &str) {
......@@ -152,24 +166,80 @@ impl PlayerBackend {
let _ = self.pipeline.set_state(State::Playing);
}
pub fn new_filesink_location(&mut self, location: &str) {
debug!("Update filesink location to \"{}\"...", location);
pub fn start_recording(&mut self, path: PathBuf) {
debug!("Start recording to \"{:?}\"...", path);
debug!("Destroy old muxsinkbin");
let muxsinkbin = self.muxsinkbin.take().unwrap();
let _ = muxsinkbin.set_state(State::Null);
self.pipeline.remove(&muxsinkbin).unwrap();
debug!("Destroy old muxsinkbin...");
if self.muxsinkbin.is_some() {
let muxsinkbin = self.muxsinkbin.take().unwrap();
let _ = muxsinkbin.set_state(State::Null);
self.pipeline.remove(&muxsinkbin).unwrap();
} else {
debug!("No muxsinkbin available - nothing to destroy");
}
debug!("Create new muxsinkbin");
self.create_muxsinkbin(location);
self.create_muxsinkbin(path);
debug!("Remove block probe..."); //TODO: Fix crash here... (called `Option::unwrap()` on a `None) 169
self.file_srcpad.remove_probe(self.file_blockprobe_id.take().unwrap());
// Remove block probe id, if available
debug!("Remove block probe...");
match self.file_blockprobe_id.take() {
Some(id) => self.file_srcpad.remove_probe(id),
None => (),
}
debug!("Everything ok.");
}
fn create_muxsinkbin(&mut self, location: &str) {
pub fn stop_recording(&mut self) -> Option<Song> {
debug!("Stop recording... ({:?})", self.muxsinkbin);
let muxsinkbin = self.muxsinkbin.clone();
if muxsinkbin.is_some() {
// Get location property from muxsinkbin
let path_value = muxsinkbin.clone().unwrap().get_by_name("filesink").unwrap().get_property("location").unwrap();
let path_string: String = path_value.get().unwrap();
let path: PathBuf = PathBuf::from(path_string);
// Remove file extension (.ogg) from filename
let mut title_path = path.clone();
title_path.set_extension("");
let title = title_path.file_name().unwrap().to_str().unwrap();
let file_id = self
.file_srcpad
.add_probe(gstreamer::PadProbeType::BLOCK_DOWNSTREAM, move |_, _| {
// Dataflow is blocked
debug!("Push EOS into muxsinkbin sinkpad...");
let sinkpad = muxsinkbin.clone().unwrap().get_static_pad("sink").unwrap();
sinkpad.send_event(gstreamer::Event::new_eos().build());
gstreamer::PadProbeReturn::Ok
})
.unwrap();
// We need the padprobe id later to remove the block probe
self.file_blockprobe_id = Some(file_id);
// Create song and return it
let song = Song::new(title, path.clone());
return Some(song);
} else {
debug!("No muxsinkbin available - nothing to stop");
}
None
}
pub fn is_recording(&self) -> bool {
self.muxsinkbin.is_some()
}
pub fn get_current_song_title(&self) -> String {
self.current_title.lock().unwrap().clone()
}
fn create_muxsinkbin(&mut self, path: PathBuf) {
// Create vorbisenc
let vorbisenc = ElementFactory::make("vorbisenc", "vorbisenc").unwrap();
......@@ -178,8 +248,7 @@ impl PlayerBackend {
// Create filesink
let filesink = ElementFactory::make("filesink", "filesink").unwrap();
filesink.set_property("location", &location).unwrap();
filesink.set_property("location", &path.to_str().unwrap()).unwrap();
// Create bin
let bin = Bin::new("bin");
bin.set_property("message-forward", &true).unwrap();
......@@ -204,64 +273,47 @@ impl PlayerBackend {
self.muxsinkbin = Some(bin);
}
}
/////////////////////////////////////////////////////////////////////////////////////
// //
// # Gstreamer Pipeline //
// //
// --------- ----------- ----------- -------- ---------- //
// | filesrc | -> | decodebin | -> | vorbisenc | -> | oggmux | -> | filesink | //
// --------- ----------- ----------- -------- ---------- //
// //
// We need a separate pipeline for exporting a song, otherwise the duration would //
// be wrong. For reference: //
// //
// http://gstreamer-devel.966125.n4.nabble.com/Dynamically-change-filesink-File-duration-problem-td4689353.html
// //
/////////////////////////////////////////////////////////////////////////////////////
#[derive(Clone)]
pub struct ExportBackend {
pipeline: Pipeline,
path: String,
export_path: String,
}
impl ExportBackend {
pub fn new(path: &str, export_path: &str) -> Self {
let pipeline = Pipeline::new("export_pipeline");
let filesrc = ElementFactory::make("filesrc", "filesrc").unwrap();
let decodebin = ElementFactory::make("decodebin", "decodebin").unwrap();
let vorbisenc = ElementFactory::make("vorbisenc", "vorbisenc").unwrap();
let oggmux = ElementFactory::make("oggmux", "oggmux").unwrap();
let filesink = ElementFactory::make("filesink", "filesink").unwrap();
pipeline.add_many(&[&filesrc, &decodebin, &vorbisenc, &oggmux, &filesink]).unwrap();
Element::link_many(&[&vorbisenc, &oggmux, &filesink]).unwrap();
Element::link_many(&[&filesrc, &decodebin]).unwrap();
let vorbis = vorbisenc.clone();
decodebin.connect_pad_added(move |_, src_pad| {
let sink_pad = vorbis.get_static_pad("sink").expect("Failed to get static sink pad from convert");
let _ = src_pad.link(&sink_pad);
});
filesrc.set_property("location", &path).unwrap();
filesink.set_property("location", &export_path).unwrap();
Self {
pipeline,
path: path.to_string(),
export_path: export_path.to_string(),
}
}
pub fn start(&self) {
debug!("* Export song **");
debug!("Cached song path: {}", self.path);
debug!("Export song path: {}", self.export_path);
let _ = self.pipeline.set_state(State::Playing);
fn parse_bus_message(message: &gstreamer::Message, sender: Sender<GstreamerMessage>, current_title: Arc<Mutex<String>>)