Source code for armonic.lifecycle

import inspect
import logging
import copy
import sys
import re
from platform import uname

import armonic.common

from armonic.utils import IterContainer, DoesNotExist, OS_TYPE, OsTypeAll, get_subclasses
from armonic.common import ProvideError, format_input_variables
from armonic.provide import Provide
from armonic.variable import ValidationError

from xml_register import XMLRessource, XMLRegistery, Element, SubElement

XMLRegistery = XMLRegistery()
logger = logging.getLogger(__name__)
STATE_RESERVED_METHODS = ('enter', 'leave', 'cross')
STATE_RESERVED_PROVIDES = ('enter',)


class TransitionNotAllowed(Exception):
    pass


def Transition(s, d):
    return (s, d,)


class ProvideNotExist(Exception):
    pass


class ProvideNotInStack(Exception):
    pass


class ProvideAmbigous(Exception):
    pass


class RequireHasNotFuncArgs(Exception):
    pass


class StateNotApply(Exception):
    pass


class StateNotExist(Exception):
    pass


class StateFactory(type):
    _instances = {}

    def __call__(cls, *args, **kwargs):
        # States are Singletons
        if cls not in cls._instances:
            cls._instances[cls] = super(StateFactory, cls).__call__(*args, **kwargs)
        return cls._instances[cls]

    def __new__(cls, *args, **kwargs):
        state_class = super(StateFactory, cls).__new__(cls, *args, **kwargs)

        # init provides
        state_class._provides = IterContainer()

        # setup reserved provides
        for method_name in STATE_RESERVED_PROVIDES:
            method = getattr(state_class, method_name).__func__
            if not hasattr(method, "_provide"):
                provide_inst = Provide(method_name)
            else:
                provide_inst = copy.deepcopy(method._provide)
            setattr(state_class, "provide_%s" % method_name, provide_inst)

        # register custom provides
        funcs = inspect.getmembers(state_class, predicate=inspect.ismethod)
        for (fname, f) in funcs:
            if hasattr(f, '_provide') and fname not in STATE_RESERVED_METHODS:
                provide = copy.deepcopy(f._provide)

                # Format provide extra args with class attributes
                for key, value in provide.extra.items():
                    if isinstance(value, basestring):
                        matches = re.search('{([^}]*)}', value)
                        if not matches:
                            continue
                        format_dct = {}
                        for match in matches.groups():
                            format_dct[match] = getattr(state_class, match, "")
                        provide.extra[key] = value.format(**format_dct)

                state_class._provides.append(provide)
                logger.debug("Registered %s in state %s" % (f._provide, state_class.__name__))

        return state_class


class State(XMLRessource):
    __metaclass__ = StateFactory
    """A State describes a step during the life of a :class:`Lifecycle`.

    Each State can have some Requires. :class:`Require` objects
    defines the arguments required to enter the State.

    To define a new State, it is necessary to redefine methods:
     :py:meth:`State.enter`
     :py:meth:`State.leave`
     :py:meth:`State.cross`
    """
    _lf_name = ""
    _instance = None
    supported_os_type = [OsTypeAll()]

    def _xml_tag(self):
        return self.name

    def _xml_children(self):
        provides = []
        for method_name in STATE_RESERVED_PROVIDES:
            provides.append(getattr(self, "provide_%s" % method_name))
        return provides + self.provides

    def _xml_ressource_name(self):
        return "state"

    def _xml_add_properties(self):
        acc = []
        for s in self.supported_os_type:
            t = Element("supported_os")
            name = SubElement(t, "name")
            name.text = s.name
            r = SubElement(t, "release")
            r.text = s.release
            acc.append(t)
        return acc

    @property
    def name(self):
        """Name of the state"""
        return self.__class__.__name__

    @property
    def lf_name(self):
        """Shity hack. This return the name of the lifecycle using this
        state. It is set by :py:meth:`Lifecycle._push_state` method.
        """
        return self._lf_name

    @lf_name.setter
    def lf_name(self, name):
        self._lf_name = name

    def _provide_call(self, f_provide, provide, requires=[]):
        """Call a provide.
        :param f_provide: is the function to call
        :param provide: is the provide object
        :param requires: is the list of input requires
        """
        provide.fill(requires)
        if not armonic.common.DONT_VALIDATE_ON_CALL:
            provide.validate()
        try:
            if armonic.common.SIMULATION:
                logger.warning("Provide call %s is simulated" % provide.name)
                ret = {}
            else:
                if provide:
                    ret = f_provide(provide)
                else:
                    ret = f_provide()

            provide.finalize()
        except ValidationError:
            raise
        except Exception, e:
            raise ProvideError(provide, e.message, sys.exc_info())
        return ret

    def _enter_safe(self, requires=[]):
        """Check all state requires are satisfated and enter into State

        :param requires: variable values to fill the requires::

            ([
                ("//xpath/to/variable", {0: value}),
                ("//xpath/to/variable", {0: value})
             ], {'source' : xpath, 'id': uuid})

        :type requires: tuple of variable values and deployment info
        """
        return self._provide_call(self.enter, self.provide_enter, requires)

    def enter(self):
        """Called when a state is applied"""
        logger.debug("Entering state %s" % self)

    def leave(self):
        """Called when a state is leaved"""
        logger.debug("Leaving state %s" % self)

    def cross(self, **kwargs):
        """Called when the state is traversed"""
        logger.debug("State %s crossed" % self)

    def enter_doc(self):
        """NOT YET IMPLEMENTED.
        By default, it returns doc string of enter method. You can
        override it to be more concise.

        TODO Need state to be built by LF in order to have an instance.
        """
        return self.enter.__doc__

    @property
    def provides(self):
        """
        Requires for all provides

        :rtype: IterContainer([:py:class:`Provide`])
        """
        return self._provides

    def provide_by_name(self, provide_name):
        """
        :param provide_name: name of a provide
        :rtype: Provide
        """
        # Small hack for LifecycleManager.from_xpath
        if provide_name in STATE_RESERVED_PROVIDES:
            return getattr(self, 'provide_%s' % provide_name)

        try:
            return self.provides.get(provide_name)
        except DoesNotExist:
            raise ProvideNotExist("%s doesn't exist in state %s" %
                                  (provide_name, self))

    def __repr__(self):
        return "<%s:%s>" % (self.lf_name, self.name)

    def _clear_provides(self):
        """Reset variables to default values in all state provides"""
        for provide in self.provides:
            provide._clear()
        self.provide_enter._clear()

    def _clear_provide(self, provide_name):
        """Reset variables to default values in state provide"""
        self.provide_by_name(provide_name)._clear()

    def to_primitive(self):
        return {"name": self.name,
                "xpath": self.get_xpath_relative(),
                "supported_os_type": [t.to_primitive() for t in
                                      self.supported_os_type],
                "provides": [r.to_primitive() for r in self.provides],
                "provide_enter": self.provide_enter.to_primitive()}


[docs]class MetaState(State): """Set by state.__new__ to add implementation of this metastate. Be careful, provides of states that implement a MetaState are _removed_. You then have to manually redefine them in the metastate if they are required. """ implementations = []
[docs]class Lifecycle(XMLRessource): """The Lifecycle of a service or application is represented by transitions between :class:`State` classes. The transitions list is specified in the class attribute :attr:`Lifecycle.transition`. Main operations on a Lifecycle are: * :meth:`Lifecycle.state_list` to list available states, * :meth:`Lifecycle.state_current` to know the current state, * :meth:`Lifecycle.state_goto` to go from current state to another state. * :meth:`Lifecycle.provide_call` to call a provide. States applied are recorded in a stack to be able to unapply them. The State stack does not contain the same State twice. """ _initialized = False os_type = OS_TYPE """To specify the current OS type. By default, OS type is automatically discovered but it is possible to override this attribute to manually specify one. """ abstract = False """If the Lifecycle is abstract it won't be loaded in the LifecycleManager and in the XML registery. """ initial_state = None """The initial state for this Lifecycle""" _persist = True def __new__(cls): instance = super(Lifecycle, cls).__new__(cls) # Update transitions to manage MetaState for ms in instance._state_list(): ms.lf_name = instance.name # For each MetaState ms if isinstance(ms, MetaState): # Find transitions which involve a MetaState # Ignore already done MetaState transitions transitions = [(s, i) for (s, i) in instance.transitions if i == ms and not "%s." % i.name in s.name] if not transitions: continue # We create new state suffixed by metaclass name This # permits to create specical path. If two metastate # has same implementation, we need to create special # implementations for each metastate. created_states = [type('%s.%s' % (ms.__class__.__name__, s.__name__), (s,), {}) for s in ms.implementations] for s in created_states: s.lf_name = instance.name logger.debug("State %s has been created from MetaState %s" % (s.__name__, ms.name)) # For each transtion to MetaState ms for t in transitions: update_transitions = [] # And for each state implementations for d in created_states: # We create transition to this implementation update_transitions += [(t[0], d())] # And from this implementation to metastate update_transitions += [(d(), ms)] # We also remove the provide list of an # implementation. # # TODO: we should implement a mecanism to # import provide from implemenations to # MetaState iif a provide is defined in all # implementations. del d._provides[:] # Finally, we remove useless transitions and add new ones. if update_transitions != []: instance.transitions.remove(t) instance.transitions += update_transitions return instance def __init__(self): XMLRessource.__init__(self) if self.initial_state: self.init(self.initial_state)
[docs] def init(self, state, requires=[]): """If it is not already initialized, push state in stack.""" self._stack = [] requires = format_input_variables(requires) if not self._initialized: self._push_state(state, requires) self._initialized = True
def _xml_tag(self): return self.name def _xml_children(self): return self.state_list() def _xml_ressource_name(self): return "lifecycle" def _xml_add_properties(self): transitions = [] for (s, d) in self.transitions: t = Element("transition") src = SubElement(t, "source") src.text = s.name dst = SubElement(t, "destination") dst.text = d.name transitions.append(t) return transitions def _persist_primitive(self): return [state.name for state in self._stack] def _persist_load_primitive(self, stack): logger.debug("Loading %s previous states" % self) _stack = [] for state_name in stack: try: state = self.state_by_name(state_name) _stack.append(state) except DoesNotExist: logger.error("State %s in unknown in Lifecycle %s" % (state_name, self)) return False if _stack: self._stack = _stack return True return False @property def name(self): return self.__class__.__name__ @classmethod def _state_list(cls): return list(set([s for (s, d) in cls.transitions] + [d for (s, d) in cls.transitions]))
[docs] def doc(self): """Return docstring of this lifecycle.""" return self.__class__.__doc__
[docs] def state_list(self, reachable=False): """To get all available states. :param reachable: list only reachable states from the current state :type reachable: bool :rtype: [:class:`State`] """ states = self.__class__._state_list() if reachable: acc = [] for s in states: if (self._get_from_state_paths(self.state_current(), s) != [] or s == self.state_current()): acc.append(s) states = acc return states
[docs] def state_current(self): """Get current state. :rtype: :class:`State` """ if self._stack == []: return None else: return self._stack[-1]
def _is_state_in_stack(self, state): return self._get_state_class(state) in self._stack def _is_transition_allowed(self, s, d): """A transition is allowed if src and dst state support current os type.""" return (self.os_type in d.supported_os_type and self.os_type in s.supported_os_type and (s, d) in self.transitions) def _push_state(self, state, requires): """Go to a state if transition from current state to state is allowed TODO: verify that state is not in stack yet. You should never use this method. Use goto_state instead. """ if self._stack != []: if not self._is_transition_allowed(self.state_current(), state): raise TransitionNotAllowed("from %s to %s" % (self.state_current(), state)) logger.event({'event': 'state_appling', 'state': state.name, 'lifecycle': self.name}) ret = state._enter_safe(requires) logger.debug("push state %s" % state) self._stack.append(state) logger.event({'event': 'state_applied', 'state': state.name, 'lifecycle': self.name}) return ret def _pop_state(self): if self._stack != []: t = self._stack.pop() t.leave() def _get_from_state_paths(self, from_state, to_state): logger.debug("Find paths from %s to %s" % (from_state, to_state)) paths = [] def _find_next_state(state, paths, path=[]): for (src, dst) in self.transitions: if src == state and self._is_transition_allowed(src, dst): new_path = copy.copy(path) new_path.append((dst, 'enter')) paths.append(new_path) if not dst == to_state: _find_next_state(dst, paths, new_path) # we can't go further # should we keep this path ? if path and not (path[0] == from_state and path[-1] == to_state): # seems not! delete it for i, p in enumerate(paths): if p == path: del paths[i] # Check if we are going back in the stack # take the same path we took to go to to_state to go back to from_state if to_state in self._stack and from_state == self.state_current(): logger.debug("Using same path to go back to state %s" % to_state) rewind_path = [] for state in reversed(self._stack[:]): if state == to_state: break else: rewind_path.append((state, "leave")) paths.append(rewind_path) # trying to find a path from "to_state" to "from_state" # meaning we are going forward in the state machine else: _find_next_state(from_state, paths) logger.debug("Found paths:") # % pprint.pformat(paths)) for p in paths: logger.debug("\t%s" % p) return paths def _get_state_class(self, state): """From a string state name or a state class, try to find the state object. :rtype: the corresponding state class If state is not found, raise StateNotExist. """ if isinstance(state, type) and issubclass(state, State): state = state.__name__ elif isinstance(state, basestring): pass elif isinstance(state, State): state = state.name else: raise AttributeError("state must be a subclass of State or a string") for s in self.state_list(): if s.name == state: return s raise StateNotExist("%s is not a valid state" % state)
[docs] def state_by_name(self, name): """Get state from its name :param name: the name of a state :type name: str :rtype: :class:`State` """ for state in self.state_list(): if state.name == name: return state raise DoesNotExist("State %s doesn't exists" % name)
[docs] def state_goto(self, state, requires=[], path_idx=0): """Go to state. :param state: the target state :type state: state_name | :class:`State` :param requires: variable values to fill the requires :: ([ ("//xpath/to/variable", {0: value}), ("//xpath/to/variable", {0: value}) ], {'source' : xpath, 'id': uuid}) :type requires: tuple of variable values and deployment info :param path_idx: the path to use when there is multiple paths to go to the target State :type path_idx: int :rtype: None """ requires = format_input_variables(requires) logger.debug("Goto state %s using path %i" % (state, path_idx)) path = self.state_goto_path(state, path_idx=path_idx) for (state, method) in path: if method == "enter": self._push_state(state, requires) elif method == "leave": if self.state_current() == state: self._pop_state() else: raise StateNotApply(self.state_current())
[docs] def state_goto_path_list(self, state): """Get the list of paths to go to State. :param state: the target state :type state: state_name | :class:`State` :rtype: [[(:class:`State`, method), (:class:`State`, method), ...], ...] """ state = self._get_state_class(state) logger.debug("Get paths to go to state %s" % state) return self._get_from_state_paths(self.state_current(), state)
[docs] def state_goto_path(self, state, func=None, path_idx=0): """Get one path to go to State. :param state: the target state :type state: state_name | :class:`State` :param func: function to apply on all States of the path :type func: function :param path_idx: the path to use when there is multiple paths to go to the target State :type path_idx: int :rtype: [(:class:`State`, method), (:class:`State`, method), ...] """ try: path = self.state_goto_path_list(state)[path_idx] except IndexError: raise StateNotApply("No path to go to state %s" % state) if func is not None: for state, method in path: func(state) return path
[docs] def state_goto_requires(self, state, path_idx=0): """Get Requires to go to State. :param state: the target state :type state: state_name | :class:`State` :param path_idx: the path to use when there is multiple paths to go to the target State :type path_idx: int :rtype: [:class:`Provide`] """ acc = IterContainer() for state, method in self.state_goto_path(state, path_idx=path_idx): if method == "enter": if state.provide_enter: acc.append(state.provide_enter) return acc
[docs] def provide_list(self, reachable=False): """Get all available provides :param reachable: list only reachable provides from the current state :type reachable: bool :rtype: [(:class:`State`, [:class:`Provide`])] """ return [(s, s.provides) for s in self.state_list(reachable=reachable) if s.provides != []]
[docs] def provide_call_requires(self, state, path_idx=0): """Get requires to call provide in state. :param state: the target state :type state: state_name | :class:`State` :param path_idx: the path to use when there is multiple paths to go to the target State :type path_idx: int """ state = self._get_state_class(state) if not self._is_state_in_stack(state): return self.state_goto_requires(state, path_idx) else: return []
[docs] def provide_call_args(self, state_name, provide_name): """From a provide_name, returns its needed arguments.""" state = self._get_state_class(state_name) return state.provide_by_name(provide_name)
[docs] def provide_call_path(self, state): """Get paths to call a provide in state. :param state: the target state :type state: state_name | :class:`State` """ state = self._get_state_class(state) if not self._is_state_in_stack(state): return self.state_goto_path_list(state) else: return []
[docs] def provide_call(self, state, provide_name, requires=[], path_idx=0): """Go to provide state and call provide. :param state: the target state :type state: state_name | :class:`State` :param provide_name: name of the provide :type provide_name: str :param requires: variable values to fill the requires :: ([ ("//xpath/to/variable", {0: value}), ("//xpath/to/variable", {0: value}) ], {'source' : xpath, 'id': uuid}) :type requires: tuple of variable values and deployment info :rtype: provide result """ # FIXME This is useless and should be removed ??? requires = format_input_variables(requires) state = self._get_state_class(state) # To be sure that the provide exists state.provide_by_name(provide_name) if not self._is_state_in_stack(state): self.state_goto(state, requires, path_idx) return self._provide_call_in_stack(state, provide_name, requires)
def _provide_call_in_stack(self, state, provide_name, requires=[]): """Call a provide by name. State which provides must be in the stack.""" state = self._get_state_class(state) state_index = self._stack.index(state) provide = state.provide_by_name(provide_name) provide_method = getattr(state, provide_name) ret = state._provide_call(provide_method, provide, requires) logger.debug("Provide %s returns values %s" % (provide_name, ret)) logger.debug("Propagate flags %s to upper states" % provide.flags) for s in self._stack[state_index:]: if armonic.common.SIMULATION: logger.warning("Cross state %s is simulated" % s.name) else: s.cross(**(provide.flags)) return ret
[docs] def to_dot(self, cross=False, enter_doc=False, leave_doc=False, reachable=False): """Return a dot string of lifecycle.""" def dotify(string): # To remove illegal character if string is not None: tmp = string.replace("{", "").replace("}", "").replace(":", "").replace("\n", "\l") tmp += '\l' return tmp else: return string def list_to_table(l): if l == []: acc = "" else: acc = dotify(str(l[0])) if len(l) > 1: for a in l[1:]: acc += "| %s" % dotify(str(a)) return acc def dot_provide(provide): if provide != []: return "%s | {%s}" % ( dotify(provide.name), list_to_table([r.name for r in provide])) else: return "" acc = "" acc += "digraph finite_state_machine {\n" acc += "node [shape = ellipse];\n" state_list = self.state_list(reachable=reachable) for s in state_list: acc += '"%s"[\n' % s.name acc += 'shape = "record"\n' # requires = "" # provides = list_to_table([(p.name, p.flags) for p in # s.provides]) # Begin of label acc += 'label = "{%s | %s ' % ( s.name, dotify(s.__doc__), ) # Enter doc if enter_doc: acc += " | Entry: %s" % (dotify(s.enter.__doc__)) if leave_doc: acc += " | Leave: %s" % (dotify(s.leave.__doc__)) # Cross method if cross: acc += "| {Cross: | {Doc: %s | Flags: %s}}" % ( dotify(s.cross.__doc__), inspect.getargspec(s.cross).args[1:]) # Enter Requires acc += "| { enter\l | {%s}}" % list_to_table([r.name for r in s.provide_enter]) for p in s.provides: acc += " | { %s }" % dot_provide(p) # End of label acc += '}"\n' acc += "];\n" for (s, d) in self.transitions: if s in state_list and d in state_list: acc += '"%s" -> "%s";\n' % (s.name, d.name) acc += "}\n" return acc
def __repr__(self): return "<Lifecycle:%s>" % self.name def _clear_states_provides(self): """Reset variables to default values in all states""" for s in self.state_list(): s._clear_provides() def _clear_state_provide(self, state, provide_name): """Reset variables to default values in all states""" state = self._get_state_class(state) state._clear_provide(provide_name) def to_primitive(self, reachable=False): state_list = self.state_list(reachable=reachable) return {'name': self.name, 'xpath': self.get_xpath_relative(), 'states': [s.to_primitive() for s in state_list], "transitions": [(s.name, d.name) for (s, d) in self.transitions if s in state_list and d in state_list]}
class LifecycleNotExist(Exception): pass
[docs]class LifecycleManager(XMLRessource): """The :class:`LifecyleManager` is used to manage :class:`Lifecyle` objects. It permits to interact with lifecycles by provinding xpaths. The full path to a variable is:: /hostname/lifecycle_name/state_name/provide_name/require_name/variable_name The xpath to get all states of the Mysql :class:`Lifecyle` would be:: //Mysql/* To get the ``add_database`` provide in the Mysql :class:`Lifecyle`:: //Mysql//add_database All methods of :class:`LifecyleManager` returns python objects. :param os_type: to specify which kind of os has to be used. If it is not specified, the os type is automatically discovered. :param public_ip: the public ip of the agent. This is used by clients to know how to contact services deployed by this agent. """ def __init__(self, os_type=None, autoload=True, public_ip="localhost"): XMLRessource.__init__(self) self.os_type = OS_TYPE if os_type: self.os_type = os_type self.public_ip = public_ip # Here we globally set PUBLIC_IP armonic.common.PUBLIC_IP = public_ip self.lf_loaded = {} self.lf = {} for lf in get_subclasses(Lifecycle): if not lf.abstract: logger.debug("Found Lifecycle %s" % lf) self.lf.update({lf.__name__: lf}) if autoload: self.load(lf.__name__) else: logger.debug("Ignoring abstract Lifecycle %s" % lf) self.register()
[docs] def register(self): """Register the manager in the XMLRegistery. """ XMLRegistery._xml_register(self)
@property def name(self): return uname()[1] def _xml_ressource_name(self): return "location" def _xml_tag(self): return self.name def _xml_children(self): return [lf for lf_name, lf in self.lf_loaded.items()]
[docs] def info(self): """Get info of armonic agent :rtype: dict """ return {"os-type": self.os_type.name, "os-release": self.os_type.release, "version": armonic.common.VERSION, "public-ip": self.public_ip}
[docs] def lifecycle(self, lifecycle_xpath): """List loaded lifecycle objects :param lifecycle_xpath: xpath that matches lifecycles :type lifecycle_xpath: str :return: list of :class:`Lifecycle` :rtype: [:class:`Lifecycle`] """ elts = XMLRegistery.find_all_elts(lifecycle_xpath) acc = [] for e in elts: lf_name = XMLRegistery.get_ressource(e, "lifecycle") lf = self.lifecycle_by_name(lf_name) acc.append(lf) return acc
[docs] def load(self, lf_name): """Load a :class:`Lifecycle` in the manager and register it in the XML register. :param lf_name: the :class:`Lifecycle` name to load :type lf_name: str :raises LifecycleNotExist: if the :class:`Lifecycle` isn't found :return: the loaded :class:`Lifecycle` :rtype: :class:`Lifecycle` """ try: lf = self.lf[lf_name]() # Reset variables values in all States # since States are Singleton lf._clear_states_provides() if self.os_type is not None: lf.os_type = self.os_type except KeyError: raise LifecycleNotExist("Lifecycle '%s' doesn't exist" % lf_name) self.lf_loaded.update({lf_name: lf}) return lf
def lifecycle_by_name(self, lf_name): try: self.lf_loaded[lf_name] except KeyError: self.load(lf_name) try: return self.lf_loaded[lf_name] except KeyError: raise LifecycleNotExist("%s is not loaded" % lf_name)
[docs] def state(self, state_xpath): """Return a list of states that matches state_xpath. :param state_xpath: xpath that can match multiple states :type state_xpath: str :return: list of :class:`State` :rtype: [:py:class:`State`] """ elts = XMLRegistery.find_all_elts(state_xpath) acc = [] for e in elts: lf_name = XMLRegistery.get_ressource(e, "lifecycle") state_name = XMLRegistery.get_ressource(e, "state") state = self.lifecycle_by_name(lf_name)._get_state_class(state_name) acc.append(state) return acc
[docs] def state_current(self, lifecycle_xpath): """Get the current state name of matched lifecycles. :param lifecyle_xpath: xpath that can match multiple :class:`Lifecycle` :rtype: [:class:`State`] """ # TODO return (Lifecycle, State) elts = XMLRegistery.find_all_elts(lifecycle_xpath) acc = [] for e in elts: lf_name = XMLRegistery.get_ressource(e, "lifecycle") lf = self.lifecycle_by_name(lf_name) acc.append(lf.state_current()) return acc
[docs] def state_goto_path(self, state_xpath): """From the current state, returns all paths to goto states that match state_xpath. :param state_xpath: xpath that can match multiple states :type state_xpath: str :return: list of paths for every state matched by state_xpath :rtype: [(:class:`State`, [path])] """ elts = XMLRegistery.find_all_elts(state_xpath) acc = [] for e in elts: lf_name = XMLRegistery.get_ressource(e, "lifecycle") state_name = XMLRegistery.get_ressource(e, "state") state = self.lifecycle_by_name(lf_name)._get_state_class(state_name) paths = self.lifecycle_by_name(lf_name).state_goto_path_list(state_name) acc.append((state, paths)) return acc
[docs] def state_goto_requires(self, state_xpath_uri, path_idx=0): """Return the list a special provide required to go from the current state to the state that match state_xpath_uri. :param state_xpath_uri: unique state xpath :type state_xpath_uri: str :param path_idx: path to use when there is multiple paths to go to the provide :type path_idx: int :rtype: [:py:class:`Provide`] """ lf_name = XMLRegistery.get_ressource(state_xpath_uri, "lifecycle") state_name = XMLRegistery.get_ressource(state_xpath_uri, "state") lf = self.lifecycle_by_name(lf_name) return lf.state_goto_requires(state_name)
[docs] def state_goto(self, state_xpath_uri, requires=[], path_idx=0): """From the current state go to state. :param xpath: unique xpath of a state :type xpath: str :param requires: variable values to fill the requires :: ([ ("//xpath/to/variable", {0: value}), ("//xpath/to/variable", {0: value}) ], {'source' : xpath, 'id': uuid}) :type requires: tuple of variable values and deployment info :rtype: None """ requires = format_input_variables(requires) lf_name = XMLRegistery.get_ressource(state_xpath_uri, "lifecycle") state_name = XMLRegistery.get_ressource(state_xpath_uri, "state") logger.debug("state-goto %s %s %s" % ( lf_name, state_name, requires)) return self.lifecycle_by_name(lf_name).state_goto(state_name, requires)
[docs] def provide(self, provide_xpath): """Return provides that match provide_xpath and that can be reached (OS_TYPE). :param provide_xpath: xpath to provide :type provide_xpath: str :return: list of provides that match provide_xpath :rtype: [:py:class:`Provide`] """ matches = XMLRegistery.find_all_elts(provide_xpath) acc = IterContainer() for m in matches: if XMLRegistery.is_ressource(m, "provide"): provide_name = XMLRegistery.get_ressource(m, "provide") if provide_name not in STATE_RESERVED_METHODS: lf_name = XMLRegistery.get_ressource(m, "lifecycle") lf = self.lifecycle_by_name(lf_name) state_name = XMLRegistery.get_ressource(m, "state") state = lf.state_by_name(state_name) if (lf._is_state_in_stack(state) or lf.provide_call_path(state) != []): acc.append(state.provide_by_name(provide_name)) return acc
[docs] def provide_call_requires(self, provide_xpath_uri, path_idx=0): """Requires for the provide. :param provide_xpath_uri: unique xpath to provide :type provide_xpath_uri: str :param path_idx: path to use when there is multiple paths to go to the provide :type path_idx: int :return: list of provides to call it order to call provide_xpath_uri :rtype: [:py:class:`Provide`] """ lf_name = XMLRegistery.get_ressource(provide_xpath_uri, "lifecycle") state_name = XMLRegistery.get_ressource(provide_xpath_uri, "state") return self.lifecycle_by_name(lf_name).provide_call_requires(state_name, path_idx)
[docs] def provide_call_path(self, provide_xpath): """Paths for provides that matches provide_xpath. :param provide_xpath: xpath to provide :type provide_xpath: str :return: list of paths to call provides that match provide_xpath :rtype: [(:py:class:`Provide`, [path, ...])] """ matches = XMLRegistery.find_all_elts(provide_xpath) acc = [] for m in matches: if XMLRegistery.is_ressource(m, "provide"): provide_name = XMLRegistery.get_ressource(m, "provide") if provide_name not in STATE_RESERVED_METHODS: lf_name = XMLRegistery.get_ressource(m, "lifecycle") lf = self.lifecycle_by_name(lf_name) state_name = XMLRegistery.get_ressource(m, "state") state = lf.state_by_name(state_name) provide = state.provide_by_name(provide_name) acc.append((provide, lf.provide_call_path(state_name))) return acc
[docs] def provide_call_validate(self, provide_xpath_uri, requires=[], path_idx=0): """Validate requires to call the provide :param xpath: unique xpath of the provide to call :type xpath: str :param requires: variable values to fill the requires :: ([ ("//xpath/to/variable", {0: value}), ("//xpath/to/variable", {0: value}) ], {'source' : xpath, 'id': uuid}) :type requires: tuple of variable values and deployment info :return: list of validated provides to call in order to call provide_xpath_uri :rtype: {'errors': bool, 'xpath': xpath, 'requires': [:class:`Provide`]} """ variables_values = format_input_variables(requires) logger.debug("Validating variables %s" % variables_values) # check that all requires are validated # copy requires we don't want to fill variables yet requires = copy.deepcopy(self.provide_call_requires(provide_xpath_uri)) try: requires.append(copy.deepcopy( self.from_xpath(provide_xpath_uri, "provide"))) except DoesNotExist: pass errors = False for provide in requires: try: provide.fill(variables_values) provide.validate() except ValidationError as e: logger.debug("Validation error on provide '%s'" % provide.get_xpath()) logger.debug(" on require '%s'" % e.require_name) logger.debug(" on variable '%s'" % e.variable_name) logger.debug(" with msg: %s" % e.msg) errors = True return {'xpath': provide_xpath_uri, 'errors': errors, 'requires': requires}
[docs] def provide_call(self, provide_xpath_uri, requires=[], path_idx=0): """Call a provide of a lifecycle and go to provider state if needed :param xpath: xpath of the provide to call :type xpath: str :param requires: variable values to fill the requires :: ([ ("//xpath/to/variable", {0: value}), ("//xpath/to/variable", {0: value}) ], {'source' : xpath, 'id': uuid}) :type requires: tuple of variable values and deployment info :return: provide_xpath_uri call result """ requires = format_input_variables(requires) logger.debug("Provide call %s" % provide_xpath_uri) # be sure that the provide can be validated # we don't want to change states # if we can't call the provide in the end if not armonic.common.DONT_VALIDATE_ON_CALL: errors = self.provide_call_validate(provide_xpath_uri, requires)['errors'] if errors: msg = ("Provided values doesn't met provide requires." + " Call provide_call_validate() to know errors.") logger.error(msg) raise ValidationError(msg=msg) requires = format_input_variables(requires) lf_name = XMLRegistery.get_ressource(provide_xpath_uri, "lifecycle") state_name = XMLRegistery.get_ressource(provide_xpath_uri, "state") provide_name = XMLRegistery.get_ressource(provide_xpath_uri, "provide") logger.debug("Calling provide %s" % provide_xpath_uri) return self.lifecycle_by_name(lf_name).provide_call(state_name, provide_name, requires, path_idx)
[docs] def to_dot(self, lf_name, reachable=False): """Return the dot string of a lifecyle object :param lf_name: name of the lifecycle object :type lf_name: str :rtype: dot file string""" return self.lifecycle_by_name(lf_name).to_dot(reachable=reachable)
[docs] def to_primitive(self, lf_name, reachable=False): """Return a serialized Lifecycle object :param lf_name: name of the :class:`Lifecycle` object :type lf_name: str :return: serialized :class:`Lifecycle` object :rtype: dict""" return self.lifecycle_by_name(lf_name).to_primitive(reachable=reachable)
[docs] def uri(self, xpath="//", relative=False, resource=None): """Return the list of xpath_uris that match this xpath. :type xpath: str :param xpath: an xpath string :type relative: bool :param relative: If true, returns relative xpath :type resource: str :param resource: Returns only xpath that describe this resource type :return: list of xpaths :rtype: [xpath_uri] """ ret = XMLRegistery.find_all_elts(xpath) if relative: acc = [] for x in ret: try: if (resource is None or XMLRegistery.is_ressource(x, resource)): acc.append(x.split("/", 2)[2]) except IndexError: pass return acc else: return ret
[docs] def from_xpath(self, xpath, ret="lifecycle"): """From a xpath try to get the object of type ``ret`` :param xpath: xpath to a ressource :type xpath: str :param ret: object type to return (lifecycle, state, provide, require, variable) :type ret: str :rtype: :class:`Lifecycle` | :class:`State` | :class:`Provide` | :class:`Require` | :class:`Variable` """ ressource_obj = self ressources_types = ("lifecycle", "state", "provide", "require", "variable") for ressource_type in ressources_types: ressource_name = XMLRegistery.get_ressource(xpath, ressource_type) ressource_obj = getattr(ressource_obj, "%s_by_name" % ressource_type)(ressource_name) if ressource_type == ret: return ressource_obj raise DoesNotExist("Can't find object")
def xpath(self, xpath): return XMLRegistery.xpath(xpath)
[docs] def to_xml(self, xpath=None): """Return the xml representation of the :class:`LifecyleManager`.""" return XMLRegistery.to_string(xpath)
def __repr__(self): return "<LifecyleManager:%s>" % self.name