Source code for illumio.rules.rule

# -*- coding: utf-8 -*-

"""This module provides classes related to policy rules.

Copyright:
    © 2022 Illumio

License:
    Apache2, see LICENSE for more details.
"""
from dataclasses import dataclass
from typing import List, Union

from illumio.util import (
    JsonObject,
    Reference,
    MutableObject,
    pce_api,
    RESOLVE_AS_WORKLOADS
)
from illumio.policyobjects import Service, ServicePort

from .actor import Actor


@dataclass
class BaseRule(Reference):
    ingress_services: List[Union[Service, ServicePort]] = None
    providers: List[Actor] = None
    consumers: List[Actor] = None

    @classmethod
    def build(cls, providers: List[Union[str, Reference, dict]], consumers: List[Union[str, Reference, dict]],
            ingress_services: List[Union[JsonObject, dict, str]], **kwargs) -> 'BaseRule':
        services = []
        for service in ingress_services:
            if isinstance(service, JsonObject):
                services.append(service)
            elif type(service) is str:
                services.append(Service(href=service))
            else:
                service_type = Service if 'href' in service else ServicePort
                services.append(service_type.from_json(service))
        return cls(
            providers=[Actor.from_reference(provider) for provider in providers],
            consumers=[Actor.from_reference(consumer) for consumer in consumers],
            ingress_services=services,
            **kwargs
        )

    def _decode_complex_types(self):
        decoded_ingress_services = []
        if self.ingress_services:
            for service in self.ingress_services:
                service_type = Service if 'href' in service else ServicePort
                decoded_ingress_services.append(service_type.from_json(service))
        self.ingress_services = decoded_ingress_services
        super()._decode_complex_types()


@dataclass
class LabelResolutionBlock(JsonObject):
    providers: List[str] = None
    consumers: List[str] = None


[docs]@dataclass @pce_api('rules', endpoint='/sec_rules') class Rule(BaseRule, MutableObject): """Represents a security rule in the PCE. Each security rule defines one or more services on which traffic will be allowed from the defined providers to the defined consumers. Providers and consumers can be defined using static (workload HREF) or dynamic (label, IP list) references. By default, providers and consumers are resolved as workloads. See https://docs.illumio.com/core/21.5/Content/Guides/security-policy/create-security-policy/rules.htm Usage: >>> import illumio >>> pce = illumio.PolicyComputeEngine('pce.company.com', port=443, org_id=1) >>> pce.set_credentials('api_key', 'api_secret') >>> any_ip_list = pce.get_default_ip_list() >>> role_label = pce.labels.create({'key': 'role', 'value': 'R-Web'}) >>> app_label = pce.labels.create({'key': 'app', 'value': 'A-App'}) >>> env_label = pce.labels.create({'key': 'env', 'value': 'E-Prod'}) >>> loc_label = pce.labels.create({'key': 'loc', 'value': 'L-AWS'}) >>> ruleset = illumio.RuleSet( ... name='RS-LAB-ALLOWLIST', ... scopes=[ ... illumio.LabelSet( ... labels=[app_label, env_label, loc_label] ... ) ... ] ... ) >>> ruleset = pce.rule_sets.create(ruleset) >>> rule = illumio.Rule.build( ... providers=[role_label], ... consumers=[any_ip_list], ... ingress_services=[ ... {'port': 80, 'proto': 'tcp'}, ... {'port': 443, 'proto': 'tcp'} ... ], ... unscoped_consumers=True # creates an extra-scope rule ... ) >>> rule = pce.rules.create(rule, parent=ruleset) >>> rule Rule( href='/orgs/1/sec_policy/rule_sets/19/rules/sec_rules/34', enabled=True, providers=[ Actor( label=Reference( href='/orgs/1/labels/21' ), ... ) ], consumers=[ Actor( ip_list=Reference( href='/orgs/1/sec_policy/draft/ip_lists/1' ), ... ) ], ingress_services=[ ServicePort(port=80, proto=6, ...), ServicePort(port=443, proto=6, ...) ], resolve_labels_as=LabelResolutionBlock( providers=['workloads'], consumers=['workloads'] ), unscoped_consumers=True, ... ) """ enabled: bool = None resolve_labels_as: LabelResolutionBlock = None sec_connect: bool = None stateless: bool = None machine_auth: bool = None consuming_security_principals: List[Reference] = None unscoped_consumers: bool = None network_type: str = None @classmethod def build(cls, providers: List[Union[str, Reference, dict]], consumers: List[Union[str, Reference, dict]], ingress_services: List[Union[JsonObject, dict, str]], resolve_providers_as: List[str]=None, resolve_consumers_as: List[str]=None, enabled=True, **kwargs) -> 'Rule': resolve_labels_as = LabelResolutionBlock( providers=resolve_providers_as or [RESOLVE_AS_WORKLOADS], consumers=resolve_consumers_as or [RESOLVE_AS_WORKLOADS] ) return super().build(providers, consumers, ingress_services, resolve_labels_as=resolve_labels_as, enabled=enabled, **kwargs)
__all__ = [ 'BaseRule', 'Rule', 'LabelResolutionBlock', ]