Source code for graphical_model_learning.algorithms.dag.pcalg

from graphical_models import PDAG, UndirectedGraph
from conditional_independence import CI_Tester
import itertools as itr

def skeleton(nodes: set, ci_tester: CI_Tester, max_cond_set: int=None, verbose=False):
    Estimate the skeleton of an underlying DAG using the order-independent skeleton estimation method of
    Colombo and Maathius (2014).

        Labels of nodes in the graph.
        A conditional independence tester, which has a method is_ci taking two sets A and B, and a conditioning set C,
        and returns True/False.
        Maximum size of conditioning set tested to separate nodes.
        If True, print edges as they are removed, along with the separating set responsible for removing them.

    See Also

    (skeleton, sepset)
    nnodes = len(nodes)
    ug = UndirectedGraph(edges=set(itr.combinations(nodes, 2)))
    sepset = {}
    max_cond_set = max_cond_set if max_cond_set is not None else nnodes-2
    for c_size in range(max_cond_set+1):
        adjacencies = ug.neighbors
        for i, j in itr.permutations(nodes, 2):
            if ug.has_edge(i, j) and len(adjacencies[i] - {j}) >= c_size:
                for cond_set in itr.combinations(adjacencies[i] - {j}, c_size):
                    if ci_tester.is_ci(i, j, cond_set):
                        if verbose: print(f"Removing {i}-{j}, separated by {cond_set}")
                        ug.delete_edge(i, j)
                        sepset[frozenset({i, j})] = cond_set
    return ug, sepset

[docs]def pcalg( nodes, ci_tester: CI_Tester=None, skel=None, sepset=None, solve_conflict: bool=False, max_cond_set: int=None, verbose: bool=False ) -> PDAG: """ Use the PC (Peters-Clark) algorithm to estimate the Markov equivalence class of the data-generating DAG. Parameters ---------- nodes: Labels of nodes in the graph. ci_tester: A conditional independence tester, which has a method is_ci taking two sets A and B, and a conditioning set C, and returns True/False. skel: An estimated skeleton. If not provided, uses the `skeleton` method to estimate. sepset: The separating sets for non-adjacent nodes in the estimated skeleton. solve_conflict: If False, any disagreements on v-structures are simply overwritten. If True, allow both orientations (represented by a bidirected edge). verbose: If True, print decisions made by the algorithm. See Also -------- gsp Returns ------- est_dag """ if solve_conflict: raise NotImplementedError if ci_tester is None: if skel is None or sepset is None: raise ValueError("Must provide either ci_tester or skeleton and sepset dictionary") if ci_tester is not None: skel, sepset = skeleton(nodes, ci_tester, max_cond_set=max_cond_set, verbose=verbose) adjacencies = skel.neighbors arcs = set() for i, k in itr.combinations(nodes, 2): if not skel.has_edge(i, k): for j in adjacencies[i] & adjacencies[k]: if j not in sepset[frozenset({i, k})]: if not solve_conflict: arcs.discard((j, k)) arcs.discard((j, k)) arcs.add((i, j)) arcs.add((k, j)) cpdag = PDAG(nodes=nodes, arcs=arcs, edges=skel.edges-{frozenset({*arc}) for arc in arcs}) cpdag.to_complete_pdag(verbose=verbose, solve_conflict=solve_conflict) return cpdag
if __name__ == '__main__': import causaldag as cd from conditional_independence import MemoizedCI_Tester, dsep_test import numpy as np import random np.random.seed(9890142) random.seed(9890142) nnodes = 15 ngraphs = 10 exp_nbrs = 3 dags = cd.rand.directed_erdos(nnodes, exp_nbrs/(nnodes-1), ngraphs) nodes = set(range(nnodes)) ci_testers = [MemoizedCI_Tester(dsep_test, d) for d in dags] est_skels = [skeleton(set(range(nnodes)), ci_tester)[0] for ci_tester in ci_testers] false_positives = [est_skel.edges - d.skeleton for est_skel, d in zip(est_skels, dags)] false_negatives = [d.skeleton - est_skel.edges for est_skel, d in zip(est_skels, dags)] print(false_positives) print(false_negatives) est_cpdags = [pcalg(nodes, ci_tester) for ci_tester in ci_testers] true_cpdags = [d.cpdag() for d in dags] correct_cpdag = [true_cpdag == est_cpdag for true_cpdag, est_cpdag in zip(true_cpdags, est_cpdags)] shds = [true_cpdag.shd(est_cpdag) for true_cpdag, est_cpdag in zip(true_cpdags, est_cpdags)] print(correct_cpdag)