diff --git a/upscaler/algorithms/__init__.py b/upscaler/algorithms/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/upscaler/algorithms/commons.py b/upscaler/algorithms/commons.py new file mode 100644 index 0000000000000000000000000000000000000000..844d5cf42ca74dbc1c8dab6190d6c52bfc68b6be --- /dev/null +++ b/upscaler/algorithms/commons.py @@ -0,0 +1,164 @@ +# commons.py: common data structures for algorithms +# +# Copyright (C) 2023 Upscaler Contributors +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, version 3. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# SPDX-License-Identifier: GPL-3.0-only + +from typing import Any, Callable, Optional +from enum import Enum +from gi.repository import Gtk, Adw + +class AlgorithmFailed(Exception): + """Raise when the algorithm has failed.""" + def __init__(self, result_code: int, output: str) -> None: + super().__init__() + self.result_code = result_code + self.output = output + + def __str__(self) -> str: + return f'Algorithm failed.\nResult code: {self.result_code}\nOutput: {self.output}' + +class AlgorithmWarning(Exception): + """Raise when the output could be damaged.""" + pass + +class Property: + def __init__(self, title: str, subtitle: Optional[str], tooltip: Optional[str]) -> None: + self._widget = Adw.ActionRow(title=title) + if subtitle: + self._widget.set_subtitle(subtitle) + if tooltip: + self._widget.set_tooltip_text(tooltip) + self._widget.add_css_class('property') + + def set_title(self, title: str) -> None: + self._widget.set_title(title) + + def set_subtitle(self, subtitle: str) -> None: + self._widget.set_subtitle(subtitle) + + def widget(self) -> Gtk.Widget: + return self._widget + +class Option: + """Option base class.""" + def __init__(self, + default_value: Any, + desc: str, + on_changed: Optional[Callable[[], None]]) -> None: + self._default_value = default_value + self._value = default_value + self._desc = desc + self._on_changed = on_changed + + def widget(self) -> Gtk.Widget: + raise NotImplementedError() + + def get_value(self) -> Any: + return self._value + + def set_value(self, value: Any) -> None: + self._value = value + if self._on_changed is not None: + self._on_changed() + + def reset(self) -> None: + self.set_value(self._default_value) + +class EnumOption(Option): + """String enum option.""" + def __init__(self, + enum: Enum, + desc: str, + on_changed: Optional[Callable[[], None]]) -> None: + super().__init__(enum, desc, on_changed) + + self.Enum = type(enum) + + self._strings = [e.value for e in self.Enum] + string_list = Gtk.StringList.new(self._strings) + + self._row = Adw.ComboRow(title=desc, + model=string_list) + self._row.connect('notify::selected', self._on_row_selected) + + def widget(self) -> Gtk.Widget: + return self._row + + def _on_row_selected(self, row: Adw.ComboRow, *_: Any) -> None: + self.set_value(self.Enum(self._strings[row.get_selected()])) + + def reset(self) -> None: + super().reset() + self._row.set_selected(self._strings.index(self.get_value().value)) + +class Algorithm: + """Algorithm base class.""" + def __init__(self, properties: dict[str, Property], options: dict[str, Option]) -> None: + self.input_file_path: Optional[str] = None + self.output_file_path: Optional[str] = None + self.cancelled = False + self._properties = properties + self._options = options + + def __del__(self) -> None: + # Cancel operation on destruction + self.cancel() + + def get_properties(self) -> list[Property]: + """Get the list of all properties.""" + return list(self._properties.values()) + + def get_property(self, name: str) -> Property: + """Get the property with 'name'.""" + if not name in self._properties: + raise ValueError(f'Unable to find {name} property') + + return self._properties[name] + + def get_options(self) -> list[Option]: + """Get the list of all the options.""" + return list(self._options.values()) + + def get_option(self, name: str) -> Any: + """Get the value of the option with 'name'.""" + if not name in self._options: + raise ValueError(f'Unable to find {name} option') + + return self._options[name].get_value() + + def reset_options(self) -> None: + """Reset all the options to the default value.""" + for opt in self.get_options(): + opt.reset() + + def set_input_file(self, file_path: str, *args: Any) -> None: + self.input_file_path = file_path + + def set_output_file(self, file_path: str) -> None: + self.output_file_path = file_path + + def default_output_file_name(self) -> str: + raise NotImplementedError() + + def start(self, + progress_callback: Callable[[float], None], + done_callback: Callable[[Any, Optional[Exception]], None]) -> None: + """Start the algorithm.""" + raise NotImplementedError() + + def cancel(self) -> None: + """Cancel the current operation.""" + raise NotImplementedError() diff --git a/upscaler/algorithms/realesrgan_ncnn_vulkan.py b/upscaler/algorithms/realesrgan_ncnn_vulkan.py new file mode 100644 index 0000000000000000000000000000000000000000..8e074de3153e9bd4e81717775baa9e59e55ead56 --- /dev/null +++ b/upscaler/algorithms/realesrgan_ncnn_vulkan.py @@ -0,0 +1,137 @@ +# realesrgan_ncnn_vulkan.py: Real-ESRGAN ncnn Vulkan implementation +# +# Copyright (C) 2023 Upscaler Contributors +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, version 3. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# SPDX-License-Identifier: GPL-3.0-only + +from os.path import basename, splitext +from enum import Enum +import re +import subprocess +from gi.repository import GLib, GdkPixbuf +from upscaler.algorithms.commons import (Algorithm, + Property, + EnumOption, + AlgorithmFailed, + AlgorithmWarning) +from upscaler.threading import RunAsync +from typing import Any, IO, Callable, Optional, cast +from gettext import gettext as _ + +ALG_WARNINGS = [ + 'vkQueueSubmit failed' +] + +# Real-ESRGAN ncnn Vulkan only properly supports 4x. +UPSCALE_FACTOR = 4 + +class ImageTypeEnum(Enum): + PHOTO = _("Photo") + CARTOON = _("Cartoon/Anime") + +MODEL_IMAGES = { + ImageTypeEnum.PHOTO: 'realesrgan-x4plus', + ImageTypeEnum.CARTOON: 'realesrgan-x4plus-anime', +} + +class RealesrganNcnnVulkan(Algorithm): + def __init__(self) -> None: + super().__init__( + { + 'input': Property(_('Image Size'), None, None), + 'output': Property(_('Image Size After Upscaling'), None, None), + }, + { + 'model': EnumOption(ImageTypeEnum.PHOTO, 'Type of Image', None), + }) + self._process: Optional[subprocess.Popen[str]] = None + self._default_output: Optional[str] = None + + def set_input_file(self, file_path: str, *args: Any) -> None: + pixbuf: GdkPixbuf.Pixbuf = args[0] + super().set_input_file(file_path, pixbuf) + property = self.get_property('input') + height = pixbuf.props.height + width = pixbuf.props.width + property.set_subtitle(f'{width} × {height}') + + property = self.get_property('output') + property.set_subtitle(f'{width * UPSCALE_FACTOR} × {height * UPSCALE_FACTOR}') + + base_path = basename(splitext(file_path)[0]) + self._default_output = f'{base_path}-{width * UPSCALE_FACTOR}x{height * UPSCALE_FACTOR}-upscaled.png' + + def default_output_file_name(self) -> str: + if self._default_output: + return self._default_output + else: + raise NotImplementedError + + def start(self, + progress_callback: Callable[[float], None], + done_callback: Callable[[Any, Optional[Exception]], None]) -> None: + """Start the algorithm.""" + + self.cancelled = False + model = MODEL_IMAGES[self.get_option('model')] + + def run() -> None: + if self.input_file_path is None or self.output_file_path is None: + return + + command = ['realesrgan-ncnn-vulkan', + '-i', self.input_file_path, + '-o', self.output_file_path, + '-n', model, + '-s', '4'] + self._process = subprocess.Popen(command, + stderr=subprocess.PIPE, + universal_newlines=True) + print('Running: ', end='') + print(*command) + + # Read each line, query the percentage and update the progress bar + output = '' + bad = False + for line in iter(cast(IO[str], self._process.stderr).readline, ''): + print(line, end='') + output += line + res = re.match('^(\\d*.\\d+)%$', line) + if res: + GLib.idle_add(progress_callback, float(res.group(1))) + else: + """ Check if this line is a warning. """ + if bad: continue + for warn in ALG_WARNINGS: + if re.match(warn, line) is not None: + bad = True + continue + + # Process algorithm output + if self.cancelled: + return + result = self._process.poll() + if result is None or result != 0: + if result is None: result = -1 + raise AlgorithmFailed(result, output) + if bad: + raise AlgorithmWarning + + RunAsync(run, done_callback) + + def cancel(self) -> None: + if self._process: + self.cancelled = True + self._process.kill() diff --git a/upscaler/gtk/window.blp b/upscaler/gtk/window.blp index 6535a1371f3e6f2de1f1702f2687f8e0d6e89992..05b5f308477f81e57489f5cb72db03edea3904e2 100644 --- a/upscaler/gtk/window.blp +++ b/upscaler/gtk/window.blp @@ -94,27 +94,11 @@ template UpscalerWindow : Adw.ApplicationWindow { height-request: 192; } - Adw.PreferencesGroup { + Adw.PreferencesGroup properties { title: _("Properties"); - - Adw.ActionRow action_image_size { - title: _("Image Size"); - - styles [ - "property", - ] - } - - Adw.ActionRow action_upscale_image_size { - title: _("Image Size After Upscaling"); - - styles [ - "property", - ] - } } - Adw.PreferencesGroup { + Adw.PreferencesGroup options { title: _("Options"); // Scaling is broken in Real-ESRGAN with values other than 4. @@ -138,14 +122,6 @@ template UpscalerWindow : Adw.ApplicationWindow { // } // } - Adw.ComboRow combo_models { - title: _("Type of Image"); - model: - StringList string_models {} - - ; - } - Adw.ActionRow { title: _("Save Location"); activatable-widget: button_output; diff --git a/upscaler/meson.build b/upscaler/meson.build index 096c57f6af484f4c12b835afe82673f143fb342e..25bad00bfdc28746e7dd2558fcb3f313cb56471b 100644 --- a/upscaler/meson.build +++ b/upscaler/meson.build @@ -51,4 +51,10 @@ upscaler_sources = [ 'app_profile.py', ] +algorithms_sources = [ + 'algorithms/commons.py', + 'algorithms/realesrgan_ncnn_vulkan.py', +] + install_data(upscaler_sources, install_dir: moduledir) +install_data(algorithms_sources, install_dir: join_paths(moduledir, 'algorithms')) diff --git a/upscaler/window.py b/upscaler/window.py index 3731205e8bbf2dbbfa93259f2f86e9ab51bcf3c4..dbb45f19c01a6d8d668fe31c10e1c75865aeb951 100644 --- a/upscaler/window.py +++ b/upscaler/window.py @@ -16,38 +16,17 @@ # # SPDX-License-Identifier: GPL-3.0-only -from os.path import basename, splitext -import subprocess -import re +from os.path import basename from gi.repository import Adw, Gtk, GLib, Gdk, Gio, Pango, GdkPixbuf from sys import exit -from upscaler.threading import RunAsync +from upscaler.algorithms.commons import Algorithm, AlgorithmWarning +from upscaler.algorithms.realesrgan_ncnn_vulkan import RealesrganNcnnVulkan from upscaler.file_chooser import FileChooser from filecmp import cmp import vulkan # type: ignore -from typing import Any, Callable, Optional, Literal +from typing import Any, Optional, Literal, Callable, cast from gettext import gettext as _ -ALG_WARNINGS = [ - 'vkQueueSubmit failed' -] - -UPSCALE_FACTOR = 4 - -class AlgorithmFailed(Exception): - """Raise when the algorithm has failed.""" - def __init__(self, result_code: int, output: str) -> None: - super().__init__() - self.result_code = result_code - self.output = output - - def __str__(self) -> str: - return f'Algorithm failed.\nResult code: {self.result_code}\nOutput: {self.output}' - -class AlgorithmWarning(Exception): - """Raise when the output could be damaged.""" - pass - @Gtk.Template(resource_path='/io/gitlab/theevilskeleton/Upscaler/gtk/window.ui') class UpscalerWindow(Adw.ApplicationWindow): __gtype_name__ = 'UpscalerWindow' @@ -56,15 +35,13 @@ class UpscalerWindow(Adw.ApplicationWindow): toast: Adw.ToastOverlay = Gtk.Template.Child() # type: ignore stack_upscaler: Gtk.Stack = Gtk.Template.Child() # type: ignore button_input: Gtk.Button = Gtk.Template.Child() # type: ignore - action_image_size: Adw.ActionRow = Gtk.Template.Child() # type: ignore - action_upscale_image_size: Adw.ActionRow = Gtk.Template.Child() # type: ignore + properties: Adw.PreferencesGroup = Gtk.Template.Child() # type: ignore button_upscale: Gtk.Button = Gtk.Template.Child() # type: ignore spinner_loading: Gtk.Spinner = Gtk.Template.Child() # type: ignore image: Gtk.Picture = Gtk.Template.Child() # type: ignore - # video = Gtk.Template.Child() # type: ignore - combo_models: Adw.ComboRow = Gtk.Template.Child() # type: ignore - string_models: Gtk.StringList = Gtk.Template.Child() # type: ignore - # spin_scale = Gtk.Template.Child() # type: ignore + # video = Gtk.Template.Child() + # spin_scale = Gtk.Template.Child() + options: Adw.PreferencesGroup = Gtk.Template.Child() # type: ignore button_output: Gtk.Button = Gtk.Template.Child() # type: ignore label_output: Gtk.Label = Gtk.Template.Child() # type: ignore button_cancel: Gtk.Button = Gtk.Template.Child() # type: ignore @@ -83,25 +60,19 @@ class UpscalerWindow(Adw.ApplicationWindow): raise ValueError("Application should be passed to UpscalerWindow") self.app: Gio.Application = app - # Declare default models and variables - self.model_images = { - 'realesrgan-x4plus': _('Photo'), - 'realesrgan-x4plus-anime': _('Cartoon/Anime'), - } - - self.process: Optional[subprocess.Popen[Any]] = None self.output_file_path: Optional[str] = None self.input_file_path: Optional[str] = None content = Gdk.ContentFormats.new_for_gtype(Gio.File) self.target = Gtk.DropTarget(formats=content, actions=Gdk.DragAction.COPY) - self.string_models.splice(0, 0, list(self.model_images.values())) self.previous_stack = 'stack_welcome_page' + self.algorithm: Optional[Algorithm] = None + self.__set_algorithm(RealesrganNcnnVulkan) + # Connect signals self.button_input.connect('clicked', self.open_file) self.button_upscale.connect('clicked', self.__upscale) self.button_output.connect('clicked', self.__output_location) - self.combo_models.connect('notify::selected', self.__set_model) self.button_cancel.connect('clicked', self.__cancel) self.target.connect('drop', self.__on_drop) self.target.connect('enter', self.__on_enter) @@ -116,22 +87,24 @@ class UpscalerWindow(Adw.ApplicationWindow): def on_file_open(self, input_file_path: str, pixbuf: GdkPixbuf.Pixbuf) -> None: """Open and display file.""" + if self.algorithm is None: + return + + # Set variables self.input_file_path = input_file_path - self.image_size = (pixbuf.get_width(), pixbuf.get_height()) + self.algorithm.set_input_file(input_file_path, pixbuf) # Display image - self.action_image_size.set_subtitle(f'{self.image_size[0]} × {self.image_size[1]}') - self.action_upscale_image_size.set_subtitle(f'{self.image_size[0] * UPSCALE_FACTOR} × {self.image_size[1] * UPSCALE_FACTOR}') self.image.set_pixbuf(pixbuf) # Reset widgets self.label_output.set_label(_('(None)')) self.button_upscale.set_sensitive(False) self.button_upscale.set_has_tooltip(True) - self.combo_models.set_selected(0) self.stack_upscaler.set_visible_child_name('stack_upscale') self.previous_stack = 'stack_upscale' self.spinner_loading.stop() + self.algorithm.reset_options() def __on_file_open_error(self, error: GLib.Error, file_path: str) -> None: """Display error if the format is incompatible.""" @@ -143,6 +116,22 @@ class UpscalerWindow(Adw.ApplicationWindow): """Open the file chooser to load the file.""" FileChooser.open_file(self) + def __set_algorithm(self, AlgorithmType: Any) -> None: + """ Build algorithm and options. """ + if self.algorithm: + # Cleanup algorithm widgets. + for property in self.algorithm.get_properties(): + self.properties.remove(property.widget()) + for opt in self.algorithm.get_options(): + self.options.remove(opt.widget()) + + self.algorithm = cast(Algorithm, AlgorithmType()) + for property in self.algorithm.get_properties(): + self.properties.add(property.widget()) + options = self.algorithm.get_options() + for opt in options: + self.options.add(opt.widget()) + def __output_location(self, *args: Any) -> None: """ Select output file location. @@ -152,6 +141,7 @@ class UpscalerWindow(Adw.ApplicationWindow): def good(output_file_path: str) -> None: # Set variables self.output_file_path = output_file_path + cast(Algorithm, self.algorithm).set_output_file(output_file_path) # Update widgets self.button_upscale.set_sensitive(True) @@ -165,13 +155,13 @@ class UpscalerWindow(Adw.ApplicationWindow): if message: self.toast.add_toast(Adw.Toast.new(message)) - if self.input_file_path is None: - return + try: + default_output = cast(Algorithm, self.algorithm).default_output_file_name() + except NotImplementedError: + default_output = 'output.png' - base_path = basename(splitext(self.input_file_path)[0]) - image_size = [x * UPSCALE_FACTOR for x in self.image_size] FileChooser.output_file(self, - f'{base_path}-{image_size[0]}x{image_size[1]}-upscaled.png', + default_output, good, bad) @@ -191,82 +181,35 @@ class UpscalerWindow(Adw.ApplicationWindow): def __upscale_progress(self, progress: float) -> None: """Updates upscale progress.""" if self.stack_upscaler.get_visible_child_name() == 'stack_upscaling': - self.set_progress(progress) + self.progressbar.set_text(str(progress) + " %") + self.progressbar.set_fraction(progress / 100) def __upscale(self, *args: Any) -> None: - """Initialize algorithm and updates widgets.""" - # Since GTK is not thread safe, prepare some data in the main thread - self.cancelled = False - # Appropriately close child windows def reset_widgets() -> None: self.button_upscale.set_sensitive(True) self.progressbar.set_text(_('Loading…')) self.progressbar.set_fraction(0) - self.cancelled = False - - # Run in a separate thread - def run() -> None: - if self.input_file_path is None or self.output_file_path is None: - raise AlgorithmFailed(0, _("Unexpected error while running the algorithm")) - - command: list[str] = ['realesrgan-ncnn-vulkan', - '-i', self.input_file_path, - '-o', self.output_file_path, - '-n', list(self.model_images)[self.combo_models.get_selected()], - '-s', '4', - ] - - self.process = subprocess.Popen(command, stderr=subprocess.PIPE, universal_newlines=True) - print('Running: ', end='') - print(*command) - - # Read each line, query the percentage and update the progress bar - output = '' - bad = False - if self.process.stderr is not None: - for line in iter(self.process.stderr.readline, ''): - print(line, end='') - output += line - res = re.match('^(\d*.\d+)%$', line) - if res: - GLib.idle_add(self.__upscale_progress, float(res.group(1))) - else: - # Check if this line is a warning - if bad: continue - for warn in ALG_WARNINGS: - if re.match(warn, line) is not None: - bad = True - continue - - # Process algorithm output - result = self.process.poll() - if result != 0 or result is None: - if result is None: - result = 0; - raise AlgorithmFailed(result, output) - - if bad: - raise AlgorithmWarning - - # Run after run() function finishes - def callback(result: Gio.AsyncResult, error: Optional[Exception]) -> None: - if self.cancelled == True: + + # Run after algorithm finishes. + def callback(result: Any, error: Optional[Exception]) -> None: + if cast(Algorithm, self.algorithm).cancelled: self.toast.add_toast(Adw.Toast.new(_('Upscaling Cancelled'))) else: - self.upscaling_completed_dialog(error) + self.__upscaling_completed_dialog(error) self.stack_upscaler.set_visible_child_name('stack_upscale') self.previous_stack = 'stack_upscale' reset_widgets() # Run functions asynchronously - RunAsync(run, callback) + cast(Algorithm, self.algorithm).start(self.__upscale_progress, + callback) self.stack_upscaler.set_visible_child_name('stack_upscaling') self.previous_stack = 'stack_upscaling' self.button_upscale.set_sensitive(False) - def upscaling_completed_dialog(self, error: Optional[Exception]) -> None: + def __upscaling_completed_dialog(self, error: Optional[Exception]) -> None: """Ask the user if they want to open the file.""" if self.output_file_path is None: return @@ -354,24 +297,7 @@ class UpscalerWindow(Adw.ApplicationWindow): if not self.props.is_active: self.app.send_notification('upscaling-done', notification) - def __set_model(self, *args: Any) -> None: - """Set model and print.""" - print(_('Model name: {}').format(list(self.model_images)[self.combo_models.get_selected()])) - - # Update post-upscale image size as the user adjusts the spinner - # def __update_post_upscale_image_size(self, *args): - # upscale_image_size = [ - # self.image_size[1] * int(self.spin_scale.get_value()), - # self.image_size[2] * int(self.spin_scale.get_value()), - # ] - # self.action_upscale_image_size.set_subtitle(f'{upscale_image_size[0]} × {upscale_image_size[1]}') - - def set_progress(self, progress: float) -> None: - """Update progress widget.""" - self.progressbar.set_text(str(progress) + " %") - self.progressbar.set_fraction(progress / 100) - - def close_dialog(self, function: Callable[[], None]) -> None: + def __close_dialog(self, function: Callable[[], None]) -> None: """Prompt the user to stop the algorithm when it is running.""" self.stop_upscaling_dialog = Adw.MessageDialog.new( self, @@ -394,12 +320,10 @@ class UpscalerWindow(Adw.ApplicationWindow): self.spinner_loading.start() def __cancel(self, *args: Any) -> None: - """Stop algorithm.""" def function() -> None: - self.cancelled = True - if self.process: - self.process.kill() - self.close_dialog(function) + if self.algorithm: + self.algorithm.cancel() + self.__close_dialog(function) def __load_image_done(self, _obj: Any, result: Gio.AsyncResult, input_file_path: str) -> None: """Attempt to load image.""" @@ -465,6 +389,6 @@ class UpscalerWindow(Adw.ApplicationWindow): if self.stack_upscaler.get_visible_child_name() == 'stack_upscaling': def function() -> None: exit() - self.close_dialog(function) + self.__close_dialog(function) return True return False