Source code for armonic.lifecycle

import inspect
import logging
import copy
import sys
import re
from platform import uname

import armonic.common

from armonic.utils import IterContainer, DoesNotExist, OS_TYPE, OsTypeAll, get_subclasses
from armonic.common import ProvideError, format_input_variables
from armonic.provide import Provide
from armonic.variable import ValidationError

from xml_register import XMLRessource, XMLRegistery, Element, SubElement

XMLRegistery = XMLRegistery()
logger = logging.getLogger(__name__)
STATE_RESERVED_METHODS = ('enter', 'leave', 'cross')

class TransitionNotAllowed(Exception):

def Transition(s, d):
    return (s, d,)

class ProvideNotExist(Exception):

class ProvideNotInStack(Exception):

class ProvideAmbigous(Exception):

class RequireHasNotFuncArgs(Exception):

class StateNotApply(Exception):

class StateNotExist(Exception):

class StateFactory(type):
    _instances = {}

    def __call__(cls, *args, **kwargs):
        # States are Singletons
        if cls not in cls._instances:
            cls._instances[cls] = super(StateFactory, cls).__call__(*args, **kwargs)
        return cls._instances[cls]

    def __new__(cls, *args, **kwargs):
        state_class = super(StateFactory, cls).__new__(cls, *args, **kwargs)

        # init provides
        state_class._provides = IterContainer()

        # setup reserved provides
        for method_name in STATE_RESERVED_PROVIDES:
            method = getattr(state_class, method_name).__func__
            if not hasattr(method, "_provide"):
                provide_inst = Provide(method_name)
                provide_inst = copy.deepcopy(method._provide)
            setattr(state_class, "provide_%s" % method_name, provide_inst)

        # register custom provides
        funcs = inspect.getmembers(state_class, predicate=inspect.ismethod)
        for (fname, f) in funcs:
            if hasattr(f, '_provide') and fname not in STATE_RESERVED_METHODS:
                provide = copy.deepcopy(f._provide)

                # Format provide extra args with class attributes
                for key, value in provide.extra.items():
                    if isinstance(value, basestring):
                        matches ='{([^}]*)}', value)
                        if not matches:
                        format_dct = {}
                        for match in matches.groups():
                            format_dct[match] = getattr(state_class, match, "")
                        provide.extra[key] = value.format(**format_dct)

                logger.debug("Registered %s in state %s" % (f._provide, state_class.__name__))

        return state_class

class State(XMLRessource):
    __metaclass__ = StateFactory
    """A State describes a step during the life of a :class:`Lifecycle`.

    Each State can have some Requires. :class:`Require` objects
    defines the arguments required to enter the State.

    To define a new State, it is necessary to redefine methods:
    _lf_name = ""
    _instance = None
    supported_os_type = [OsTypeAll()]

    def _xml_tag(self):

    def _xml_children(self):
        provides = []
        for method_name in STATE_RESERVED_PROVIDES:
            provides.append(getattr(self, "provide_%s" % method_name))
        return provides + self.provides

    def _xml_ressource_name(self):
        return "state"

    def _xml_add_properties(self):
        acc = []
        for s in self.supported_os_type:
            t = Element("supported_os")
            name = SubElement(t, "name")
            name.text =
            r = SubElement(t, "release")
            r.text = s.release
        return acc

    def name(self):
        """Name of the state"""
        return self.__class__.__name__

    def lf_name(self):
        """Shity hack. This return the name of the lifecycle using this
        state. It is set by :py:meth:`Lifecycle._push_state` method.
        return self._lf_name

    def lf_name(self, name):
        self._lf_name = name

    def _provide_call(self, f_provide, provide, requires=[]):
        """Call a provide.
        :param f_provide: is the function to call
        :param provide: is the provide object
        :param requires: is the list of input requires
        if not armonic.common.DONT_VALIDATE_ON_CALL:
            if armonic.common.SIMULATION:
                logger.warning("Provide call %s is simulated" %
                ret = {}
                if provide:
                    ret = f_provide(provide)
                    ret = f_provide()

        except ValidationError:
        except Exception, e:
            raise ProvideError(provide, e.message, sys.exc_info())
        return ret

    def _enter_safe(self, requires=[]):
        """Check all state requires are satisfated and enter into State

        :param requires: variable values to fill the requires::

                ("//xpath/to/variable", {0: value}),
                ("//xpath/to/variable", {0: value})
             ], {'source' : xpath, 'id': uuid})

        :type requires: tuple of variable values and deployment info
        return self._provide_call(self.enter, self.provide_enter, requires)

    def enter(self):
        """Called when a state is applied"""
        logger.debug("Entering state %s" % self)

    def leave(self):
        """Called when a state is leaved"""
        logger.debug("Leaving state %s" % self)

    def cross(self, **kwargs):
        """Called when the state is traversed"""
        logger.debug("State %s crossed" % self)

    def enter_doc(self):
        By default, it returns doc string of enter method. You can
        override it to be more concise.

        TODO Need state to be built by LF in order to have an instance.
        return self.enter.__doc__

    def provides(self):
        Requires for all provides

        :rtype: IterContainer([:py:class:`Provide`])
        return self._provides

    def provide_by_name(self, provide_name):
        :param provide_name: name of a provide
        :rtype: Provide
        # Small hack for LifecycleManager.from_xpath
        if provide_name in STATE_RESERVED_PROVIDES:
            return getattr(self, 'provide_%s' % provide_name)

            return self.provides.get(provide_name)
        except DoesNotExist:
            raise ProvideNotExist("%s doesn't exist in state %s" %
                                  (provide_name, self))

    def __repr__(self):
        return "<%s:%s>" % (self.lf_name,

    def _clear_provides(self):
        """Reset variables to default values in all state provides"""
        for provide in self.provides:

    def _clear_provide(self, provide_name):
        """Reset variables to default values in state provide"""

    def to_primitive(self):
        return {"name":,
                "xpath": self.get_xpath_relative(),
                "supported_os_type": [t.to_primitive() for t in
                "provides": [r.to_primitive() for r in self.provides],
                "provide_enter": self.provide_enter.to_primitive()}

[docs]class MetaState(State): """Set by state.__new__ to add implementation of this metastate. Be careful, provides of states that implement a MetaState are _removed_. You then have to manually redefine them in the metastate if they are required. """ implementations = []
[docs]class Lifecycle(XMLRessource): """The Lifecycle of a service or application is represented by transitions between :class:`State` classes. The transitions list is specified in the class attribute :attr:`Lifecycle.transition`. Main operations on a Lifecycle are: * :meth:`Lifecycle.state_list` to list available states, * :meth:`Lifecycle.state_current` to know the current state, * :meth:`Lifecycle.state_goto` to go from current state to another state. * :meth:`Lifecycle.provide_call` to call a provide. States applied are recorded in a stack to be able to unapply them. The State stack does not contain the same State twice. """ _initialized = False os_type = OS_TYPE """To specify the current OS type. By default, OS type is automatically discovered but it is possible to override this attribute to manually specify one. """ abstract = False """If the Lifecycle is abstract it won't be loaded in the LifecycleManager and in the XML registery. """ initial_state = None """The initial state for this Lifecycle""" _persist = True def __new__(cls): instance = super(Lifecycle, cls).__new__(cls) # Update transitions to manage MetaState for ms in instance._state_list(): ms.lf_name = # For each MetaState ms if isinstance(ms, MetaState): # Find transitions which involve a MetaState # Ignore already done MetaState transitions transitions = [(s, i) for (s, i) in instance.transitions if i == ms and not "%s." % in] if not transitions: continue # We create new state suffixed by metaclass name This # permits to create specical path. If two metastate # has same implementation, we need to create special # implementations for each metastate. created_states = [type('%s.%s' % (ms.__class__.__name__, s.__name__), (s,), {}) for s in ms.implementations] for s in created_states: s.lf_name = logger.debug("State %s has been created from MetaState %s" % (s.__name__, # For each transtion to MetaState ms for t in transitions: update_transitions = [] # And for each state implementations for d in created_states: # We create transition to this implementation update_transitions += [(t[0], d())] # And from this implementation to metastate update_transitions += [(d(), ms)] # We also remove the provide list of an # implementation. # # TODO: we should implement a mecanism to # import provide from implemenations to # MetaState iif a provide is defined in all # implementations. del d._provides[:] # Finally, we remove useless transitions and add new ones. if update_transitions != []: instance.transitions.remove(t) instance.transitions += update_transitions return instance def __init__(self): XMLRessource.__init__(self) if self.initial_state: self.init(self.initial_state)
[docs] def init(self, state, requires=[]): """If it is not already initialized, push state in stack.""" self._stack = [] requires = format_input_variables(requires) if not self._initialized: self._push_state(state, requires) self._initialized = True
def _xml_tag(self): return def _xml_children(self): return self.state_list() def _xml_ressource_name(self): return "lifecycle" def _xml_add_properties(self): transitions = [] for (s, d) in self.transitions: t = Element("transition") src = SubElement(t, "source") src.text = dst = SubElement(t, "destination") dst.text = transitions.append(t) return transitions def _persist_primitive(self): return [ for state in self._stack] def _persist_load_primitive(self, stack): logger.debug("Loading %s previous states" % self) _stack = [] for state_name in stack: try: state = self.state_by_name(state_name) _stack.append(state) except DoesNotExist: logger.error("State %s in unknown in Lifecycle %s" % (state_name, self)) return False if _stack: self._stack = _stack return True return False @property def name(self): return self.__class__.__name__ @classmethod def _state_list(cls): return list(set([s for (s, d) in cls.transitions] + [d for (s, d) in cls.transitions]))
[docs] def doc(self): """Return docstring of this lifecycle.""" return self.__class__.__doc__
[docs] def state_list(self, reachable=False): """To get all available states. :param reachable: list only reachable states from the current state :type reachable: bool :rtype: [:class:`State`] """ states = self.__class__._state_list() if reachable: acc = [] for s in states: if (self._get_from_state_paths(self.state_current(), s) != [] or s == self.state_current()): acc.append(s) states = acc return states
[docs] def state_current(self): """Get current state. :rtype: :class:`State` """ if self._stack == []: return None else: return self._stack[-1]
def _is_state_in_stack(self, state): return self._get_state_class(state) in self._stack def _is_transition_allowed(self, s, d): """A transition is allowed if src and dst state support current os type.""" return (self.os_type in d.supported_os_type and self.os_type in s.supported_os_type and (s, d) in self.transitions) def _push_state(self, state, requires): """Go to a state if transition from current state to state is allowed TODO: verify that state is not in stack yet. You should never use this method. Use goto_state instead. """ if self._stack != []: if not self._is_transition_allowed(self.state_current(), state): raise TransitionNotAllowed("from %s to %s" % (self.state_current(), state)) logger.event({'event': 'state_appling', 'state':, 'lifecycle':}) ret = state._enter_safe(requires) logger.debug("push state %s" % state) self._stack.append(state) logger.event({'event': 'state_applied', 'state':, 'lifecycle':}) return ret def _pop_state(self): if self._stack != []: t = self._stack.pop() t.leave() def _get_from_state_paths(self, from_state, to_state): logger.debug("Find paths from %s to %s" % (from_state, to_state)) paths = [] def _find_next_state(state, paths, path=[]): for (src, dst) in self.transitions: if src == state and self._is_transition_allowed(src, dst): new_path = copy.copy(path) new_path.append((dst, 'enter')) paths.append(new_path) if not dst == to_state: _find_next_state(dst, paths, new_path) # we can't go further # should we keep this path ? if path and not (path[0] == from_state and path[-1] == to_state): # seems not! delete it for i, p in enumerate(paths): if p == path: del paths[i] # Check if we are going back in the stack # take the same path we took to go to to_state to go back to from_state if to_state in self._stack and from_state == self.state_current(): logger.debug("Using same path to go back to state %s" % to_state) rewind_path = [] for state in reversed(self._stack[:]): if state == to_state: break else: rewind_path.append((state, "leave")) paths.append(rewind_path) # trying to find a path from "to_state" to "from_state" # meaning we are going forward in the state machine else: _find_next_state(from_state, paths) logger.debug("Found paths:") # % pprint.pformat(paths)) for p in paths: logger.debug("\t%s" % p) return paths def _get_state_class(self, state): """From a string state name or a state class, try to find the state object. :rtype: the corresponding state class If state is not found, raise StateNotExist. """ if isinstance(state, type) and issubclass(state, State): state = state.__name__ elif isinstance(state, basestring): pass elif isinstance(state, State): state = else: raise AttributeError("state must be a subclass of State or a string") for s in self.state_list(): if == state: return s raise StateNotExist("%s is not a valid state" % state)
[docs] def state_by_name(self, name): """Get state from its name :param name: the name of a state :type name: str :rtype: :class:`State` """ for state in self.state_list(): if == name: return state raise DoesNotExist("State %s doesn't exists" % name)
[docs] def state_goto(self, state, requires=[], path_idx=0): """Go to state. :param state: the target state :type state: state_name | :class:`State` :param requires: variable values to fill the requires :: ([ ("//xpath/to/variable", {0: value}), ("//xpath/to/variable", {0: value}) ], {'source' : xpath, 'id': uuid}) :type requires: tuple of variable values and deployment info :param path_idx: the path to use when there is multiple paths to go to the target State :type path_idx: int :rtype: None """ requires = format_input_variables(requires) logger.debug("Goto state %s using path %i" % (state, path_idx)) path = self.state_goto_path(state, path_idx=path_idx) for (state, method) in path: if method == "enter": self._push_state(state, requires) elif method == "leave": if self.state_current() == state: self._pop_state() else: raise StateNotApply(self.state_current())
[docs] def state_goto_path_list(self, state): """Get the list of paths to go to State. :param state: the target state :type state: state_name | :class:`State` :rtype: [[(:class:`State`, method), (:class:`State`, method), ...], ...] """ state = self._get_state_class(state) logger.debug("Get paths to go to state %s" % state) return self._get_from_state_paths(self.state_current(), state)
[docs] def state_goto_path(self, state, func=None, path_idx=0): """Get one path to go to State. :param state: the target state :type state: state_name | :class:`State` :param func: function to apply on all States of the path :type func: function :param path_idx: the path to use when there is multiple paths to go to the target State :type path_idx: int :rtype: [(:class:`State`, method), (:class:`State`, method), ...] """ try: path = self.state_goto_path_list(state)[path_idx] except IndexError: raise StateNotApply("No path to go to state %s" % state) if func is not None: for state, method in path: func(state) return path
[docs] def state_goto_requires(self, state, path_idx=0): """Get Requires to go to State. :param state: the target state :type state: state_name | :class:`State` :param path_idx: the path to use when there is multiple paths to go to the target State :type path_idx: int :rtype: [:class:`Provide`] """ acc = IterContainer() for state, method in self.state_goto_path(state, path_idx=path_idx): if method == "enter": if state.provide_enter: acc.append(state.provide_enter) return acc
[docs] def provide_list(self, reachable=False): """Get all available provides :param reachable: list only reachable provides from the current state :type reachable: bool :rtype: [(:class:`State`, [:class:`Provide`])] """ return [(s, s.provides) for s in self.state_list(reachable=reachable) if s.provides != []]
[docs] def provide_call_requires(self, state, path_idx=0): """Get requires to call provide in state. :param state: the target state :type state: state_name | :class:`State` :param path_idx: the path to use when there is multiple paths to go to the target State :type path_idx: int """ state = self._get_state_class(state) if not self._is_state_in_stack(state): return self.state_goto_requires(state, path_idx) else: return []
[docs] def provide_call_args(self, state_name, provide_name): """From a provide_name, returns its needed arguments.""" state = self._get_state_class(state_name) return state.provide_by_name(provide_name)
[docs] def provide_call_path(self, state): """Get paths to call a provide in state. :param state: the target state :type state: state_name | :class:`State` """ state = self._get_state_class(state) if not self._is_state_in_stack(state): return self.state_goto_path_list(state) else: return []
[docs] def provide_call(self, state, provide_name, requires=[], path_idx=0): """Go to provide state and call provide. :param state: the target state :type state: state_name | :class:`State` :param provide_name: name of the provide :type provide_name: str :param requires: variable values to fill the requires :: ([ ("//xpath/to/variable", {0: value}), ("//xpath/to/variable", {0: value}) ], {'source' : xpath, 'id': uuid}) :type requires: tuple of variable values and deployment info :rtype: provide result """ # FIXME This is useless and should be removed ??? requires = format_input_variables(requires) state = self._get_state_class(state) # To be sure that the provide exists state.provide_by_name(provide_name) if not self._is_state_in_stack(state): self.state_goto(state, requires, path_idx) return self._provide_call_in_stack(state, provide_name, requires)
def _provide_call_in_stack(self, state, provide_name, requires=[]): """Call a provide by name. State which provides must be in the stack.""" state = self._get_state_class(state) state_index = self._stack.index(state) provide = state.provide_by_name(provide_name) provide_method = getattr(state, provide_name) ret = state._provide_call(provide_method, provide, requires) logger.debug("Provide %s returns values %s" % (provide_name, ret)) logger.debug("Propagate flags %s to upper states" % provide.flags) for s in self._stack[state_index:]: if armonic.common.SIMULATION: logger.warning("Cross state %s is simulated" % else: s.cross(**(provide.flags)) return ret
[docs] def to_dot(self, cross=False, enter_doc=False, leave_doc=False, reachable=False): """Return a dot string of lifecycle.""" def dotify(string): # To remove illegal character if string is not None: tmp = string.replace("{", "").replace("}", "").replace(":", "").replace("\n", "\l") tmp += '\l' return tmp else: return string def list_to_table(l): if l == []: acc = "" else: acc = dotify(str(l[0])) if len(l) > 1: for a in l[1:]: acc += "| %s" % dotify(str(a)) return acc def dot_provide(provide): if provide != []: return "%s | {%s}" % ( dotify(, list_to_table([ for r in provide])) else: return "" acc = "" acc += "digraph finite_state_machine {\n" acc += "node [shape = ellipse];\n" state_list = self.state_list(reachable=reachable) for s in state_list: acc += '"%s"[\n' % acc += 'shape = "record"\n' # requires = "" # provides = list_to_table([(, p.flags) for p in # s.provides]) # Begin of label acc += 'label = "{%s | %s ' % (, dotify(s.__doc__), ) # Enter doc if enter_doc: acc += " | Entry: %s" % (dotify(s.enter.__doc__)) if leave_doc: acc += " | Leave: %s" % (dotify(s.leave.__doc__)) # Cross method if cross: acc += "| {Cross: | {Doc: %s | Flags: %s}}" % ( dotify(s.cross.__doc__), inspect.getargspec(s.cross).args[1:]) # Enter Requires acc += "| { enter\l | {%s}}" % list_to_table([ for r in s.provide_enter]) for p in s.provides: acc += " | { %s }" % dot_provide(p) # End of label acc += '}"\n' acc += "];\n" for (s, d) in self.transitions: if s in state_list and d in state_list: acc += '"%s" -> "%s";\n' % (, acc += "}\n" return acc
def __repr__(self): return "<Lifecycle:%s>" % def _clear_states_provides(self): """Reset variables to default values in all states""" for s in self.state_list(): s._clear_provides() def _clear_state_provide(self, state, provide_name): """Reset variables to default values in all states""" state = self._get_state_class(state) state._clear_provide(provide_name) def to_primitive(self, reachable=False): state_list = self.state_list(reachable=reachable) return {'name':, 'xpath': self.get_xpath_relative(), 'states': [s.to_primitive() for s in state_list], "transitions": [(, for (s, d) in self.transitions if s in state_list and d in state_list]}
class LifecycleNotExist(Exception): pass
[docs]class LifecycleManager(XMLRessource): """The :class:`LifecyleManager` is used to manage :class:`Lifecyle` objects. It permits to interact with lifecycles by provinding xpaths. The full path to a variable is:: /hostname/lifecycle_name/state_name/provide_name/require_name/variable_name The xpath to get all states of the Mysql :class:`Lifecyle` would be:: //Mysql/* To get the ``add_database`` provide in the Mysql :class:`Lifecyle`:: //Mysql//add_database All methods of :class:`LifecyleManager` returns python objects. :param os_type: to specify which kind of os has to be used. If it is not specified, the os type is automatically discovered. :param public_ip: the public ip of the agent. This is used by clients to know how to contact services deployed by this agent. """ def __init__(self, os_type=None, autoload=True, public_ip="localhost"): XMLRessource.__init__(self) self.os_type = OS_TYPE if os_type: self.os_type = os_type self.public_ip = public_ip # Here we globally set PUBLIC_IP armonic.common.PUBLIC_IP = public_ip self.lf_loaded = {} self.lf = {} for lf in get_subclasses(Lifecycle): if not lf.abstract: logger.debug("Found Lifecycle %s" % lf) self.lf.update({lf.__name__: lf}) if autoload: self.load(lf.__name__) else: logger.debug("Ignoring abstract Lifecycle %s" % lf) self.register()
[docs] def register(self): """Register the manager in the XMLRegistery. """ XMLRegistery._xml_register(self)
@property def name(self): return uname()[1] def _xml_ressource_name(self): return "location" def _xml_tag(self): return def _xml_children(self): return [lf for lf_name, lf in self.lf_loaded.items()]
[docs] def info(self): """Get info of armonic agent :rtype: dict """ return {"os-type":, "os-release": self.os_type.release, "version": armonic.common.VERSION, "public-ip": self.public_ip}
[docs] def lifecycle(self, lifecycle_xpath): """List loaded lifecycle objects :param lifecycle_xpath: xpath that matches lifecycles :type lifecycle_xpath: str :return: list of :class:`Lifecycle` :rtype: [:class:`Lifecycle`] """ elts = XMLRegistery.find_all_elts(lifecycle_xpath) acc = [] for e in elts: lf_name = XMLRegistery.get_ressource(e, "lifecycle") lf = self.lifecycle_by_name(lf_name) acc.append(lf) return acc
[docs] def load(self, lf_name): """Load a :class:`Lifecycle` in the manager and register it in the XML register. :param lf_name: the :class:`Lifecycle` name to load :type lf_name: str :raises LifecycleNotExist: if the :class:`Lifecycle` isn't found :return: the loaded :class:`Lifecycle` :rtype: :class:`Lifecycle` """ try: lf = self.lf[lf_name]() # Reset variables values in all States # since States are Singleton lf._clear_states_provides() if self.os_type is not None: lf.os_type = self.os_type except KeyError: raise LifecycleNotExist("Lifecycle '%s' doesn't exist" % lf_name) self.lf_loaded.update({lf_name: lf}) return lf
def lifecycle_by_name(self, lf_name): try: self.lf_loaded[lf_name] except KeyError: self.load(lf_name) try: return self.lf_loaded[lf_name] except KeyError: raise LifecycleNotExist("%s is not loaded" % lf_name)
[docs] def state(self, state_xpath): """Return a list of states that matches state_xpath. :param state_xpath: xpath that can match multiple states :type state_xpath: str :return: list of :class:`State` :rtype: [:py:class:`State`] """ elts = XMLRegistery.find_all_elts(state_xpath) acc = [] for e in elts: lf_name = XMLRegistery.get_ressource(e, "lifecycle") state_name = XMLRegistery.get_ressource(e, "state") state = self.lifecycle_by_name(lf_name)._get_state_class(state_name) acc.append(state) return acc
[docs] def state_current(self, lifecycle_xpath): """Get the current state name of matched lifecycles. :param lifecyle_xpath: xpath that can match multiple :class:`Lifecycle` :rtype: [:class:`State`] """ # TODO return (Lifecycle, State) elts = XMLRegistery.find_all_elts(lifecycle_xpath) acc = [] for e in elts: lf_name = XMLRegistery.get_ressource(e, "lifecycle") lf = self.lifecycle_by_name(lf_name) acc.append(lf.state_current()) return acc
[docs] def state_goto_path(self, state_xpath): """From the current state, returns all paths to goto states that match state_xpath. :param state_xpath: xpath that can match multiple states :type state_xpath: str :return: list of paths for every state matched by state_xpath :rtype: [(:class:`State`, [path])] """ elts = XMLRegistery.find_all_elts(state_xpath) acc = [] for e in elts: lf_name = XMLRegistery.get_ressource(e, "lifecycle") state_name = XMLRegistery.get_ressource(e, "state") state = self.lifecycle_by_name(lf_name)._get_state_class(state_name) paths = self.lifecycle_by_name(lf_name).state_goto_path_list(state_name) acc.append((state, paths)) return acc
[docs] def state_goto_requires(self, state_xpath_uri, path_idx=0): """Return the list a special provide required to go from the current state to the state that match state_xpath_uri. :param state_xpath_uri: unique state xpath :type state_xpath_uri: str :param path_idx: path to use when there is multiple paths to go to the provide :type path_idx: int :rtype: [:py:class:`Provide`] """ lf_name = XMLRegistery.get_ressource(state_xpath_uri, "lifecycle") state_name = XMLRegistery.get_ressource(state_xpath_uri, "state") lf = self.lifecycle_by_name(lf_name) return lf.state_goto_requires(state_name)
[docs] def state_goto(self, state_xpath_uri, requires=[], path_idx=0): """From the current state go to state. :param xpath: unique xpath of a state :type xpath: str :param requires: variable values to fill the requires :: ([ ("//xpath/to/variable", {0: value}), ("//xpath/to/variable", {0: value}) ], {'source' : xpath, 'id': uuid}) :type requires: tuple of variable values and deployment info :rtype: None """ requires = format_input_variables(requires) lf_name = XMLRegistery.get_ressource(state_xpath_uri, "lifecycle") state_name = XMLRegistery.get_ressource(state_xpath_uri, "state") logger.debug("state-goto %s %s %s" % ( lf_name, state_name, requires)) return self.lifecycle_by_name(lf_name).state_goto(state_name, requires)
[docs] def provide(self, provide_xpath): """Return provides that match provide_xpath and that can be reached (OS_TYPE). :param provide_xpath: xpath to provide :type provide_xpath: str :return: list of provides that match provide_xpath :rtype: [:py:class:`Provide`] """ matches = XMLRegistery.find_all_elts(provide_xpath) acc = IterContainer() for m in matches: if XMLRegistery.is_ressource(m, "provide"): provide_name = XMLRegistery.get_ressource(m, "provide") if provide_name not in STATE_RESERVED_METHODS: lf_name = XMLRegistery.get_ressource(m, "lifecycle") lf = self.lifecycle_by_name(lf_name) state_name = XMLRegistery.get_ressource(m, "state") state = lf.state_by_name(state_name) if (lf._is_state_in_stack(state) or lf.provide_call_path(state) != []): acc.append(state.provide_by_name(provide_name)) return acc
[docs] def provide_call_requires(self, provide_xpath_uri, path_idx=0): """Requires for the provide. :param provide_xpath_uri: unique xpath to provide :type provide_xpath_uri: str :param path_idx: path to use when there is multiple paths to go to the provide :type path_idx: int :return: list of provides to call it order to call provide_xpath_uri :rtype: [:py:class:`Provide`] """ lf_name = XMLRegistery.get_ressource(provide_xpath_uri, "lifecycle") state_name = XMLRegistery.get_ressource(provide_xpath_uri, "state") return self.lifecycle_by_name(lf_name).provide_call_requires(state_name, path_idx)
[docs] def provide_call_path(self, provide_xpath): """Paths for provides that matches provide_xpath. :param provide_xpath: xpath to provide :type provide_xpath: str :return: list of paths to call provides that match provide_xpath :rtype: [(:py:class:`Provide`, [path, ...])] """ matches = XMLRegistery.find_all_elts(provide_xpath) acc = [] for m in matches: if XMLRegistery.is_ressource(m, "provide"): provide_name = XMLRegistery.get_ressource(m, "provide") if provide_name not in STATE_RESERVED_METHODS: lf_name = XMLRegistery.get_ressource(m, "lifecycle") lf = self.lifecycle_by_name(lf_name) state_name = XMLRegistery.get_ressource(m, "state") state = lf.state_by_name(state_name) provide = state.provide_by_name(provide_name) acc.append((provide, lf.provide_call_path(state_name))) return acc
[docs] def provide_call_validate(self, provide_xpath_uri, requires=[], path_idx=0): """Validate requires to call the provide :param xpath: unique xpath of the provide to call :type xpath: str :param requires: variable values to fill the requires :: ([ ("//xpath/to/variable", {0: value}), ("//xpath/to/variable", {0: value}) ], {'source' : xpath, 'id': uuid}) :type requires: tuple of variable values and deployment info :return: list of validated provides to call in order to call provide_xpath_uri :rtype: {'errors': bool, 'xpath': xpath, 'requires': [:class:`Provide`]} """ variables_values = format_input_variables(requires) logger.debug("Validating variables %s" % variables_values) # check that all requires are validated # copy requires we don't want to fill variables yet requires = copy.deepcopy(self.provide_call_requires(provide_xpath_uri)) try: requires.append(copy.deepcopy( self.from_xpath(provide_xpath_uri, "provide"))) except DoesNotExist: pass errors = False for provide in requires: try: provide.fill(variables_values) provide.validate() except ValidationError as e: logger.debug("Validation error on provide '%s'" % provide.get_xpath()) logger.debug(" on require '%s'" % e.require_name) logger.debug(" on variable '%s'" % e.variable_name) logger.debug(" with msg: %s" % e.msg) errors = True return {'xpath': provide_xpath_uri, 'errors': errors, 'requires': requires}
[docs] def provide_call(self, provide_xpath_uri, requires=[], path_idx=0): """Call a provide of a lifecycle and go to provider state if needed :param xpath: xpath of the provide to call :type xpath: str :param requires: variable values to fill the requires :: ([ ("//xpath/to/variable", {0: value}), ("//xpath/to/variable", {0: value}) ], {'source' : xpath, 'id': uuid}) :type requires: tuple of variable values and deployment info :return: provide_xpath_uri call result """ requires = format_input_variables(requires) logger.debug("Provide call %s" % provide_xpath_uri) # be sure that the provide can be validated # we don't want to change states # if we can't call the provide in the end if not armonic.common.DONT_VALIDATE_ON_CALL: errors = self.provide_call_validate(provide_xpath_uri, requires)['errors'] if errors: msg = ("Provided values doesn't met provide requires." + " Call provide_call_validate() to know errors.") logger.error(msg) raise ValidationError(msg=msg) requires = format_input_variables(requires) lf_name = XMLRegistery.get_ressource(provide_xpath_uri, "lifecycle") state_name = XMLRegistery.get_ressource(provide_xpath_uri, "state") provide_name = XMLRegistery.get_ressource(provide_xpath_uri, "provide") logger.debug("Calling provide %s" % provide_xpath_uri) return self.lifecycle_by_name(lf_name).provide_call(state_name, provide_name, requires, path_idx)
[docs] def to_dot(self, lf_name, reachable=False): """Return the dot string of a lifecyle object :param lf_name: name of the lifecycle object :type lf_name: str :rtype: dot file string""" return self.lifecycle_by_name(lf_name).to_dot(reachable=reachable)
[docs] def to_primitive(self, lf_name, reachable=False): """Return a serialized Lifecycle object :param lf_name: name of the :class:`Lifecycle` object :type lf_name: str :return: serialized :class:`Lifecycle` object :rtype: dict""" return self.lifecycle_by_name(lf_name).to_primitive(reachable=reachable)
[docs] def uri(self, xpath="//", relative=False, resource=None): """Return the list of xpath_uris that match this xpath. :type xpath: str :param xpath: an xpath string :type relative: bool :param relative: If true, returns relative xpath :type resource: str :param resource: Returns only xpath that describe this resource type :return: list of xpaths :rtype: [xpath_uri] """ ret = XMLRegistery.find_all_elts(xpath) if relative: acc = [] for x in ret: try: if (resource is None or XMLRegistery.is_ressource(x, resource)): acc.append(x.split("/", 2)[2]) except IndexError: pass return acc else: return ret
[docs] def from_xpath(self, xpath, ret="lifecycle"): """From a xpath try to get the object of type ``ret`` :param xpath: xpath to a ressource :type xpath: str :param ret: object type to return (lifecycle, state, provide, require, variable) :type ret: str :rtype: :class:`Lifecycle` | :class:`State` | :class:`Provide` | :class:`Require` | :class:`Variable` """ ressource_obj = self ressources_types = ("lifecycle", "state", "provide", "require", "variable") for ressource_type in ressources_types: ressource_name = XMLRegistery.get_ressource(xpath, ressource_type) ressource_obj = getattr(ressource_obj, "%s_by_name" % ressource_type)(ressource_name) if ressource_type == ret: return ressource_obj raise DoesNotExist("Can't find object")
def xpath(self, xpath): return XMLRegistery.xpath(xpath)
[docs] def to_xml(self, xpath=None): """Return the xml representation of the :class:`LifecyleManager`.""" return XMLRegistery.to_string(xpath)
def __repr__(self): return "<LifecyleManager:%s>" %