# -*- coding: utf-8 -*-
'''
This is the interface to MPEG Document Management System (mdms).
It requests the data from MDMS and parses the output HTML
'''
import os
import requests
import bs4
import re
from urllib.parse import urljoin, parse_qs, urlparse
from datetime import datetime
from enum import Enum, unique
BASE_URL = 'https://dms.mpeg.expert/doc_end_user/'
MEETINGS_URL = urljoin(BASE_URL, 'all_meeting.php')
CURRENT_MEETING_URL = urljoin(BASE_URL, 'current_meeting.php')
SEARCH_URL = urljoin(BASE_URL, 'searchAcross.php')
DOCUMENT_URL = urljoin(BASE_URL, 'current_document.php')
MPEG_LOGIN = os.environ.get('MPEG_LOGIN')
MPEG_PWD = os.environ.get('MPEG_PWD')
class MDMSParser:
def parse_meetings(self, html):
meetings = []
soup = bs4.BeautifulSoup(html, features = 'lxml')
tables = soup.find('body').find_all('table')
if len(tables) != 1:
print('Error: Only single table should be present in "All Meetings" frame. Did layout of MDMS change?')
return []
rows = tables[0].find_all('tr')
for n in range(len(rows)):
if n==0: # check header first
header = ['number', 'name', 'start date', 'end date', 'last input document', 'last output document']
if not self.check_table_header(header, rows[n]):
print('Error: Wrong table header. Did layout of MDMS change?')
return []
continue
cols = rows[n].find_all('td', recursive=False)
try:
last_input = None
last_output = None
if len(cols[4].text) > 0:
last_input = int(cols[4].text)
if len(cols[5].text) > 0:
last_output = int(cols[5].text)
parsed_href = urlparse(cols[1].a['href'])
meeting_id = int(parse_qs(parsed_href.query)['id_meeting'][0])
meetings.append({
'number': int(cols[0].text),
'id': meeting_id,
'name': cols[1].text.strip(),
'start_date': datetime.strptime(cols[2].text, '%Y-%m-%d'),
'end_date': datetime.strptime(cols[3].text, '%Y-%m-%d'),
'last_input': last_input,
'last_output': last_output
})
except:
print('Error: Could not parse meeting data. Did MDMS layout change?')
return []
return meetings
def parse_author_entry(self, author_entry):
'''
Search entry string for an email, remove it from the name and clean up
Return a tuple('name', 'email')
'''
author_entry = author_entry.strip()
if len(author_entry) == 0:
return None
email = None
match = re.search(r'[\w\.-]+@[\w\.-]+', author_entry)
if match: # email found
email = match.group(0)
author_entry = author_entry.replace(email, '') # remove email from the name
# remove everything what is inside () or []
author_entry = re.sub(r'[\(\[].*?[\)\]]', '', author_entry)
# remove all non ASCII characters
author_entry = re.sub(r'[^\x00-\x7F]+', '', author_entry)
author_entry = author_entry.strip()
return (author_entry, email)
def try_parsing_date(self, text):
'''
Try parsing the timestamp, if not possible return None
'''
for fmt in ('%Y-%m-%d %H:%M:%S', '%Y-%m-%d'):
try:
return datetime.strptime(text.strip(), fmt)
except ValueError:
pass
return None
def parse_input_docs(self, html):
docs = []
soup = bs4.BeautifulSoup(html, features = 'lxml')
for i in soup.select ('br'): # replace
with a space, it makes checking headers easier
i.replace_with(' ')
form = soup.find('body').find('form', id='documents')
if not form:
print('Error: No form with id="documents" found. Did MDMS layout change?')
return []
table = form.find('table')
if not table:
print('Error: No table found in form. Did MDMS layout change?')
return []
rows = table.find_all('tr', recursive=False)
for n in range(len(rows)):
if n==0: # check header first
header = ['number', 'created', 'uploaded', 'Group Working Group / SubGroup', 'title', 'source', 'download']
if not self.check_table_header(header, rows[n]):
print('Error: Wrong table header. Did layout of MDMS change?')
return []
continue
cols = rows[n].find_all('td', recursive=False)
try:
if len(cols[0].text) == 0:
continue
# get document ID on MDMS
parsed_href = urlparse(cols[0].a['href'])
mdms_id = int(parse_qs(parsed_href.query)['id'][0])
container_url = urljoin(DOCUMENT_URL, '?id={}'.format(mdms_id))
# get timestamp of the last uploaded version
last_version_uploaded = self.try_parsing_date(cols[2].text)
created_timestamp = self.try_parsing_date(cols[1].text)
# get authors
authors = []
for entry in cols[5].contents:
if isinstance(entry, bs4.Tag):
parsed_href = entry.text
email = None
name = None
try:
parsed_href = urlparse(entry['href'])
email = parsed_href.path
author_data = self.parse_author_entry(entry.text)
name = entry.text
if author_data:
name = author_data[0] # clean version of the name
# sometimes people type name and email wrong in MDMS and they are flipped
if not '@' in email and author_data[1]:
name = email
email = author_data[1]
except KeyError:
# sometimes Author's field is formatted with fake html tags.
print('Bad HTML format in Authors field: ', entry)
name = entry.text
pass
authors.append({
'name': name,
'email': email
})
else:
for author_entry in entry.string.replace(' and ', ',').split(','):
author_data = self.parse_author_entry(author_entry)
if author_data:
authors.append({
'name': author_data[0],
'email': author_data[1]
})
# get latest document link (if available)
latest_url = None
if len(cols) == 7:
if not cols[6].find('a') == None:
latest_url = urljoin(CURRENT_MEETING_URL, cols[6].find('a')['href'])
docs.append({
'mdms_id': mdms_id,
'document': cols[0].text,
'created': created_timestamp,
'last_version_uploaded': last_version_uploaded,
'sub_group_text': cols[3].text,
'title': cols[4].text.strip(),
'authors': authors,
'latest_version_url': latest_url,
'container': container_url
})
except: # TODO: catch properly
print('Error: Could not parse input documents data. Did MDMS layout change?')
return []
return docs
def parse_document_details(self, html):
details = {
'submitted_by': None,
'title': None,
'authors_string': None,
'organizations': None,
'abstract': None,
'related_docs': None,
'ahg': None,
'sub_group': None,
'group': None,
'standard': None,
'activity': None,
'documents': []
}
soup = bs4.BeautifulSoup(html, features = 'lxml')
for i in soup.select ('br'): # replace
with a space, it makes checking headers easier
i.replace_with(' ')
# do some checks if format is ok
table_main = soup.find('body').find('table')
if not table_main:
print('Error: No main table element found. Did MDMS layout change?')
return None
rows_main = table_main.find_all('tr', recursive=False)
if not len(rows_main) == 3:
print('Error: Main table should have 3 rows. Did MDMS layout change?')
return None
tables = rows_main[0].find_all('table')
if not len(tables) == 2:
print('Error: First row in the main table should have only 2 tables in it. Did MDMS layout change?')
return None
rows = tables[1].find_all('tr', recursive=False)
# parse
for n in range(len(rows)):
cols = rows[n].find_all('td', recursive=False)
attribute = cols[0].text.strip().lower()
entry = cols[1].text.strip()
if len(entry) == 0:
continue # skip all empty fields
if 'submitted by' in attribute:
parsed_href = urlparse(cols[1].a['href'])
details['submitted_by'] = {'name': entry, 'email': parsed_href.path}
elif 'title' in attribute:
details['title'] = entry
elif 'authors' in attribute:
details['authors_string'] = entry
elif 'organizations' in attribute:
details['organizations'] = entry
elif 'abstract' in attribute:
details['abstract'] = entry
elif 'related contributions' in attribute:
details['related_docs'] = entry
elif 'ahg' in attribute:
details['ahg'] = entry
elif 'sub group' in attribute:
details['sub_group'] = entry
elif 'group' in attribute:
details['group'] = entry
elif 'standard' in attribute:
details['standard'] = entry
elif 'activity' in attribute:
details['activity'] = entry
elif 'document' in attribute:
rel_path = None
version = None
timestamp = None
for entry in cols[1].contents:
if isinstance(entry, bs4.Tag):
try:
parsed_href = urlparse(entry['href'])
rel_path = parsed_href.path
except KeyError:
continue
else:
entry = entry.string.strip()
if len(entry) == 0:
details['documents'].append({
'path': rel_path,
'version': version,
'timestamp': timestamp
})
rel_path = None
version = None
timestamp = None
continue
pos1 = entry.find('(version')
pos2 = entry.find('- date', pos1+8)
pos3 = entry.find(')', pos2+6)
if pos1 < 0 or pos2 < 0 or pos3 < 0:
continue
version = int(entry[pos1+8:pos2].strip())
timestamp = self.try_parsing_date(entry[pos2+6:pos3])
return details
def check_table_header(self, template, header_row):
'''
Check if header_row contains the same data as the template
'''
cols = header_row.find_all('td', recursive=False)
if not len(template) == len(cols):
print('Error: Table header should have {} columns but it has {}.'.format(len(template), len(cols)))
return False
for n in range(len(template)):
if not cols[n].text.strip().lower() == template[n].strip().lower():
print('Error: Table header entry mismatch: "{}" != "{}".'.format(cols[n].text.strip().lower(), template[n].strip().lower()))
return False
return True
@unique
class SearchCategory(Enum):
ALL = ''
INPUT = 'm'
OUTPUT = 'w'
# search_id_group
@unique
class Standard(Enum):
ALL = 1
MPEG_DASH = 38
MPEG_G = 43
MPEG_IOMT = 44
MPEG_N = 49
MPEG_1 = 4
MPEG_2 = 5
MPEG_4 = 6
MPEG_7 = 7
MPEG_21 = 8
MPEG_A = 9
MPEG_B = 10
MPEG_C = 11
MPEG_D = 12
MPEG_E = 13
MPEG_M = 14
MPEG_U = 15
MPEG_V = 16
MPEG_H = 26
MPEG_UD = 39
MPEG_GREEN = 40
MPEG_I = 41
MPEG_CICP = 42
MPEG_5 = 50
EXPLORATIONS = 45
MAR_REFERENCE_MODEL = 46
# search_sub_group
@unique
class Subgroup(Enum):
ALL = 1
JVET = 41
WG2 = 43
WG3 = 44
WG4 = 45
WG5 = 46
WG6 = 47
WG7 = 48
WG8 = 49
AG2 = 50
AG3 = 51
AG4 = 52
AG5 = 53
JCTVC = 38
JCT3V = 39
# id_meeting=
# type_order=0 # 0-inc, 1-decreasing order (input + output documents)
# sql_type=document_number | document_date_time | upload_document_date_time | document.id_sub_group | title | authors
def _get_query_string( meeting_id,
category=SearchCategory.INPUT,
group=Standard.ALL,
subgroup=Subgroup.ALL):
return '?id_meeting={}' \
'&search_category={}' \
'&search_id_group={}' \
'&search_sub_group={}'.format(meeting_id, category.value, group.value, subgroup.value )
# --------------------------------------------------------------------------------------------------
# Interfaces
# --------------------------------------------------------------------------------------------------
def get_meetings():
'''
Get data for all meetings. Retruns data of all meetings.
[{ 'number', 'id', 'name', 'start_date', 'end_date', 'last_input', 'last_output' }, ...]
'''
response = requests.get(MEETINGS_URL, auth=(MPEG_LOGIN, MPEG_PWD))
if not response.status_code == 200:
print('HTTP response {} != 200'.format(response.status_code))
print('\t{}'.format(response.text.replace('\n', '\n\t')))
return []
parser = MDMSParser()
return parser.parse_meetings(response.text)
def get_current_meeting():
'''
Retruns data of the latest meeeting.
{ 'number', 'id', 'name', 'start_date', 'end_date', 'last_input', 'last_output' }
'''
meetings = get_meetings()
if len(meetings) == 0:
return None
return max(meetings, key=lambda x:x['number'])
def get_input_documents(meeting_id, standard=Standard.ALL, subgroup=Subgroup.ALL):
'''
Returns data of all input documents of a certain meeting.
[{'mdms_id', 'document', 'created', 'last_version_uploaded', 'sub_group_text', 'title', 'authors', 'latest_version_url'}, ...]
'''
query = _get_query_string(meeting_id, SearchCategory.INPUT, standard, subgroup)
url = urljoin(CURRENT_MEETING_URL, query)
response = requests.get(url, auth=(MPEG_LOGIN, MPEG_PWD))
if not response.status_code == 200:
print('HTTP response {} != 200'.format(response.status_code))
print('\t{}'.format(response.text.replace('\n', '\n\t')))
return []
parser = MDMSParser()
return parser.parse_input_docs(response.text)
def get_document_details(document_id):
'''
Get more details about a docuemt.
{'submitted_by': {'name', 'email'}, 'title', 'authors_string', 'organizations', 'abstract', 'related_docs', 'ahg', 'sub_group', 'group', 'standard', 'activity', 'documents': [{'path', 'version', 'timestamp'}, ... ]}
'''
query = '?id={}'.format(document_id)
url = urljoin(DOCUMENT_URL, query)
response = requests.post(url, auth=(MPEG_LOGIN, MPEG_PWD))
if not response.status_code == 200:
print('HTTP response {} != 200'.format(response.status_code))
print('\t{}'.format(response.text.replace('\n', '\n\t')))
return None
parser = MDMSParser()
details = parser.parse_document_details(response.text)
for n in range(len(details['documents'])):
details['documents'][n]['path'] = urljoin(DOCUMENT_URL, details['documents'][n]['path']) # relative to absolute
return details
def find_documents( title = '',
number = '',
author = '',
category=SearchCategory.ALL,
group = Standard.ALL,
subgroup=Subgroup.ALL):
'''
Find documents using the search URL.
TODO: Fire a POST request to SEARCH_URL and parse the result
'''
return None