Big refactor. Reorganize, make the code cleaner.

This commit is contained in:
0x80 2021-12-30 00:00:00 +00:00
parent 05d844b337
commit 5cc99dd432
Signed by: 0x80
GPG Key ID: 68368BCBC000EF51
34 changed files with 177 additions and 208 deletions

View File

@ -1 +1 @@
scripts/rsstube.py
src/rsstube.py

1
rsstube-gtk Symbolic link
View File

@ -0,0 +1 @@
src/rsstube-gtk.py

View File

@ -1,173 +0,0 @@
#!/usr/bin/python3
import sys,importlib
from utils import debug,notify,warn,error,success
import opml
network = True
verbosity = 3
args = {}
arg_count = 0
output_format = "url"
output_opml = None
output_filename = None
output = None
config = None
try:
from pathlib import Path
home = str(Path.home())
local_config_path = home + "/.config/rsstube/config"
config = open(local_config_path,"r")
except FileNotFoundError:
try:
global_config_path = "/etc/rsstube/config"
config = open(global_config_path,"r")
except FileNotFoundError:
# no change
config = None
file_params = ""
if not config is None:
for line in config:
line = line.strip()
# comment lines should begin with # after stripping
if line != "" and line[0] != "#":
file_params += " " + line
from options import options
def process_args (
network,
verbosity,
args,
arg_count,
output_format,
output_filename,
network_new,
verbosity_new,
args_new,
arg_count_new,
output_format_new,
output_filename_new
):
if network_new == False:
network = network_new
if not verbosity_new is None:
verbosity = verbosity_new
for i in args_new:
args[i] = args_new[i]
arg_count = arg_count_new
if output_format_new != "":
output_format = output_format_new
if not output_filename_new is None:
output_filename = output_filename_new
return network,verbosity,args,arg_count,output_format,output_filename
# config file options
if not file_params == "":
network_new,verbosity_new,args_new,arg_count_new,output_format_new,output_filename_new = options(file_params.split())
network,verbosity,args,arg_count,output_format,output_filename = process_args(
network,
verbosity,
args,
arg_count,
output_format,
output_filename,
network_new,
verbosity_new,
args_new,
arg_count_new,
output_format_new,
output_filename_new
)
# command-line options
network_new,verbosity_new,args_new,arg_count_new,output_format_new,output_filename_new = options(sys.argv[1:])
network,verbosity,args,arg_count,output_format,output_filename = process_args(
network,
verbosity,
args,
arg_count,
output_format,
output_filename,
network_new,
verbosity_new,
args_new,
arg_count_new,
output_format_new,
output_filename_new
)
if output_format == "opml":
debug ("Formatting output as OPML.", verbosity)
output_opml = opml.Opml("rsstube feeds")
if not output_filename is None and output_filename != "":
debug ("Output will be saved in " + output_filename, verbosity)
output = open(output_filename, "w")
if len(sys.argv) == arg_count+1:
error ("Please provide one or more URL.", verbosity)
for url in sys.argv[arg_count+1:]:
from determine_site import determine_site
debug ("Attempting to determine site...", verbosity)
site = determine_site (url)
if not site is None:
debug ("Site identified as " + site, verbosity)
notify ("Trying " + site + " extractor...", verbosity)
# get appropriate extractor
extractor = importlib.import_module("extractors." + site)
feed = extractor.extract(url, None, network, verbosity, args)
if feed is None:
error ("Unable to get RSS feed for " + url, verbosity, site)
else:
if not output_opml is None:
output_opml.add_feed (feed, site + ": " + url, url)
else:
success (feed, output)
elif network:
from download_page import download
page = download (None, url, args, verbosity)
if page is None:
error ("Failed to download " + url, verbosity)
continue
# try to get feed for common software like PeerTube
debug ("Attempting to determine software from page...", verbosity)
from determine_software import determine_software
software = determine_software (page)
if not software is None:
debug ("Software identified as " + software, verbosity)
notify ("Trying " + software + " extractor...", verbosity)
extractor = importlib.import_module("extractors." + software)
feed = extractor.extract(url, page, network, verbosity, args)
if feed is None:
notify ("Unable to get RSS feed for " + url + " with " + software + " extractor", verbosity, software)
else:
if not output_opml is None:
output_opml.add_feed (feed, software + ": " + url, url)
else:
success (feed, output)
continue
# try generic extractor even if software is known
debug ("Trying generic extractor...", verbosity)
extractor = importlib.import_module("extractors.generic")
feed = extractor.extract(url, page, network, verbosity, args)
if feed is None:
error ("Unable to get RSS feed for " + url, verbosity, "generic")
else:
if not output_opml is None:
output_opml.add_feed (feed, url, url)
else:
success (feed, output)
else:
error ("Unable to get RSS feed for " + url + " without downloading page", verbosity)
if not output_opml is None:
success (output_opml.get_opml(), output)
if not output is None:
output.close()

View File

@ -15,10 +15,11 @@ def download (platform, url, args, verbosity, return_http_code=False, follow_loc
c.setopt(c.FOLLOWLOCATION, follow_location)
# TODO: handle possible arguments
if "user_agent" in args:
c.setopt(pycurl.USERAGENT, args["user_agent"])
if "header" in args:
c.setopt(pycurl.HTTPHEADER, args["header"])
if not args is None:
if "user_agent" in args:
c.setopt(pycurl.USERAGENT, args["user_agent"])
if "header" in args:
c.setopt(pycurl.HTTPHEADER, args["header"])
notify ("Downloading " + url + "...", verbosity, platform)
try:
c.perform()

View File

@ -41,14 +41,11 @@ def update():
else:
print("rsstube appears to have been manually downloaded or installed with a package manager. Use that same method to update.")
def options(params):
def parse_options(params):
import sys,getopt,glob
from utils import debug,notify,warn,error
# general settings
network = True
output_format = ""
output_filename = None
options = dict()
## verbosity: in addition to the feed, print...
## 0: no messages (suppress errors)
@ -56,10 +53,11 @@ def options(params):
## 2: error messages and warnings
## 3: [default] errors, warnings, and info
## 4: all messages, including debugging info
verbosity = None
# pycurl args
d = dict()
options["curl_args"] = dict()
# user may submit multiple HTTP headers, so they're stored as a list
header = []
# count number of arguments
@ -91,10 +89,10 @@ def options(params):
sys.exit(2)
for opt, arg in opts:
if opt in ("-A", "--user-agent"):
d["user_agent"] = arg
options["curl_args"]["user_agent"] = arg
arg_count += 2
elif opt == "--ciphers":
d["ciphers"] = arg
options["curl_args"]["ciphers"] = arg
arg_count += 2
elif opt in ("-h", "--help"):
print ("Usage: rsstube [OPTIONS] URL")
@ -110,20 +108,20 @@ def options(params):
arg_count += 1
sys.exit()
elif opt in ("-n", "--non-network"):
network = False
options["network"] = False
arg_count += 1
elif opt in ("-o", "--output"):
output_filename = arg
options["output_filename"] = arg
arg_count += 2
elif opt == "--output-format":
if str.lower(arg) in ("opml", "url"):
output_format = str.lower(arg)
options["output_format"] = str.lower(arg)
arg_count += 2
elif opt in ("-p", "--proxy"):
d["proxy"] = arg
options["curl_args"]["proxy"] = arg
arg_count += 2
elif opt in ("-q", "--quiet"):
verbosity = 1
options["verbosity"] = 1
arg_count += 1
elif opt in ("--sites"):
print ("Site-specific support:")
@ -138,18 +136,18 @@ def options(params):
arg_count += 1
sys.exit()
elif opt in ("--suppress-errors"):
verbosity = 0
options["verbosity"] = 0
arg_count += 1
elif opt == "--tls-max":
d["tls_max"] = arg
options["curl_args"]["tls_max"] = arg
arg_count += 2
elif opt == "--tls13-ciphers":
d["tls13_ciphers"] = arg
options["curl_args"]["tls13_ciphers"] = arg
arg_count += 2
elif opt == "--unbreak":
# attempt to unbreak hostile websites (e.g., Cloudflare)
# based on Tor Browser cURL request
d["user_agent"] = 'Mozilla/5.0 (Windows NT 10.0; rv:91.0) Gecko/20100101 Firefox/91.0'
options["curl_args"]["user_agent"] = 'Mozilla/5.0 (Windows NT 10.0; rv:91.0) Gecko/20100101 Firefox/91.0'
header = [
'Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language: en-US,en;q=0.5',
@ -167,12 +165,12 @@ def options(params):
arg_count += 1
sys.exit()
elif opt in ("-v", "--verbose"):
verbosity = 4
options["verbosity"] = 4
arg_count += 1
elif opt == "--verbosity":
v = int(arg)
if v >= 0 and v <= 4:
verbosity = v
options["verbosity"] = v
else:
print ("Invalid verbosity: " + arg)
arg_count += 2
@ -184,5 +182,6 @@ def options(params):
arg_count += 1
sys.exit()
d["header"] = header
return network,verbosity,d,arg_count,output_format,output_filename
if len(header) > 0:
options["curl_args"]["header"] = header
return options, arg_count

View File

@ -12,6 +12,7 @@ import gi,subprocess,sys,pycurl
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk
from rsstube import get_feed
class RsstubeGtk(Gtk.Window):
def __init__(self):
@ -26,7 +27,7 @@ class RsstubeGtk(Gtk.Window):
self.btn = Gtk.Button()
self.btn.set_label("Get Feed")
self.btn.connect("clicked",self.get_feed)
self.btn.connect("clicked",self.display_feed)
self.output_label = Gtk.Label()
self.output_label.set_text("")
@ -45,17 +46,14 @@ class RsstubeGtk(Gtk.Window):
self.connect("destroy", Gtk.main_quit)
# runs rsstube
def get_feed(self,widget):
def display_feed(self,widget):
error_color = "#B3589A"
url = self.entry.get_text()
feed = str(subprocess.check_output([sys.executable, "scripts/rsstube.py", url]))
feed = feed[:feed.rindex("\\n")]
feed = feed[feed.rindex("\\n")+2:]
feed = get_feed(url)[0]
# this color is used for error output
if "\\x1b[1;31m[" in feed:
errmsg = feed[feed.index(']')+2:]
errmsg = errmsg[:errmsg.rindex("\\x1b[0m")]
if feed is None:
errmsg = "Unable to get feed."
self.output_label.set_markup('<span foreground="' + error_color + '">' + errmsg + '</span>')
else:
self.output_label.set_markup('<a href="' + feed + '">' + feed + '</a>')

143
src/rsstube.py Executable file
View File

@ -0,0 +1,143 @@
#!/usr/bin/python3
import sys,importlib
from utils import debug,notify,warn,error,success
import opml
# enter a URL and attempt to return a feed URL
def get_feed (url, verbosity=3, network=True, curl_args=None):
from determine_site import determine_site
debug ("Attempting to determine site...", verbosity)
site = determine_site (url)
if not site is None:
debug ("Site identified as " + site, verbosity)
notify ("Trying " + site + " extractor...", verbosity)
# get appropriate extractor
extractor = importlib.import_module("extractors." + site)
feed = extractor.extract(url, None, network, verbosity, curl_args)
if feed is None:
error ("Unable to get RSS feed for " + url, verbosity, site)
else:
return feed,site
elif network:
from download_page import download
page = download (None, url, curl_args, verbosity)
if page is None:
error ("Failed to download " + url, verbosity)
return None,None
# try to get feed for common software like PeerTube
debug ("Attempting to determine software from page...", verbosity)
from determine_software import determine_software
software = determine_software (page)
if not software is None:
debug ("Software identified as " + software, verbosity)
notify ("Trying " + software + " extractor...", verbosity)
extractor = importlib.import_module("extractors." + software)
feed = extractor.extract(url, page, network, verbosity, curl_args)
if feed is None:
notify ("Unable to get RSS feed for " + url + " with " + software + " extractor", verbosity, software)
else:
return feed,software
# try generic extractor even if software is known
debug ("Trying generic extractor...", verbosity)
extractor = importlib.import_module("extractors.generic")
feed = extractor.extract(url, page, network, verbosity, curl_args)
if feed is None:
error ("Unable to get RSS feed for " + url, verbosity, "generic")
else:
return feed,"generic"
else:
error ("Unable to get RSS feed for " + url + " without downloading page", verbosity)
return None,None
def process_args (options, options_new):
for opt in options_new:
# curl_args is handled as a special case below
if opt != "curl_args":
options[opt] = options_new[opt]
# may need to merge dictionaries from config file and command line
if "curl_args" in options_new:
for i in options_new["curl_args"]:
options["curl_args"][i] = options_new["curl_args"][i]
if __name__ == "__main__":
options = dict()
# set default options
options["network"] = True
options["verbosity"] = 3
options["curl_args"] = dict()
options["output_format"] = "url"
options["output_filename"] = ""
# count of command-line arguments
arg_count = 0
# object to output feeds as OPML
output_opml = None
# output file
output_file = None
config = None
try:
from pathlib import Path
home = str(Path.home())
local_config_path = home + "/.config/rsstube/config"
config = open(local_config_path,"r")
except FileNotFoundError:
try:
global_config_path = "/etc/rsstube/config"
config = open(global_config_path,"r")
except FileNotFoundError:
# no change
config = None
file_params = ""
if not config is None:
for line in config:
line = line.strip()
# comment lines should begin with # after stripping
if line != "" and line[0] != "#":
file_params += " " + line
from parse_options import parse_options
# config file options
if file_params != "":
# throw away arg_count
config_file_options = parse_options (file_params.split())[0]
process_args (options, config_file_options)
# command-line options
command_line_options, arg_count = parse_options (sys.argv[1:])
process_args (options, command_line_options)
if options["output_format"] == "opml":
debug ("Formatting output as OPML.", options["verbosity"])
output_opml = opml.Opml("rsstube feeds")
if options["output_filename"] != "":
debug ("Output will be saved in " + options["output_filename"], options["verbosity"])
output_file = open(options["output_filename"], "w")
if len(sys.argv) == arg_count+1:
error ("Please provide one or more URL.", options["verbosity"])
for url in sys.argv[arg_count+1:]:
feed,site = get_feed (url, options["verbosity"], options["network"], options["curl_args"])
if not feed is None:
if not output_opml is None:
output_opml.add_feed (feed, site + ": " + url, url)
else:
success (feed, output_file)
if not output_opml is None:
success (output_opml.get_opml(), output_file)
if not output_file is None:
output_file.close()