Source code for revive.utils.common_utils

''''''
"""
    POLIXIR REVIVE, copyright (C) 2021-2024 Polixir Technologies Co., Ltd., is 
    distributed under the GNU Lesser General Public License (GNU LGPL). 
    POLIXIR REVIVE is free software; you can redistribute it and/or
    modify it under the terms of the GNU Lesser General Public
    License as published by the Free Software Foundation; either
    version 3 of the License, or (at your option) any later version.

    This library is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
    Lesser General Public License for more details.
"""
import ot
import os
import sys
import ray
import gym
import yaml
import h5py
import torch
import math
import random
import urllib
import pickle
import argparse
import warnings
import importlib
import multiprocessing
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.font_manager import fontManager
plt.switch_backend('Agg')

from io import StringIO
from uuid import uuid1
from sklearn import tree
from cairosvg import svg2pdf
from dtreeviz.trees import dtreeviz
from tempfile import TemporaryDirectory
from PyPDF2 import PdfFileReader,PdfFileMerger
from tqdm import tqdm
from loguru import logger
from copy import deepcopy
from functools import partial
from torch.utils.data.dataloader import DataLoader
from typing import Any, Dict, List
from collections import defaultdict, deque
import matplotlib.gridspec as gridspec
import matplotlib.patches as mpatches
from scipy import stats

from revive.computation.graph import DesicionGraph
from revive.computation.inference import *
from revive.computation.utils import *
from revive.computation.modules import *
from revive.data.batch import Batch
from revive.computation.funs_parser import parser
# from revive.utils.common_utils import load_data

try:
    import cupy as cp
    CUPY_READY = True
except:
    # warnings.warn("Warning: CuPy is not installed, metric computing is going to be slow!")
    CUPY_READY = False


[docs] class CollectPrints: def __enter__(self): self.stdout = sys.stdout self.stream = StringIO() sys.stdout = self.stream return self def __exit__(self, exc_type, exc_value, traceback): sys.stdout = self.stdout self.output = self.stream.getvalue()
[docs] def get_output(self): return self.output
[docs] def update_env_vars(key: str, value: Any): """ update env vars in os Args: key (str): name of the key. value (str): value for the key Returns: update os.environ['env_vars'] """ env_vars = os.environ.get('env_vars') if env_vars: try: env_vars_dict = eval(env_vars) if not isinstance(env_vars_dict, dict): raise ValueError('env_vars is not a dictionary') except (NameError, SyntaxError, ValueError) as e: return False else: env_vars_dict = {} env_vars_dict[key] = value os.environ['env_vars'] = str(env_vars_dict)
[docs] def get_env_var(key: str, default=None): """ get env vars in os Args: key (str): name of the key. default (str): None Returns: update os.environ['env_vars'] """ env_vars = os.environ.get('env_vars') if env_vars: try: env_vars_dict = eval(env_vars) if not isinstance(env_vars_dict, dict): raise ValueError('vnv_vars is not a dictionary') except (NameError, SyntaxError, ValueError) as e: return default else: return default
# return env_vars_dict.get(key, default)
[docs] class BreakpointContext: def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): if exc_type is not None: breakpoint() return False
[docs] class AttributeDict(dict): """ define a new class for using get and set variables esily """ __getattr__ = dict.__getitem__ __setattr__ = dict.__setitem__
[docs] def setup_seed(seed: int): """ Seting random seed in REVIVE. Args: seed: random seed """ np.random.seed(seed) torch.manual_seed(seed) random.seed(seed)
[docs] def load_npz(filename: str): """ Loading npz file Args: filename(str): *.npz file path Return: Dict of data in format of keys:values """ data = np.load(filename) return {k: v for k, v in data.items()}
[docs] def load_h5(filename: str): """ Loading npz file Args: filename(str): *.h5 file path Return: Dict of data in format of keys:values """ f = h5py.File(filename, 'r', libver="latest", swmr=True) data = {k: f[k][:] for k in f.keys()} f.close() return data
[docs] def save_h5(filename: str, data: Dict[str, np.ndarray]): """ Loading npz file Args: filename(str): output *.h5 file path Return: output file """ with h5py.File(filename, 'w') as f: for k, v in data.items(): f[k] = v
[docs] def npz2h5(npz_filename: str, h5_filename: str): """ Transforming npz file to h5 file Args: npz_filename (str): *.npz file path h5_filename (str): output *.h5 file path Return: output file """ data = load_npz(npz_filename) save_h5(h5_filename, data)
[docs] def h52npz(h5_filename: str, npz_filename: str): """ Transforming h5 file to npz file Args: h5_filename (str): input *.h5 file path npz_filename (str): output *.npz file path Return: output file """ data = load_h5(h5_filename) np.savez_compressed(npz_filename, **data)
[docs] def load_data(data_file: str): """ Loading data file Only support h5 and npz file as data files in REVIVE Args: data_file (str): input *.h5 or *.npz file path Return: Dict of data in format of keys:values """ if data_file.endswith('.h5'): raw_data = load_h5(data_file) elif data_file.endswith('.npz'): raw_data = load_npz(data_file) else: raise ValueError(f'Try to load {data_file}, but get unknown data format!') return raw_data
[docs] def find_policy_index(graph: DesicionGraph, policy_name: str): """ Find index of policy node in the whole decision flow graph Args: graph (DesicionGraph): decision flow graph in REVIVE policy_name (str): the policy node name be indexed Return: index of the policy node in decision graph Notice: only the first policy node name is supported TODO: multi policy indexes """ for i, k in enumerate(graph.keys()): if k == policy_name: break return i
[docs] def load_policy(filename: str, policy_name: str = None): """ Load policy file for REVIVE in the format of torch or .pkl of VirturalEnv VirtualEnvDev or PolicyModelEv Args: filename (str): file path policy_name (str): the policy node name be indexed Return: Policy model """ try: model = torch.load(filename, map_location='cpu') except: with open(filename, 'rb') as f: model = pickle.load(f) if isinstance(model, VirtualEnv): model = model._env if isinstance(model, VirtualEnvDev): node = model.graph.get_node(policy_name) model = PolicyModelDev(node) if isinstance(model, PolicyModelDev): model = PolicyModel(model) return model
[docs] def download_helper(url: str, filename: str): """ Download file from given url. Modified from `torchvision.dataset.utils Args: url (str): donwloading path filename (str): output file path Return: Output path """ def gen_bar_updater(): pbar = tqdm(total=None) def bar_update(count, block_size, total_size): if pbar.total is None and total_size: pbar.total = total_size progress_bytes = count * block_size pbar.update(progress_bytes - pbar.n) return bar_update try: print('Downloading ' + url + ' to ' + filename) urllib.request.urlretrieve( url, filename, reporthook=gen_bar_updater() ) except (urllib.error.URLError, IOError) as e: if url[:5] == 'https': url = url.replace('https:', 'http:') print('Failed download. Trying https -> http instead.', ' Downloading ' + url + ' to ' + filename) urllib.request.urlretrieve( url, filename, reporthook=gen_bar_updater() ) else: raise e
[docs] def import_module_from_file(file_path: str, module_name="module.name"): """ import expert function from file Args: file_path (str): file path of the expert function module_name (str): function name in the file Return: treat the expert function as an useable funtion in REVIVE """ spec = importlib.util.spec_from_file_location(module_name, file_path) foo = importlib.util.module_from_spec(spec) spec.loader.exec_module(foo) return foo
[docs] def get_reward_fn(reward_file_path: str, config_file: str): """ import user defined reward function only for Matcher reward Args: reward_file_path (str): file path of the expert function config_file (str): decision flow *.yml file Return: treat the reward function as an useable funtion in REVIVE """ if reward_file_path: logger.info(f'import reward function from {reward_file_path}!') # parse function reward_file_path_parsed = reward_file_path[:-3]+"_parsed.py" if not parser(reward_file_path, reward_file_path_parsed, config_file): reward_file_path_parsed = reward_file_path source_file = import_module_from_file(reward_file_path_parsed) try: reward_func = source_file.reward except: reward_func = source_file.get_reward else: logger.info(f'No reward function is defined!') reward_func = None return reward_func
[docs] def get_module(function_file_path, config_file): """ import user defined function Args: function_file_path (str): file path of the expert function config_file (str): decision flow *.yml file Return: treat the reward function as an useable funtion in REVIVE """ if function_file_path: logger.info(f'import module from {function_file_path}!') # parse function function_file_path_parsed = function_file_path[:-3]+"_parsed.py" if not parser(function_file_path,function_file_path_parsed,config_file): function_file_path_parsed = function_file_path module = import_module_from_file(function_file_path_parsed) else: logger.info(f'No module is defined!') module = None return module
[docs] def create_env(task: str): """ initiating gym environment as testing env for trainning Args: task (str): gym mujoco task name Return: gym env """ try: if task in ["HalfCheetah-v3", "Hopper-v3", "Walker2d-v3", "ib", "finance", "citylearn"]: import neorl env = neorl.make(task) elif task in ['halfcheetah-meidum-v0', 'hopper-medium-v0', 'walker2d-medium-v0']: import d4rl env = gym.make(task) else: env = gym.make(task) except: warnings.warn(f'Warning: task {task} can not be created!') env = None return env
[docs] def test_one_trail(env: gym.Env, policy: PolicyModel): """ testing revive policy on gym env Args: env (str): initialized gym mujoco env policy (str): revive policy used for testing on the env Return: reward and running length of the policy """ env = deepcopy(env) policy = deepcopy(policy) obs = env.reset() reward = 0 length = 0 while True: action = policy.infer({'obs': obs[np.newaxis]})[0] obs, r, done, info = env.step(action) reward += r length += 1 if done: break return (reward, length)
[docs] def test_on_real_env(env: gym.Env, policy: PolicyModel, number_of_runs: int = 10): """ testing revive policy on multiple gym envs Args: env (str): initialized gym mujoco env policy (str): revive policy used for testing on the env number_of_runs (int): the number of trails to testing Return: mean value of reward and running length of the policy """ rewards = [] episode_lengths = [] test_func = ray.remote(test_one_trail) results = ray.get([test_func.remote(env, policy) for _ in range(number_of_runs)]) rewards = [result[0] for result in results] episode_lengths = [result[1] for result in results] return np.mean(rewards), np.mean(episode_lengths)
[docs] def get_input_dim_from_graph(graph: DesicionGraph, node_name: str, total_dims: dict): """ return the total number of dims used to compute the given node on the graph Args: graph (DecisionGraph): decision flow with user setting nodes node_name (str): name of the node to get total dimensions total_dims (dict): dict of input and output dims of all nodes Return: total number of dimensions of the node_name """ input_names = graph[node_name] input_dim = 0 for _name in input_names: input_dim += total_dims[_name]['input'] return input_dim
[docs] def remove_string(string): if 'ts_' in string: string = string.replace('ts_', '') if 'next_' in string: string = string.replace('next_', '') return string
[docs] def dict_ts_conv(graph: DesicionGraph, node_name: dict, total_dims: dict, config:dict, net_hidden_features: int=128): """ return the total number of dims as dictused to compute the given node on the graph Args: graph (DecisionGraph): decision flow with user setting nodes node_name (str): name of the node to get total dimensions total_dims (dict): dict of input and output dims of all nodes Return: total number of dimensions as dict for all input of the node_name """ input_names = list(node_name.keys()) #graph[node_name] input_dim_dict = {'no_ts_input': np.array([False]), 'with_ts_input':np.array([False])} input_dim_config = {'no_ts_input': [] , 'with_ts_input':{}} ts_conv_net_config = {'other_net_input': 0, 'conv_net_input': 0} for _name in input_names: if 'ts' in _name: temp_name = remove_string(_name) input_dim_dict['with_ts_input'] += True get_config = {'ts': total_dims[_name]['input']}#config['raw_graph_config']['metadata']['nodes'][temp_name] # assert get_config['endpoint'] == True input_dim_config['with_ts_input'][_name] = get_config #the now step of ts node ts_conv_net_config['other_net_input'] += total_dims[temp_name]['input'] #the history step of ts node ts_conv_net_config['conv_net_input'] += total_dims[temp_name]['input'] else: input_dim_dict['no_ts_input'] += True input_dim_config['no_ts_input'].append(_name) ts_conv_net_config['other_net_input'] += total_dims[_name]['input'] for k, v in input_dim_dict.items(): input_dim_dict[k] = (v*net_hidden_features).item() return input_dim_dict, input_dim_config, ts_conv_net_config
[docs] def get_input_dim_dict_from_graph(graph: DesicionGraph, node_name: str, total_dims: dict): """ return the total number of dims as dictused to compute the given node on the graph Args: graph (DecisionGraph): decision flow with user setting nodes node_name (str): name of the node to get total dimensions total_dims (dict): dict of input and output dims of all nodes Return: total number of dimensions as dict for all input of the node_name """ input_names = graph[node_name] input_dim_dict = dict() for _name in input_names: input_dim_dict[_name] = total_dims[_name]['input'] return input_dim_dict
[docs] def normalize(data: np.ndarray): """ normalization of data using mean and std Args: data (np.ndarray): numpy array Return: normalized data """ flatten_data = data.reshape((-1, data.shape[-1])) mean = flatten_data.mean(axis=0) std = flatten_data.std(axis=0) std[np.isclose(std, 0)] = 1 data = (data - mean) / std return data
[docs] def plot_traj(traj: dict): """ plot all dims of data into color map along trajectory Args: traj (dict): data stored in dict Return: plot show with x axis as dims and y axis as traj-step """ traj = np.concatenate([*traj.values()], axis=-1) max_value = traj.max(axis=0) min_value = traj.min(axis=0) interval = max_value - min_value interval[interval == 0] = 1 traj = (traj - min_value) / interval plt.imshow(traj) plt.show()
[docs] def check_weight(network: torch.nn.Module): """ Check whether network parameters are nan or inf. Args: network (torch.nn.Module): torch.nn.Module Print: nan of inf in network params """ for k, v in network.state_dict().items(): v_mean = v.mean() if torch.isnan(v_mean) or torch.isinf(v_mean): if torch.any(torch.isnan(v)): print(k + 'has nan') if torch.any(torch.isinf(v)): print(k + 'has inf')
[docs] def get_models_parameters(*models): """ return all the parameters of input models in a list Args: models (torch.nn.Module): all models inputed for getting parameters Return: list of parameters for all models inputted """ parameters = [] for model in models: parameters += list(model.parameters()) return parameters
[docs] def get_grad_norm(parameters, norm_type: float = 2): """ return all gradient of the parameters Args: models : parameters of the a model Return: L2 norm of the gradient """ parameters = [p for p in parameters if p.grad is not None] total_norm = torch.norm( torch.stack([torch.norm(p.grad.detach(), norm_type) \ for p in parameters]), norm_type) return total_norm
[docs] def get_concat_traj(batch_data: Batch, node_names: List[str]): """ concatenate the data from node_names Args: batch_data (Batch): Batch of data node_names (List): list of node names to get data Return: data to get """ return torch.cat(get_list_traj(batch_data, node_names), dim=-1)
[docs] def get_list_traj(batch_data: Batch, node_names: List[str], nodes_fit_index: dict = None) -> list: """ return all data of node_names from batch_data Args: batch_data (Batch): Batch of data node_names (List): list of node names to get data nodes_fit_index (Dict): dict of fixed index for nodel_names Return: data to get """ datas = [] for name in node_names: if nodes_fit_index: datas.append(batch_data[name][..., nodes_fit_index[name]]) else: datas.append(batch_data[name]) return datas
[docs] def generate_rewards(traj: Batch, reward_fn): """ Add rewards for batch trajectories. Args: traj: batch trajectories. reward_fn: how the rewards generate. Return: batch trajectories with rewards. """ head_shape = traj.shape[:-1] traj.reward = reward_fn(traj).view(*head_shape, 1) return traj
[docs] def generate_rollout(expert_data: Batch, graph: DesicionGraph, traj_length: int, sample_fn=lambda dist: dist.sample(), adapt_stds=None, clip: Union[bool, float] = False, use_target: bool = False, mix_sample: bool = False, mix_sample_ratio: float = 0.5, replace_with_expert: bool = False, replace_ratio: float = 0.0, field='mail', return_dist=False, detach=False, mode="test"): """ Generate trajectories based on current policy. Args: expert_data: samples from the dataset. graph: the computation graph traj_length: trajectory length sample_fn: sample from a distribution. Return: batch trajectories. NOTE: this function will mantain the last dimension even if it is 1 """ if traj_length > expert_data.shape[0]: logger.warning(f'Cannot generate trajectory beyond expert data') traj_length = expert_data.shape[0] expert_data = deepcopy(expert_data) batch_size = expert_data.shape[1] if adapt_stds is None: adapt_stds = [None] * (len(graph)) graph.reset() generated_data = [] current_batch = expert_data[0] network_node_names = [] if return_dist: dist_dict = {} for i in range(traj_length): for node_name, adapt_std in zip(list(graph.keys()), adapt_stds): if graph.get_node(node_name).node_type == 'network': network_node_names.append(node_name) node_input_type = getattr(graph.get_node(node_name), "node_input_type", "generate") # if node_name == 'net_trans' and mode == "train": # node_input_type = "expert" if mode == "train" and node_input_type == "expert": input_data = expert_data[i] else: input_data = current_batch.detach() action_dist = graph.compute_node(node_name, input_data, adapt_std=adapt_std, use_target=use_target, field=field, current_expert_data=expert_data[i], history_expert_data=expert_data[:i], history_generate_data=Batch.stack(generated_data)) if mix_sample: sample_ids = np.random.choice(batch_size, size=int(mix_sample_ratio * batch_size), replace=False) action = action_dist.mode action_sample = action_dist.sample() action[sample_ids, :] = action_sample[sample_ids, :] else: action = sample_fn(action_dist) if isinstance(clip, bool) and clip: action = torch.clamp(action, -1, 1) elif isinstance(clip, float): action = torch.clamp(action, -clip, clip) else: pass if replace_with_expert: replace_ids = np.random.choice(batch_size, size=int(replace_ratio*batch_size), replace=False) action[replace_ids, :] = expert_data[node_name][i, replace_ids, :] action_log_prob = action_dist.log_prob(action).unsqueeze(dim=-1) if detach: action = action.detach() action_log_prob = action_log_prob.detach() current_batch[node_name] = action current_batch[node_name + "_log_prob"] = action_log_prob if return_dist: if node_name not in dist_dict.keys(): dist_dict[node_name] = [] dist_dict[node_name].append(action_dist) else: action = graph.compute_node(node_name, current_batch) current_batch[node_name] = action # check the generated current_batch # NOTE: this will make the rollout a bit slower. # Remove it if you are sure no explosion will happend. for k, v in current_batch.items(): if "dist" in k: continue v_mean = v.mean() if torch.isnan(v_mean) or torch.isinf(v_mean): has_inf = torch.any(torch.isinf(v)) has_nan = torch.any(torch.isnan(v)) if has_inf or has_nan: logger.warning(f'During rollout detect anomaly data: key {k}, \ has inf {has_inf}, has nan {has_nan}') logger.warning(f'Should generated rollout with \ length {traj_length}, \ early stop for only length {i}') break generated_data.append(current_batch) if i == traj_length - 1: break else: # Clone to the next Batch current_batch = expert_data[i+1] current_batch.update(graph.state_transition(generated_data[-1])) generated_data = Batch.stack(generated_data) if return_dist: return generated_data, dist_dict return generated_data
[docs] def generate_rollout_bc(expert_data: Batch, graph: DesicionGraph, traj_length: int, sample_fn=lambda dist: dist.sample(), adapt_stds=None, clip: Union[bool, float] = False, use_target: bool = False): """ Generate trajectories based on current policy. Args: expert_data: samples from the dataset. graph: the computation graph traj_length: trajectory length sample_fn: sample from a distribution. Return: batch trajectories. NOTE: this function will mantain the last dimension even if it is 1 """ assert traj_length <= expert_data.shape[0], \ 'cannot generate trajectory beyond expert data' expert_data = deepcopy(expert_data) if adapt_stds is None: adapt_stds = [None] * (len(graph)) graph.reset() generated_data = [] current_batch = expert_data[0] for i in range(traj_length): for node_name, adapt_std in zip(list(graph.keys()), adapt_stds): if graph.get_node(node_name).node_type == 'network': action_dist = graph.compute_node(node_name, current_batch, adapt_std=adapt_std, use_target=use_target, field='bc') action = sample_fn(action_dist) if isinstance(clip, bool) and clip: action = torch.clamp(action, -1, 1) elif isinstance(clip, float): action = torch.clamp(action, -clip, clip) else: pass current_batch[node_name] = action action_log_prob = \ action_dist.log_prob(action).unsqueeze(dim=-1).detach() # TODO: do we need this detach? current_batch[node_name + "_log_prob"] = action_log_prob current_batch[node_name + "_dist" + f"_{i}"] = action_dist else: action = graph.compute_node(node_name, current_batch) current_batch[node_name] = action # check the generated current_batch # NOTE: this will make the rollout a bit slower. # Remove it if you are sure no explosion will happend. for k, v in current_batch.items(): if "dist" in k: continue v_mean = v.mean() has_inf = torch.isinf(v_mean) has_nan = torch.isnan(v_mean) if has_inf or has_nan: logger.warning(f'During rollout detect anomaly data: key {k}, \ has inf {has_inf}, has nan {has_nan}') logger.warning(f'Should generated rollout with \ length {traj_length}, \ early stop for only length {i}') break generated_data.append(current_batch) if i == traj_length - 1: break # clone to new Batch current_batch = expert_data[i+1] current_batch.update(graph.state_transition(generated_data[-1])) generated_data = Batch.stack(generated_data) return generated_data
[docs] def compute_lambda_return(rewards, values, bootstrap=None, _gamma=0.9, _lambda=0.98): """ Generate lambda return for svg in REVIVE env learning Args: rewards: reward data for current stated values: values derived from value net bootstrap: bootstrap for the last time step of next_values _gamma: discounted factor _lambda: factor for balancing future or current return Return: discounted return for the input rewards. """ next_values = values[1:] if bootstrap is None: bootstrap = torch.zeros_like(values[-1]) next_values = torch.cat([next_values, bootstrap.unsqueeze(0)], dim=0) g = [rewards[i] + _gamma * (1 - _lambda) * next_values[i] for i in range(rewards.shape[0])] lambda_returns = [] last = next_values[-1] for i in reversed(list(range(len(rewards)))): last = g[i] + _gamma * _lambda * last lambda_returns.append(last) return torch.stack(list(reversed(lambda_returns)))
[docs] def sinkhorn_gpu(cuda_id): """ Specifically setting running device Args: cuda_id: cuda device id Return: sinkhorn function """ cp.cuda.Device(cuda_id).use() import ot.gpu return ot.gpu.sinkhorn
[docs] def wasserstein_distance(X, Y, cost_matrix, method='sinkhorn', niter=50000, cuda_id=0): """ Calculate wasserstein distance Args: X & Y : two arrays cost_matrix: cost matrix between two arrays method: method for calculating w_distance niter: number of iteration cuda_id: device for calculating w_distance Return: wasserstein distance """ if method == 'sinkhorn_gpu': sinkhorn_fn = sinkhorn_gpu(cuda_id) transport_plan = sinkhorn_fn(X, Y, cost_matrix, reg=1, enumItermax=niter) # (GPU) Get the transport plan for regularized OT elif method == 'sinkhorn': transport_plan = ot.sinkhorn(X, Y, cost_matrix, reg=1, numItermax=niter) # (CPU) Get the transport plan for regularized OT elif method == 'emd': transport_plan = ot.emd(X, Y, cost_matrix, numItermax=niter) # (CPU) Get the transport plan for OT with no regularisation elif method == 'emd2': distance = ot.emd2(X, Y, cost_matrix) # (CPU) Get the transport loss return distance else: raise NotImplementedError("The method is not implemented!") # Calculate Wasserstein by summing diagonals, i.e., W=Trace[MC^T] distance = np.sum(np.diag(np.matmul(transport_plan, cost_matrix.T))) return distance
[docs] def compute_w2_dist_to_expert(policy_trajectorys, expert_trajectorys, scaler=None, data_is_standardscaler=False, max_expert_sampes=20000, dist_metric="euclidean", emd_method="emd", processes=None, use_cuda=False, cuda_id_list=None): """ Computes Wasserstein 2 distance to expert demonstrations. Calculate wasserstein distance Args: policy_trajectorys: data generated by policy expert_trajectorys: expert data scaler: scale the data data_is_standardscaler: whether the data is standard scaled or not max_expert_sampes: number of data to use dist_metric: distance type, emd_method: using cpu for computing processes: multi-processing setting use_cuda: using gpu for computing cuda_id_lis: duda device as list Return: wasserstein distance """ policy_trajectorys = policy_trajectorys.copy() expert_trajectorys = expert_trajectorys.reshape(-1, expert_trajectorys.shape[-1]).copy() policy_trajectorys_shape = policy_trajectorys.shape policy_trajectorys = policy_trajectorys.reshape(-1, policy_trajectorys_shape[-1]) expert_trajectorys_index = np.arange(expert_trajectorys.shape[0]) if expert_trajectorys.shape[0] < max_expert_sampes: max_expert_sampes = expert_trajectorys.shape[0] if not data_is_standardscaler: if scaler is None: from sklearn import preprocessing scaler = preprocessing.StandardScaler() scaler.fit(expert_trajectorys) policy_trajectorys = scaler.transform(policy_trajectorys) expert_trajectorys = scaler.transform(expert_trajectorys) policy_trajectorys = policy_trajectorys.reshape(policy_trajectorys_shape) expert_trajectory_weights = \ 1./max_expert_sampes * np.ones(max_expert_sampes) policy_trajectory_weights = \ 1./policy_trajectorys_shape[1] * np.ones(policy_trajectorys_shape[1]) # fallback to cpu mode if not CUPY_READY: emd_method = 'emd' w2_dist_list = [] if use_cuda and "gpu" in emd_method: if cuda_id_list is None: cuda_id_list = list(range(cp.cuda.runtime.getDeviceCount())) assert len(cuda_id_list) > 0 for i, policy_trajectory in enumerate(policy_trajectorys): cuda_id = cuda_id_list[i%len(cuda_id_list)] cost_matrix = ot.dist(policy_trajectory, expert_trajectorys[expert_trajectorys_index[:max_expert_sampes]], metric=dist_metric) w2_dist_list.append(wasserstein_distance(policy_trajectory_weights, expert_trajectory_weights, cost_matrix, emd_method, cuda_id)) else: pool = multiprocessing.Pool(processes = processes if processes is not None else multiprocessing.cpu_count()) for policy_trajectory in policy_trajectorys: cost_matrix = ot.dist(policy_trajectory, expert_trajectorys[expert_trajectorys_index[:max_expert_sampes]], metric=dist_metric) np.random.shuffle(expert_trajectorys_index) w2_dist_list.append(pool.apply_async(wasserstein_distance, (policy_trajectory_weights, expert_trajectory_weights, cost_matrix, emd_method))) pool.close() pool.join() w2_dist_list = [res.get() for res in w2_dist_list] return np.mean(w2_dist_list)
[docs] def dict2parser(config: dict): """ transform dict as operation setting as parser Args: config: dict of operation Return: parser as command """ parser = argparse.ArgumentParser() def get_type(value): if type(value) is bool: return lambda x: [False, True][int(x)] return type(value) for k, v in config.items(): parser.add_argument(f'--{k}', type=get_type(v), default=v) return parser
[docs] def list2parser(config: List[Dict]): """ transform list of dict as operation setting as parser Args: config: list of operation Return: parser as command """ parser = argparse.ArgumentParser() def get_type(type_name): type_name = eval(type_name) if isinstance(type_name, str) else type_name if type_name is bool: return lambda x: [False, True][int(x)] return type_name for d in config: names = ['--' + d['name']] data_type = get_type(d['type']) default_value = d['default'] addition_args = {} if data_type is list: data_type = get_type(type(default_value[0])) if type(default_value) is list else get_type(type(default_value)) addition_args['nargs'] = '+' if 'abbreviation' in d.keys(): names.append('-' + d['abbreviation']) parser.add_argument(*names, type=data_type, default=default_value, help=d.get('description', ''), **addition_args) return parser
[docs] def set_parameter_value(config: List[Dict], name: str, value: Any): """ change value of the name in config file Args: config: list of dict of variables name: the value of the keys to be changed value: the value to be chanbed into Return: resetting default values to the original config """ for param in config: if param['name'] == name: param['default'] = value break return config
[docs] def update_description(default_description, custom_description): ''' update in-place the default description with a custom description. Args: default_description: custom_description: Return: ''' names_to_indexes = {description['name']: i for i, description in enumerate(default_description)} for description in custom_description: name = description['name'] index = names_to_indexes.get(name, None) if index is None: warnings.warn(f'parameter name `{name}` \ is not in the default description, skip.') else: default_description[index] = description
[docs] def find_later(path: str, keyword: str) -> List[str]: ''' find all the later folder after the given keyword Args: path: a file path to get list of folder keyword: the name of the folder which as the last folder at the path Return: a list of folder as path ''' later = [] while len(path) > 0: path, word = os.path.split(path) later.append(word) if keyword == word: break return list(reversed(later))
[docs] def get_node_dim_from_dist_configs(dist_configs: dict, node_name: str): """ return the total number of dims of the node_name Args: dist_configs (dict): decision flow with user setting nodes node_name (str): name of the node to get total dimensions Return: total number of dimensions of the node_name """ node_dim = 0 for dist_config in dist_configs[node_name]: node_dim += dist_config["dim"] return node_dim
[docs] def set_default_font(): available_fonts = [font.name for font in fontManager.ttflist] preferred_fonts = ['Arial', 'Helvetica'] selected_font = next((font for font in preferred_fonts if font in available_fonts), None) if selected_font: plt.rcParams['font.family'] = selected_font
set_default_font()
[docs] def save_histogram(histogram_path: str, graph: DesicionGraph, data_loader: DataLoader, device: str, scope: str, *args, **kwargs): """ save the histogram Args: histogram_path (str): the path to save histogram graph (DesicionGraph): DesicionGraph data_loader (DataLoader): torch data loader device (str): generate data on which device scope (str): 'train' or 'val' related to the file-saving name. Return: Saving the histogram as png file to the histogram_path """ processor = graph.processor expert_data = [] generated_data = [] for expert_batch in iter(data_loader): if 'rnn_dataset' in kwargs and kwargs['rnn_dataset']: expert_batch, loss_mask = expert_batch traj_length = expert_batch.shape[0] expert_batch.to_torch(device=device) generated_batch = generate_rollout(expert_batch, graph, traj_length, lambda dist: dist.mode, clip=True) expert_batch.to_numpy() generated_batch.to_numpy() expert_data.append(expert_batch) generated_data.append(generated_batch) # expert_data = {node_name : np.concatenate([batch[node_name] for batch in expert_data], axis=1) for node_name in graph.keys()} expert_data_tmp_dict = {} for node_name in graph.keys(): try: expert_data_tmp_dict[node_name] = np.concatenate([batch[node_name] for batch in expert_data], axis=1) except Exception as e: logger.warning(f"{e} not in dataset when plotting histogram") continue expert_data = expert_data_tmp_dict generated_data = {node_name: np.concatenate([batch[node_name] for batch in generated_data], axis=1) for node_name in graph.keys()} expert_data = processor.deprocess(expert_data) generated_data = processor.deprocess(generated_data) set_default_font() fig = plt.figure(figsize=(15, 7), dpi=150) for node_name in graph.keys(): index_name = node_name[5:] if node_name in graph.transition_map.values() else node_name for i, dimension in enumerate(graph.descriptions[index_name]): dimension_name = list(dimension.keys())[0] generated_dimension_data = generated_data[node_name][..., i].reshape((-1)) try: expert_dimension_data = expert_data[node_name][..., i].reshape((-1)) assert expert_dimension_data.shape == generated_dimension_data.shape except: expert_dimension_data = None if dimension[dimension_name]['type'] == 'continuous': bins = 100 elif dimension[dimension_name]['type'] == 'discrete': bins = min(dimension[dimension_name]['num'], 100) else: bins = None title = f'{node_name}.{dimension_name}' if expert_dimension_data is not None: plt.hist([expert_dimension_data, generated_dimension_data], bins=bins, label=['History_Data', 'Generated_Data'], log=True) else: plt.hist(generated_dimension_data, bins=bins, label='Generated_Data', log=True) plt.legend(loc='upper left') plt.xlabel(title) plt.ylabel("frequency") plt.title(title + f"-histogram-{scope}") plt.savefig(os.path.join(histogram_path, title + f"-{scope}.png")) fig.clf() plt.close(fig)
[docs] def save_histogram_after_stop(traj_length: int, traj_dir: str, train_dataset, val_dataset): """ save the histogram after the training is stopped Args: traj_length (int): length of the horizon traj_dir (str): saving derectory train_dataset: torch data loader val_dataset: generate data on which device Return: Saving the histogram as png file to the histogram_path """ histogram_path = os.path.join(traj_dir, 'histogram') if not os.path.exists(histogram_path): os.makedirs(histogram_path) from revive.data.dataset import collect_data device = 'cuda' if torch.cuda.is_available() else 'cpu' train_graph = torch.load(os.path.join(traj_dir, 'venv_train.pt'), map_location=device).graph train_dataset = train_dataset.trajectory_mode_(traj_length) train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=256, shuffle=False, collate_fn=partial(collect_data, graph=train_dataset.graph), pin_memory=True) save_histogram(histogram_path, train_graph, train_loader, device=device, scope='train') val_graph = torch.load(os.path.join(traj_dir, 'venv_val.pt'), map_location=device).graph val_dataset = val_dataset.trajectory_mode_(traj_length) val_loader = torch.utils.data.DataLoader(train_dataset, batch_size=256, shuffle=False, collate_fn=partial(collect_data, graph=val_dataset.graph), pin_memory=True) save_histogram(histogram_path, val_graph, val_loader, device=device, scope='val')
[docs] def tb_data_parse(tensorboard_log_dir: str, keys: list = []): """ parse data from tensorboard logdir Args: tensorboard_log_dir (str): length of the horizon keys (list): list of keys to get from tb logdir Return: geting a dict of result including value of keys """ from tensorboard.backend.event_processing import event_accumulator ea = event_accumulator.EventAccumulator(tensorboard_log_dir) ea.Reload() ea_keys = ea.scalars.Keys() ea_keys = [k[9:] if k.startswith('ray/tune/') else k for k in ea_keys] parse_data = lambda key: [(i.step, i.value) for i in ea.scalars.Items(key)] if keys: if set(keys) < set(ea_keys): logger.info(f"Keys missing: there are some keys not in tensorboard logs!") res = {key: parse_data(key) for key in keys} else: res = {key: parse_data(key) for key in ea_keys} return res
[docs] def double_venv_validation(reward_logs, data_reward={}, img_save_path=""): """ policy double venv validation to the img path Args: reward_logs (str): dict of different rewards data_reward (dict): dataset mean reward of train and val dataset img_save_path (str): path of saving img Return: saving double venv validation img to the setting path """ reward_trainPolicy_on_trainEnv = np.array(reward_logs["reward_trainPolicy_on_trainEnv"]) reward_valPolicy_on_trainEnv = np.array(reward_logs["reward_valPolicy_on_trainEnv"]) reward_valPolicy_on_valEnv = np.array(reward_logs["reward_valPolicy_on_valEnv"]) reward_trainPolicy_on_valEnv = np.array(reward_logs["reward_trainPolicy_on_valEnv"]) set_default_font() fig, axs = plt.subplots(2, 1, figsize=(15, 10)) fig.suptitle("Double Venv Validation", fontsize=26) x = np.arange(reward_trainPolicy_on_trainEnv[:, 0].shape[0]) axs[0].plot(x,reward_trainPolicy_on_trainEnv[:, 1], 'r--', label='reward_trainPolicy_on_trainEnv') axs[0].plot(x,reward_valPolicy_on_trainEnv[:, 1], 'g--', label='reward_valPolicy_on_trainEnv') if "reward_train" in data_reward.keys(): axs[0].plot(x, np.ones_like(reward_trainPolicy_on_trainEnv[:, 0]) * data_reward["reward_train"], 'b--', label='reward_train') axs[0].set_ylabel('Reward') axs[0].set_xlabel('Epoch') axs[0].legend() axs[1].plot(np.arange(reward_valPolicy_on_valEnv[:, 1].shape[0]), reward_valPolicy_on_valEnv[:,1], 'r--', label='reward_valPolicy_on_valEnv') axs[1].plot(np.arange(reward_trainPolicy_on_valEnv[:, 1].shape[0]), reward_trainPolicy_on_valEnv[:,1], 'g--', label='reward_trainPolicy_on_valEnv') if "reward_val" in data_reward.keys(): axs[1].plot(np.arange(reward_valPolicy_on_valEnv[:,1].shape[0]), np.ones_like(reward_valPolicy_on_valEnv[:, 0]) * data_reward["reward_val"], 'b--', label='reward_val') axs[1].set_ylabel('Reward') axs[1].set_xlabel('Epoch') axs[1].legend() fig.savefig(img_save_path) plt.close(fig)
[docs] def plt_double_venv_validation(tensorboard_log_dir, reward_train, reward_val, img_save_path): """ Drawing double_venv_validation images Args: tensorboard_log_dir (str): path of tb infomation reward_train : dataset mean reward of train dataset reward_val: dataset mean reward of val dataset img_save_path (str): path of saving img Return: saving double venv validation img to the setting path """ reward_logs = tb_data_parse(tensorboard_log_dir, ['reward_trainPolicy_on_valEnv', 'reward_trainPolicy_on_trainEnv', 'reward_valPolicy_on_trainEnv', 'reward_valPolicy_on_valEnv']) data_reward = {"reward_train": reward_train, "reward_val": reward_val} double_venv_validation(reward_logs, data_reward, img_save_path)
def _plt_node_rollout(expert_datas, generated_datas, node_name, data_dims, img_save_dir, **kwargs): """ Drawing rollout plot for every node Args: expert_datas: expert data generated_datas: generate data node_name: name of the graph node data_dims: dimensions of the data img_save_path (str): path of saving img Return: saving double venv validation img to the setting path """ set_default_font() sub_fig_num = len(data_dims) train_type = kwargs['train_type'] if expert_datas is None: expert_datas = deepcopy(generated_datas) expert_datas_flag = False else: expert_datas_flag = True # import time # start = time.time() def my_function(trj_index, expert_data, generated_data): img_save_path = os.path.join(img_save_dir, f"{trj_index}_{node_name}") fig = plt.figure(figsize=(15, 5 * sub_fig_num)) gs = fig.add_gridspec(sub_fig_num + 1, 1, height_ratios=[0.05] + [1] * sub_fig_num, hspace=0.3) # 创建第一个子图,占据第一行的所有列 ax_title = fig.add_subplot(gs[0, :], frameon=False) ax_title.set_title(f"{train_type} Rollout", fontsize=26) ax_title.axis('off') # 关闭坐标轴 ax_title.text(0, -1, f"Node name: {node_name}\nDimensions: {sub_fig_num}\nTraj index: {trj_index}", ha='left', va='center',fontsize=12) for index, dim in enumerate(data_dims): ax = fig.add_subplot(gs[index + 1, :]) if expert_datas_flag: ax.plot(expert_data[:, index], 'r--', label='History Expert Data') ax.plot(generated_data[:, index], 'g--', label=f'{train_type} Rollout Data') ax.set_ylabel(dim) ax.set_xlabel('Step') ax.legend() fig.savefig(img_save_path, bbox_inches='tight', pad_inches=0.5) plt.close(fig) pass # 做一些任务 processes = [] for trj_index, (expert_data, generated_data) in enumerate(zip(expert_datas, generated_datas)): process = multiprocessing.Process(target=my_function, args=(trj_index, expert_data, generated_data)) process.start() processes.append(process) for process in processes: process.join() # for trj_index, (expert_data, generated_data) in enumerate(zip(expert_datas, generated_datas)): # img_save_path = os.path.join(img_save_dir, f"{trj_index}_{node_name}") # # if sub_fig_num > 1: # fig = plt.figure(figsize=(15, 5 * sub_fig_num)) # gs = fig.add_gridspec(sub_fig_num + 1, 1, height_ratios=[0.05] + [1] * sub_fig_num, hspace=0.3) # # 创建第一个子图,占据第一行的所有列 # ax_title = fig.add_subplot(gs[0, :], frameon=False) # ax_title.set_title(f"{train_type} Rollout", fontsize=26) # ax_title.axis('off') # 关闭坐标轴 # ax_title.text(0, -1, f"Node name: {node_name}\nDimensions: {sub_fig_num}\nTraj index: {trj_index}", # ha='left', va='center',fontsize=12) # for index, dim in enumerate(data_dims): # ax = fig.add_subplot(gs[index + 1, :]) # if expert_datas_flag: # ax.plot(expert_data[:, index], 'r--', label='History Expert Data') # ax.plot(generated_data[:, index], 'g--', label=f'{train_type} Rollout Data') # ax.set_ylabel(dim) # ax.set_xlabel('Step') # ax.legend() # fig.savefig(img_save_path, bbox_inches='tight', pad_inches=0.5) # plt.close(fig) # print(time.time()-start)
[docs] def save_rollout_action(rollout_save_path: str, graph: DesicionGraph, device: str, dataset, nodes_map, horizion_num = 10, rollout_plt_length = None, pre_horzion = 0, **kwargs): """ save the Trj rollout Args: rollout_save_path: path of saving img data graph: decision graph device: device dataset: dimensions of the data nodes_map: graph nodes horizion_num (int): length to generate data Return: save Trj rollout """ if not os.path.exists(rollout_save_path): os.makedirs(rollout_save_path) graph = graph.to(device) from revive.data.dataset import collect_data, pad_collect_data collate_fn = partial(pad_collect_data, graph=graph) if 'rnn_dataset' in kwargs and kwargs['rnn_dataset'] else partial(collect_data, graph=graph) data_loader = torch.utils.data.DataLoader(dataset, batch_size=horizion_num, shuffle=True, collate_fn=collate_fn, pin_memory=True) processor = graph.processor expert_data = [] generated_data = [] for expert_batch in iter(data_loader): if 'rnn_dataset' in kwargs and kwargs['rnn_dataset']: expert_batch, loss_mask = expert_batch if rollout_plt_length is None: traj_length = expert_batch.shape[0] else: traj_length = rollout_plt_length expert_batch.to_torch(device=device) generated_batch = generate_rollout(expert_batch, graph, traj_length, lambda dist: dist.mode, clip=True) expert_batch.to_numpy() generated_batch.to_numpy() expert_batch = {k:v[:traj_length] for k,v in expert_batch.items()} expert_data.append(expert_batch) generated_data.append(generated_batch) break # expert_data = {node_name : np.concatenate([batch[node_name] for batch in expert_data if node_name in batch.keys()], axis=1) for node_name in nodes_map.keys()} expert_data_tmp_dict = {} for node_name in nodes_map.keys(): try: expert_data_tmp_dict[node_name] = np.concatenate([batch[node_name] for batch in expert_data], axis=1) except Exception as e: logger.warning(f"{e} not in dataset when plotting rollout") continue expert_data = expert_data_tmp_dict generated_data = {node_name: np.concatenate([batch[node_name] for batch in generated_data], axis=1) for node_name in nodes_map.keys()} # deprocess all data expert_data = processor.deprocess(expert_data) generated_data = processor.deprocess(generated_data) if pre_horzion > 0: expert_data = expert_data[pre_horzion:] generated_data = generated_data[pre_horzion:] # del ts_node in nodes_map if dataset.ts_node_frames: ts_nodes_map = {} for k, v in dataset.ts_node_frames.items(): ts_nodes_map[k] = nodes_map[k][-len(nodes_map[k])//v:] if 'next_' + k in nodes_map.keys(): ts_nodes_map['next_' + k] = nodes_map['next_' + k][-len(nodes_map['next_' + k])//v:] nodes_map.update(ts_nodes_map) if graph.ts_nodes: for ts_node,node in graph.ts_nodes.items(): if ts_node in nodes_map.keys(): if node in nodes_map.keys(): expert_data.pop(ts_node) generated_data.pop(ts_node) nodes_map.pop(ts_node) else: nodes_map[node] = [c[c.index("_")+1:]for c in nodes_map[ts_node]] expert_data[node] = expert_data[ts_node][...,-len(nodes_map[node]):] generated_data[node] = generated_data[ts_node][...,-len(nodes_map[node]):] nodes_map.pop(ts_node) if "next_" + ts_node in nodes_map.keys(): if "next_" + node in nodes_map.keys(): expert_data.pop("next_" + ts_node) generated_data.pop("next_" + ts_node) nodes_map.pop("next_" + ts_node) else: nodes_map["next_" + node] = [c[c.index("_")+1:]for c in nodes_map["next_" + ts_node]] expert_data["next_" + node] = expert_data["next_" +ts_node][..., -len(nodes_map["next_" + node]):] generated_data["next_" + node] = generated_data["next_" +ts_node][..., -len(nodes_map["next_" + node]):] nodes_map.pop("next_" + ts_node) # select_indexs = np.random.choice(np.arange(expert_data[list(nodes.keys())[0]].shape[1]), size=10, replace=False) horizion_num = min(horizion_num, expert_data.shape[1]) for node_name, node_dims in nodes_map.items(): if node_name in ["step_node_", "traj"]: continue try: expert_action_data = expert_data[node_name] select_expert_action_data = [expert_action_data[:,index] for index in range(horizion_num)] except Exception as e: logger.warning(f"{e} not in dataset when plotting rollout") select_expert_action_data = None generated_action_data = generated_data[node_name] select_generated_action_data = [generated_action_data[: , index] for index in range(horizion_num)] node_rollout_save_path = os.path.join(rollout_save_path, node_name) if not os.path.exists(node_rollout_save_path): os.makedirs(node_rollout_save_path) _plt_node_rollout(select_expert_action_data, select_generated_action_data, node_name, node_dims, node_rollout_save_path, **kwargs)
[docs] def data_to_dtreeviz(data: pd.DataFrame, target: pd.DataFrame, target_type: (List[str], str), orientation: ('TD', 'LR') = "TD", fancy: bool = True, max_depth: int = 3, output: (str) = None): """ pd data to decision tree Args: data: dataset in pandas form target: target in pandas form target_type: continuous or discrete orientation: Left to right or top to down fancy: true or false for dtreeviz function max_depth (int): depth of the tree output: whether to output dtreeviz result in the path Return: save Trj rollout """ # orange = '#F46d43' # red = '#FF0018' # blue ='#0000F9' # black = '#000000' # custom_color = {'scatter_edge': '#225eab', 'scatter_marker': '#225eab', 'scatter_marker_alpha':0.3, 'wedge': '#F46d43', #orange 'split_line': '#F46d43', #orange 'mean_line': '#F46d43', #orange 'axis_label': '#000000', #black 'title': '#000000', #black, 'legend_title': '#000000', #black, 'legend_edge': '#000000', #black, 'edge': '#000000', #black, 'color_map_min': '#c7e9b4', 'color_map_max': '#081d58', # 'classes': color_blind_friendly_colors, 'rect_edge': '#000000', #black, 'text': '#000000', #black, # 'highlight': 'k', 'text_wedge': '#000000', #black, 'arrow': '#000000', #black, 'node_label': '#000000', #black, 'tick_label': '#000000', #black, 'leaf_label': '#000000', #black, 'pie': '#000000', #black, } if isinstance(target_type, str) and len(target.columns) > 1: target_type = [target_type, ] * len(target.columns) _tmp_pdf_paths = [] with TemporaryDirectory() as dirname: for _target_type, target_name in zip(target_type,target.columns): if _target_type == "Classification" or _target_type == "C": _target_type = "Classification" decisiontree = tree.DecisionTreeClassifier(max_depth=max_depth) elif _target_type == "Regression" or _target_type == "R": _target_type = "Regression" decisiontree = tree.DecisionTreeRegressor(max_depth=max_depth, random_state=1) else: raise NotImplementedError _target = target[[target_name, ]] decisiontree.fit(data, _target) temp_size = min(data.values.shape[0], 5000) np.random.seed(2020) random_index = np.random.choice(np.arange(data.values.shape[0]), size=(temp_size,)) if _target_type == "Classification": _orientation = "TD" class_names = list(set(list(_target.values.reshape(-1)))) _target = _target.values[random_index,:].reshape(-1) else: _target = _target.values[random_index,:] class_names = None _orientation = "LR" viz = dtreeviz(decisiontree, data.values[random_index,:], _target, target_name=target_name, feature_names=data.columns, class_names=class_names, show_root_edge_labels = True, show_node_labels = False, title = _target_type + " Decision Tree of "+ '<'+ target_name + '>', orientation = _orientation, label_fontsize = 15, ticks_fontsize = 10, title_fontsize = 15, fancy=fancy, scale = 2, colors = custom_color, cmap = "cool",) _tmp_pdf_path = os.path.join(dirname, str(uuid1())+".pdf") _tmp_pdf_paths.append(_tmp_pdf_path) try: svg2pdf(url=viz.save_svg(), output_width=1000, output_height=1000, write_to=_tmp_pdf_path) except Exception as e: logger.warning(f"{e}") try: # os.system('sudo apt install graphviz') svg2pdf(url=viz.save_svg(), output_width=1000, output_height=1000, write_to=_tmp_pdf_path) except Exception as e: logger.warning(f"{e}") logger.warning(f"if Graphviz show Error, please try to use <sudo apt install graphviz> in operation system") return if output is None: if len(vizs.keys()) == 1: return viz return vizs assert output, f"output should be not None" if len(_tmp_pdf_paths)==len(target_type): merger = PdfFileMerger() for in_pdf in _tmp_pdf_paths: with open(in_pdf, 'rb') as pdf: merger.append(PdfFileReader(pdf)) merger.write(output) return
[docs] def net_to_tree(tree_save_path: str, graph: DesicionGraph, device: str, dataset, nodes, *args, **kwargs): """ deriving the net model to decision tree Args: tree_save_path: result saving path graph: decision flow in DesicionGraph type device: device to generate data dataset: dataset for deriving decision tree nodes: nodes in decision flow to derive decision tree Return: save decision tree """ if not os.path.exists(tree_save_path): os.makedirs(tree_save_path) graph = graph.to(device) from revive.data.dataset import collect_data data_loader = torch.utils.data.DataLoader(dataset, batch_size=256, shuffle=True, collate_fn=partial(collect_data, graph=graph), pin_memory=True) processor = graph.processor expert_data = [] generated_data = [] data_num = 0 for expert_batch in iter(data_loader): if 'rnn_dataset' in kwargs and kwargs['rnn_dataset']: expert_batch, loss_mask = expert_batch traj_length = expert_batch.shape[0] expert_batch.to_torch(device=device) generated_batch = generate_rollout(expert_batch, graph, traj_length, lambda dist: dist.mode, clip=True) expert_batch.to_numpy() generated_batch.to_numpy() expert_data.append(expert_batch) generated_data.append(generated_batch) data_num += traj_length*256 if data_num > 20000: break data_keys = list(graph.keys()) + graph.leaf # expert_data = {node_name : np.concatenate([batch[node_name] for batch in expert_data], axis=1) for node_name in data_keys} expert_data_tmp_dict = {} for node_name in data_keys: try: val = np.concatenate([batch[node_name] for batch in expert_data], axis=1) expert_data_tmp_dict[node_name] = val.reshape(-1, val.shape[-1]) except: continue expert_data = expert_data_tmp_dict generated_data = {node_name: np.concatenate([batch[node_name] for batch in generated_data], axis=1) for node_name in data_keys} generated_data = {node_name: node_data.reshape(-1,node_data.shape[-1]) for node_name, node_data in generated_data.items()} expert_data = processor.deprocess(expert_data) generated_data = processor.deprocess(generated_data) sample_num = expert_data[list(nodes.keys())[0]].shape[0] size = min(sample_num, 100000) select_indexs = np.random.choice(np.arange(sample_num), size=size, replace=False) for output_node, output_node_dims in nodes.items(): input_node_dims = [] input_nodes = graph[output_node] for input_node in input_nodes: for obs_dim in graph.descriptions[input_node]: input_node_dims.append(list(obs_dim.keys())[0]) input_data = np.concatenate([generated_data[node] for node in input_nodes], axis=-1) output_data = generated_data[output_node] input_data = input_data[select_indexs] output_data = output_data[select_indexs] X = pd.DataFrame(input_data, columns=input_node_dims) Y = pd.DataFrame(output_data, columns=output_node_dims) # begin bug fix tuzuolin 2022-1010 # Y_type = "R" from collections import ChainMap # tuzuolin 2022 1011 temp_dict = dict(ChainMap(*graph.descriptions[output_node])) Y_type = [] for i_type in map(lambda x: temp_dict[x]['type'], output_node_dims): Y_type.append("C" if i_type == 'category' else "R") result = data_to_dtreeviz(X, Y, Y_type, output=os.path.join(tree_save_path,output_node+".pdf"))
[docs] def generate_response_inputs(expert_data: Batch, dataset: Batch, graph: DesicionGraph, obs_sample_num=16): expert_data = deepcopy(expert_data) all_input_nodes = set([item for sub_ls in graph.graph_dict.values() for item in sub_ls]) """ Output: generated_inputs: Dict[str: Dict[tuple: np.ndarray]] generated_inputs.keys(): ['obs', 'action', 'door_open', ...] --> the nodes appear in the input of any network node generated_inputs['obs'].keys(): [(0, 0), ..., (0, 15), (1, 0), ..., (1, 15), ..., (obs_dim, 0), ..., (obs_dim, 15)] --> (data_dim_index, sub_graph_index) generated_inputs['obs'][(0, 0)]: np.ndarray of shape (dim_sample_num, obs_dim) --> perturbated data dim_perturbations: Dict[str: Dict[int: np.ndarray]] dim_perturbations.keys(): ['obs', 'action', 'door_open', ...] --> the nodes appear in the input of any network node dim_perturbations['obs'].keys(): [0, 1, ..., obs_dim] dim_perturbations['obs'][0]: np.ndarray of shape (dim_sample_num, ) --> dim_perturbation --> as x-axis in plotting """ generated_inputs = defaultdict(dict) dim_perturbations = defaultdict(dict) for node_name in all_input_nodes: descriptions = graph.descriptions[node_name] for dim, description in enumerate(descriptions): # generate fake data # info = list(description.values())[0] for _, info in description.items(): if info['type'] == 'continuous': dim_sample_num = 100 dim_perturbation = np.linspace(info.get('min') if info.get('min') is not None else dataset[node_name][:, dim].min(), info.get('max') if info.get('max') is not None else dataset[node_name][:, dim].max(), num=dim_sample_num) elif info['type'] == 'category': values = info['values'] dim_sample_num = len(values) dim_perturbation = np.array(values) elif info['type'] == 'discrete': assert isinstance(info, dict), "in discrete, assert isinstance(info, dict)" dim_sample_num = info['num'] dim_perturbation = np.linspace(info['min'], info['max'], num=dim_sample_num) else: raise NotImplementedError("data dim_type not match") for index in range(obs_sample_num): inputs = deepcopy(expert_data[node_name][index]) # (dim, ) inputs = np.repeat(np.expand_dims(inputs, axis=0), repeats=dim_sample_num, axis=0) # (dim_sample_num, dim) inputs[:, dim] = dim_perturbation generated_inputs[node_name][(dim, index)] = inputs dim_perturbations[node_name][dim] = dim_perturbation return generated_inputs, dim_perturbations
[docs] def generate_response_outputs(generated_inputs: defaultdict, expert_data: Batch, venv_train: VirtualEnvDev, venv_val: VirtualEnvDev): generated_inputs = deepcopy(generated_inputs) """ Output: generated_outputs: Dict[str: Dict[tuple: np.ndarray]] generated_outputs.keys(): ['action', 'next_obs', ...] --> the nodes in graph.keys() whose node_type == 'network' generated_outputs['next_obs'].keys(): [(0, 0, input_1), ..., (0, 0, input_n), (0, 1, input_1), ..., (0, 1, input_n), ..., (0, 15, input_1), ..., (0, 15, input_n), (1, 0, input_1), ..., (1, 0, input_n), (1, 1, input_1), ..., (1, 1, input_n), ..., (1, 15, input_1), ..., (1, 15, input_n), ..., (next_obs_dim, 0, input_1), ..., (next_obs_dim, 0, input_n), (next_obs_dim, 1, input_1), ..., (next_obs_dim, 1, input_n), ..., (next_obs_dim, 15, input_1), ..., (next_obs_dim, 15, input_n)] --> (data_dim_index, sub_graph_index, each_input_name) generated_outputs['next_obs'][(0, 0, input_1)]: np.ndarray of shape (dim_sample_num, next_obs_dim) --> outputs corresponding to each perturbated input """ graph = venv_train.graph generated_outputs_train = defaultdict(dict) generated_outputs_val = defaultdict(dict) with torch.no_grad(): for node_name in list(graph.keys()): if graph.get_node(node_name).node_type == 'network': input_names = graph.get_node(node_name).input_names inputs_dict = graph.get_node(node_name).get_inputs(generated_inputs) # Dict[str: Dict[tuple: np.ndarray]] for input_name in input_names: for dim, index in list(inputs_dict[input_name].keys()): state_dict = {} state_dict[input_name] = inputs_dict[input_name][(dim, index)] state_dict.update({name: np.repeat(np.expand_dims(expert_data[name][index], axis=0), repeats=state_dict[input_name].shape[0], axis=0) for name in input_names if name != input_name}) venv_train.reset() venv_val.reset() generated_outputs_train[node_name][(dim, index, input_name)] = venv_train.node_infer(node_name, state_dict) generated_outputs_val[node_name][(dim, index, input_name)] = venv_val.node_infer(node_name, state_dict) return generated_outputs_train, generated_outputs_val
[docs] def plot_response_curve(response_curve_path, graph_train, graph_val, dataset, device, obs_sample_num=16): set_default_font() if not os.path.exists(response_curve_path): os.makedirs(response_curve_path) dataset = deepcopy(dataset) graph_train.reset() graph_train = deepcopy(graph_train) venv_train = VirtualEnvDev(graph_train) venv_train.to(device) graph_val.reset() graph_val = deepcopy(graph_val) venv_val = VirtualEnvDev(graph_val) venv_val.to(device) indexes = np.random.choice(np.arange(dataset.shape[0]), size=(obs_sample_num, ), replace=False) expert_data = dataset[indexes] # numpy array generated_inputs, dim_perturbations = generate_response_inputs(expert_data, dataset, graph_train, obs_sample_num) generated_outputs_train, generated_outputs_val = generate_response_outputs(generated_inputs, expert_data, venv_train, venv_val) obs_sample_num_per_dim = int(np.sqrt(obs_sample_num)) with torch.no_grad(): for node_name in list(graph_train.keys()): if graph_train.get_node(node_name).node_type == 'network': input_names = graph_train.get_node(node_name).input_names output_dims = dataset[node_name].shape[-1] for input_name in input_names: input_dims = dataset[input_name].shape[-1] # plot fig = plt.figure(figsize=(8 * input_dims, 10 * output_dims)) # (width, height) red_patch = mpatches.Patch(color='red', label='venv_train') blue_patch = mpatches.Patch(color='blue', label='venv_val') outer = gridspec.GridSpec(output_dims, input_dims, wspace=0.2, hspace=0.2) for output_dim in range(output_dims): for input_dim in range(input_dims): dim_perturbation = dim_perturbations[input_name][input_dim] outer_index = output_dim * input_dims + input_dim inner = gridspec.GridSpecFromSubplotSpec(obs_sample_num_per_dim, obs_sample_num_per_dim, subplot_spec=outer[outer_index], wspace=0.2, hspace=0.2) outer_ax = plt.Subplot(fig, outer[outer_index]) outer_ax.axis('off') fig.add_subplot(outer_ax) mae = [] corr = [] for index in range(obs_sample_num): ax = plt.Subplot(fig, inner[index]) if output_dim == 0 and input_dim == 0 and index == 0: ax.legend(handles=[red_patch, blue_patch], loc="upper left") output_train = generated_outputs_train[node_name][(input_dim, index, input_name)][:, output_dim] output_val = generated_outputs_val[node_name][(input_dim, index, input_name)][:, output_dim] line1, = ax.plot(dim_perturbation, output_train, color='red') line2, = ax.plot(dim_perturbation, output_val, color='blue') fig.add_subplot(ax) mae.append(np.abs(output_train - output_val) / dim_perturbation.shape[0]) corr.append(stats.spearmanr(output_train, output_val).correlation) outer_ax.set_title(f"{node_name}_dim: {output_dim}, {input_name}_dim: {input_dim}\nMAE: {np.sum(mae) / obs_sample_num:.2f}\nCorr: {np.sum(corr) / obs_sample_num:.2f}", ) response_curve_node_path = os.path.join(response_curve_path, node_name) if not os.path.exists(response_curve_node_path): os.makedirs(response_curve_node_path) plt.savefig(os.path.join(response_curve_node_path, f"{node_name}_on_{input_name}.png"), bbox_inches='tight')
[docs] def response_curve(response_curve_path, venv, dataset, device="cuda" if torch.cuda.is_available() else "cpu", obs_sample_num=16): set_default_font() def generate_response_outputs(generated_inputs: defaultdict, expert_data: Batch, venv: VirtualEnvDev): generated_inputs = deepcopy(generated_inputs) graph = venv.graph generated_outputs_train = defaultdict(dict) with torch.no_grad(): for node_name in list(graph.keys()): if graph.get_node(node_name).node_type == 'network': input_names = graph.get_node(node_name).input_names inputs_dict = graph.get_node(node_name).get_inputs(generated_inputs) # Dict[str: Dict[tuple: np.ndarray]] for input_name in input_names: for dim, index in list(inputs_dict[input_name].keys()): state_dict = {} state_dict[input_name] = inputs_dict[input_name][(dim, index)] state_dict.update({name: np.repeat(np.expand_dims(expert_data[name][index], axis=0), repeats=state_dict[input_name].shape[0], axis=0) for name in input_names if name != input_name}) venv.reset() generated_outputs_train[node_name][(dim, index, input_name)] = venv.node_infer(node_name, state_dict) return generated_outputs_train if not os.path.exists(response_curve_path): os.makedirs(response_curve_path) dataset = deepcopy(dataset) venv.to(device) graph = venv.graph graph.reset() indexes = np.random.choice(np.arange(dataset.shape[0]), size=(obs_sample_num, ), replace=False) expert_data = dataset[indexes] # numpy array generated_inputs, dim_perturbations = generate_response_inputs(expert_data, dataset, graph, obs_sample_num) generated_outputs = generate_response_outputs(generated_inputs, expert_data, venv) obs_sample_num_per_dim = int(np.sqrt(obs_sample_num)) with torch.no_grad(): for node_name in list(graph.keys()): if graph.get_node(node_name).node_type == 'network': input_names = graph.get_node(node_name).input_names output_dims = dataset[node_name].shape[-1] for input_name in input_names: input_dims = dataset[input_name].shape[-1] # plot fig = plt.figure(figsize=(8 * input_dims, 10 * output_dims)) # (width, height) blue_patch = mpatches.Patch(color='blue', label='venv') outer = gridspec.GridSpec(output_dims, input_dims, wspace=0.2, hspace=0.2) for output_dim in range(output_dims): for input_dim in range(input_dims): dim_perturbation = dim_perturbations[input_name][input_dim] outer_index = output_dim * input_dims + input_dim inner = gridspec.GridSpecFromSubplotSpec(obs_sample_num_per_dim, obs_sample_num_per_dim, subplot_spec=outer[outer_index], wspace=0.2, hspace=0.2) outer_ax = plt.Subplot(fig, outer[outer_index]) outer_ax.axis('off') fig.add_subplot(outer_ax) for index in range(obs_sample_num): ax = plt.Subplot(fig, inner[index]) if output_dim == 0 and input_dim == 0 and index == 0: ax.legend(handles=[blue_patch, ], loc="upper left") output = generated_outputs[node_name][(input_dim, index, input_name)][:, output_dim] line1, = ax.plot(dim_perturbation, output, color='blue') fig.add_subplot(ax) outer_ax.set_title(f"{node_name}_dim: {output_dim}, {input_name}_dim: {input_dim}", ) response_curve_node_path = os.path.join(response_curve_path, node_name) if not os.path.exists(response_curve_node_path): os.makedirs(response_curve_node_path) plt.savefig(os.path.join(response_curve_node_path, f"{node_name}_on_{input_name}.png"), bbox_inches='tight')
[docs] def create_unit_vector(d_model): """ Normalization of unit vector Args: size: tuple, (num_vecs, dim) / (dim, ) Return: a random vector of which the sum eaquals 1 """ x = np.random.uniform(low=-1, high=1, size=d_model) x -= x.mean() return x / np.linalg.norm(x)
[docs] def generate_bin_encoding(traj_num): """ transform trajectory id into binary code in form of list Args: traj_num: the number of traj in data Return: binary vector of traj id """ traj_str = "{0:b}".format(traj_num) max_len = len(traj_str) traj_encodings = [] for i in range(traj_num): traj_encodings.append(np.array(list(map(int, list("{0:b}".format(i).zfill(max_len)))))) return np.vstack(traj_encodings)
[docs] class PositionalEncoding: def __init__(self, d_model: int, max_len: int = 5000): position = np.expand_dims(np.arange(max_len), axis=1) div_term = np.exp(np.arange(0, d_model, 2) * (-math.log(10000.0) / d_model)) pe = np.zeros((max_len, 1, d_model)) pe[:, 0, 0::2] = np.sin(position * div_term) pe[:, 0, 1::2] = np.cos(position * div_term) self.pe = pe
[docs] def encode(self, pos) -> np.ndarray: """ Args: pos: Array, shape [seq_len, 1] / scalar Output: pe: Array, shape [seq_len, d_model] / [d_model, ] """ pe = self.pe[pos].squeeze() # [seq_len, d_model] return pe
[docs] def func_read(parse_file_path): file_name, function_name = parse_file_path.split('.') file_name = os.path.split(os.path.splitext(parse_file_path)[0])[-1] source_file = importlib.import_module(f'{file_name}') func = eval(f'source_file.{function_name}') return func
[docs] def rollout_response_curve(dataset, venv, expert_func_dict: dict, node_name: str, plot_path: str, length: int=None): set_default_font() output_dim = dataset.data[node_name].shape[-1] _random_index = np.random.choice(range(dataset._start_indexes.shape[0])) _start = dataset._start_indexes[_random_index] _end = dataset._end_indexes[_random_index] if length is None else _start + length expert_data = dataset.data[_start:_end] # 开始数据生成 for k in expert_func_dict.keys(): expert_func_dict[k]['func'] = func_read(expert_func_dict[k]['func']) _temp_data = deepcopy(expert_data) _temp_data[expert_func_dict[k]['node']] = expert_func_dict[k]['func'](_temp_data) _temp_data.pop(node_name) expert_func_dict[k]['result'] = venv.node_infer(node_name, _temp_data) print('expert_func:',k,' finished generate data!') # 开始画图 plt.rcdefaults() plt.rc('axes', unicode_minus=False) fig, axes = plt.subplots(output_dim, 1, figsize=(8, 4 * output_dim)) # 在每个子图中绘制示例内容(这里用一个简单的示例) for dim_index in range(output_dim): ax = axes[dim_index] # 获取当前子图的轴对象 ax.plot(expert_data[node_name][..., dim_index], alpha=1, lw=0.5, label=f'history_data') ax.set_title(f'{node_name}-dim:{dim_index}') # 设置子图标题 for key in expert_func_dict.keys(): ax.plot(expert_func_dict[key]['result'][...,dim_index], alpha=1, lw=0.5, label=f'{key}') ax.legend() # 显示图例 plt.tight_layout() plt.savefig(f'{plot_path}.png', dpi=150) print('rollous response curve is finished and save to : ',f'{plot_path}.png')