# -*- coding: utf-8 -*-
"""This module provides helper functions and decorators for common use-cases.
Copyright:
© 2022 Illumio
License:
Apache2, see LICENSE for more details.
"""
import functools
import re
import socket
import sys
import typing
import warnings
from dataclasses import dataclass
from urllib.parse import urlparse
from illumio._version import version
from illumio.exceptions import IllumioException, IllumioIntegerValidationException
from .constants import ACTIVE, DRAFT, PCE_APIS
[docs]def ignore_empty_keys(o: dict):
"""Removes keys with None-type values from the provided dict.
Used for JSON encoding to avoid schema errors due to empty or invalid parameters.
"""
return {k: v for (k, v) in o if v is not None}
[docs]def convert_draft_href_to_active(href: str) -> str:
"""Given an HREF string, converts policy version to active.
If an active HREF is provided, this function has no effect.
Args:
href (str): PCE object HREF.
Returns:
str: active policy version HREF.
"""
return href.replace('/{}/'.format(DRAFT), '/{}/'.format(ACTIVE))
[docs]def convert_active_href_to_draft(href: str) -> str:
"""Given an HREF string, converts policy version to draft.
If a draft HREF is provided, this function has no effect.
Args:
href (str): PCE object HREF.
Returns:
str: draft policy version HREF.
"""
return href.replace('/{}/'.format(ACTIVE), '/{}/'.format(DRAFT))
[docs]def deprecated(deprecated_in, message=None):
"""
Deprecation decorator, adapted from https://stackoverflow.com/a/30253848
Will emit a warning when the decorated function is called.
"""
def _deprecated(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
default_message = "Call to deprecated function {}. Deprecated in version {}, current version is {}.".format(
func.__name__, deprecated_in, version
)
warning_message = message or default_message
warnings.warn(warning_message, category=DeprecationWarning, stacklevel=2)
return func(*args, **kwargs)
return wrapper
return _deprecated
[docs]def pce_api(name: str, endpoint: str = None, is_sec_policy=False, is_global=False):
"""Decorates an IllumioObject subclass to denote it as a PCE API object type.
This registers the type in the PCE_APIS mapping used to determine whether
a given name corresponds to an API-accessible type.
We can then leverage __getattr__ to instantiate a generic API interface for
any registered type (with some caveats, see the _PCEObjectAPI documentation).
By default, registers the endpoint as /{name}, but the endpoint can also be
specified in the decorator function call.
For example:
>>> @pce_api('labels', endpoint='/labels')
>>> class Label(IllumioObject):
... ...
>>> pce = PolicyComputeEngine(...)
>>> # the 'labels' name is registered, and so we can
>>> # call /labels endpoints through the _PCEObjectAPI interface
>>> labels = pce.labels.get()
>>> labels
[
Label(
href='/orgs/1/labels/1',
key='role',
value='R-DB',
...
),
...
]
Args:
name (str): the name of the API. used as a PolicyComputeEngine attribute name
to generate the API interface.
endpoint (str, optional): _description_. Defaults to None.
is_sec_policy (bool, optional): whether or not the object reflects a security
policy API with the sec_policy/{pversion} prefix. Defaults to False.
is_global (bool, optional): whether or not the object reflects a global API,
such as /health or /users. These APIs operate on the entire PCE rather
than a single tenant, and don't need the /orgs/{org_id} prefix.
"""
def _decorator(cls):
@dataclass
class __PCEApi:
name: str
endpoint: str
object_class: object
is_sec_policy: bool
is_global: bool
PCE_APIS[name] = __PCEApi(
name=name,
endpoint=endpoint or '/{}'.format(name),
object_class=cls,
is_sec_policy=is_sec_policy,
is_global=is_global
)
return cls
return _decorator
[docs]def parse_url(url: str) -> tuple:
"""Parses given URL into its scheme and hostname, stripping port and path.
Args:
url (str): URL to parse.
Returns:
tuple: parsed (scheme, hostname)
"""
pattern = re.compile('^\w+://')
if not re.match(pattern, url):
url = 'https://{}'.format(url)
parsed = urlparse(url)
scheme = parsed.scheme
if scheme not in ('http', 'https'):
scheme = 'https' # only support http(s)
return scheme, parsed.hostname
[docs]def convert_protocol(protocol: str) -> int:
"""Given a protocol name, returns the integer ID of that protocol.
Usage:
>>> convert_protocol('tcp')
6
Args:
protocol (str): case-insensitive protocol string, e.g. 'tcp', 'UDP'
Raises:
IllumioException: if an invalid protocol name is provided.
Returns:
int: the integer ID of the given protocol, e.g. 17 for 'udp'
"""
try:
return socket.getprotobyname(protocol)
except:
raise IllumioException('Invalid protocol name: {}'.format(protocol))
def validate_int(val: typing.Any, minimum: int=0, maximum: int=sys.maxsize) -> None:
"""Validates a given value is an integer and is within min <= val <= max.
Args:
val (Any): value to validate.
min (int, optional): validation lower bound. Defaults to 0.
max (int, optional): validation upper bound. Defaults to sys.maxsize.
Raises:
IllumioIntegerValidationException: if an invalid value is provided.
"""
try:
valid = minimum <= int(val) <= maximum
except: # catch the ValueError for invalid values
valid = False
if not valid:
raise IllumioIntegerValidationException(val, minimum, maximum)
if hasattr(typing, 'get_origin'):
# python 3.8+ - introduces the get_origin function
def isunion(type_):
return typing.get_origin(type_) is typing.Union
def islist(type_):
if type_ is list:
return True
return typing.get_origin(type_) is list
elif hasattr(typing, '_GenericAlias'):
# python 3.7 - changes meta types to be based off the
# _GenericAlias supertype. __extra__ is now __origin__
def isunion(type_):
if isinstance(type_, typing._GenericAlias):
return type_.__origin__ is typing.Union
return False
def islist(type_):
if type_ is list:
return True
if isinstance(type_, typing._GenericAlias):
return type_.__origin__ is list
return False
else:
# python 3.6
def isunion(type_):
return isinstance(type_, typing._Union)
def islist(type_):
if type_ is list:
return True
# in 3.6, List's type is GenericMeta, so we
# instead check the __extra__ param
if hasattr(type_, '__extra__'):
return type_.__extra__ is list
return False
__all__ = [
'ignore_empty_keys',
'convert_draft_href_to_active',
'convert_active_href_to_draft',
'deprecated',
'pce_api',
'parse_url',
'convert_protocol',
'validate_int',
'isunion',
'islist',
]