# -*- coding: utf-8 -*- ''' This is the interface to MPEG Document Management System (mdms). It requests the data from MDMS and parses the output HTML ''' import os import requests import bs4 import re from urllib.parse import urljoin, parse_qs, urlparse from datetime import datetime from enum import Enum, unique BASE_URL = 'https://dms.mpeg.expert/doc_end_user/' MEETINGS_URL = urljoin(BASE_URL, 'all_meeting.php') CURRENT_MEETING_URL = urljoin(BASE_URL, 'current_meeting.php') SEARCH_URL = urljoin(BASE_URL, 'searchAcross.php') DOCUMENT_URL = urljoin(BASE_URL, 'current_document.php') MPEG_LOGIN = os.environ.get('MPEG_LOGIN') MPEG_PWD = os.environ.get('MPEG_PWD') class MDMSParser: def parse_meetings(self, html): meetings = [] soup = bs4.BeautifulSoup(html, features = 'lxml') tables = soup.find('body').find_all('table') if len(tables) != 1: print('Error: Only single table should be present in "All Meetings" frame. Did layout of MDMS change?') return [] rows = tables[0].find_all('tr') for n in range(len(rows)): if n==0: # check header first header = ['number', 'name', 'start date', 'end date', 'last input document', 'last output document'] if not self.check_table_header(header, rows[n]): print('Error: Wrong table header. Did layout of MDMS change?') return [] continue cols = rows[n].find_all('td', recursive=False) try: last_input = None last_output = None if len(cols[4].text) > 0: last_input = int(cols[4].text) if len(cols[5].text) > 0: last_output = int(cols[5].text) parsed_href = urlparse(cols[1].a['href']) meeting_id = int(parse_qs(parsed_href.query)['id_meeting'][0]) meetings.append({ 'number': int(cols[0].text), 'id': meeting_id, 'name': cols[1].text.strip(), 'start_date': datetime.strptime(cols[2].text, '%Y-%m-%d'), 'end_date': datetime.strptime(cols[3].text, '%Y-%m-%d'), 'last_input': last_input, 'last_output': last_output }) except: print('Error: Could not parse meeting data. Did MDMS layout change?') return [] return meetings def parse_author_entry(self, author_entry): ''' Search entry string for an email, remove it from the name and clean up Return a tuple('name', 'email') ''' author_entry = author_entry.strip() if len(author_entry) == 0: return None email = None match = re.search(r'[\w\.-]+@[\w\.-]+', author_entry) if match: # email found email = match.group(0) author_entry = author_entry.replace(email, '') # remove email from the name # remove everything what is inside () or [] author_entry = re.sub(r'[\(\[].*?[\)\]]', '', author_entry) # remove all non ASCII characters author_entry = re.sub(r'[^\x00-\x7F]+', '', author_entry) author_entry = author_entry.strip() return (author_entry, email) def try_parsing_date(self, text): ''' Try parsing the timestamp, if not possible return None ''' for fmt in ('%Y-%m-%d %H:%M:%S', '%Y-%m-%d'): try: return datetime.strptime(text.strip(), fmt) except ValueError: pass return None def parse_input_docs(self, html): docs = [] soup = bs4.BeautifulSoup(html, features = 'lxml') for i in soup.select ('br'): # replace
with a space, it makes checking headers easier i.replace_with(' ') form = soup.find('body').find('form', id='documents') if not form: print('Error: No form with id="documents" found. Did MDMS layout change?') return [] table = form.find('table') if not table: print('Error: No table found in form. Did MDMS layout change?') return [] rows = table.find_all('tr', recursive=False) for n in range(len(rows)): if n==0: # check header first header = ['number', 'created', 'uploaded', 'Group Working Group / SubGroup', 'title', 'source', 'download'] if not self.check_table_header(header, rows[n]): print('Error: Wrong table header. Did layout of MDMS change?') return [] continue cols = rows[n].find_all('td', recursive=False) try: if len(cols[0].text) == 0: continue # get document ID on MDMS parsed_href = urlparse(cols[0].a['href']) mdms_id = int(parse_qs(parsed_href.query)['id'][0]) container_url = urljoin(DOCUMENT_URL, '?id={}'.format(mdms_id)) # get timestamp of the last uploaded version last_version_uploaded = self.try_parsing_date(cols[2].text) created_timestamp = self.try_parsing_date(cols[1].text) # get authors authors = [] for entry in cols[5].contents: if isinstance(entry, bs4.Tag): parsed_href = entry.text email = None name = None try: parsed_href = urlparse(entry['href']) email = parsed_href.path author_data = self.parse_author_entry(entry.text) name = entry.text if author_data: name = author_data[0] # clean version of the name # sometimes people type name and email wrong in MDMS and they are flipped if not '@' in email and author_data[1]: name = email email = author_data[1] except KeyError: # sometimes Author's field is formatted with fake html tags. print('Bad HTML format in Authors field: ', entry) name = entry.text pass authors.append({ 'name': name, 'email': email }) else: for author_entry in entry.string.replace(' and ', ',').split(','): author_data = self.parse_author_entry(author_entry) if author_data: authors.append({ 'name': author_data[0], 'email': author_data[1] }) # get latest document link (if available) latest_url = None if len(cols) == 7: if not cols[6].find('a') == None: latest_url = urljoin(CURRENT_MEETING_URL, cols[6].find('a')['href']) docs.append({ 'mdms_id': mdms_id, 'document': cols[0].text, 'created': created_timestamp, 'last_version_uploaded': last_version_uploaded, 'sub_group_text': cols[3].text, 'title': cols[4].text.strip(), 'authors': authors, 'latest_version_url': latest_url, 'container': container_url }) except: # TODO: catch properly print('Error: Could not parse input documents data. Did MDMS layout change?') return [] return docs def parse_document_details(self, html): details = { 'submitted_by': None, 'title': None, 'authors_string': None, 'organizations': None, 'abstract': None, 'related_docs': None, 'ahg': None, 'sub_group': None, 'group': None, 'standard': None, 'activity': None, 'documents': [] } soup = bs4.BeautifulSoup(html, features = 'lxml') for i in soup.select ('br'): # replace
with a space, it makes checking headers easier i.replace_with(' ') # do some checks if format is ok table_main = soup.find('body').find('table') if not table_main: print('Error: No main table element found. Did MDMS layout change?') return None rows_main = table_main.find_all('tr', recursive=False) if not len(rows_main) == 3: print('Error: Main table should have 3 rows. Did MDMS layout change?') return None tables = rows_main[0].find_all('table') if not len(tables) == 2: print('Error: First row in the main table should have only 2 tables in it. Did MDMS layout change?') return None rows = tables[1].find_all('tr', recursive=False) # parse for n in range(len(rows)): cols = rows[n].find_all('td', recursive=False) attribute = cols[0].text.strip().lower() entry = cols[1].text.strip() if len(entry) == 0: continue # skip all empty fields if 'submitted by' in attribute: parsed_href = urlparse(cols[1].a['href']) details['submitted_by'] = {'name': entry, 'email': parsed_href.path} elif 'title' in attribute: details['title'] = entry elif 'authors' in attribute: details['authors_string'] = entry elif 'organizations' in attribute: details['organizations'] = entry elif 'abstract' in attribute: details['abstract'] = entry elif 'related contributions' in attribute: details['related_docs'] = entry elif 'ahg' in attribute: details['ahg'] = entry elif 'sub group' in attribute: details['sub_group'] = entry elif 'group' in attribute: details['group'] = entry elif 'standard' in attribute: details['standard'] = entry elif 'activity' in attribute: details['activity'] = entry elif 'document' in attribute: rel_path = None version = None timestamp = None for entry in cols[1].contents: if isinstance(entry, bs4.Tag): try: parsed_href = urlparse(entry['href']) rel_path = parsed_href.path except KeyError: continue else: entry = entry.string.strip() if len(entry) == 0: details['documents'].append({ 'path': rel_path, 'version': version, 'timestamp': timestamp }) rel_path = None version = None timestamp = None continue pos1 = entry.find('(version') pos2 = entry.find('- date', pos1+8) pos3 = entry.find(')', pos2+6) if pos1 < 0 or pos2 < 0 or pos3 < 0: continue version = int(entry[pos1+8:pos2].strip()) timestamp = self.try_parsing_date(entry[pos2+6:pos3]) return details def check_table_header(self, template, header_row): ''' Check if header_row contains the same data as the template ''' cols = header_row.find_all('td', recursive=False) if not len(template) == len(cols): print('Error: Table header should have {} columns but it has {}.'.format(len(template), len(cols))) return False for n in range(len(template)): if not cols[n].text.strip().lower() == template[n].strip().lower(): print('Error: Table header entry mismatch: "{}" != "{}".'.format(cols[n].text.strip().lower(), template[n].strip().lower())) return False return True @unique class SearchCategory(Enum): ALL = '' INPUT = 'm' OUTPUT = 'w' # search_id_group @unique class Standard(Enum): ALL = 1 MPEG_DASH = 38 MPEG_G = 43 MPEG_IOMT = 44 MPEG_N = 49 MPEG_1 = 4 MPEG_2 = 5 MPEG_4 = 6 MPEG_7 = 7 MPEG_21 = 8 MPEG_A = 9 MPEG_B = 10 MPEG_C = 11 MPEG_D = 12 MPEG_E = 13 MPEG_M = 14 MPEG_U = 15 MPEG_V = 16 MPEG_H = 26 MPEG_UD = 39 MPEG_GREEN = 40 MPEG_I = 41 MPEG_CICP = 42 MPEG_5 = 50 EXPLORATIONS = 45 MAR_REFERENCE_MODEL = 46 # search_sub_group @unique class Subgroup(Enum): ALL = 1 JVET = 41 WG2 = 43 WG3 = 44 WG4 = 45 WG5 = 46 WG6 = 47 WG7 = 48 WG8 = 49 AG2 = 50 AG3 = 51 AG4 = 52 AG5 = 53 JCTVC = 38 JCT3V = 39 # id_meeting= # type_order=0 # 0-inc, 1-decreasing order (input + output documents) # sql_type=document_number | document_date_time | upload_document_date_time | document.id_sub_group | title | authors def _get_query_string( meeting_id, category=SearchCategory.INPUT, group=Standard.ALL, subgroup=Subgroup.ALL): return '?id_meeting={}' \ '&search_category={}' \ '&search_id_group={}' \ '&search_sub_group={}'.format(meeting_id, category.value, group.value, subgroup.value ) # -------------------------------------------------------------------------------------------------- # Interfaces # -------------------------------------------------------------------------------------------------- def get_meetings(): ''' Get data for all meetings. Retruns data of all meetings. [{ 'number', 'id', 'name', 'start_date', 'end_date', 'last_input', 'last_output' }, ...] ''' response = requests.get(MEETINGS_URL, auth=(MPEG_LOGIN, MPEG_PWD)) if not response.status_code == 200: print('HTTP response {} != 200'.format(response.status_code)) print('\t{}'.format(response.text.replace('\n', '\n\t'))) return [] parser = MDMSParser() return parser.parse_meetings(response.text) def get_current_meeting(): ''' Retruns data of the latest meeeting. { 'number', 'id', 'name', 'start_date', 'end_date', 'last_input', 'last_output' } ''' meetings = get_meetings() if len(meetings) == 0: return None return max(meetings, key=lambda x:x['number']) def get_input_documents(meeting_id, standard=Standard.ALL, subgroup=Subgroup.ALL): ''' Returns data of all input documents of a certain meeting. [{'mdms_id', 'document', 'created', 'last_version_uploaded', 'sub_group_text', 'title', 'authors', 'latest_version_url'}, ...] ''' query = _get_query_string(meeting_id, SearchCategory.INPUT, standard, subgroup) url = urljoin(CURRENT_MEETING_URL, query) response = requests.get(url, auth=(MPEG_LOGIN, MPEG_PWD)) if not response.status_code == 200: print('HTTP response {} != 200'.format(response.status_code)) print('\t{}'.format(response.text.replace('\n', '\n\t'))) return [] parser = MDMSParser() return parser.parse_input_docs(response.text) def get_document_details(document_id): ''' Get more details about a docuemt. {'submitted_by': {'name', 'email'}, 'title', 'authors_string', 'organizations', 'abstract', 'related_docs', 'ahg', 'sub_group', 'group', 'standard', 'activity', 'documents': [{'path', 'version', 'timestamp'}, ... ]} ''' query = '?id={}'.format(document_id) url = urljoin(DOCUMENT_URL, query) response = requests.post(url, auth=(MPEG_LOGIN, MPEG_PWD)) if not response.status_code == 200: print('HTTP response {} != 200'.format(response.status_code)) print('\t{}'.format(response.text.replace('\n', '\n\t'))) return None parser = MDMSParser() details = parser.parse_document_details(response.text) for n in range(len(details['documents'])): details['documents'][n]['path'] = urljoin(DOCUMENT_URL, details['documents'][n]['path']) # relative to absolute return details def find_documents( title = '', number = '', author = '', category=SearchCategory.ALL, group = Standard.ALL, subgroup=Subgroup.ALL): ''' Find documents using the search URL. TODO: Fire a POST request to SEARCH_URL and parse the result ''' return None