Source code for mgrid.transformation

"""Function to convert planar graph to supra graph."""
from copy import deepcopy
from itertools import chain
from statistics import mean
from typing import Dict, Set, Tuple, Union

import networkx as nx
import pandas as pd

from mgrid.graph.geographic import COLUMNS, COLUMNS_DI, GeoGraph
from mgrid.grid import GeoGrid, SupraGrid
from mgrid.log import LOGGER

COLUMNS_DI_ORIGINAL = ["source_original", "target_original"]


def _planar2supra(g: GeoGrid) -> Tuple[SupraGrid, Dict[int, Set[str]]]:
    """Convert a planar grid to corresponding supra-grid.

    Args:
        g: a planar graph to be converted.

    Returns:
        Resulted supra-graph.
    """
    dg = nx.DiGraph(g)

    # Initiate dictionary for nodes in different layers.
    node_dict = {}
    inter_nodes = set(g.inter_nodes.index)
    for layer in g.layers:
        node_dict[layer] = set(g.layer_graph(layer).nodes) - inter_nodes

    # Initiate dataframe for inter-edges.
    inter_edges = deepcopy(g.inter_nodes)
    inter_edges["source"] = "default_"
    inter_edges["target"] = "default_"

    # Initiate dataframe for intra-edges.
    intra_edges = nx.to_pandas_edgelist(g)
    intra_edges[COLUMNS_DI_ORIGINAL[0]] = intra_edges[COLUMNS_DI[0]]
    intra_edges[COLUMNS_DI_ORIGINAL[1]] = intra_edges[COLUMNS_DI[1]]
    intra_edges.set_index(COLUMNS_DI_ORIGINAL, inplace=True)

    def update_intra_edges(edge: Tuple[str, str], edge_new: Tuple[str, str]):
        """Update intra-edges' relationship after splitting inter-nodes.

        Args:
            edge: intra-edges in planar graph.
            edge_new: corresponding edge in supra-graph.
        """
        edges_original = intra_edges.index[
            (intra_edges[COLUMNS_DI[0]] == edge[0])
            & (intra_edges[COLUMNS_DI[1]] == edge[1])
        ].tolist()
        if len(edges_original) != 1:
            LOGGER.error(f"The origin of edge {edge} is incorrect.")
        intra_edges.loc[edges_original[0], COLUMNS_DI] = edge_new

    # Split all the inter-nodes to inter-edges.
    for node, row in g.inter_nodes.iterrows():
        in_edges = list(dg.in_edges(nbunch=node, data=True))
        for u, _, data in in_edges:
            edge_new = (u, str(node) + f'_layer{data["layer"]}')
            dg.add_edge(*edge_new, **data)
            update_intra_edges((u, node), edge_new)
        out_edges = list(dg.out_edges(nbunch=node, data=True))
        for _, v, data in out_edges:
            edge_new = (str(node) + f'_layer{data["layer"]}', v)
            dg.add_edge(*edge_new, **data)
            update_intra_edges((node, v), edge_new)

        def init_node4inter(binary: int) -> str:
            """Init source or target for the inter-edge.

            Args:
                binary: 0 or 1, representing "source" and "target".

            Returns:
                Node name of source or target.
            """
            layer = row[COLUMNS[binary]]
            node_new = str(node) + f"_layer{layer}"
            inter_edges.loc[node, COLUMNS_DI[binary]] = node_new
            node_dict[layer].add(node_new)
            return node_new

        source = init_node4inter(0)
        target = init_node4inter(1)
        dg.add_edge(source, target, layer=mean([row["upper"], row["lower"]]))

        # Store layer and origin of inter-edge terminals as node attributes
        dg.nodes[source]["layer"] = row["upper"]
        dg.nodes[source]["origin"] = node
        dg.nodes[target]["layer"] = row["lower"]
        dg.nodes[target]["origin"] = node

        dg.remove_node(node)

    # Build supra-grid.
    res = SupraGrid(dg)
    res.intra_edges = intra_edges
    res.inter_edges = inter_edges
    res.inter_edges.index.name = "node"

    res.df_layers = g.df_layers.copy(deep=True)
    return res, node_dict


[docs]def planar2supra(g: Union[GeoGraph, GeoGrid]) -> SupraGrid: """Convert a planar grid to corresponding supra-grid. Args: g: a planar graph or grid to be converted. Returns: Resulted supra graph (for the grid). """ supra, node_dict = _planar2supra(g) if isinstance(g, GeoGrid): for node, row in g.inter_nodes.iterrows(): source = supra.inter_edges.loc[node, "source"] target = supra.inter_edges.loc[node, "target"] supra.edges[source, target]["element"] = row["element"] # Get conversion elements. conversions = deepcopy(g.conversions) conversions.reset_index(inplace=True) conversions = pd.merge( conversions, supra.nodes_new, how="left", left_on=["node", "layer"], right_index=True, ) conversions.set_index("name", inplace=True) conversions.drop(columns=["node", "layer"], inplace=True) supra.conversions = conversions supra.types = deepcopy(g.types) # Build a list of buses using dictionary for nodes in different layers. keys_sorted = sorted(node_dict) data = { "idx": chain(*[[key] * len(node_dict[key]) for key in keys_sorted]) } buses = pd.DataFrame( data, index=chain(*[node_dict[key] for key in keys_sorted]), ) buses["layer_name"] = buses["idx"].map(supra.df_layers["name"]) buses.index.name = "node" buses["voltage"] = buses["idx"].map(supra.df_layers["voltage"]) supra.buses = buses return supra