# -*- coding: utf-8
#------------------------------------------------------------------#
"""
.. inheritance-diagram:: xtd.core.stat.counter
:parts: 1
"""
__author__ = "Xavier MARCELET <xavier@marcelet.com>"
#------------------------------------------------------------------#
import threading
import sys
import time
import multiprocessing
from ..error import XtdError
#------------------------------------------------------------------#
[docs]class BaseCounter(object):
""" Abstract base counter
The almost-empty shell insure base methods are protected
by a lock.
Args:
p_name (str): object name
"""
[docs] def __init__(self, p_name):
self.m_lock = multiprocessing.Lock()
self.m_name = p_name
[docs] def visit(self, p_visitor):
""" Visit object tree with visitor """
with self.m_lock:
self._visit_safe(p_visitor)
[docs] def update(self):
""" Update object
Generally called by user just before visiting the object
in order to gather "fresh" data
"""
with self.m_lock:
self._update_safe()
def _visit_safe(self, p_visitor):
raise NotImplementedError
def _update_safe(self):
raise NotImplementedError
#------------------------------------------------------------------#
[docs]class Value(BaseCounter):
"""Thread-safe numeric value holder
Args:
p_name (str): Object's name
p_value (numeric): Internal value, type depends on ``p_type``
p_type (str) : One character type spec, see :py:class:`multiprocessing.Value`
Raises:
TypeError: invalid ``p_type``
TypeError: invalid ``p_value`` for given ``p_type``
**Visitors**
Value visitor must follow the following prototype:
``function(p_name, p_value)``
- **p_name** (str): name of visited Value
- **p_value** (numeric|str): internal value or ``NaN`` is unset
"""
[docs] def __init__(self, p_name, p_value = None, p_type='i'):
super(Value, self).__init__(p_name)
if p_value != None:
self.m_unset = False
self.m_value = multiprocessing.Value(p_type, p_value)
else:
self.m_unset = True
self.m_value = multiprocessing.Value(p_type, 0)
[docs] def unset(self):
""" Make the current value undefined """
with self.m_lock:
self.m_unset = True
# pylint: disable=invalid-name
@property
def val(self):
""" (Property) internal value
If set to None, the current value is ``undefined``
Returns:
(numeric) : current internal value, None if unset
Raises:
TypeError: affected value dosen't match constructor type
"""
with self.m_lock:
if self.m_unset:
return None
return self.m_value.value
@val.setter
def val(self, p_val):
with self.m_lock:
if p_val != None:
self.m_unset = False
self.m_value.value = p_val
else:
self.m_unset = True
[docs] def incr(self, p_val = 1):
""" Increments the current value
Args:
p_val (numeric): add p_val to current internal value
Raises:
TypeError: given value dosen't match constructor type
"""
with self.m_lock:
self.m_value.value += p_val
[docs] def decr(self, p_val = 1):
""" Decrements the current value
Args:
p_val (numeric): subtract p_val to current internal value
Raises:
TypeError: given value dosen't match constructor type
"""
self.incr(-1 * p_val)
def _visit_safe(self, p_visitor):
""" Apply visitor to internal value
Raises:
TypeError: visitor as invalid prototype
"""
l_val = self.m_value.value
if self.m_unset:
l_val = "NaN"
p_visitor(self.m_name, l_val)
def _update_safe(self):
""" Noop """
pass
#------------------------------------------------------------------#
[docs]class Int32(Value):
""" Value specialization for signed 32 bits integer """
TYPE = 'i'
""":py:class:`multiprocessing.Value` type spec"""
[docs] def __init__(self, p_name, p_value = None):
super(Int32, self).__init__(p_name, p_value, self.TYPE)
[docs]class Int64(Value):
""" Value specialization for signed 64 bits integer """
TYPE = 'l'
""":py:class:`multiprocessing.Value` type spec"""
[docs] def __init__(self, p_name, p_value = None):
super(Int64, self).__init__(p_name, p_value, self.TYPE)
[docs]class UInt32(Value):
""" Value specialization for unsigned 32 bits integer """
TYPE = "I"
""":py:class:`multiprocessing.Value` type spec"""
[docs] def __init__(self, p_name, p_value = None):
super(UInt32, self).__init__(p_name, p_value, self.TYPE)
[docs]class UInt64(Value):
""" Value specialization for unsigned 64 bits integer """
TYPE = "L"
""":py:class:`multiprocessing.Value` type spec"""
[docs] def __init__(self, p_name, p_value = None):
super(UInt64, self).__init__(p_name, p_value, self.TYPE)
[docs]class Float(Value):
""" Value specialization for float """
TYPE = 'f'
""":py:class:`multiprocessing.Value` type spec"""
[docs] def __init__(self, p_name, p_value = None):
super(Float, self).__init__(p_name, p_value, self.TYPE)
[docs]class Double(Value):
""" Value specialization for double """
TYPE = 'd'
""":py:class:`multiprocessing.Value` type spec"""
[docs] def __init__(self, p_name, p_value = None):
super(Double, self).__init__(p_name, p_value, self.TYPE)
#------------------------------------------------------------------#
[docs]class Composed(BaseCounter):
""" Manage a collection child counters """
[docs] def __init__(self, p_name):
super(Composed, self).__init__(p_name)
self.m_childs = []
[docs] def register(self, p_counter):
""" Register a child counter
Current object name is prepend to registered child
name with the following format : ``<parent-name>.<child-name>``
Args:
p_counter (BaseCounter): child counter to add
"""
with self.m_lock:
if self.m_name:
p_counter.m_name = self.m_name + "." + p_counter.m_name
self.m_childs.append(p_counter)
def _visit_safe(self, p_visitor):
for c_child in self.m_childs:
c_child.visit(p_visitor)
def _update_safe(self):
for c_child in self.m_childs:
c_child.update()
#------------------------------------------------------------------#
[docs]class TimedSample(Composed):
""" Holds the min, max and average value of collected items over a fixed period of time
When no items are available for the last ``p_timeMs``, the 3 sub counters are
undefined, thus, collected by visitors as ``NaN``.
Args:
p_name (str) : counter name
p_timeMs (int) : maximum amount of time (millisecond) to keep collected values
p_maxSamples (int) : maximum amount of values to keep
p_type (str) : internal type representation, see :py:class:`multiprocessing.Value`
"""
[docs] def __init__(self, p_name, p_timeMs = 10000, p_maxSamples = 20000, p_type = Int32.TYPE):
super(TimedSample, self).__init__(p_name)
self.m_samples = []
self.m_timeMs = p_timeMs
self.m_maxSize = p_maxSamples
self.m_rttMin = Value("min", None, p_type)
self.m_rttMax = Value("max", None, p_type)
self.m_rttAvg = Value("avg", None, p_type)
self.register(self.m_rttMin)
self.register(self.m_rttMax)
self.register(self.m_rttAvg)
[docs] def push(self, p_val):
""" Add a value in collection
p_val (int): value to add
"""
with self.m_lock:
try:
l_val = int(p_val)
except ValueError:
raise TypeError
self.m_samples.append((time.time(), l_val))
if len(self.m_samples) > self.m_maxSize:
self.m_samples = self.m_samples[-self.m_maxSize:]
# pylint: disable=invalid-name
def _update_safe(self):
l_max = time.time() - float(self.m_timeMs / 1000.0)
self.m_samples = [ x for x in self.m_samples if x[0] >= l_max ]
l_size = len(self.m_samples)
if not l_size:
self.m_rttMin.unset()
self.m_rttMax.unset()
self.m_rttAvg.unset()
else:
l_max = 0
l_sum = 0
l_min = sys.maxsize
for c_val in self.m_samples:
l_min = min(l_min, c_val[1])
l_max = max(l_max, c_val[1])
l_sum += c_val[1]
self.m_rttMin.val = int(l_min)
self.m_rttMax.val = int(l_max)
self.m_rttAvg.val = int(l_sum / l_size)
#------------------------------------------------------------------#
[docs]class Perf(TimedSample):
""" Designed to monitor the min, max and average time of an event
At event start call :py:meth:`work_begin` to store the current time.
At event end, call :py:meth:`work_end` to calculate the time detla and add it
the base class samples that monitors the min max and average values
The time resolution is the microsecond (10^-6 second).
Note:
Events beginnings and ends can't be interleaved in the same thread.
"""
[docs] def __init__(self, p_name, p_timeMs = 10000, p_maxSamples = 20000):
super(Perf, self).__init__(p_name, p_timeMs, p_maxSamples, p_type=UInt64.TYPE)
self.m_startTimes = {}
[docs] def work_begin(self):
""" Record begin time of an event
Raises:
CounterError: called twice from same thread with no :py:meth:`work_end` between them
"""
l_name = threading.current_thread().name
l_value = self.m_startTimes.get(l_name, None)
if l_value is not None:
raise CounterError(__name__, self.m_name, "missing work_end for thread '%s'", l_name)
self.m_startTimes[l_name] = time.time()
[docs] def work_end(self):
""" Record end time of an event and push it into base class
Raises:
CounterError: called without calling :py:meth:`work_begin` first from same thread
"""
l_name = threading.current_thread().name
l_value = self.m_startTimes.get(l_name, None)
if l_value is None:
raise CounterError(__name__, self.m_name, "missing work_begin for thread '%s'", l_name)
self.push((time.time() - self.m_startTimes[l_name]) * 1000000)
del self.m_startTimes[l_name]
[docs]class CounterError(XtdError):
""" Generic counter error class
Args:
p_module (str) : module name
p_name (str) : counter name
p_msg (str) : error message
p_args (tuple) : generic message format arguments
p_kwds (dict) : generic message format keywords
Attributes:
m_name (str) : name of counter that raised the error
"""
[docs] def __init__(self, p_module, p_name, p_msg, *p_args, **p_kwds):
self.m_name = p_name
l_msg = "error with counter '%(name)s' : %(msg)s" % {
"name" : p_name,
"msg" : p_msg.format(*p_args, **p_kwds)
}
super(CounterError, self).__init__(p_module, l_msg)
#------------------------------------------------------------------#
# Local Variables:
# ispell-local-dictionary: "american"
# End: