Commit 83a7de13 authored by Bilal Elmoussaoui's avatar Bilal Elmoussaoui Committed by GitHub

Refactor the code (#88)

* Code refactoring part 1

* refactor part 2

* refactor part 3

* prepare for 0.2.3
parent 4c82b8b4
[submodule "subprojects/libgd"]
path = subprojects/libgd
url = https://gitlab.gnome.org/GNOME/libgd.git
......@@ -19,10 +19,11 @@
from gettext import gettext as _
from gi import require_version
require_version("Gtk", "3.0")
from gi.repository import Gtk, GLib, Gio, Gdk, GObject
from gi.repository import Gtk, GLib, Gio, Gdk
from .widgets import Window, AboutDialog
from .models import Settings, Keyring, Clipboard, Logger
from .models import Settings, Clipboard, Logger
class Application(Gtk.Application):
......@@ -45,24 +46,12 @@ class Application(Gtk.Application):
Application.instance = Application()
return Application.instance
def setup_css(self):
"""Setup the CSS and load it."""
uri = 'resource:///com/github/bilelmoussaoui/Authenticator/style.css'
provider_file = Gio.File.new_for_uri(uri)
provider = Gtk.CssProvider()
screen = Gdk.Screen.get_default()
context = Gtk.StyleContext()
provider.load_from_file(provider_file)
context.add_provider_for_screen(screen, provider,
Gtk.STYLE_PROVIDER_PRIORITY_USER)
Logger.debug("Loading CSS")
def do_startup(self):
"""Startup the application."""
Gtk.Application.do_startup(self)
# Unlock the keyring
self.generate_menu()
self.setup_css()
self.__generate_menu()
Application.__setup_css()
# Set the default night mode
is_night_mode = Settings.get_default().is_night_mode
......@@ -70,52 +59,65 @@ class Application(Gtk.Application):
gtk_settings.set_property("gtk-application-prefer-dark-theme",
is_night_mode)
def generate_menu(self):
@staticmethod
def __setup_css():
"""Setup the CSS and load it."""
uri = 'resource:///com/github/bilelmoussaoui/Authenticator/style.css'
provider_file = Gio.File.new_for_uri(uri)
provider = Gtk.CssProvider()
screen = Gdk.Screen.get_default()
context = Gtk.StyleContext()
provider.load_from_file(provider_file)
context.add_provider_for_screen(screen, provider,
Gtk.STYLE_PROVIDER_PRIORITY_USER)
Logger.debug("Loading CSS")
def __generate_menu(self):
"""Generate application menu."""
settings = Settings.get_default()
# Help section
help_content = Gio.Menu.new()
# Main section
main_content = Gio.Menu.new()
# Night mode action
help_content.append_item(Gio.MenuItem.new(_("Night Mode"),
main_content.append_item(Gio.MenuItem.new(_("Night Mode"),
"app.night_mode"))
help_content.append_item(Gio.MenuItem.new(_("About"), "app.about"))
help_content.append_item(Gio.MenuItem.new(_("Quit"), "app.quit"))
help_section = Gio.MenuItem.new_section(None, help_content)
main_content.append_item(Gio.MenuItem.new(_("About"), "app.about"))
main_content.append_item(Gio.MenuItem.new(_("Quit"), "app.quit"))
help_section = Gio.MenuItem.new_section(None, main_content)
self._menu.append_item(help_section)
is_night_mode = settings.is_night_mode
gv_is_night_mode = GLib.Variant.new_boolean(is_night_mode)
action = Gio.SimpleAction.new_stateful("night_mode", None,
gv_is_night_mode)
action.connect("change-state", self.on_night_mode)
action.connect("change-state", self.__on_night_mode)
self.add_action(action)
action = Gio.SimpleAction.new("about", None)
action.connect("activate", self.on_about)
action.connect("activate", self.__on_about)
self.add_action(action)
action = Gio.SimpleAction.new("quit", None)
action.connect("activate", self.on_quit)
action.connect("activate", self.__on_quit)
self.add_action(action)
def do_activate(self, *args):
def do_activate(self, *_):
"""On activate signal override."""
resources_path = "/com/github/bilelmoussaoui/Authenticator/"
Gtk.IconTheme.get_default().add_resource_path(resources_path)
window = Window.get_default()
window.set_application(self)
window.set_menu(self._menu)
window.connect("delete-event", lambda x, y: self.on_quit())
window.connect("delete-event", lambda x, y: self.__on_quit())
self.add_window(window)
window.show_all()
window.present()
def set_use_qrscanner(self, state):
@staticmethod
def set_use_qrscanner(state):
Application.USE_QRSCANNER = state
def on_night_mode(self, action, *args):
def __on_night_mode(self, action, *_):
"""Switch night mode."""
settings = Settings.get_default()
is_night_mode = not settings.is_night_mode
......@@ -125,7 +127,8 @@ class Application(Gtk.Application):
gtk_settings.set_property("gtk-application-prefer-dark-theme",
is_night_mode)
def on_about(self, *args):
@staticmethod
def __on_about(*_):
"""
Shows about dialog
"""
......@@ -134,15 +137,7 @@ class Application(Gtk.Application):
dialog.run()
dialog.destroy()
def on_settings(self, *args):
"""
Shows settings window
"""
settings_window = SettingsWindow()
settings_window.set_attached_to(Window.get_default())
settings_window.show_window()
def on_quit(self, *args):
def __on_quit(self, *_):
"""
Close the application, stops all threads
and clear clipboard for safety reasons
......
......@@ -19,7 +19,7 @@
from .settings import Settings
from .qr_reader import QRReader
from .database import Database
from .code import Code
from .otp import OTP
from .account import Account
from .logger import Logger
from .keyring import Keyring
......
......@@ -23,66 +23,72 @@ from time import sleep
from gi.repository import GObject
from .clipboard import Clipboard
from .code import Code
from .database import Database
from .keyring import Keyring
from .logger import Logger
from .otp import OTP
class Account(GObject.GObject, Thread):
__gsignals__ = {
'code_updated': (GObject.SignalFlags.RUN_LAST, None, (str,)),
'otp_updated': (GObject.SignalFlags.RUN_LAST, None, (str,)),
'counter_updated': (GObject.SignalFlags.RUN_LAST, None, (str,)),
'removed': (GObject.SignalFlags.RUN_LAST, None, ()),
}
def __init__(self, _id, name, provider, secret_id):
def __init__(self, _id, username, provider, secret_id):
Thread.__init__(self)
GObject.GObject.__init__(self)
self.counter_max = 30
self._alive = True
self.counter = self.counter_max
self.id = _id
self.name = name
self.username = username
self.provider = provider
self.secret_id = secret_id
_secret = Keyring.get_by_id(self.secret_id)
if _secret:
self._code = Code(_secret)
token = Keyring.get_by_id(self.secret_id)
if token:
self.otp = OTP(token)
self._code_generated = True
else:
self._code = None
self.otp = None
self._code_generated = False
Logger.error("Could not read the secret code,"
"the keyring keys were reset manually")
self.start()
@staticmethod
def create(name, provider, secret_id):
encrypted_secret = sha256(secret_id.encode('utf-8')).hexdigest()
obj = Database.get_default().insert(name, provider, encrypted_secret)
Keyring.insert(encrypted_secret, provider, name, secret_id)
return Account(obj['id'], name, provider, encrypted_secret)
def create(username, provider, token):
"""
Create a new Account.
:param username: the account's username
:param provider: the account's provider
:param token: the OTP secret token
:return: Account object
"""
# Encrypt the token to create a secret_id
secret_id = sha256(token.encode('utf-8')).hexdigest()
# Save the account
obj = Database.get_default().insert(username, provider, secret_id)
Keyring.insert(secret_id, provider, username, token)
def update(self, **kwargs):
self.name = kwargs.get("name", self.name)
self.provider = kwargs.get("provider", self.provider)
Database.get_default().update(kwargs, self.id)
return Account(obj['id'], username, provider, secret_id)
@property
def secret_code(self):
if self._code_generated:
return self._code.secret_code
return None
def update(self, username, provider):
"""
Update the account name and/or provider.
:param username: the account's username
:param provider: the account's provider
"""
Database.get_default().update(username, provider, self.id)
def run(self):
while self._code_generated and self._alive:
self.counter -= 1
if self.counter == 0:
self.counter = self.counter_max
self._code.update()
self.emit("code_updated", self.secret_code)
self.otp.update()
self.emit("otp_updated", self.otp.pin)
self.emit("counter_updated", self.counter)
sleep(1)
......@@ -93,6 +99,9 @@ class Account(GObject.GObject, Thread):
self._alive = False
def remove(self):
"""
Remove the account.
"""
self.kill()
Database.get_default().remove(self.id)
Keyring.remove(self.secret_id)
......@@ -100,6 +109,6 @@ class Account(GObject.GObject, Thread):
Logger.debug("Account '{}' with id {} was removed".format(self.name,
self.id))
def copy_token(self):
"""Copy the secret token to the clipboard."""
Clipboard.set(self._code.secret_code)
def copy_pin(self):
"""Copy the OTP to the clipboard."""
Clipboard.set(self.otp.pin)
......@@ -18,6 +18,7 @@
"""
from gi import require_version
require_version("Gtk", "3.0")
from gi.repository import Gdk, Gtk
......@@ -25,17 +26,22 @@ from gi.repository import Gdk, Gtk
class Clipboard:
"""Clipboard handler."""
def __init__(self):
pass
@staticmethod
def clear():
"""Clear the clipboard"""
"""Clear the clipboard."""
clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD)
clipboard.clear()
@staticmethod
def set(value):
def set(string):
"""
Set a new value on the clipboard
:param value: string; the value to set
Copy a string to the clipboard.
:param string: the string to copy.
:type string: str
"""
clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD)
clipboard.set_text(value, -1)
clipboard.set_text(string, -1)
......@@ -16,11 +16,12 @@
You ould have received a copy of the GNU General Public License
along with Authenticator. If not, see <http://www.gnu.org/licenses/>.
"""
from collections import OrderedDict
import sqlite3
from collections import OrderedDict
from os import path, makedirs
from gi.repository import GLib
from .logger import Logger
......@@ -29,21 +30,19 @@ class Database:
# Default instance
instance = None
# Database version number
db_version = 3
table_name = "accounts"
primary_key = "id"
def __init__(self):
db_dir = path.join(GLib.get_user_config_dir(), 'Authenticator/')
db_file = path.join(
db_dir, 'database-{}.db'.format(str(Database.db_version)))
makedirs(path.dirname(db_dir), exist_ok=True)
if not path.exists(db_file):
with open(db_file, 'w') as file_obj:
file_obj.write('')
self.conn = sqlite3.connect(db_file)
if not self.is_table_exists():
Logger.debug("SQL: Table 'accounts' does not exist")
self.create_table()
Logger.debug("SQL: Table 'accounts' created successfully")
self.__create_database_file()
self.conn = sqlite3.connect(self.db_file)
if not self.__is_table_exists():
Logger.debug("[SQL]: Table 'accounts' does not exist")
self.__create_table()
Logger.debug("[SQL]: Table 'accounts' created successfully")
@staticmethod
def get_default():
......@@ -52,35 +51,41 @@ class Database:
Database.instance = Database()
return Database.instance
def insert(self, name, provider, secret):
@property
def db_file(self):
return path.join(GLib.get_user_config_dir(),
'Authenticator',
'database-{}.db'.format(str(Database.db_version))
)
def insert(self, username, provider, secret_id):
"""
Insert a new account to the database
:param name: Account name
:param username: Account name
:param provider: Service provider
:param secret: the secret code
:return: a dict with id, name and encrypted_secret
:param secret_id: the secret code
"""
query = "INSERT INTO accounts (name, provider, secret_code) VALUES (?, ?, ?)"
query = "INSERT INTO {table} (username, provider, secret_id) VALUES (?, ?, ?)".format(table=self.table_name)
try:
self.conn.execute(query, [name, provider, secret])
self.conn.execute(query, [username, provider, secret_id])
self.conn.commit()
return OrderedDict([
("id", self.latest_id),
("name", name),
("name", username),
("provider", provider),
("secret_id", secret)
("secret_id", secret_id)
])
except Exception as error:
Logger.error("[SQL] Couldn't add a new account")
Logger.error(str(error))
def get_secret_code(self, id_):
def get_secret_id(self, id_):
"""
Get the secret code by id
:param id_: int the account id
:return: the secret id
"""
query = "SELECT secret_code FROM accounts WHERE uid=?"
query = "SELECT secret_id FROM {table} WHERE {key}=?".format(key=self.primary_key, table=self.table_name)
try:
data = self.conn.cursor().execute(query, (id_,))
return data.fetchone()[0]
......@@ -91,35 +96,27 @@ class Database:
def remove(self, id_):
"""
Remove an account by id
:param id_: (int) account uid
Remove an account by ID.
:param id_: the account ID
:type id_: int
"""
query = "DELETE FROM accounts WHERE uid=?"
query = "DELETE FROM {table} WHERE {key}=?".format(key=self.primary_key, table=self.table_name)
try:
self.conn.execute(query, (id_,))
self.conn.commit()
except Exception as error:
Logger.error("[SQL] Couldn't remove account by uid")
Logger.error("[SQL] Couldn't remove the account '{}'".format(id_))
Logger.error(str(error))
def update(self, kwargs, id):
def update(self, username, provider, id_):
"""
Update an account by id
"""
query = "UPDATE accounts SET "
values = []
i = 0
for key, value in kwargs.items():
query += " {} = ?".format(key)
if i != len(kwargs.keys()) - 1:
query += ", "
i+=1
values.append(value)
query += " WHERE uid=?"
values.append(id)
query = "UPDATE {table} SET username=?, provider=? WHERE {key}=?".format(key=self.primary_key,
table=self.table_name)
try:
self.conn.execute(query, values)
self.conn.execute(query, (username, provider, id_))
self.conn.commit()
except Exception as error:
Logger.error("[SQL] Couldn't update account name by id")
......@@ -128,10 +125,11 @@ class Database:
@property
def count(self):
"""
Count number of accounts
:return: (int) count
Count the total number of existing accounts.
:return: int
"""
query = "SELECT COUNT(uid) AS count FROM accounts"
query = "SELECT COUNT({key}) AS count FROM {table}".format(key=self.primary_key, table=self.table_name)
try:
data = self.conn.cursor().execute(query)
return data.fetchone()[0]
......@@ -143,19 +141,20 @@ class Database:
@property
def accounts(self):
"""
Fetch list of accounts
:return: (tuple) list of accounts
Retrieve the list of accounts.
:return list
"""
query = "SELECT * FROM accounts ORDER BY provider ASC, name DESC"
query = "SELECT * FROM {table} ORDER BY provider ASC, username DESC".format(table=self.table_name)
try:
data = self.conn.cursor().execute(query)
accounts = data.fetchall()
return [OrderedDict([
("id", account[0]),
("name", account[1]),
("provider", account[2]),
("secret_id", account[3])
]) for account in accounts]
("id", account[0]),
("username", account[1]),
("provider", account[2]),
("secret_id", account[3])
]) for account in accounts]
except Exception as error:
Logger.error("[SQL] Couldn't fetch accounts list")
Logger.error(str(error))
......@@ -164,26 +163,39 @@ class Database:
@property
def latest_id(self):
"""
Get the latest uid on accounts table
:return: (int) latest uid
Retrieve the latest added ID from accounts.
:return: int
"""
query = "SELECT uid FROM accounts ORDER BY uid DESC LIMIT 1;"
query = "SELECT {key} FROM {table} ORDER BY {key} DESC LIMIT 1;".format(key=self.primary_key,
table=self.table_name)
try:
data = self.conn.cursor().execute(query)
return data.fetchone()[0]
except Exception as error:
Logger.error("[SQL] Couldn't fetch the latest uid")
Logger.error("[SQL] Couldn't fetch the latest id")
Logger.error(str(error))
return None
def create_table(self):
"""Create 'accounts' table."""
query = '''CREATE TABLE "accounts" (
"uid" INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL UNIQUE ,
"name" VARCHAR NOT NULL ,
def __create_database_file(self):
"""
Create an empty database file for the first start of the application.
"""
makedirs(path.dirname(self.db_file), exist_ok=True)
if not path.exists(self.db_file):
with open(self.db_file, 'w') as file_obj:
file_obj.write('')
def __create_table(self):
"""
Create the needed tables to run the application.
"""
query = '''CREATE TABLE "{table}" (
"{key}" INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL UNIQUE,
"username" VARCHAR NOT NULL,
"provider" VARCHAR NOT NULL,
"secret_code" VARCHAR NOT NULL UNIQUE
)'''
"secret_id" VARCHAR NOT NULL UNIQUE
)'''.format(table=self.table_name, key=self.primary_key)
try:
self.conn.execute(query)
self.conn.commit()
......@@ -191,12 +203,12 @@ class Database:
Logger.error("[SQL] Impossible to create table 'accounts'")
Logger.error(str(error))
def is_table_exists(self):
def __is_table_exists(self):
"""
Check if accounts table exists
:return: (bool)
Check if the used table are created or not.
:return bool
"""
query = "SELECT uid from accounts LIMIT 1"
query = "SELECT {key} from {table} LIMIT 1".format(key=self.primary_key, table=self.table_name)
try:
self.conn.cursor().execute(query)
return True
......
......@@ -41,43 +41,53 @@ class Keyring:
return Keyring.instance
@staticmethod
def get_by_id(id_):
"""Return the secret code"""
def get_by_id(secret_id):
"""
Return the OTP token based on a secret ID.
:param secret_id: the secret ID associated to an OTP token
:type secret_id: str
:return: the secret OTP token.
"""
schema = Keyring.get_default().schema
password = Secret.password_lookup_sync(schema, {"id": str(id_)}, None)
password = Secret.password_lookup_sync(schema, {"id": str(secret_id)}, None)
return password
@staticmethod
def insert(id_, provider, name, secret_code):
def insert(secret_id, provider, username, token):
"""
Insert a secret code to Keyring database
:param id_: the encrypted id
:param provider: the provider's name
:param name: the identity/username
:param secret_code: the secret code
Save a secret OTP token.
:param secret_id: The secret ID associated to the OTP token
:param provider: the provider name
:param username: the username
:param token: the secret OTP token.
"""
schema = Keyring.get_default().schema
data = {
"id": str(id_),
"name": str(name),
"id": str(secret_id),
"name": str(username),
}
Secret.password_store_sync(
schema,
data,
Secret.COLLECTION_DEFAULT,
"{provider} OTP ({name})".format(provider=provider, name=name),
secret_code,
"{provider} OTP ({username})".format(provider=provider, username=username),
token,
None
)
@staticmethod
def remove(id_):
def remove(secret_id):
"""
Remove an account from Gnome Keyring by secret id
:param id_: the encrypted secret code.
:return: bool
Remove a specific secret OTP token.
:param secret_id: the secret ID associated to the OTP token
:return bool: Either the token was removed successfully or not
"""
schema = Keyring.get_default().schema
removed = Secret.password_clear_sync(schema, {"id": str(id_)}, None)
return removed
success = Secret.password_clear_sync(schema, {"id": str(secret_id)}, None)
return success
......@@ -30,13 +30,16 @@ class Logger:
# Date format
DATE = "%Y-%m-%d %H:%M:%S"
def __init__(self):
pass
@staticmethod
def new():
"""Create a new instance of Logger."""
logger = logging.getLogger('authenticator')
handler = logging.StreamHandler()
formater = logging.Formatter(Logger.FORMAT, Logger.DATE)
handler.setFormatter(formater)
formatter = logging.Formatter(Logger.FORMAT, Logger.DATE)
handler.setFormatter(formatter)
logger.setLevel(logging.DEBUG)
logger.addHandler(handler)
return logger
......@@ -50,7 +53,7 @@ class Logger:
return Logger.instance
@staticmethod
def setLevel(level):
def set_level(level):
"""Set the logging level."""
Logger.get_default().setLevel(level)
......
......@@ -16,51 +16,50 @@
You should have received a copy of the GNU General Public License
along with Authenticator. If not, see <http://www.gnu.org/licenses/>.
"""
from .logger import Logger
import binascii
from .logger import Logger
try:
from pyotp import TOTP
except ImportError:
Logger.error("Impossible to import TOTP, please install PyOTP first")
class Code:
class OTP(TOTP):
"""
OTP (One-time password) handler using PyOTP.
"""
def __init__(self, token):
self._token = token
self._secret_code = None
self.create()
"""
:param token: the OTP token.
"""
TOTP.__init__(self, token)
self.pin = None
self.update()
@staticmethod
def is_valid(token):
"""Validate a token."""
"""
Validate a OTP token.
:param token: OTP token
:type token: str
:return: bool