Skip to content

API Reference

This section is auto-generated from the docstrings of the sfa package using mkdocstrings.

sfa.base

sfa.base

Algorithm

Bases: ContainerItem

The base class of Algorithm classes.

Attributes:

Name Type Description
abbr str
name str
data Data
params ParameterSet
result Result

Examples:

>>> class AnAlgorithm(sfa.base.Algorithm):
        # Definition of algorithm ...
        ...

>>> alg = AnAlgorithm()
>>> alg.params = params_obj # Parameters of the algorithm
>>> alg.data = data_obj # Data to be analyzed by the algorithm
>>> alg.initialize()
>>> res = alg.compute()
Source code in sfa/base.py
class Algorithm(ContainerItem):
    """The base class of Algorithm classes.

    Attributes
    ----------
    abbr : str
    name : str
    data : sfa.base.Data
    params : sfa.base.ParameterSet
    result : sfa.base.Result

    Examples
    --------
        >>> class AnAlgorithm(sfa.base.Algorithm):
                # Definition of algorithm ...
                ...

        >>> alg = AnAlgorithm()
        >>> alg.params = params_obj # Parameters of the algorithm
        >>> alg.data = data_obj # Data to be analyzed by the algorithm
        >>> alg.initialize()
        >>> res = alg.compute()

    """
    def __init__(self, abbr):
        super().__init__(abbr)
        self._data = None
        self._params = None
        self._result = None

    def copy(self, is_deep=False):
        """

        """
        if is_deep:
            return copy.deepcopy(self)
        else:
            return copy.copy(self)

    # Read-only properties
    @property
    def result(self):
        """The object of ``sfa.base.Result``.
           The result of computing the batch.
        """
        return self._result

    # Read & write properties
    @property
    def params(self):
        """The object of ``sfa.base.ParameterSet``.
           Parameters of the algorithm can accessed
           through this member.
        """
        return self._params

    @params.setter
    def params(self, obj):
        self._params = obj    

    @property
    def data(self):
        """The object of ``sfa.base.Data``.
            Data to be processed based on the algorithm
            can accessed through this member.
        """
        return self._data

    @data.setter
    def data(self, obj):
        self._data = obj

    def initialize(self, network=True, ba=True):
        """
        """
        if network:
            self.initialize_network()

        if ba:
            self.initialize_basal_activity()

    def initialize_network(self):
        """Initialize the data structures related to network.
        """
        pass

    def initialize_basal_activity(self):
        """Initialize the basal activity, :math:`b`.
        """
        pass

    @abc.abstractmethod
    def compute(self, b):
        r"""Process the assigned data
            with the given basal activity, :math:`b`.

        Parameters
        ----------
        b : numpy.ndarray
            1D array of basal activity.


        Returns
        -------
        x : numpy.ndarray
            1D-array object of activity at steady-state.
        """
        raise NotImplementedError("compute() should be implemented")

    @abc.abstractmethod
    def compute_batch(self):
        """Process the assigned data that contains a batch data.
           The result is stored in ``result`` member.
        """
        raise NotImplementedError("compute_batch() should be implemented")

data property writable

The object of sfa.base.Data. Data to be processed based on the algorithm can accessed through this member.

params property writable

The object of sfa.base.ParameterSet. Parameters of the algorithm can accessed through this member.

result property

The object of sfa.base.Result. The result of computing the batch.

compute(b) abstractmethod

Process the assigned data with the given basal activity, :math:b.

Parameters:

Name Type Description Default
b ndarray

1D array of basal activity.

required

Returns:

Name Type Description
x ndarray

1D-array object of activity at steady-state.

Source code in sfa/base.py
@abc.abstractmethod
def compute(self, b):
    r"""Process the assigned data
        with the given basal activity, :math:`b`.

    Parameters
    ----------
    b : numpy.ndarray
        1D array of basal activity.


    Returns
    -------
    x : numpy.ndarray
        1D-array object of activity at steady-state.
    """
    raise NotImplementedError("compute() should be implemented")

compute_batch() abstractmethod

Process the assigned data that contains a batch data. The result is stored in result member.

Source code in sfa/base.py
@abc.abstractmethod
def compute_batch(self):
    """Process the assigned data that contains a batch data.
       The result is stored in ``result`` member.
    """
    raise NotImplementedError("compute_batch() should be implemented")

copy(is_deep=False)

Source code in sfa/base.py
def copy(self, is_deep=False):
    """

    """
    if is_deep:
        return copy.deepcopy(self)
    else:
        return copy.copy(self)

initialize(network=True, ba=True)

Source code in sfa/base.py
def initialize(self, network=True, ba=True):
    """
    """
    if network:
        self.initialize_network()

    if ba:
        self.initialize_basal_activity()

initialize_basal_activity()

Initialize the basal activity, :math:b.

Source code in sfa/base.py
def initialize_basal_activity(self):
    """Initialize the basal activity, :math:`b`.
    """
    pass

initialize_network()

Initialize the data structures related to network.

Source code in sfa/base.py
def initialize_network(self):
    """Initialize the data structures related to network.
    """
    pass

ContainerItem

The base class that defines the item object of sfa.containers.Container.

Source code in sfa/base.py
class ContainerItem(metaclass=abc.ABCMeta):
    """
    The base class that defines the item object of
    ``sfa.containers.Container``.

    """
    def __init__(self, abbr=None, name=None):
        self._abbr = abbr
        self._name = name

    def __str__(self):
        return self._abbr

    def __repr__(self):
        class_name = self.__class__.__name__
        return "%s object" % (class_name)

    @property
    def abbr(self):
        """Abbreviation or symbol representing this item.
        """
        return self._abbr

    @abbr.setter
    def abbr(self, val):
        self._abbr =val

    @property
    def name(self):
        """Full name or description of this item.
        """
        return self._name

    @name.setter
    def name(self, val):
        self._name = val

abbr property writable

Abbreviation or symbol representing this item.

name property writable

Full name or description of this item.

ParameterSet

Bases: FrozenClass

The base class of ParameterSet objects.

Source code in sfa/base.py
class ParameterSet(sfa.utils.FrozenClass):
    """The base class of ParameterSet objects.
    """

    def __init__(self, abbr):
        """
        """
        super().__init__(abbr)

__init__(abbr)

Source code in sfa/base.py
def __init__(self, abbr):
    """
    """
    super().__init__(abbr)

sfa.containers

sfa.containers

AlgorithmSet

Bases: Container

Source code in sfa/containers.py
@Singleton
class AlgorithmSet(Container):
    _instance = None
    def __new__(cls, *args, **kwargs):
        return super().__new__(cls, *args, **kwargs)

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        """
        sfa.algorithms.__file__ represents a directory path
        containing sfa.algorithms's init module (__init__.py).
        """
        self._dpath = os.path.dirname(sfa.algorithms.__file__)

    # end of def __init__

    def get_all_keys(self):
        if not self._all_keys:
            self._all_keys = []
            for entity in os.listdir(self._dpath):
                if re.match(r"[^_]\w+\.py$", entity) \
                   and entity not in excluded:
                    mod_name = entity.split('.')[0]  # Module name
                    key = mod_name.upper()
                    self._all_keys.append(key)
                    yield key
            # end of for
        else:
            return iter(self._all_keys)

    def _create_single(self, key):
        if key in self._map:
            return self._map[key]


        key_low = key.lower()
        fstr_module_path = "%s.%s" % (sfa.algorithms.__name__,
                                      key_low)

        _key = key.upper()  # We use captital characters for the key.
        if _key in self._map:  # Avoid redundant importing
            return

        mod = importlib.import_module(fstr_module_path)
        # The following is of test purpose (removable in the future).
        if "this_should_be_imported" in mod.__name__:
            return

        alg = mod.create_algorithm(_key)
        self._map[_key] = alg
        return alg

    def _create_all(self):
        """
        Import all algorithms, based on file names
        """
        for entity in os.listdir(self._dpath):
            if re.match(r"[^_]\w+\.py", entity) \
               and entity not in excluded:
                mod_name = entity.split('.')[0]  # Module name
                self._create_single(mod_name)

Container

Bases: MutableMapping

A simple container class, which handles multiple objects with its hashable functionality (using dictionary).

Source code in sfa/containers.py
class Container(MutableMapping, metaclass=abc.ABCMeta):
    """
    A simple container class, which handles multiple objects with
    its hashable functionality (using dictionary).
    """
    def __init__(self, *args, **kwargs):
        """
        _dict: internal data structure, which is hashable.
        _dpath: Path of the directory containing algorithms, data, etc.
        """
        self._map = dict()
        self._dpath = None
        self._all_keys = None
        self.update(dict(*args, **kwargs))

    def __getitem__(self, key):
        return self._map[key]

    def __setitem__(self, key, value):
        if isinstance(value, sfa.base.ContainerItem):
            self._map[key] = value
        else:
            raise TypeError("Container.__setitem__ only accepts "
                            "sfa.base.ContainerItem type object.")

    def __delitem__(self, key):
        del self._map[key]

    def __iter__(self):
        return iter(self._map)

    def __len__(self):
        return len(self._map)

    def __str__(self):
        return str(self._map)

    def keys(self):
        return self._map.keys()

    def values(self):
        return self._map.values()

    def create(self, keys=None):
        """
            Create a single or multiple objects according to keys.
            keys: a single string or multiple strings in an iterable object.
                  All related objects are created if 'keys' is None.
        """
        if keys is not None:
            if type(keys) is str:
                return self._create_single(keys)
            elif hasattr(keys, '__iter__'):
                mult = []  # To return multiple elements
                # An iterable object contains multiple keys.
                for elem in keys:
                    mult.append(self._create_single(elem))
                return mult
        else:
            self._create_all()
            return self
    # end of def create

    @abc.abstractmethod
    def get_all_keys(self):
        """"""
    # end of def

    @abc.abstractmethod
    def _create_single(self, key):
        """Create a single object"""
    # end of def

    @abc.abstractmethod
    def _create_all(self):
        """Create all objects"""

__init__(*args, **kwargs)

_dict: internal data structure, which is hashable. _dpath: Path of the directory containing algorithms, data, etc.

Source code in sfa/containers.py
def __init__(self, *args, **kwargs):
    """
    _dict: internal data structure, which is hashable.
    _dpath: Path of the directory containing algorithms, data, etc.
    """
    self._map = dict()
    self._dpath = None
    self._all_keys = None
    self.update(dict(*args, **kwargs))

create(keys=None)

Create a single or multiple objects according to keys. keys: a single string or multiple strings in an iterable object. All related objects are created if 'keys' is None.

Source code in sfa/containers.py
def create(self, keys=None):
    """
        Create a single or multiple objects according to keys.
        keys: a single string or multiple strings in an iterable object.
              All related objects are created if 'keys' is None.
    """
    if keys is not None:
        if type(keys) is str:
            return self._create_single(keys)
        elif hasattr(keys, '__iter__'):
            mult = []  # To return multiple elements
            # An iterable object contains multiple keys.
            for elem in keys:
                mult.append(self._create_single(elem))
            return mult
    else:
        self._create_all()
        return self

get_all_keys() abstractmethod

Source code in sfa/containers.py
@abc.abstractmethod
def get_all_keys(self):
    """"""

DataSet

Bases: Container

The name of this class is similar to that of 'DataSet' in C#. The instance of this class handles multiple sfa.base.Data objects.

Source code in sfa/containers.py
@Singleton
class DataSet(Container):
    """
    The name of this class is similar to that of 'DataSet' in C#.
    The instance of this class handles multiple sfa.base.Data objects.
    """
    _instance = None
    def __new__(cls, *args, **kwargs):
        return super().__new__(cls, *args, **kwargs)

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        """
        sfa.data.__file__ represents a directory path
        containing sfa.data's init module (__init__.py).
        """
        self._dpath = os.path.dirname(sfa.data.__file__)

    # end of def __init__

    def get_all_keys(self):
        if not self._all_keys:
            self._all_keys = []
            for entity in os.listdir(self._dpath):
                dpath = os.path.join(self._dpath, entity)
                if not entity.startswith('_') and os.path.isdir(dpath):
                    key = entity.upper()
                    self._all_keys.append(key)
                    yield key
        else:
            return iter(self._all_keys)

    def _create_single(self, key):
        if key in self._map:
            return self._map[key]

        key_items = key.split("_")
        key_1st, key_2nd = key_items[:2]
        mod_name = "%s_%s"%(key_1st.lower(), key_2nd.lower())
        fstr_module_path = "%s.%s" % (sfa.data.__name__, mod_name)

        _key = key.upper()
        if _key in self._map:  # Avoid redundant importing
            return
        elif mod_name.upper() in self._map:
            # All data object in this directory has been created before.
            return

        mod = importlib.import_module(fstr_module_path)
        if len(key_items) > 2:  # Create the specified single data object
            data = mod.create_data(key)
        else:  # Create all data objects from this directory
            data = mod.create_data()

        if isinstance(data, dict):
            self._map[_key] = data
        elif isinstance(data, list):
            self._map[_key] = {obj.abbr.upper(): obj for obj in data}
        elif isinstance(data, sfa.base.Data):
            _key = data.abbr.upper()
            self._map[_key] = data
        else:
            err_msg = "%s.create_data() returns unsupported type."\
                      % (fstr_module_path)
            raise TypeError(err_msg)
        # end of if

        return self._map[_key]
    # end of def _create_single

    def _create_all(self):
        """
        Import all data, based on the directory names of data modules
        """
        for entity in os.listdir(self._dpath):
            dpath = os.path.join(self._dpath, entity)
            if not entity.startswith('_') and os.path.isdir(dpath):
                self._create_single(entity)

sfa.fileio

sfa.fileio

create_from_sif(fpath, abbr=None, inputs=None, outputs=None)

Create sfv.base.Data object from SIF file.

Parameters:

Name Type Description Default
fpath str

Absolute path of SIF file

required
abbr str

Abbreviation to denote this data object for the network.

None
inputs dict

Input information with default values

None
outputs sequence

Output information.

None

Returns:

Name Type Description
obj Data

Data object with the information of network topology.

Source code in sfa/fileio.py
def create_from_sif(fpath, abbr=None, inputs=None, outputs=None):
    """Create sfv.base.Data object from SIF file.

    Parameters
    ----------
    fpath : str
        Absolute path of SIF file
    abbr : str
        Abbreviation to denote this data object for the network.
    inputs : dict, optional
        Input information with default values
    outputs : sequence, optional
        Output information.

    Returns
    -------
    obj : sfv.base.Data
        Data object with the information of network topology.

    """
    class __Data(Data):
        def __init__(self):
            if abbr:
                self._abbr = abbr
            else:
                self._abbr = os.path.basename(fpath)

            self._name = self._abbr
            A, n2i, dg = read_sif(fpath, as_nx=True)
            self._A = A
            self._n2i = n2i
            self._i2n = {idx: name for name, idx in n2i.items()}
            self._dg = dg
            self._inputs = inputs

            if outputs:
                self._outputs = outputs

            # The following members are not defined due to the lack of data.
            self._df_conds = None
            self._df_exp = None
            self._df_ptb = None
            self._has_link_perturb = False
            self._names_ptb = None
            self._iadj_to_idf = None
            # end of def __init__
    # end of def class

    fname, ext = os.path.splitext(os.path.basename(fpath))
    fname = ''.join([c for c in fname.title() if c.isalnum()])
    fname += "Data"
    __Data.__name__ = fname
    return __Data()

sfa.utils

sfa.utils

get_akey(d)

Get a key from a given dictionary. It returns the first key in d.keys().

Parameters:

Name Type Description Default
d dict

Dictionary of objects.

required

Returns:

Name Type Description
obj object

First item of iter(d.keys()).

Source code in sfa/utils.py
def get_akey(d):
    """Get a key from a given dictionary.
    It returns the first key in d.keys().

    Parameters
    ----------
    d : dict
        Dictionary of objects.

    Returns
    -------
    obj : object
        First item of iter(d.keys()).
    """
    return next(iter(d.keys()))

get_avalue(d)

Get a value from a given dictionary. It returns the value designated by sfa.get_akey().

Parameters:

Name Type Description Default
d dict

Dictionary of objects.

required

Returns:

Name Type Description
obj object

First item of d[iter(d.keys())].

Source code in sfa/utils.py
def get_avalue(d):
    """Get a value from a given dictionary.
    It returns the value designated by sfa.get_akey().

    Parameters
    ----------
    d : dict
        Dictionary of objects.

    Returns
    -------
    obj : object
        First item of d[iter(d.keys())].
    """
    akey = next(iter(d.keys()))
    return d[akey]

rand_flip(A, nsamp=10, pivots=None, inplace=False)

Randomly flip the signs of connections.

Parameters:

Name Type Description Default
A ndarray

Adjacency matrix (connection matrix).

required
nsamp int

Number of sampled connections to be flipped.

10
pivots list

Indices of pivot nodes

None
inplace bool

Modify the given adjacency matrix for rewiring.

False

Returns:

Name Type Description
B ndarray

The randomized matrix. The reference of the given W is returned, when inplace is True.

Source code in sfa/utils.py
def rand_flip(A, nsamp=10, pivots=None, inplace=False):
    """Randomly flip the signs of connections.

    Parameters
    ----------
    A : numpy.ndarray
        Adjacency matrix (connection matrix).
    nsamp : int, optional
        Number of sampled connections to be flipped.
    pivots : list, optional
        Indices of pivot nodes
    inplace : bool, optional
        Modify the given adjacency matrix for rewiring.

    Returns
    -------
    B : numpy.ndarray
        The randomized matrix.
        The reference of the given W is returned, when inplace is True.
    """
    if not inplace:
        B = A.copy()
    else:
        B = A

    ir, ic = B.nonzero()
    if pivots:
        iflip = np.random.choice(pivots, nsamp)
    else:
        iflip = np.random.randint(0, ir.size, nsamp)

    B[ir[iflip], ic[iflip]] *= -1
    return B

rand_swap(A, nsamp=10, noself=True, pivots=None, inplace=False)

Randomly rewire the network connections by swapping.

Parameters:

Name Type Description Default
A ndarray

Adjacency matrix (connection matrix).

required
nsamp int

Number of sampled connections to rewire

10
noself bool

Whether to allow self-loop link.

True
pivots list

Indices of pivot nodes

None
inplace bool

Modify the given adjacency matrix for rewiring.

False

Returns:

Name Type Description
B ndarray

The randomized matrix. The reference of the given W is returned, when inplace is True.

Source code in sfa/utils.py
def rand_swap(A, nsamp=10, noself=True, pivots=None, inplace=False):
    """Randomly rewire the network connections by swapping.

    Parameters
    ----------
    A : numpy.ndarray
        Adjacency matrix (connection matrix).
    nsamp : int, optional
        Number of sampled connections to rewire
    noself : bool, optional
        Whether to allow self-loop link.
    pivots : list, optional
        Indices of pivot nodes
    inplace : bool, optional
        Modify the given adjacency matrix for rewiring.


    Returns
    -------
    B : numpy.ndarray
        The randomized matrix.
        The reference of the given W is returned, when inplace is True.
    """


    if not inplace:
        A_org = A
        B = A.copy()
    else:
        A_org = A.copy()
        B = A

    cnt = 0
    while cnt < nsamp:
        ir, ic = B.nonzero()
        if pivots:
            if np.random.uniform() < 0.5:
                isrc1 = np.random.choice(pivots)
                nz = B[:, isrc1].nonzero()[0]
                if len(nz) == 0:
                    continue
                itrg1 = np.random.choice(nz)
            else:
                itrg1 = np.random.choice(pivots)
                nz = B[itrg1, :].nonzero()[0]
                if len(nz) == 0:
                    continue
                isrc1 = np.random.choice(nz)
            # if-else

            itrg2, isrc2 = itrg1, isrc1
            while isrc1 == isrc2 and itrg1 == itrg2:
                i2 = np.random.randint(0, ir.size)
                itrg2, isrc2 = ir[i2], ic[i2]
        else:
            i1, i2 = 0, 0
            while i1 == i2:
                i1, i2 = np.random.randint(0, ir.size, 2)

            itrg1, isrc1 = ir[i1], ic[i1]
            itrg2, isrc2 = ir[i2], ic[i2]

        if noself:
            if itrg2 == isrc1 or itrg1 == isrc2:
                continue

        # Are the swapped links new?
        if B[itrg2, isrc1] == 0 and B[itrg1, isrc2] == 0:
            a, b = B[itrg1, isrc1], B[itrg2, isrc2]

            # Are the swapped links in the original network?
            if A_org[itrg2, isrc1] == a and A_org[itrg1, isrc2] == b:
                continue

            B[itrg2, isrc1], B[itrg1, isrc2] = a, b
            B[itrg1, isrc1], B[itrg2, isrc2] = 0, 0
            cnt += 1
        else:
            continue
    # end of while

    if not inplace:
        return B

rand_weights(W, lb=-3, ub=3, inplace=False)

Randomly sample the weights of connections in W from 10^(lb, ub).

Parameters:

Name Type Description Default
W ndarray

Adjacency (connection) or weight matrix.

required
lb float

The 10's exponent for lower bound

-3
inplace bool

Modify the given adjacency matrix for rewiring.

False

Returns:

Name Type Description
B ndarray

The randomly sampled weight matrix. The reference of the given W is returned, when inplace is True.

Source code in sfa/utils.py
def rand_weights(W, lb=-3, ub=3, inplace=False):
    """ Randomly sample the weights of connections in W from 10^(lb, ub).

    Parameters
    ----------
    W : numpy.ndarray
        Adjacency (connection) or weight matrix.
    lb : float, optional
        The 10's exponent for lower bound
    inplace : bool, optional
        Modify the given adjacency matrix for rewiring.

    Returns
    -------
    B : numpy.ndarray
        The randomly sampled weight matrix.
        The reference of the given W is returned, when inplace is True.
    """
    if not inplace:
        B = np.array(W, dtype=np.float64)
    else:
        if not np.issubdtype(W.dtype, np.floating):
            raise ValueError("W.dtype given to rand_weights should be "
                             "a float type, not %s" % (W.dtype))

        B = W
    # end of if-else

    ir, ic = B.nonzero()
    weights_rand = 10 ** np.random.uniform(lb, ub,
                                           size=(ir.size,))

    B[ir, ic] = weights_rand * np.sign(B[ir, ic])
    return B

sfa.stats

sfa.stats

calc_accuracy(df1, df2, get_cons=False)

Count the same sign of each element between df1 and df2

df1: pandas.DataFrame or numpy.ndarray to be compared df2: pandas.DataFrame or numpy.ndarray to be compared getcons: decide whether to return consensus array in DataFrame or not

Source code in sfa/stats.py
def calc_accuracy(df1, df2, get_cons=False):
    """
    Count the same sign of each element between df1 and df2

    df1: pandas.DataFrame or numpy.ndarray to be compared
    df2: pandas.DataFrame or numpy.ndarray to be compared
    getcons: decide whether to return consensus array in DataFrame or not
    """

    if df1.ndim == 1:
        num_total = df1.shape[0]
    elif df1.ndim == 2:
        num_total = df1.shape[0] * df1.shape[1]
    diff_abs = np.abs(np.sign(df1) - np.sign(df2))
    consensus = (diff_abs == 0)

    if isinstance(consensus, pd.DataFrame):
        num_cons = consensus.values.sum()
    else:
        num_cons = consensus.sum()

    acc = num_cons / float(num_total)  # Accuracy
    if get_cons:
        return acc, consensus
    else:
        return acc

sfa.topology

sfa.topology

max_spl(nxdg)

Find the maximum length of the shortest path

Source code in sfa/topology.py
def max_spl(nxdg):
    """Find the maximum length of the shortest path
    """
    all_spl = nxsp.all_pairs_shortest_path_length(nxdg)
    max_spl = 0
    for src, targets in all_spl:
        for tgt, spl in targets.items():
            if spl > max_spl:
                max_spl = spl
        # end of for
    # end of for
    return max_spl

splo(nxdg, sources, outputs, rtype='df')

Calculate the shortest path length from each source node to the outputs. SPLO represents 'shortest path length to output'.

Parameters:

Name Type Description Default
nxdg

A directed network in NetworkX.

required
sources

Names of source nodes in nxdg.

required
outputs

Names of output nodes in nxdg.

required
rtype

Return object type: 'df' or 'dict'.

'df'

Returns:

Name Type Description
splo dict

All the shortest path lengths to the outputs.

Source code in sfa/topology.py
def splo(nxdg, sources, outputs, rtype='df'):
    """Calculate the shortest path length
       from each source node to the outputs.
       SPLO represents
       'shortest path length to output'.

       Parameters
       ----------
       nxdg: NetworkX.DiGraph
           A directed network in NetworkX.
       sources: list (or iterable) of str
           Names of source nodes in nxdg.
       outputs: list (or iterable) of str
           Names of output nodes in nxdg.
       rtype: str (optional)
           Return object type: 'df' or 'dict'.

       Returns
       -------
       splo: dict
           All the shortest path lengths
           to the outputs.

    """
    if isinstance(outputs, str):
        outputs = [outputs]

    dict_splo = {}
    for trg in outputs:
        dict_splo[trg] = {}
        for src in sources:
            try:
                splo = nxsp.shortest_path_length(nxdg,
                                                 src,
                                                 trg)
            except nx.NetworkXNoPath:
                continue  # splo = np.inf

            dict_splo[trg][src] = splo

    if rtype == 'df':
        df = pd.DataFrame(dict_splo)
        df.index.name = 'Source'
        return df

    return dict_splo

sfa.algorithms.np

sfa.algorithms.np

NetworkPropagation

Bases: Algorithm

A base class that defines the basic functionality of network propagation algorithms.

Source code in sfa/algorithms/np.py
class NetworkPropagation(sfa.base.Algorithm):
    """A base class that defines the basic functionality of
       network propagation algorithms.

    Attributes
    ----------


    """

    def __init__(self, abbr):
        """Constructor of NetworkPropagation.
        """
        super().__init__(abbr)
        self._name = "Abstract base class for network propagation algorithms"
        self._params = NetworkPropagationParameterSet()

        # The following members are assigned the instances in initialize()
        self._W = None
        self._b = None

        self._exsol_avail = False  # The exact solution is available.
        self._M = None  # A matrix for getting the exact solution.
        self._weight_matrix_invalidated = True

        self._result = sfa.base.Result()

    # end of def __init__

    @property
    def b(self):
        return self._b

    @b.setter
    def b(self, vec):
        self._b = vec

    @property
    def W(self):
        return self._W

    @W.setter
    def W(self, mat):
        self._W = mat
        self._weight_matrix_invalidated = True

    # end of _W.setter

    def initialize_network(self):

        # Matrix normalization for getting transition matrix
        if self._params.apply_weight_norm:
            self.W = sfa.utils.normalize(self.data.A)
        else:
            self.W = np.array(self.data.A, dtype=np.float64)

        self._check_dimension(self.W, "transition matrix")

        if not self.params.exsol_forbidden:
            # Try to prepare the exact solution
            try:
                self.prepare_exact_solution()
                self._check_dimension(self._M, "exact solution matrix")
                self._exsol_avail = True
            except np.linalg.LinAlgError:
                pass

        if not self._exsol_avail:
            self.prepare_iterative_solution()
            self._exsol_avail = False

    # end of def _initialize_network

    def _check_dimension(self, mat, mat_name):
        """Check whether a given matrix is a square matrix.
        """
        if mat.shape[0] != mat.shape[1]:
            raise ValueError("The %s should be square matrix." % (mat_name))
    # end of def _check_dimension

    def initialize_basal_activity(self):
        N = self.data.A.shape[0]  # Number of state variables
        self._b = np.zeros(N)
    # end of def

    def apply_inputs(self, inds, vals):
        """

        Parameters
        ----------

        """
        if self._params.no_inputs:
            return

        # Input condition
        if hasattr(self.data, 'inputs') and self.data.inputs:
            inds_inputs = [self.data.n2i[inp] for inp in self.data.inputs]
            vals_inputs = [val for val in self.data.inputs.values()]
            inds.extend(inds_inputs)
            vals.extend(vals_inputs)
            # end of if

    # end of def apply_inputs

    def apply_perturbations(self, targets, inds, vals, W_ptb=None):
        if self.data.has_link_perturb and W_ptb is None:
            raise ValueError("Weight matrix for perturbation is necessary for "
                             "the data including link type perturbations.")

        for target in targets:
            if self.data.df_ptb is not None:
                type_ptb = self.data.df_ptb.loc[target, "Type"]
                val_ptb = self.data.df_ptb.loc[target, "Value"]
            else:
                type_ptb = 'node'
                val_ptb = -1

            if type_ptb == 'node':
                inds.append(self.data.n2i[target])
                vals.append(val_ptb)
            elif type_ptb == 'link':
                idx = self.data.n2i[target]
                W_ptb[:, idx] *= val_ptb
            elif type_ptb == 'isolation':
                idx = self.data.n2i[target]
                W_ptb[:, idx] *= val_ptb
                W_ptb[idx, :] *= val_ptb
            else:
                raise ValueError("Undefined perturbation type: %s" % (type_ptb))

    # end of def apply_perturbations

    def compute_batch(self):

        df_exp = self.data.df_exp  # Result of experiment

        # Simulation result
        sim_result = np.zeros(df_exp.shape, dtype=np.float64)

        b = self._b

        if self._params.use_rel_change:
            inds_ba = []  # Indices of nodes to be perturbed
            vals_ba = []  # Basal activity
            self.apply_inputs(inds_ba, vals_ba)
            b[inds_ba] = vals_ba
            x_cnt = self.compute(b)

        W_cnt = self.W

        # Main loop of the simulation
        for i, targets_ptb in enumerate(self.data.names_ptb):
            inds_ba = []  # Indices of nodes to be perturbed
            vals_ba = []  # Basal activity
            self.apply_inputs(inds_ba, vals_ba)  # Apply the input condition

            if self.data.has_link_perturb:
                W_ptb = W_cnt.copy()
                self.apply_perturbations(targets_ptb, inds_ba, vals_ba, W_ptb)
                self.W = W_ptb
            else:
                self.apply_perturbations(targets_ptb, inds_ba, vals_ba)

            b_store = b[inds_ba]
            b[inds_ba] = vals_ba
            x_exp = self.compute(b)

            # Result of a single condition
            if self._params.use_rel_change:  # Use relative change
                x_diff = (x_exp - x_cnt)
                rel_change = x_diff
                res_single = rel_change[self.data.iadj_to_idf]
            else:
                res_single = x_exp[self.data.iadj_to_idf]

            sim_result[i, :] = res_single
            b[inds_ba] = b_store
        # end of for

        self.W = W_cnt

        df_sim = pd.DataFrame(sim_result,
                              index=df_exp.index,
                              columns=df_exp.columns)

        # Get the result of elements in the columns of df_exp.
        self._result.df_sim = df_sim[df_exp.columns]

    # end of def compute_batch

    def prepare_exact_solution(self):
        """Prepare to get the matrix for the exact solution.
        """
    # end of def _prepare_exact_solution

    def prepare_iterative_solution(self):
        """Prepare to get the solution from the iterative method.
        """
    # end of def _prepare_iterative_solution

    def compute(self, b):
        if self.params.exsol_forbidden is True \
                or self._exsol_avail is False:
            alpha = self._params.alpha
            W = self.W
            lim_iter = self._params.lim_iter
            x_ss, _ = self.propagate_iterative(W, b, b, a=alpha,
                                               lim_iter=lim_iter)
            return x_ss  # x at steady-state (i.e., stationary state)
        else:
            return self.propagate_exact(b)

    # end of def compute

    def propagate_exact(self, b):
        """Obtain the activity at steady-state
        based on the exact solution of network propagation.

        Parameters
        ----------
        b : numpy.ndarray
            1D array of basal activity.

        Returns
        -------
        x : numpy.ndarray
            The exact solution of the activity at steady-state
            in 1D array.
        """
        raise NotImplementedError("propagate_exact is not implemented")

    def propagate_iterative(self,
                            W,
                            xi,
                            b,
                            a=0.5,
                            lim_iter=1000,
                            tol=1e-5,
                            get_trj=False):

        r"""Compute network propagation based on the iterative method.
        This method should be used if we want to obtain the trajectory.

        Parameters
        ----------
        W: numpy.ndarray
            2D array for weight matrix.
        xi: numpy.ndarray
            1D array for initial state.
        b: numpy.ndarray
            1D array for basal activity.
        a: real number, optional
            Hyperparameter, :math:`\alpha`, ~ (0, 1).
            The default value is 0.5.
        lim_iter: int, optional
            Number of maximum iterations in the iterative method.
            Computation terminates when the iteration reaches ``lim_iter``.
            The default value is 1000.
        tol: float, optional
            Tolerance for terminating iteration.
            Iteration continues, if Frobenius norm of
            :math:`x(t+1)-x(t)` is greater than ``tol``.
            The default value is 1e-5.
        get_trj: bool, optional
            Determine whether the trajectory of the state is returned.
            If get_trj is true, the trajectory is returned.

        Returns
        -------
        x : numpy.ndarray
            1D array of the activity after the computation.
        trj : numpy.ndarray
            2D array where the row represents a state of the activity.

        See also
        --------
        """
        raise NotImplementedError("propagate_iterative is not implemented")

__init__(abbr)

Constructor of NetworkPropagation.

Source code in sfa/algorithms/np.py
def __init__(self, abbr):
    """Constructor of NetworkPropagation.
    """
    super().__init__(abbr)
    self._name = "Abstract base class for network propagation algorithms"
    self._params = NetworkPropagationParameterSet()

    # The following members are assigned the instances in initialize()
    self._W = None
    self._b = None

    self._exsol_avail = False  # The exact solution is available.
    self._M = None  # A matrix for getting the exact solution.
    self._weight_matrix_invalidated = True

    self._result = sfa.base.Result()

apply_inputs(inds, vals)

Source code in sfa/algorithms/np.py
def apply_inputs(self, inds, vals):
    """

    Parameters
    ----------

    """
    if self._params.no_inputs:
        return

    # Input condition
    if hasattr(self.data, 'inputs') and self.data.inputs:
        inds_inputs = [self.data.n2i[inp] for inp in self.data.inputs]
        vals_inputs = [val for val in self.data.inputs.values()]
        inds.extend(inds_inputs)
        vals.extend(vals_inputs)

prepare_exact_solution()

Prepare to get the matrix for the exact solution.

Source code in sfa/algorithms/np.py
def prepare_exact_solution(self):
    """Prepare to get the matrix for the exact solution.
    """

prepare_iterative_solution()

Prepare to get the solution from the iterative method.

Source code in sfa/algorithms/np.py
def prepare_iterative_solution(self):
    """Prepare to get the solution from the iterative method.
    """

propagate_exact(b)

Obtain the activity at steady-state based on the exact solution of network propagation.

Parameters:

Name Type Description Default
b ndarray

1D array of basal activity.

required

Returns:

Name Type Description
x ndarray

The exact solution of the activity at steady-state in 1D array.

Source code in sfa/algorithms/np.py
def propagate_exact(self, b):
    """Obtain the activity at steady-state
    based on the exact solution of network propagation.

    Parameters
    ----------
    b : numpy.ndarray
        1D array of basal activity.

    Returns
    -------
    x : numpy.ndarray
        The exact solution of the activity at steady-state
        in 1D array.
    """
    raise NotImplementedError("propagate_exact is not implemented")

propagate_iterative(W, xi, b, a=0.5, lim_iter=1000, tol=1e-05, get_trj=False)

Compute network propagation based on the iterative method. This method should be used if we want to obtain the trajectory.

Parameters:

Name Type Description Default
W

2D array for weight matrix.

required
xi

1D array for initial state.

required
b

1D array for basal activity.

required
a

Hyperparameter, :math:\alpha, ~ (0, 1). The default value is 0.5.

0.5
lim_iter

Number of maximum iterations in the iterative method. Computation terminates when the iteration reaches lim_iter. The default value is 1000.

1000
tol

Tolerance for terminating iteration. Iteration continues, if Frobenius norm of :math:x(t+1)-x(t) is greater than tol. The default value is 1e-5.

1e-05
get_trj

Determine whether the trajectory of the state is returned. If get_trj is true, the trajectory is returned.

False

Returns:

Name Type Description
x ndarray

1D array of the activity after the computation.

trj ndarray

2D array where the row represents a state of the activity.

See also
Source code in sfa/algorithms/np.py
def propagate_iterative(self,
                        W,
                        xi,
                        b,
                        a=0.5,
                        lim_iter=1000,
                        tol=1e-5,
                        get_trj=False):

    r"""Compute network propagation based on the iterative method.
    This method should be used if we want to obtain the trajectory.

    Parameters
    ----------
    W: numpy.ndarray
        2D array for weight matrix.
    xi: numpy.ndarray
        1D array for initial state.
    b: numpy.ndarray
        1D array for basal activity.
    a: real number, optional
        Hyperparameter, :math:`\alpha`, ~ (0, 1).
        The default value is 0.5.
    lim_iter: int, optional
        Number of maximum iterations in the iterative method.
        Computation terminates when the iteration reaches ``lim_iter``.
        The default value is 1000.
    tol: float, optional
        Tolerance for terminating iteration.
        Iteration continues, if Frobenius norm of
        :math:`x(t+1)-x(t)` is greater than ``tol``.
        The default value is 1e-5.
    get_trj: bool, optional
        Determine whether the trajectory of the state is returned.
        If get_trj is true, the trajectory is returned.

    Returns
    -------
    x : numpy.ndarray
        1D array of the activity after the computation.
    trj : numpy.ndarray
        2D array where the row represents a state of the activity.

    See also
    --------
    """
    raise NotImplementedError("propagate_iterative is not implemented")

NetworkPropagationParameterSet

Bases: ParameterSet

An object that deals with the parameters of sfa.algorithms.np.NetworkPropagation base algorithm.

Attributes:

Name Type Description
alpha float
lim_iter int
apply_weight_norm bool
use_rel_change bool
exsol_forbidden bool
no_inputs bool
Source code in sfa/algorithms/np.py
class NetworkPropagationParameterSet(sfa.base.ParameterSet):
    """
    An object that deals with the parameters
    of ``sfa.algorithms.np.NetworkPropagation`` base algorithm.

    Attributes
    ----------
    alpha : float
    lim_iter : int
    apply_weight_norm : bool
    use_rel_change : bool
    exsol_forbidden : bool
    no_inputs : bool
    """

    def __init__(self):
        self.initialize()
        self._freeze()

    def initialize(self):
        """Initialize the parameters with default values.
        """
        self._alpha = 0.5  # float value in (0, 1). The default value is 0.5.
        self._lim_iter = 1000
        self._apply_weight_norm = False
        self._use_rel_change = False
        self._exsol_forbidden = False
        self._no_inputs = False

    @property
    def alpha(self):
        r"""Hyperparameter, :math:`\alpha` ~ (0, 1).
         It controls the portion of signal flow
         in determining the activity.
         The default value is 0.5.
        """
        return self._alpha

    @alpha.setter
    def alpha(self, val):
        if not isinstance(val, float):
            raise TypeError("alpha should be a float type value in (0,1).")
        elif (val <= 0.0) or (val >= 1.0):
            raise ValueError("alpha should be within (0,1).")
        else:
            self._alpha = val

    @property
    def lim_iter(self):
        """Number of maximum iterations in the iterative method.
        """
        return self._lim_iter

    @lim_iter.setter
    def lim_iter(self, val):
        if not isinstance(val, int):
            raise TypeError("lim_iter is a integer type value.")
        elif val < 0:
            raise ValueError("lim_iter should be greater than 0.")
        else:
            self._lim_iter = val

    @property
    def apply_weight_norm(self):
        """Apply the link weight normalization.
        """
        return self._apply_weight_norm

    @apply_weight_norm.setter
    def apply_weight_norm(self, val):
        if not isinstance(val, bool):
            raise TypeError(
                "apply_weight_norm should be a bool type value.")
        self._apply_weight_norm = val

    @property
    def use_rel_change(self):
        """Use relative change for prediction.
        """

        return self._use_rel_change

    @use_rel_change.setter
    def use_rel_change(self, val):
        if not isinstance(val, bool):
            raise TypeError("use_rel_change should be a bool type value.")
        self._use_rel_change = val

    @property
    def exsol_forbidden(self):
        """Forbid the propagation computation
           based on the exact solution.
           In other words, use the iterative method.
        """
        return self._exsol_forbidden

    @exsol_forbidden.setter
    def exsol_forbidden(self, val):
        if not isinstance(val, bool):
            raise TypeError("exsol_forbidden should be boolean type.")

        self._exsol_forbidden = val

    @property
    def no_inputs(self):
        """Do not apply the effects of inputs in a given network.
        """
        return self._no_inputs

    @no_inputs.setter
    def no_inputs(self, val):
        if not isinstance(val, bool):
            raise TypeError("no_inputs is bool type.")
        self._no_inputs = val

alpha property writable

Hyperparameter, :math:\alpha ~ (0, 1). It controls the portion of signal flow in determining the activity. The default value is 0.5.

apply_weight_norm property writable

Apply the link weight normalization.

exsol_forbidden property writable

Forbid the propagation computation based on the exact solution. In other words, use the iterative method.

lim_iter property writable

Number of maximum iterations in the iterative method.

no_inputs property writable

Do not apply the effects of inputs in a given network.

use_rel_change property writable

Use relative change for prediction.

initialize()

Initialize the parameters with default values.

Source code in sfa/algorithms/np.py
def initialize(self):
    """Initialize the parameters with default values.
    """
    self._alpha = 0.5  # float value in (0, 1). The default value is 0.5.
    self._lim_iter = 1000
    self._apply_weight_norm = False
    self._use_rel_change = False
    self._exsol_forbidden = False
    self._no_inputs = False

sfa.algorithms.sp

sfa.algorithms.sp

SignalPropagation

Bases: NetworkPropagation

Source code in sfa/algorithms/sp.py
class SignalPropagation(NetworkPropagation):

    def __init__(self, abbr):
        super().__init__(abbr)
        self._name = "Signal propagation algorithm"
    # end of def __init__

    def prepare_exact_solution(self):
        """
        Prepare to get the matrix for the exact solution:

        .. :math
            x(t+1) = a*_W.dot(x(t)) + (1-a)*b, where $a$ is alpha.

        When $t -> inf$, both $x(t+1)$ and $x(t)$
        converges to the stationary state.


        Then, s = aW*s + (1-a)b
              (I-aW)*s = (1-a)b
              s = (I-aW)^-1 * (1-a)b
              s = M*b, where M is (1-a)(I-aW)^-1.

        This method is to get the matrix, M for preparing the exact solution
        """
        W = self._W
        a = self._params.alpha
        M0 = np.eye(W.shape[0]) - a*W
        self._M = (1-a)*np.linalg.inv(M0)
    # end of def _prepare_exact_solution

    def prepare_iterative_solution(self):
        pass  # Nothing...
    # end of def prepare_iterative_solution

    def propagate_exact(self, b):
        if self._weight_matrix_invalidated:
            self.prepare_exact_solution()

        return self._M.dot(b)
    # end of def propagate_exact

    def propagate_iterative(self,
                            W,
                            xi,
                            b,
                            a=0.5,
                            lim_iter=1000,
                            tol=1e-5,
                            get_trj=False):


        n = W.shape[0]
        # Initial values
        x0 = np.array(xi, dtype=np.float64)

        x_t1 = x0.copy()

        if get_trj:
            # Record the initial states
            trj_x = []
            trj_x.append(x_t1.copy())

        # Main loop
        num_iter = 0
        for i in range(lim_iter):
            # Main formula
            x_t2 = a*W.dot(x_t1) + (1-a)*b
            num_iter += 1
            # Check termination condition
            if np.linalg.norm(x_t2 - x_t1) <= tol:
                break

            # Add the current state to the trajectory
            if get_trj:
                trj_x.append(x_t2)

            # Update the state
            x_t1 = x_t2.copy()
        # end of for

        if get_trj is False:
            return x_t2, num_iter
        else:
            return x_t2, np.array(trj_x)

prepare_exact_solution()

Prepare to get the matrix for the exact solution:

.. :math x(t+1) = a_W.dot(x(t)) + (1-a)b, where \(a\) is alpha.

When \(t -> inf\), both \(x(t+1)\) and \(x(t)\) converges to the stationary state.

Then, s = aWs + (1-a)b (I-aW)s = (1-a)b s = (I-aW)^-1 * (1-a)b s = M*b, where M is (1-a)(I-aW)^-1.

This method is to get the matrix, M for preparing the exact solution

Source code in sfa/algorithms/sp.py
def prepare_exact_solution(self):
    """
    Prepare to get the matrix for the exact solution:

    .. :math
        x(t+1) = a*_W.dot(x(t)) + (1-a)*b, where $a$ is alpha.

    When $t -> inf$, both $x(t+1)$ and $x(t)$
    converges to the stationary state.


    Then, s = aW*s + (1-a)b
          (I-aW)*s = (1-a)b
          s = (I-aW)^-1 * (1-a)b
          s = M*b, where M is (1-a)(I-aW)^-1.

    This method is to get the matrix, M for preparing the exact solution
    """
    W = self._W
    a = self._params.alpha
    M0 = np.eye(W.shape[0]) - a*W
    self._M = (1-a)*np.linalg.inv(M0)

sfa.analysis.perturb

sfa.analysis.perturb

analyze_perturb(alg, data, targets, b=None, get_trj=False)

Perform signal flow analysis under perturbations.

Parameters:

Name Type Description Default
alg Algorithm

Algorithm object.

required
data Data

Data object which has perturbation data.

required
targets list

List of node names, which are the keys of data.n2i.

required
b ndarray

Basic vector for signaling sources or basal activities.

None
get_trj bool(optional)

Decide to get the trajectory of activity change.

False

Returns:

Name Type Description
act ndarray

Change in the activities. It is usually calculated as x2 - x1, where x is the a vector of activities at steady-state.

F ndarray

A matrix of signal flows.
It is usually calculated as W2x1 - W1x1, where W is weight matrix and x is a vector of activities at steady-state.

trj ndarray(optional)

Trajectory of activity change, which is returned if get_trj is True.

Source code in sfa/analysis/perturb.py
def analyze_perturb(alg, data, targets, b=None, get_trj=False):
    """Perform signal flow analysis under perturbations.

    Parameters
    ----------
    alg : sfa.Algorithm
        Algorithm object.

    data : sfa.Data
        Data object which has perturbation data.

    targets : list
        List of node names, which are the keys of data.n2i.

    b : numpy.ndarray
        Basic vector for signaling sources or basal activities.

    get_trj : bool (optional)
        Decide to get the trajectory of activity change.

    Returns
    -------
    act : numpy.ndarray
        Change in the activities. It is usually calculated
        as x2 - x1, where x is
        the a vector of activities at steady-state.

    F : numpy.ndarray
        A matrix of signal flows.        
        It is usually calculated as W2*x1 - W1*x1,
        where W is weight matrix and
        x is a vector of activities at steady-state. 

    trj : numpy.ndarray (optional)
        Trajectory of activity change, which is returned
        if get_trj is True.
    """
    N = data.A.shape[0]

    if b is None:
        b = np.zeros((N,), dtype=np.float64)
    elif b.size != N:
        raise TypeError("The size of b should be equal to %d"%(N))

    inds = []
    vals = []
    alg.apply_inputs(inds, vals)
    b[inds] = vals

    W_ctrl = alg.W.copy()
    x_ctrl, trj_ctrl = alg.propagate_iterative(
                                W_ctrl,
                                b,
                                b,
                                alg.params.alpha,
                                get_trj=get_trj)

    if data.has_link_perturb:
        W_pert = W_ctrl.copy()
        alg.apply_perturbations(targets, inds, vals, W_pert)
        alg.W = W_pert
    else:
        W_pert = W_ctrl
        alg.apply_perturbations(targets, inds, vals)

    b[inds] = vals
    x_pert, trj_pert = alg.propagate_iterative(
                                W_pert,
                                b,
                                b,
                                alg.params.alpha,
                                get_trj=get_trj)

    act_change = x_pert - x_ctrl

    if data.has_link_perturb:
        F = W_pert*x_pert - W_ctrl*x_ctrl
    else:
        F = W_ctrl*act_change

    ret = [act_change, F]  # return objects
    if get_trj:
        if trj_pert.shape[0] != trj_ctrl.shape[0]:
            trj_ctrl, trj_pert = resize_trj(trj_ctrl, trj_pert)

        trj_change = trj_pert - trj_ctrl
        ret.append(trj_change)

    return tuple(ret)

sfa.analysis.random.base

sfa.analysis.random.base

BaseRandomSimulator

Bases: object

Source code in sfa/analysis/random/base.py
class BaseRandomSimulator(object):

    def __init__(self):
        # Consider RuntimeWarning from NumPy as an error
        np.seterr(all='raise')

    def _initialize(self, alg):
        self._S = np.sign(alg.data.A)  # Sign matrix
        self._ir, self._ic = alg.data.A.nonzero()
        self._num_links = self._ir.size
        self._A = np.array(alg.data.A)
        self._W = np.zeros_like(self._A, dtype=np.float64)

    def _randomize(self):
        raise NotImplementedError()

    def _apply_norm(self, alg, use_norm):
        """
        Apply normalization
        """
        if use_norm:
            alg.W = sfa.normalize(self._W)
        else:
            alg.W = self._W

    # end of def

    def _need_to_print(self, use_print, freq_print, cnt):
        return use_print and (cnt % freq_print == 0)

    def simulate_single(*args, **kwargs):
        raise NotImplementedError()

    def simulate_multiple(*args, **kwargs):
        raise NotImplementedError()

    """
    def _simulate_single(self, args):
        num_samp = args[0]
        alg = args[1]
        data = args[2]
        use_norm = args[3]
        use_print = args[4]
        freq_print = args[5]

        alg.data = data
        alg.initialize(network=False)

        results = np.zeros((num_samp,), dtype=np.float64)
        cnt = 0

        if self._need_to_print(use_print, freq_print, cnt):
            print("%s simulation for %s starts..." % (alg.abbr, data.abbr))

        while cnt < num_samp:
            self._randomize()
            self._apply_norm(alg, use_norm)
            try:
                alg.compute_batch()
                acc = sfa.calc_accuracy(self._alg.result.df_sim,
                                        self._alg.data.df_exp)
            except FloatingPointError as pe:
                # Skip this condition
                if self._need_to_print(use_print, freq_print, cnt):
                    print("%s: skipped..." % (pe))
                continue
            except RuntimeWarning as rw:
                # Skip these weights
                if self._need_to_print(use_print, freq_print, cnt):
                    print("%s: skipped..." % (rw))
                continue

                # Skip these weights assuming acc cannot be exactly 0.
            if acc == 0:
                if self._need_to_print(use_print, freq_print, cnt):
                    print("Zero accuracy: skipped...")
                continue

            results[cnt] = acc
            cnt += 1
            if self._need_to_print(use_print, freq_print, cnt):
                print("[Iteration #%d] acc: %f" % (cnt, acc))
        # end of loop
        df = pd.DataFrame(results)
        df.index = range(1, num_samp + 1)
        df.columns = [data.abbr]
        return df
    # end of def

    def simulate_single(self, num_samp, alg, data, use_norm=False,
                        use_print=False, freq_print=100):
        self._alg = alg
        self._alg.data = data
        self._alg.initialize()
        self._initialize(alg)

        df = self._simulate_single((num_samp, alg, data, use_norm,
                                    use_print, freq_print))
        return df

    # end of def

    def simulate_multiple(self, num_samp, alg, mdata, use_norm=False,
                          use_print=False, freq_print=100,
                          max_workers=1):
        self._alg = alg

        # Initialize network information only
        self._alg.data = sfa.get_avalue(mdata)
        self._alg.initialize()
        self._initialize(alg)

        if isinstance(mdata, list):
            list_data = [(data.abbr, data) for data in mdata]
        elif isinstance(mdata, dict):
            list_data = [(abbr, mdata[abbr]) for abbr in mdata]

        dfs = []
        if max_workers == 1:
            for (abbr, data) in list_data:
                df = self._simulate_single(num_samp, alg, data, use_norm,
                                           use_print, freq_print)
                dfs.append(df)
                # end of for
        elif max_workers > 1:
            args = ((num_samp, alg, data, use_norm, use_print, freq_print)
                    for (abbr, data) in list_data)
            pool = Pool(processes=max_workers)
            dfs = list(pool.map(self._simulate_single, args))
            pool.close()
            pool.join()
        else:
            raise ValueError("max_workers should be a positive integer.")

        df_res = pd.concat(dfs, axis=1)
        return df_res
    # end of def
    """

sfa.analysis.random.structure

sfa.analysis.random.structure

sfa.analysis.random.weight

sfa.analysis.random.weight

sfa.control.influence

sfa.control.influence

compute_influence(W, alpha=0.9, beta=0.1, S=None, rtype='df', outputs=None, n2i=None, max_iter=1000, tol=1e-07, get_iter=False, device='cpu', sparse=False)

Compute the influence. It estimates the effects of a node to the other nodes, by calculating partial derivative with respect to source nodes, based on a simple iterative method.

Based on the below difference equation,

x(t+1) = alphaW.dot(x(t)) + (1-alpha)b

The influence matrix, S, is computed using chain rule of partial derivative as follows.

\begin{align} S_{ij} &= \frac{\partial{x_i}}{\partial{x_j}} \ &= (I + \alpha W + \alpha^2 W^2 + ... + \alpha^{\infty}W^{\infty}){ij} \ &\approx (I + \alpha W + \alpha^2 W^2 + ... + \alpha^{l}W^{l}) \ \end{align}

This is the summation of the weight multiplications along all paths including cycles. \(S_{ij}\) denotes the influence of node (j) on node (i).

An iterative method for an approximated solution is as follows.

S(t+1) = \alpha WS(t) + I,

where \(S(0) = \beta I\) and \(S(1) = \beta(I + \alpha W)\) \((t>1)\).

The iteration continues until \(||S(t+1) - S(t)|| \leq tol\).

Parameters:

Name Type Description Default
W ndarray

Weight matrix.

required
alpha float

Hyperparameter for adjusting the effect of signal flow.

0.9
beta float

Hyperparameter for adjusting the effect of basal activity.

0.1
S ndarray

Initial influence matrix.

None
rtype

Return object type: 'df' or 'array'.

'df'
outputs

Names of output nodes, which is necessary for 'df' rtype.

None
n2i

Name to index dict, which is necessary for 'df' rtype.

None
max_iter int

The maximum iteration number for the estimation.

1000
tol float

Tolerance for terminating the iteration.

1e-07
get_iter bool

Determine whether the actual iteration number is returned.

False
device (str, optional, {CPU, 'GPU:0', 'GPU:1', ...})

Select which device to use. 'CPU' is default.

'cpu'
sparse bool

Use sparse matrices for the computation.

False

Returns:

Name Type Description
S (ndarray, optional)

2D array of influence.

df (DataFrame, optional)

Influences for each output in DataFrame.

num_iter (int, optional)

The actual number of iteration.

Source code in sfa/control/influence.py
def compute_influence(W,
                      alpha=0.9,
                      beta=0.1,
                      S=None,
                      rtype='df',
                      outputs=None,
                      n2i=None,
                      max_iter=1000,
                      tol=1e-7,
                      get_iter=False,
                      device="cpu",
                      sparse=False):
    r"""Compute the influence.
       It estimates the effects of a node to the other nodes,
       by calculating partial derivative with respect to source nodes,
       based on a simple iterative method.

       Based on the below difference equation,

       x(t+1) = alpha*W.dot(x(t)) + (1-alpha)*b

       The influence matrix, S, is computed using chain rule of
       partial derivative as follows.

       \begin{align}
        S_{ij} &= \frac{\partial{x_i}}{\partial{x_j}} \\
               &= (I + \alpha W + \alpha^2 W^2 +  ... + \alpha^{\infty}W^{\infty})_{ij} \\
               &\approx (I + \alpha W + \alpha^2 W^2 +  ... + \alpha^{l}W^{l})_{ij} \\
       \end{align}

       This is the summation of the weight multiplications along all paths
       including cycles. $S_{ij}$ denotes the influence of node (j) on node (i).

       An iterative method for an approximated solution is as follows.

        S(t+1) = \alpha WS(t) + I,

       where $S(0) = \beta I$ and $S(1) = \beta(I + \alpha W)$ $(t>1)$.

       The iteration continues until $||S(t+1) - S(t)|| \leq tol$.


    Parameters
    ----------
    W : numpy.ndarray
        Weight matrix.
    alpha : float, optional
        Hyperparameter for adjusting the effect of signal flow.
    beta : float, optional
        Hyperparameter for adjusting the effect of basal activity.
    S : numpy.ndarray, optional
        Initial influence matrix.
    rtype: str (optional)
        Return object type: 'df' or 'array'.
    outputs: list (or iterable) of str, optional
        Names of output nodes, which is necessary for 'df' rtype.
    n2i: dict, optional
        Name to index dict, which is necessary for 'df' rtype.
    max_iter : int, optional
        The maximum iteration number for the estimation.
    tol : float, optional
        Tolerance for terminating the iteration.
    get_iter : bool, optional
        Determine whether the actual iteration number is returned.
    device : str, optional, {'CPU', 'GPU:0', 'GPU:1', ...}
        Select which device to use. 'CPU' is default.
    sparse : bool, optional
        Use sparse matrices for the computation.

    Returns
    -------
    S : numpy.ndarray, optional
        2D array of influence.
    df : pd.DataFrame, optional
        Influences for each output in DataFrame.
    num_iter : int, optional
        The actual number of iteration.
    """
    # TODO: Test rendering the above mathematical expressions in LaTeX form.

    if max_iter < 2:
        raise ValueError("max_iter should be greater than 2.")

    device = device.lower()

    if 'cpu' in device:
        if sparse:
            ret = _compute_influence_cpu_sparse(W, alpha, beta, S,
                                               max_iter, tol, get_iter)
        else:
            ret = _compute_influence_cpu(W, alpha, beta, S,
                                        max_iter, tol, get_iter)
    elif 'gpu'in device:
        _, id_device = device.split(':')
        ret = _compute_influence_gpu(W, alpha, beta, S,
                                  max_iter, tol, get_iter, id_device)

        if rtype == 'df':
            import cupy as cp
            if isinstance(ret, cp.ndarray):
                ret = cp.asnumpy(ret)

    if get_iter:
        S_ret, num_iter = ret
    else:
        S_ret = ret

    if rtype == 'array':
        return ret
    elif rtype == 'df':             
        if not outputs:
            err_msg = "outputs should be designated for 'df' return type."
            raise ValueError(err_msg)

        df = pd.DataFrame(columns=outputs)

        for trg in outputs:
            for src in n2i:
                if src == trg:
                    df.loc[src, trg] = np.inf

                idx_src = n2i[src]
                idx_trg = n2i[trg]
                df.loc[src, trg] = S_ret[idx_trg, idx_src]

        if get_iter:
            return df, num_iter
        else:
            return df
    else:
        raise ValueError("Unknown return type: %s"%(rtype))

prioritize(df_splo, df_inf, output, dac, thr_rank=3, min_group_size=0, min_splo=None, max_splo=None, thr_inf=1e-10)

Prioritize target candiates.

Parameters:

Name Type Description Default
df_splo DataFrame

Dataframe for SPLO information.

required
df_inf DataFrame

Dataframe for influence information.

required
output str

Names of output node, which is necessary for 'df_inf'.

required
dac int

Direction of activity change (DAC) of the output.

required
thr_rank int or float

Rank to filter out the entities. The entities whose ranks are greater than thr_rank survive.

3
min_group_size int

Minimum group size to be satisfied.

0
Source code in sfa/control/influence.py
def prioritize(df_splo,
               df_inf,
               output,
               dac,
               thr_rank=3,
               min_group_size=0,
               min_splo=None,
               max_splo=None,
               thr_inf=1e-10,
):
    """Prioritize target candiates.

    Parameters
    ----------
    df_splo : pandas.DataFrame
        Dataframe for SPLO information.
    df_inf : pandas.DataFrame
        Dataframe for influence information.
    output : str
        Names of output node, which is necessary for 'df_inf'.
    dac : int
        Direction of activity change (DAC) of the output.
    thr_rank : int or float
        Rank to filter out the entities.
        The entities whose ranks are greater than thr_rank survive.
    min_group_size : int
        Minimum group size to be satisfied.
    """
    ascending = True if dac < 0 else False

    df_inf_dac = df_inf[np.sign(df_inf[output]) == dac]
    si = arrange_si(df_splo,
                    df_inf_dac,
                    output,
                    min_splo, 
                    max_splo,
                    thr_inf,
                    ascending)
    targets = []
    for splo in si:
        # Get the group of this SPLO.
        df_sub = si[splo]  

        if df_sub.shape[0] < min_group_size:
           continue       

        # Get the entities that have the designated dac.
        df_sub = df_sub[np.sign(df_sub[output]) == dac]

        # Get the enetities whose rank exceeds the threshods.
        if 0 < thr_rank < 1:
            ix_max_rank = int(thr_rank * df_sub.shape[0])
            if ix_max_rank == 0:
                ix_max_rank = df_sub.shape[0] 
        else:
            ix_max_rank = thr_rank

        #print(ix_max_rank)
        df_top = df_sub.iloc[:ix_max_rank, :]

        targets.extend(df_top['Source'].tolist())
    # end of for
    return targets