import pandas as pd
import numpy as np
import torch
import os
import pickle
import networkx as nx
from functools import reduce
from dgl.nn import GraphConv
import torch.nn as nn
# Define the merge operation setup
[docs]
def merge_dfs(left_df, right_df):
"""
Merges two DataFrames on their indexes with an outer join method.
Parameters:
left_df (pd.DataFrame): The left DataFrame to merge.
right_df (pd.DataFrame): The right DataFrame to merge.
Returns:
pd.DataFrame: The resulting DataFrame after merging.
"""
# Merging on 'key' and expanding with 'how=outer' to include all records
return pd.merge(left_df, right_df, left_index=True, right_index=True, how='outer')
[docs]
def data_parsing(DATA_PATH , MODALITIES ,TARGET , INDEX_COL , PROCESSED=True) :
"""
Parses multiple data modalities into a unified structure, handling metadata alignment and preprocessing steps.
Parameters:
DATA_PATH (str): The directory path where the data files are stored.
MODALITIES (list): A list of modality names to parse.
TARGET (str): The column name in metadata that indicates the target or label.
INDEX_COL (str): The column used as the index for merging and aligning data.
PROCESSED (bool, optional): Whether to load processed or preprocessed data.
Returns:
dict: A dictionary of DataFrames, one for each modality.
pd.Series: Metadata series indexed by INDEX_COL and containing TARGET values.
"""
datModalities = {}
try :
modalities = [mod for mod in MODALITIES]
except :
print(f'Modalities listed not found in data path {DATA_PATH}')
for i, mod in enumerate(modalities) :
if PROCESSED :
with open(f'{DATA_PATH}/{mod}_processed.pkl' , 'rb') as file :
loaded_data = pickle.load(file)
else :
with open(f'{DATA_PATH}/{mod}_preprocessed.pkl' , 'rb') as file :
loaded_data = pickle.load(file)
if i == 0 :
datMeta = loaded_data['datMeta'].reset_index()[[INDEX_COL , TARGET]]
else :
datMeta = pd.merge(datMeta , loaded_data['datMeta'].reset_index()[[INDEX_COL , TARGET]] , how = 'outer' , on = [INDEX_COL , TARGET] )
datExpr = loaded_data['datExpr']
if len(set(datExpr.index.astype(str)) & set(datMeta[INDEX_COL])) == 0 :
datExpr = datExpr.T
if datExpr.isna().sum().sum() > 0 :
datExpr = datExpr.fillna(datExpr.mean())
datModalities[mod] = datExpr.loc[sorted(datExpr.index)]
meta = datMeta.set_index(INDEX_COL)[TARGET]
return datModalities , meta
[docs]
def get_gpu_memory():
"""
Retrieves and prints the current GPU memory usage statistics including total, reserved, and allocated memory amounts.
Returns:
None
"""
t = torch.cuda.get_device_properties(0).total_memory*(1*10**-9)
r = torch.cuda.memory_reserved(0)*(1*10**-9)
a = torch.cuda.memory_allocated(0)*(1*10**-9)
return print("Total = %1.1fGb \t Reserved = %1.1fGb \t Allocated = %1.1fGb" % (t,r,a))
[docs]
def indices_removal_adjust(idx_to_swap , all_idx, new_idx) :
"""
Adjusts and filters indices after some have been removed, mapping old indices to new indices post-removal.
Parameters:
idx_to_swap (array-like): Indices that potentially need adjustment after an update.
all_idx (pd.Index): The complete set of original indices.
new_idx (array-like): The updated list of indices after some removals.
Returns:
np.array: Adjusted indices reflecting the new index placement.
"""
update_idx = all_idx[all_idx.isin(new_idx)].reset_index()['index']
update_idx_swap = pd.Series(update_idx.index , index=update_idx.values)
return update_idx_swap[list(set(update_idx) & set(idx_to_swap))].values
[docs]
def network_from_csv(NETWORK_PATH , no_psn , weighted=False ) :
"""
Constructs a NetworkX graph from a CSV file containing network data.
Parameters:
NETWORK_PATH (str): The file path to the CSV containing the network data.
no_psn (bool): If True, performs special handling specific to pseudonetworks.
weighted (bool): Indicates if the network edges should consider weights (true for weighted edges).
Returns:
nx.Graph: The constructed NetworkX graph.
"""
# Open csv as pandas dataframe
network = pd.read_csv(NETWORK_PATH , index_col=0)
# Obtain node names (ids) and numbers
node_from = network[['from' , 'from_name']]
node_from.columns = ['node' , 'id']
node_to = network[['to' , 'to_name']]
node_to.columns = ['node' , 'id']
# Create networkx Graph object and add nodes to network
G = nx.Graph()
# Add nodes to Graph object, resetting index to begin from 0
nodes = pd.concat([node_from , node_to]).drop_duplicates().reset_index(drop=True)
nodes['id'] = [str(i) for i in nodes['id']] # Convert node names to strings
G.add_nodes_from(nodes['id'])
nx.set_node_attributes(G , nodes.reset_index().set_index('id')['index'] , 'idx')
# Add edges and weights (if applicable to network)
edges = []
if weighted == True :
for edge1 , edge2 , weight in zip(network['from'] , network['to'] , network['weight'] ) :
edges.append((nodes[nodes['node'] == edge1].index[0] ,nodes[nodes['node'] == edge2].index[0] , weight ))
G.add_weighted_edges_from(edges)
elif no_psn == True :
pass
else :
for edge1 , edge2 in zip(network['from_name'] , network['to_name'] ) :
edges.append((nodes[nodes['id'] == edge1]['id'].iloc[0] ,nodes[nodes['id'] == edge2]['id'].iloc[0] ))
G.add_edges_from(edges)
return G
[docs]
def init_weights(m):
"""
Initializes weights for PyTorch layers within a model.
Parameters:
m (torch.nn.Module): The model or layer to initialize.
Effects:
Applied in-situ: Adjusts the weights of the model passed based on the type of layer.
"""
if isinstance(m, GraphConv):
m.reset_parameters() ## or simply use your layer.reset_parameters()
if isinstance(m, nn.Linear):
nn.init.normal_(m.weight, mean=0.0, std=np.sqrt(1 / m.in_features))
if m.bias is not None:
nn.init.zeros_(m.bias)