Source code for illumio.util.functions

# -*- coding: utf-8 -*-

"""This module provides helper functions and decorators for common use-cases.

Copyright:
    © 2022 Illumio

License:
    Apache2, see LICENSE for more details.
"""
import functools
import re
import socket
import sys
import typing
import warnings
from dataclasses import dataclass
from urllib.parse import urlparse

from illumio._version import version
from illumio.exceptions import IllumioException, IllumioIntegerValidationException
from .constants import ACTIVE, DRAFT, PCE_APIS


[docs]def ignore_empty_keys(o: dict): """Removes keys with None-type values from the provided dict. Used for JSON encoding to avoid schema errors due to empty or invalid parameters. """ return {k: v for (k, v) in o if v is not None}
[docs]def convert_draft_href_to_active(href: str) -> str: """Given an HREF string, converts policy version to active. If an active HREF is provided, this function has no effect. Args: href (str): PCE object HREF. Returns: str: active policy version HREF. """ return href.replace('/{}/'.format(DRAFT), '/{}/'.format(ACTIVE))
[docs]def convert_active_href_to_draft(href: str) -> str: """Given an HREF string, converts policy version to draft. If a draft HREF is provided, this function has no effect. Args: href (str): PCE object HREF. Returns: str: draft policy version HREF. """ return href.replace('/{}/'.format(ACTIVE), '/{}/'.format(DRAFT))
[docs]def deprecated(deprecated_in, message=None): """ Deprecation decorator, adapted from https://stackoverflow.com/a/30253848 Will emit a warning when the decorated function is called. """ def _deprecated(func): @functools.wraps(func) def wrapper(*args, **kwargs): default_message = "Call to deprecated function {}. Deprecated in version {}, current version is {}.".format( func.__name__, deprecated_in, version ) warning_message = message or default_message warnings.warn(warning_message, category=DeprecationWarning, stacklevel=2) return func(*args, **kwargs) return wrapper return _deprecated
[docs]def pce_api(name: str, endpoint: str = None, is_sec_policy=False, is_global=False): """Decorates an IllumioObject subclass to denote it as a PCE API object type. This registers the type in the PCE_APIS mapping used to determine whether a given name corresponds to an API-accessible type. We can then leverage __getattr__ to instantiate a generic API interface for any registered type (with some caveats, see the _PCEObjectAPI documentation). By default, registers the endpoint as /{name}, but the endpoint can also be specified in the decorator function call. For example: >>> @pce_api('labels', endpoint='/labels') >>> class Label(IllumioObject): ... ... >>> pce = PolicyComputeEngine(...) >>> # the 'labels' name is registered, and so we can >>> # call /labels endpoints through the _PCEObjectAPI interface >>> labels = pce.labels.get() >>> labels [ Label( href='/orgs/1/labels/1', key='role', value='R-DB', ... ), ... ] Args: name (str): the name of the API. used as a PolicyComputeEngine attribute name to generate the API interface. endpoint (str, optional): _description_. Defaults to None. is_sec_policy (bool, optional): whether or not the object reflects a security policy API with the sec_policy/{pversion} prefix. Defaults to False. is_global (bool, optional): whether or not the object reflects a global API, such as /health or /users. These APIs operate on the entire PCE rather than a single tenant, and don't need the /orgs/{org_id} prefix. """ def _decorator(cls): @dataclass class __PCEApi: name: str endpoint: str object_class: object is_sec_policy: bool is_global: bool PCE_APIS[name] = __PCEApi( name=name, endpoint=endpoint or '/{}'.format(name), object_class=cls, is_sec_policy=is_sec_policy, is_global=is_global ) return cls return _decorator
[docs]def parse_url(url: str) -> tuple: """Parses given URL into its scheme and hostname, stripping port and path. Args: url (str): URL to parse. Returns: tuple: parsed (scheme, hostname) """ pattern = re.compile('^\w+://') if not re.match(pattern, url): url = 'https://{}'.format(url) parsed = urlparse(url) scheme = parsed.scheme if scheme not in ('http', 'https'): scheme = 'https' # only support http(s) return scheme, parsed.hostname
[docs]def convert_protocol(protocol: str) -> int: """Given a protocol name, returns the integer ID of that protocol. Usage: >>> convert_protocol('tcp') 6 Args: protocol (str): case-insensitive protocol string, e.g. 'tcp', 'UDP' Raises: IllumioException: if an invalid protocol name is provided. Returns: int: the integer ID of the given protocol, e.g. 17 for 'udp' """ try: return socket.getprotobyname(protocol) except: raise IllumioException('Invalid protocol name: {}'.format(protocol))
def validate_int(val: typing.Any, minimum: int=0, maximum: int=sys.maxsize) -> None: """Validates a given value is an integer and is within min <= val <= max. Args: val (Any): value to validate. min (int, optional): validation lower bound. Defaults to 0. max (int, optional): validation upper bound. Defaults to sys.maxsize. Raises: IllumioIntegerValidationException: if an invalid value is provided. """ try: valid = minimum <= int(val) <= maximum except: # catch the ValueError for invalid values valid = False if not valid: raise IllumioIntegerValidationException(val, minimum, maximum) if hasattr(typing, 'get_origin'): # python 3.8+ - introduces the get_origin function def isunion(type_): return typing.get_origin(type_) is typing.Union def islist(type_): if type_ is list: return True return typing.get_origin(type_) is list elif hasattr(typing, '_GenericAlias'): # python 3.7 - changes meta types to be based off the # _GenericAlias supertype. __extra__ is now __origin__ def isunion(type_): if isinstance(type_, typing._GenericAlias): return type_.__origin__ is typing.Union return False def islist(type_): if type_ is list: return True if isinstance(type_, typing._GenericAlias): return type_.__origin__ is list return False else: # python 3.6 def isunion(type_): return isinstance(type_, typing._Union) def islist(type_): if type_ is list: return True # in 3.6, List's type is GenericMeta, so we # instead check the __extra__ param if hasattr(type_, '__extra__'): return type_.__extra__ is list return False __all__ = [ 'ignore_empty_keys', 'convert_draft_href_to_active', 'convert_active_href_to_draft', 'deprecated', 'pce_api', 'parse_url', 'convert_protocol', 'validate_int', 'isunion', 'islist', ]