Source code for revive.computation.funs_parser

''''''
"""
    POLIXIR REVIVE, copyright (C) 2021-2023 Polixir Technologies Co., Ltd., is 
    distributed under the GNU Lesser General Public License (GNU LGPL). 
    POLIXIR REVIVE is free software; you can redistribute it and/or
    modify it under the terms of the GNU Lesser General Public
    License as published by the Free Software Foundation; either
    version 3 of the License, or (at your option) any later version.

    This library is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
    Lesser General Public License for more details.
"""
""" The function of this document is mainly for use on the SAAS platform, and maintenance will be temporarily suspended in the future."""
import re
import yaml
from shutil import copyfile
from loguru import logger

import revive.computation.operators as opt

OPERATORS_NAME = sorted(opt.__all__, reverse=False)

[docs]def get_nodes(yaml_file_path,): with open(yaml_file_path, 'r', encoding='UTF-8') as f: raw_config = yaml.load(f, Loader=yaml.FullLoader) data_config = raw_config['metadata']['columns'] nodes_name = set([list(d.values())[0]['dim'] for d in data_config]) nodes = { node_name : [list(d.keys())[0] for d in data_config if list(d.values())[0]['dim'] == node_name] for node_name in nodes_name} for node in raw_config['metadata']['graph'].keys(): if "next_" in node and node[5:] in nodes.keys(): nodes[node] = nodes[node[5:]] return nodes
[docs]def matching_bracket(string, idx, brackets=[]): a = { "(": ")", "[": "]", "{": "}", "<": ">" } if brackets: a = { k:v for k in a.items() for k in brackets} close_bracket = a[string[idx]] if idx != string.rindex(string[idx]): b = string.rindex(string[idx]) c = string[b:].index(close_bracket) str_list = list(string) str_list[b], str_list[b+c] = ".", "." return matching_bracket(''.join(str_list), idx) else: d = string[idx:].index(close_bracket) return idx + d
[docs]def strip(s): return re.sub("\s|\t|\n", "", s)
[docs]def find_all(s, sub_s): return [m.start() for m in re.finditer(sub_s, s)]
[docs]def find_node(line): nodes = {} for node_name in sorted(NODES.keys(), reverse=True): res = find_all(line, node_name) for index in res: bracket_start_index = index+len(node_name) if bracket_start_index==len(line): continue if line[bracket_start_index] != "[" or line[bracket_start_index+1:bracket_start_index+5] == "...,": continue bracket_stop_index = matching_bracket(line, bracket_start_index) + 1 nodes[bracket_start_index] = bracket_stop_index nodes = [[index, nodes[index]] for index in sorted(nodes.keys())] return nodes
[docs]def find_column(line): columns = {} for node_name in NODES.keys(): for column_name in NODES[node_name]: res = find_all(line, column_name) for index in res: if line[index-1:index+len(column_name)+1] == f'"{column_name}"' or \ line[index-1:index+len(column_name)+1] == f"'{column_name}'": columns[index] = [column_name,node_name] columns = [[index, columns[index]] for index in sorted(columns.keys())] return columns
[docs]def find_operator(line): operators = {} for operator_name in OPERATORS_NAME: res = find_all(line, operator_name) for index in res: if line[index-1] in [" ", "=", "(", ","] and not bool(re.match('[a-zA-Z0-9_]+', line[index+len(operator_name)])): operators[index] = operator_name operators = [[index, operators[index]] for index in sorted(operators.keys())] return operators
[docs]def convert_operator(oral_operator_name): return "opt." + oral_operator_name
[docs]def convert_column(oral_column,flag): column_name, node_name = oral_column return f"{NODES[node_name].index(column_name)}"
[docs]def convert_node(bracket_start_index): return f"{NODES[node_name].index(column_name)}"
[docs]def convert_operators(line): """ Convert operator Example: add -> opt.add """ operators = find_operator(line) if operators: first_operator_index, first_operator_name = operators[0] line = line[:first_operator_index] \ + convert_operator(first_operator_name) \ + line[first_operator_index+len(first_operator_name):] return convert_operators(line) else: return line
[docs]def convert_columns(line): """ Convert column name to column index Example: "obs_1" -> 1 """ columns = find_column(line) if columns: first_column_index, first_column_name = columns[0] if "[" == line[first_column_index-2]: flag = "L" elif "]" == line[first_column_index+len(first_column_name[0])+1]: flag = "R" else: flag = "C" line = line[:first_column_index-1] \ + convert_column(first_column_name,flag) \ + line[first_column_index+len(first_column_name[0])+1:] return convert_columns(line) else: return line
[docs]def convert_nodes(line): """ Convert node to tensor Example: obs[1,2] -> obs[...,[1,2]] """ nodes = find_node(line) if nodes: bracket_start_index, bracket_stop_index = nodes[0] if ":" in line[bracket_start_index:bracket_stop_index]: line = line[:bracket_start_index] \ + "[...," \ + line[bracket_start_index+1:bracket_stop_index-1] \ + "]" \ + line[bracket_stop_index:] else: line = line[:bracket_start_index] \ + "[...," \ + line[bracket_start_index:bracket_stop_index] \ + "]" \ + line[bracket_stop_index:] return convert_nodes(line) else: return line
[docs]def convert_line(line): line = strip(line) line = convert_operators(line) line = convert_columns(line) line = convert_nodes(line) return line
[docs]def checkt_convert(line): comvert_line_copy = line for fn in [convert_operators,convert_columns,convert_nodes]: comvert_line_copy = fn(comvert_line_copy) if comvert_line_copy == line: return False return True
[docs]def convert_fn_def(origin_code : list): ''' find the intent of original code ''' codes = [] index = 0 for i, code in enumerate(origin_code): if code.startswith('def '): bracket_start_index = code.index("(") codes.append(code[:bracket_start_index]+"(data: Dict[str, torch.Tensor]) -> torch.Tensor:\n") index = i if ":" in code: index = i break args = strip("".join(origin_code[:index+1])) bracket_start_index = args.index("(") bracket_stop_index = matching_bracket(args,bracket_start_index) args = args[bracket_start_index+1:bracket_stop_index].split(",") for arg in args: codes.append(f' {arg}=data["{arg}"]\n') other_codes = origin_code[index+1:] return codes, other_codes
[docs]def get_fn_list(origin_code_list): fn_start_index = [] fn_stop_index = [] for i,code in enumerate(origin_code_list): if code.startswith('def '): if fn_start_index: fn_stop_index.append(i) fn_start_index.append(i) fn_stop_index.append(i+1) return [origin_code_list[i:j] for i,j in zip(fn_start_index,fn_stop_index)]
[docs]def parser(input_file : str, output_file : str, yaml_file : str): global NODES NODES = get_nodes(yaml_file) with open(input_file, 'r') as f: origin_code_list = f.readlines() for code in origin_code_list: if code.startswith("import torch"): logger.info(f'Not parser function in {input_file}') # copyfile(input_file, output_file) return False logger.info(f'Parser function in {input_file}') output_codes = [] output_codes.append("import torch\n") output_codes.append("from typing import Dict\n") output_codes.append("\n") output_codes.append("import revive.computation.operators as opt\n") output_codes.append("\n") fn_list = get_fn_list(origin_code_list) for fn in fn_list: start_codes, other_codes = convert_fn_def(fn) output_codes += start_codes for code in other_codes: if not checkt_convert(code): output_codes.append(code) continue if " return " in code: if "(" not in code and "[" not in code: output_codes.append(code) else: sub_code = convert_line("return="+code[code.index("return")+len("return"):]) sub_code = sub_code.replace("return=", "return ") output_codes.append(" "*(len(code) - len(code.lstrip()))+sub_code+"\n") else: output_codes.append(" "*(len(code) - len(code.lstrip()))+convert_line(code)+"\n") with open(output_file, 'w') as f: f.writelines(output_codes) return True