Source code for xtd.core.config.checkers

# -*- coding: utf-8
#------------------------------------------------------------------#

__author__ = "Xavier MARCELET <xavier@marcelet.com>"

#------------------------------------------------------------------#
try:
  from urlparse import urlparse
except ImportError:
  from urllib.parse import urlparse

import json
import os
import re
import socket
from functools import partial

from ..error import ConfigValueFileError, ConfigValueFileModeError
from ..error import ConfigValueDirError, ConfigValueDirModeError
from ..error import ConfigValueTypeError, ConfigValueLimitsError
from ..error import ConfigValueEnumError, ConfigValueError
from ..tools import url

#------------------------------------------------------------------#


[docs]def check_file(p_section, p_name, p_value, p_read=False, p_write=False, p_execute=False): """check that given config parameter is a valid file with given rwx attributes If p_value does not exists and only write attribute is requested, the function checks that the file can be created in its parent directory Args: p_section (str): parameter section name p_name (str): parameter name p_value (str): parameter value p_read (str): target file should be readable, default ``False`` p_write (str): target file should be writable, default ``False`` p_execute (str): target file should be executable, default ``False`` Returns: bool: file absolute path Raises: ConfigValueFileError: p_value is a directory ConfigValueFileModeError: p_value dosen't meet requested rwx attributes """ l_absFilePath = os.path.expanduser(p_value) l_absFilePath = os.path.abspath(l_absFilePath) if os.path.isdir(l_absFilePath): raise ConfigValueFileError(p_section, p_name, l_absFilePath) if not os.path.exists(l_absFilePath): if p_read or not _check_mode(os.path.dirname(l_absFilePath), p_write=True): raise ConfigValueFileModeError(p_section, p_name, p_value, p_read, p_write, p_execute) else: if not _check_mode(l_absFilePath, p_read, p_write, p_execute): raise ConfigValueFileModeError(p_section, p_name, p_value, p_read, p_write, p_execute) return l_absFilePath
# ------------------------------------------------------------------------- #
[docs]def check_dir(p_section, p_name, p_value, p_read=False, p_write=False, p_execute=False): """check that given value is a valid directory for given rwx attributes Args: p_section (str): parameter section name p_name (str): parameter name p_value (str): target directory path p_read (str): target file should be readable, default ``False`` p_write (str): target file should be writable, default ``False`` p_execute (str): target file should be executable, default ``False`` Returns: bool: directory absolute path Raises: ConfigValueDirError: p_value is not a directory ConfigValueDirModeError: directory doesn't meet requested rwx attributes """ l_absDirPath = os.path.expanduser(p_value) l_absDirPath = os.path.abspath(l_absDirPath) if not os.path.isdir(l_absDirPath): raise ConfigValueDirError(p_section, p_name, l_absDirPath) if not _check_mode(l_absDirPath, p_read, p_write, p_execute): raise ConfigValueDirModeError(p_section, p_name, p_value, p_read, p_write, p_execute) return l_absDirPath
# ------------------------------------------------------------------------- #
[docs]def check_int(p_section, p_name, p_value, p_min=None, p_max=None): """check that given value is a valid integer If not None, the function will insure that value fits the requested minimum and maximum parameters Args: p_section (str): parameter section name p_name (str): parameter name p_value (str): target value p_min (int): minimum accepted value, default : ``None`` p_max (int): maximum accepted value, default : ``None`` Returns: int: integer converted value Raises: ConfigValueTypeError: value is not an integer, nor a int-convertible string ConfigValueLimitsError: value doesn't match requested min and max constraints """ if isinstance(p_value, int) and not isinstance(p_value, bool): l_value = p_value else: if isinstance(p_value, str): try: l_value = int(p_value) except ValueError: raise ConfigValueTypeError(p_section, p_name, p_value, ConfigValueTypeError.INT) else: raise ConfigValueTypeError(p_section, p_name, p_value, ConfigValueTypeError.INT) if (p_min != None) and (l_value < p_min): raise ConfigValueLimitsError(p_section, p_name, l_value, p_min, p_max) if (p_max != None) and (l_value > p_max): raise ConfigValueLimitsError(p_section, p_name, l_value, p_min, p_max) return l_value
# ------------------------------------------------------------------------- #
[docs]def check_float(p_section, p_name, p_value, p_min=None, p_max=None): """check that given value is a valid float Same as :py:func:`check_int` Args: p_section (str): parameter section name p_name (str): parameter name p_value (str): target value p_min (float): minimum accepted value, default : None p_max (float): maximum accepted value, default : None Raises: ConfigValueTypeError: value is not an integer, nor a int-convertible string ConfigValueLimitsError: value doesn't match requested min and max constraints Returns: float: float converted value """ if isinstance(p_value, float): l_value = p_value else: if isinstance(p_value, str): try: l_value = float(p_value) except ValueError: raise ConfigValueTypeError(p_section, p_name, p_value, ConfigValueTypeError.FLOAT) else: raise ConfigValueTypeError(p_section, p_name, p_value, ConfigValueTypeError.FLOAT) if (p_min != None) and (l_value < p_min): raise ConfigValueLimitsError(p_section, p_name, l_value, p_min, p_max) if (p_max != None) and (l_value > p_max): raise ConfigValueLimitsError(p_section, p_name, l_value, p_min, p_max) return l_value
# ------------------------------------------------------------------------- #
[docs]def check_bool(p_section, p_name, p_value): """check that given value is a valid boolean Valid boolean are : * native bool object * str object in the following list : ``["on", "yes", "true", "off", "no", "false"]`` (case insensitive) Args: p_section (str): parameter section name p_name (str): parameter name p_value (str): target value Raises: ConfigValueTypeError: invalid input boolean Returns: bool: converted value """ if isinstance(p_value, bool): return p_value if not isinstance(p_value, str): raise ConfigValueTypeError(p_section, p_name, p_value, ConfigValueTypeError.BOOL) if ((p_value.lower() == 'true') or (p_value.lower() == 'yes') or (p_value.lower() == 'on')): return True if ((p_value.lower() == 'false') or (p_value.lower() == 'no') or (p_value.lower() == 'off')): return False raise ConfigValueTypeError(p_section, p_name, p_value, ConfigValueTypeError.BOOL)
# ------------------------------------------------------------------------- #
[docs]def check_enum(p_section, p_name, p_value, p_values): """ check that given value matches a set of possible values Args: p_section (str): parameter section name p_name (str): parameter name p_value (str): input value p_values (list): set of possible authorized values Raises: ConfigValueEnumError: value not found in possible values Returns: bool: input value """ if not p_value in p_values: raise ConfigValueEnumError(p_section, p_name, p_value, p_values) return p_value
# ------------------------------------------------------------------------- # def _check_mode(p_path, p_read=False, p_write=False, p_execute=False): if not os.path.exists(p_path): return False l_mode = os.F_OK if p_read: l_mode = l_mode | os.R_OK if p_write: l_mode = l_mode | os.W_OK if p_execute: l_mode = l_mode | os.X_OK if os.access(p_path, l_mode): return True return False # ------------------------------------------------------------------------- #
[docs]def check_mail(p_section, p_name, p_value): """ check that given value is a syntactical valid mail address Args: p_section (str): parameter section name p_name (str): parameter name p_value (str): input value Raises: ConfigValueValueError: value not an email Returns: bool: input value """ l_mailRgx = r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,4}" if not re.match("^%s$" % l_mailRgx, p_value): if not re.match("^[^<]*<%s>$" % l_mailRgx, p_value): l_message = "value '%s' is not an email address" % p_value raise ConfigValueError(p_section, p_name, l_message) return p_value
# ------------------------------------------------------------------------- #
[docs]def check_array(p_section, p_name, p_value, p_check=None, p_delim=","): """ check that given value is convertible to array A str value is converted to array by splitting each comma separated elements Additionally, the function checks that each elements meets ``p_check`` function requirement. Example: .. code-block:: python l_value = "1,2,3,4,5" l_value = check_array(l_value, check_float) print(l_value) # [1.0, 2.0, 3.0, 4.0] l_value = "on,on;off,no;true,false" l_value = check_array(l_value, is_array(p_check=check_bool), p_delim=";") print(l_value) # [[True, True], [False, False], [True, False]] Args: p_section (str): parameter section name p_name (str): parameter name p_value (str): input value p_check (function): sub-checker to call on each elements, default ``None`` p_delim (str) : used as delimiter to split given str value Raises: ConfigValueValueError: value not an email Returns: list: array-converted value """ l_res = [] l_value = p_value if not isinstance(l_value, list): l_value = l_value.split(p_delim) for c_val in l_value: if p_check: l_res.append(p_check(p_section, p_name, c_val)) else: l_res.append(c_val) return l_res
# ------------------------------------------------------------------------- #
[docs]def check_host(p_section, p_name, p_value): """ check that value is locally resolvable hostname Args: p_section (str): parameter section name p_name (str): parameter name p_value (str): input value Raises: ConfigValueValueError: value not an hostname Returns: str: input value """ try: socket.gethostbyname(p_value) except socket.gaierror: l_message = "host '%s' is not valid" % p_value raise ConfigValueError(p_section, p_name, l_message) return p_value
# ------------------------------------------------------------------------- #
[docs]def check_json(p_section, p_name, p_value): """ check that value is a valid json if value is str, performs a :py:func:json.loads Args: p_section (str): parameter section name p_name (str): parameter name p_value (str): input value Raises: ConfigValueValueError: value not a json Returns: dict: dict-converted value """ if isinstance(p_value, dict): return p_value try: p_value = json.loads(p_value) except (ValueError, TypeError) as l_error: raise ConfigValueError(p_section, p_name, "invalid json : %s" % str(l_error)) return p_value
# ------------------------------------------------------------------------- #
[docs]def check_socket(p_section, p_name, p_value, p_schemes=None, p_checkUnix=False): """ check that value is a valid socket url Args: p_section (str): parameter section name p_name (str): parameter name p_value (str): input value p_scheme (list) : valid url schemes p_checkUnix (bool) : authorize "unix+//<file>/path" based url Raises: ConfigValueValueError: value a valid socket url Returns: str: input value """ if p_schemes is None: p_schemes = [] l_parts = urlparse(p_value) if len(p_schemes) and (not l_parts[0] in p_schemes): l_format = "invalid url '%s', scheme '%s' not in '%s'" l_message = l_format % (p_value, l_parts[0], str(p_schemes)) raise ConfigValueError(p_section, p_name, l_message) l_result = url.parse_unix(p_value) if p_checkUnix and l_result[1]: check_file(p_section, p_name, l_result[1], p_read=True, p_write=True) return p_value
# ------------------------------------------------------------------------- #
[docs]def is_file(*p_args, **p_kwds): """Currified version of :py:func:`check_file`""" return partial(check_file, *p_args, **p_kwds) # pragma: no cover
[docs]def is_dir(*p_args, **p_kwds): """Currified version of :py:func:`check_dir`""" return partial(check_dir, *p_args, **p_kwds) # pragma: no cover
[docs]def is_int(*p_args, **p_kwds): """Currified version of :py:func:`check_int`""" return partial(check_int, *p_args, **p_kwds) # pragma: no cover
[docs]def is_float(*p_args, **p_kwds): """Currified version of :py:func:`check_float`""" return partial(check_float, *p_args, **p_kwds) # pragma: no cover
[docs]def is_bool(*p_args, **p_kwds): """Currified version of :py:func:`check_bool`""" return partial(check_bool, *p_args, **p_kwds) # pragma: no cover
[docs]def is_enum(*p_args, **p_kwds): """Currified version of :py:func:`check_enum`""" return partial(check_enum, *p_args, **p_kwds) # pragma: no cover
[docs]def is_mail(*p_args, **p_kwds): """Currified version of :py:func:`check_mail`""" return partial(check_mail, *p_args, **p_kwds) # pragma: no cover
[docs]def is_array(*p_args, **p_kwds): """Currified version of :py:func:`check_array`""" return partial(check_array, *p_args, **p_kwds) # pragma: no cover
[docs]def is_host(*p_args, **p_kwds): """Currified version of :py:func:`check_host`""" return partial(check_host, *p_args, **p_kwds) # pragma: no cover
[docs]def is_json(*p_args, **p_kwds): """Currified version of :py:func:`check_json`""" return partial(check_json, *p_args, **p_kwds) # pragma: no cover
[docs]def is_socket(*p_args, **p_kwds): """Currified version of :py:func:`check_socket`""" return partial(check_socket, *p_args, **p_kwds) # pragma: no cover
# ------------------------------------------------------------------------- # # Local Variables: # ispell-local-dictionary: "american" # End: