Commit dcc6f0bc authored by Dimitri Podborski's avatar Dimitri Podborski
Browse files

add FF script to process output docs

parent a3b07438
"""
FileFormat tool to generate .md report page and to download latest output documents.
"""
import argparse
import csv
import os
import shutil
import sys
import threading
import zipfile
from time import sleep
import requests
from automation import mdms
__version__ = '0.1'
DATA_PATH = './data'
# DOCURL = 'https://raw.githubusercontent.com/MPEGGroup/FileFormat/document_status/DocumentStatus.csv'
DOCURL = 'https://raw.githubusercontent.com/MPEGGroup/FileFormat/master/DocumentStatus.csv'
MD_HEADER = '''# File Format group document status\n\n*generated using MPEG Automation script*\n\nThe following tables lists final and current documents produced by the File Format group of MPEG, as of the September 2022 meeting. Final documents are published by ISO, most of the time for a fee and sometimes free-of-charge (e.g. ISOBMFF used to be free of charge but is no longer unfortunately). MPEG Documents are usually internal documents only available to MPEG members, but some of them are public (e.g. working drafts, technologies under considerations, defect reports, etc.) and on [MPEG’s public site](https://www.mpeg.org/). The tables provide some links that are only accessible to MPEG members, and some that are public.'''
def download_url(url, save_path, chunk_size=128):
'''Download a file from the given URL'''
res = requests.get(url, auth=(mdms.MPEG_LOGIN, mdms.MPEG_PWD), stream=True)
with open(save_path, 'wb') as file_descriptor:
for chunk in res.iter_content(chunk_size=chunk_size):
file_descriptor.write(chunk)
def parse_csv(csv_file):
"""Parse document status CSV file"""
documents = []
with open(csv_file, 'r', encoding='utf-8-sig') as file_descriptor:
sample = file_descriptor.readline()
file_descriptor.seek(0)
dialect = csv.Sniffer().sniff(sample) # find out the delimeter type
has_header = csv.Sniffer().has_header(sample)
reader = csv.reader(file_descriptor, dialect)
if not has_header:
print('Error: Input CSV file has no header.')
sys.exit(-1)
header = next(reader)
for row in reader:
# skip empty lines
if len(' '.join(row).strip()) == 0:
continue
documents.append({
'ISONr': row[header.index('ISONr')].strip(),
'Name': row[header.index('Standard Name')].strip(),
'Title': row[header.index('Title')].strip(),
'PublicURL': row[header.index('PublicURL')].strip(),
'Date': row[header.index('Date')].strip(),
'URL': row[header.index('URL')].strip(),
'Status': row[header.index('Status')].strip(),
'Description': row[header.index('Description')].strip(),
'ISOURL': row[header.index('ISOURL')].strip(),
'EditorURL': row[header.index('EditorURL')].strip()
})
return documents
def generate_report(documents, output_path):
"""Generate markdown report and write it to output_path"""
if output_path is None:
output_path = './DocumentStatus.md'
else:
output_path = os.path.join(output_path, 'DocumentStatus.md')
print(f'\n* Generate report in {output_path}')
standards = set()
for document in documents:
standards.add(document['ISONr'])
standards = sorted(standards)
markdown = MD_HEADER + '\n'
for standard in standards:
standard_docs = [x for x in documents if x['ISONr'] == standard]
markdown += f'\n## {standard} | {standard_docs[0]["Name"]}\n'
markdown += '| Document Title | Link to latest public document (if any) | MPEG output date | MPEG output number<br>(MPEG-Members only) | ISO status<br>(public page and editor-only) | Description |\n'
markdown += '| --- | --- | --- | --- | --- | --- |\n'
for doc in standard_docs:
public_filename = os.path.basename(
doc['PublicURL']).replace('.zip', '')
internal_filename = os.path.basename(doc['URL'])
status_text = ''
if len(doc['Status']) > 0 or len(doc['ISOURL']) > 0:
status_text = f"[{doc['Status']}]({doc['ISOURL']})"
if len(doc['EditorURL']) > 0:
status_text += f"<br>[editor-link]({doc['EditorURL']})"
# add table entry
markdown += f"| {doc['Title']} | [{public_filename}]({doc['PublicURL']}) | {doc['Date']} | [{internal_filename}]({doc['URL']}) | {status_text} | {doc['Description']} |\n"
markdown += '\n'
with open(output_path, 'w', encoding='utf-8-sig') as file_descriptor:
file_descriptor.write(markdown)
file_descriptor.close()
def download_files(documents, output_path):
"""Download output documents and store them in output_path"""
if output_path is None:
output_path = './'
output_path = os.path.join(output_path, 'FileFormat')
print(f'\n* Download files to {output_path}')
for doc in documents:
if len(doc['URL']) == 0:
print(
f"WARNING: Document {doc['ISONr']}_{doc['Name']}: {doc['Title']} has no URL")
continue
folder_name = doc['ISONr'] + '_' + doc['Name']
spec_path = os.path.join(
output_path, folder_name.replace(' ', '').strip())
if not os.path.exists(spec_path):
os.makedirs(spec_path)
file_name = ''.join(s for s in doc['Title'] if s.isalnum())
file_name += '_' + os.path.basename(doc['URL'])
full_path = os.path.join(spec_path, file_name)
print(f"Download {doc['URL']}")
download_url(doc['URL'], full_path)
return output_path
def unzip_files(file_folder):
"""Unzip all files and remove original zips"""
print('\n* Unzip files and remove zip files')
for root, _subfolder, files in os.walk(file_folder):
for file in files:
if not file.endswith('.zip'):
continue
path = os.path.join(root, file)
prefix = file.split('_')[0]
with zipfile.ZipFile(path, 'r') as zip_file:
filenames_in_zip = zip_file.namelist()
filenames_in_zip = [
x for x in filenames_in_zip if '__MACOSX' not in x]
zip_file.extractall(root)
for temp in filenames_in_zip:
from_path = os.path.join(root, temp)
to_path = os.path.join(root, prefix + '_' + temp)
if os.path.exists(from_path) and not os.path.exists(to_path):
os.rename(from_path, to_path)
os.remove(path)
print('\n* Remove empty directories')
for root, subfolders, files in os.walk(file_folder):
for subfolder in subfolders:
if '__MACOSX' in subfolder:
shutil.rmtree(os.path.join(root, subfolder))
def run(args, documents):
"""Run the job based on input arguments"""
if args.update is not None:
interval_seconds = float(args.update) * 60.0 * 60.0
print(f'Update every {interval_seconds} seconds')
threading.Timer(interval_seconds, run, [args, documents]).start()
if args.report:
generate_report(documents, args.output)
if args.fetch:
file_folder = download_files(documents, args.output)
if args.unzip:
unzip_files(file_folder)
def main():
"""Entry point"""
print('*' * 35)
print('* FileFormat tool', __version__, '*')
print('*' * 35 + '\n')
# program options
usage_examples = '''Examples:
python fileformat.py -i <DocumentStatus.csv> --report --fetch # generate report and get files
python fileformat.py --fetch --update 60 # fetch files every 60 minutes
'''
parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter,
description='Keep the FileFormat documents up to date.',
epilog=usage_examples)
parser.add_argument(
'-i', '--input', help='Path to DocumentStatus.csv file. If not set, fetch from GitHub.')
parser.add_argument(
'-o', '--output', help='Path to output directory to store files in.')
parser.add_argument(
'-f', '--fetch', help='Download files.', action='store_true')
parser.add_argument(
'-r', '--report', help='Generate report .md file.', action='store_true')
parser.add_argument(
'-z', '--unzip', help='Can be used to unzip fetched files.', action='store_true')
parser.add_argument('-u', '--update', help='Update interval in hours')
args = parser.parse_args()
if args.output is not None:
if os.path.isdir(args.output) is False:
print(f'ERROR: Directory "{args.output}" does not exist!')
sys.exit(-1)
input_file = args.input
if not args.input:
print('* Fetch DocumentStatus.csv from GitHub')
input_file = os.path.join(DATA_PATH, 'DocumentStatus.csv')
download_url(DOCURL, input_file)
documents = parse_csv(input_file)
run(args, documents)
if __name__ == "__main__":
main()
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment