Commit bdbc6749 authored by Dimitri Podborski's avatar Dimitri Podborski
Browse files

Merge branch 'file-format' into 'master'

File format goodies

See merge request !6
parents 689d85ef dcc6f0bc
FileFormat tool to generate .md report page and to download latest output documents.
import argparse
import csv
import os
import shutil
import sys
import threading
import zipfile
from time import sleep
import requests
from automation import mdms
__version__ = '0.1'
DATA_PATH = './data'
# DOCURL = ''
MD_HEADER = '''# File Format group document status\n\n*generated using MPEG Automation script*\n\nThe following tables lists final and current documents produced by the File Format group of MPEG, as of the September 2022 meeting. Final documents are published by ISO, most of the time for a fee and sometimes free-of-charge (e.g. ISOBMFF used to be free of charge but is no longer unfortunately). MPEG Documents are usually internal documents only available to MPEG members, but some of them are public (e.g. working drafts, technologies under considerations, defect reports, etc.) and on [MPEG’s public site]( The tables provide some links that are only accessible to MPEG members, and some that are public.'''
def download_url(url, save_path, chunk_size=128):
'''Download a file from the given URL'''
res = requests.get(url, auth=(mdms.MPEG_LOGIN, mdms.MPEG_PWD), stream=True)
with open(save_path, 'wb') as file_descriptor:
for chunk in res.iter_content(chunk_size=chunk_size):
def parse_csv(csv_file):
"""Parse document status CSV file"""
documents = []
with open(csv_file, 'r', encoding='utf-8-sig') as file_descriptor:
sample = file_descriptor.readline()
dialect = csv.Sniffer().sniff(sample) # find out the delimeter type
has_header = csv.Sniffer().has_header(sample)
reader = csv.reader(file_descriptor, dialect)
if not has_header:
print('Error: Input CSV file has no header.')
header = next(reader)
for row in reader:
# skip empty lines
if len(' '.join(row).strip()) == 0:
'ISONr': row[header.index('ISONr')].strip(),
'Name': row[header.index('Standard Name')].strip(),
'Title': row[header.index('Title')].strip(),
'PublicURL': row[header.index('PublicURL')].strip(),
'Date': row[header.index('Date')].strip(),
'URL': row[header.index('URL')].strip(),
'Status': row[header.index('Status')].strip(),
'Description': row[header.index('Description')].strip(),
'ISOURL': row[header.index('ISOURL')].strip(),
'EditorURL': row[header.index('EditorURL')].strip()
return documents
def generate_report(documents, output_path):
"""Generate markdown report and write it to output_path"""
if output_path is None:
output_path = './'
output_path = os.path.join(output_path, '')
print(f'\n* Generate report in {output_path}')
standards = set()
for document in documents:
standards = sorted(standards)
markdown = MD_HEADER + '\n'
for standard in standards:
standard_docs = [x for x in documents if x['ISONr'] == standard]
markdown += f'\n## {standard} | {standard_docs[0]["Name"]}\n'
markdown += '| Document Title | Link to latest public document (if any) | MPEG output date | MPEG output number<br>(MPEG-Members only) | ISO status<br>(public page and editor-only) | Description |\n'
markdown += '| --- | --- | --- | --- | --- | --- |\n'
for doc in standard_docs:
public_filename = os.path.basename(
doc['PublicURL']).replace('.zip', '')
internal_filename = os.path.basename(doc['URL'])
status_text = ''
if len(doc['Status']) > 0 or len(doc['ISOURL']) > 0:
status_text = f"[{doc['Status']}]({doc['ISOURL']})"
if len(doc['EditorURL']) > 0:
status_text += f"<br>[editor-link]({doc['EditorURL']})"
# add table entry
markdown += f"| {doc['Title']} | [{public_filename}]({doc['PublicURL']}) | {doc['Date']} | [{internal_filename}]({doc['URL']}) | {status_text} | {doc['Description']} |\n"
markdown += '\n'
with open(output_path, 'w', encoding='utf-8-sig') as file_descriptor:
def download_files(documents, output_path):
"""Download output documents and store them in output_path"""
if output_path is None:
output_path = './'
output_path = os.path.join(output_path, 'FileFormat')
print(f'\n* Download files to {output_path}')
for doc in documents:
if len(doc['URL']) == 0:
f"WARNING: Document {doc['ISONr']}_{doc['Name']}: {doc['Title']} has no URL")
folder_name = doc['ISONr'] + '_' + doc['Name']
spec_path = os.path.join(
output_path, folder_name.replace(' ', '').strip())
if not os.path.exists(spec_path):
file_name = ''.join(s for s in doc['Title'] if s.isalnum())
file_name += '_' + os.path.basename(doc['URL'])
full_path = os.path.join(spec_path, file_name)
print(f"Download {doc['URL']}")
download_url(doc['URL'], full_path)
return output_path
def unzip_files(file_folder):
"""Unzip all files and remove original zips"""
print('\n* Unzip files and remove zip files')
for root, _subfolder, files in os.walk(file_folder):
for file in files:
if not file.endswith('.zip'):
path = os.path.join(root, file)
prefix = file.split('_')[0]
with zipfile.ZipFile(path, 'r') as zip_file:
filenames_in_zip = zip_file.namelist()
filenames_in_zip = [
x for x in filenames_in_zip if '__MACOSX' not in x]
for temp in filenames_in_zip:
from_path = os.path.join(root, temp)
to_path = os.path.join(root, prefix + '_' + temp)
if os.path.exists(from_path) and not os.path.exists(to_path):
os.rename(from_path, to_path)
print('\n* Remove empty directories')
for root, subfolders, files in os.walk(file_folder):
for subfolder in subfolders:
if '__MACOSX' in subfolder:
shutil.rmtree(os.path.join(root, subfolder))
def run(args, documents):
"""Run the job based on input arguments"""
if args.update is not None:
interval_seconds = float(args.update) * 60.0 * 60.0
print(f'Update every {interval_seconds} seconds')
threading.Timer(interval_seconds, run, [args, documents]).start()
generate_report(documents, args.output)
if args.fetch:
file_folder = download_files(documents, args.output)
if args.unzip:
def main():
"""Entry point"""
print('*' * 35)
print('* FileFormat tool', __version__, '*')
print('*' * 35 + '\n')
# program options
usage_examples = '''Examples:
python -i <DocumentStatus.csv> --report --fetch # generate report and get files
python --fetch --update 60 # fetch files every 60 minutes
parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter,
description='Keep the FileFormat documents up to date.',
'-i', '--input', help='Path to DocumentStatus.csv file. If not set, fetch from GitHub.')
'-o', '--output', help='Path to output directory to store files in.')
'-f', '--fetch', help='Download files.', action='store_true')
'-r', '--report', help='Generate report .md file.', action='store_true')
'-z', '--unzip', help='Can be used to unzip fetched files.', action='store_true')
parser.add_argument('-u', '--update', help='Update interval in hours')
args = parser.parse_args()
if args.output is not None:
if os.path.isdir(args.output) is False:
print(f'ERROR: Directory "{args.output}" does not exist!')
input_file = args.input
if not args.input:
print('* Fetch DocumentStatus.csv from GitHub')
input_file = os.path.join(DATA_PATH, 'DocumentStatus.csv')
download_url(DOCURL, input_file)
documents = parse_csv(input_file)
run(args, documents)
if __name__ == "__main__":
......@@ -31,7 +31,7 @@ import requests
from automation import gitlab, helpers, mdms
__version__ = '1.4'
__version__ = '1.5'
DATA_PATH = './data'
GITLAB_PROJECTS_PATH = os.path.join(DATA_PATH, 'gitlab_projects.json')
......@@ -39,18 +39,17 @@ GITLAB_USERS_PATH = os.path.join(DATA_PATH, 'gitlab_users.json')
MEETINGS_PATH = os.path.join(DATA_PATH, 'meetings.json')
SYSTEMS_GROUP_ID = 727 # GitLab Group ID for Systems Subgroup
PROJECTS_FF = ['isobmff', 'HEIF', 'NALuFF', 'FFConformanceRefSoft', 'rawvideo', 'Text',
'eventmessage', 'General', 'DerivedVis', 'CENC', 'Metrics', 'PartialFF', 'MP4FF', 'Audio']
def download_url(url, save_path, chunk_size=128):
r = requests.get(url, auth=(mdms.MPEG_LOGIN, mdms.MPEG_PWD), stream=True)
with open(save_path, 'wb') as fd:
for chunk in r.iter_content(chunk_size=chunk_size):
'''Download a file from the given URL'''
res = requests.get(url, auth=(mdms.MPEG_LOGIN, mdms.MPEG_PWD), stream=True)
with open(save_path, 'wb') as file_descriptor:
for chunk in res.iter_content(chunk_size=chunk_size):
def fetch_contributions(table_entries):
'''Download all files from the entries'''
print('\nDownload contributions')
for entry in table_entries:
path = os.path.join(DATA_PATH, 'contributions')
......@@ -67,13 +66,14 @@ def fetch_contributions(table_entries):
path = os.path.join(path, project['path_with_namespace'])
if not os.path.exists(path) and len(path) > 0:
folder, filename = os.path.split(url)
_folder, filename = os.path.split(url)
filename = os.path.join(path, filename)
print(document['document'], ' -> ', filename)
download_url(url, filename)
def print_infos(table_entries, project_url, gitlab_projects):
'''Print information about contributions and issues'''
print('\nDump information')
for entry in table_entries:
document = entry['document']
......@@ -142,6 +142,7 @@ def print_infos(table_entries, project_url, gitlab_projects):
def open_new_issue(project_id, document, test, meeting_start, gitlab_members):
'''Open a new GitLab issue'''
usernames = helpers.find_gitlab_users(gitlab_members, document)
issue_title = helpers.create_issue_title(document)
document_details = mdms.get_document_details(document['mdms_id'])
......@@ -169,25 +170,25 @@ def open_new_issue(project_id, document, test, meeting_start, gitlab_members):
gitlab.open_issue(project_id, issue_title,
issue_description, issue_lables)
return True
print(' * {}: Test open issue with title "{}" | Lables={}'.format(document['document'], issue_title,
print(' * {}: Test open issue with title "{}" | Lables={}'.format(
document['document'], issue_title, issue_lables))
return False
def close_issue(issue, test, document):
'''Close issue'''
if not test:
' * {}: Close issue: {}'.format(document['document'], issue.web_url))
return True
' * {}: Test close issue: {}'.format(document['document'], issue.web_url))
' * {}: Test close issue: {}'.format(document['document'], issue.web_url))
return False
def open_issues(table_entries, test, gitlab_members, meeting_start):
'''open issues using table entries'''
print('\nOpen {} issues. TestMode={}'.format(len(table_entries), test))
counter = 0
for entry in table_entries:
......@@ -428,31 +429,34 @@ def parse_cli(docs, project_url, close_flag, gitlab_projects, input_docs):
def derive_fileformat(gitlab_projects, input_docs):
"""return table_entries based on already opened issues in the FIleFormat group"""
proj_urls = []
table_entries = []
for proj in PROJECTS_FF:
'', proj))
ff_projects = [
p for p in gitlab_projects if "MPEG/Systems/FileFormat" in p["path_with_namespace"]]
for project_url in proj_urls:
print(f'gather contributions from {project_url}')
project = helpers.find_project(gitlab_projects, project_url)
if project is not None:
issues = gitlab.get_issues(project['id'])
for issue in issues:
meta = helpers.get_issue_metadata(issue.description)
if meta is not None:
document = helpers.find_document(
input_docs, meta['document'])
if not document:
f'WARNING: Document "{meta["document"]}" not found. Try updating the database (-U) or select another meeting (--meeting).')
for project in ff_projects:
print(f'gather contributions from {project["path_with_namespace"]}')
issues = gitlab.get_issues(project['id'])
for issue in issues:
meta = helpers.get_issue_metadata(issue.description)
if meta is not None:
document = helpers.find_document(input_docs, meta['document'])
if not document:
# document is probably from the previous meeting, search for it
doc_nr = meta['document'].replace('m', '')
print(f'Search for m{doc_nr}')
search_result = mdms.find_documents(number=doc_nr, category=mdms.SearchCategory.INPUT)
if len(search_result) == 0:
print(f'WARNING: Document m{doc_nr} not found.')
'project': project,
'document': document,
'close': False
document = search_result[0]
'project': project,
'document': document,
'close': False
print(f'Got {len(table_entries)} issues.')
return table_entries
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment