import json
from ibmcloud_python_sdk.config import params
from ibmcloud_python_sdk.auth import get_headers as headers
from ibmcloud_python_sdk.utils.common import query_wrapper as qw
from ibmcloud_python_sdk.vpc import vpc
from ibmcloud_python_sdk.utils.common import resource_not_found
from ibmcloud_python_sdk.utils.common import resource_deleted
from ibmcloud_python_sdk.utils.common import check_args
from ibmcloud_python_sdk.resource import resource_group
from ibmcloud_python_sdk.vpc import instance
[docs]class Security():
def __init__(self):
self.cfg = params()
self.vpc = vpc.Vpc()
self.rg = resource_group.ResourceGroup()
self.instance = instance.Instance()
[docs] def get_security_groups(self):
"""Retrieve security group list
:return: List of security groups
:rtype: list
"""
try:
# Connect to api endpoint for security_groups
path = ("/v1/security_groups?version={}&generation={}".format(
self.cfg["version"], self.cfg["generation"]))
# Return data
return qw("iaas", "GET", path, headers())["data"]
except Exception as error:
print("Error fetching security groups. {}".format(error))
raise
[docs] def get_security_group(self, security_group):
"""Retrieve specific security group
:param security_group: Security group name or ID
:type security_group: str
:return: Security group information
:rtype: dict
"""
by_name = self.get_security_group_by_name(security_group)
if "errors" in by_name:
for key_name in by_name["errors"]:
if key_name["code"] == "not_found":
by_id = self.get_security_group_by_id(security_group)
if "errors" in by_id:
return by_id
return by_id
else:
return by_name
else:
return by_name
[docs] def get_security_group_by_id(self, id):
"""Retrieve specific security group by ID
:param id: Security group ID
:type id: str
:return: Security group information
:rtype: dict
"""
try:
# Connect to api endpoint for security_groups
path = ("/v1/security_groups/{}?version={}&generation={}".format(
id, self.cfg["version"], self.cfg["generation"]))
# Return data
return qw("iaas", "GET", path, headers())["data"]
except Exception as error:
print("Error fetching security group with ID {}. {}".format(
id, error))
raise
[docs] def get_security_group_by_name(self, name):
"""Retrieve specific security group by name
:param name: Security group name
:type name: str
:return: Security group information
:rtype: dict
"""
try:
# Retrieve security groups
data = self.get_security_groups()
if "errors" in data:
return data
# Loop over security groups until filter match
for sg in data['security_groups']:
if sg["name"] == name:
# Return data
return sg
# Return error if no security group is found
return resource_not_found()
except Exception as error:
print("Error fetching security group with name {}. {}".format(
name, error))
raise
[docs] def get_security_group_interfaces(self, security_group):
"""Retrieve network interfaces associated to a security group
:param security_group: Security group name or ID
:type security_group: str
:return: List of network interfaces
:rtype: list
"""
# Retrieve security group information to get the ID
# (mostly useful if a name is provided)
sg_info = self.get_security_group(security_group)
if "errors" in sg_info:
return sg_info
try:
# Connect to api endpoint for security_groups
path = ("/v1/security_groups/{}/network_interfaces?version={}"
"&generation={}".format(sg_info["id"], self.cfg["version"],
self.cfg["generation"]))
# Return data
return qw("iaas", "GET", path, headers())["data"]
except Exception as error:
print("Error fetching network interfaces associated to security"
" group {}. {}".format(security_group, error))
raise
[docs] def get_security_group_interface(self, security_group, interface):
"""Retrieve specific network interface associated to a security group
:param security_group: Security group name or ID
:type security_group: str
:param interface: Network interface name or ID
:type interface: str
:return: Network interface information
:rtype: dict
"""
by_name = self.get_security_group_interface_by_name(security_group,
interface)
if "errors" in by_name:
for key_name in by_name["errors"]:
if key_name["code"] == "not_found":
by_id = self.get_security_group_interface_by_id(
security_group, interface)
if "errors" in by_id:
return by_id
return by_id
else:
return by_name
else:
return by_name
[docs] def get_security_group_interface_by_id(self, security_group, id):
"""Retrieve specific network interface associated to a security group
by ID
:param security_group: Security group name or ID
:type security_group: str
:param id: Network interface ID
:type id: str
:return: Network interface information
:rtype: dict
"""
# Retrieve security group information to get the ID
# (mostly useful if a name is provided)
sg_info = self.get_security_group(security_group)
if "errors" in sg_info:
return sg_info
try:
# Connect to api endpoint for security_groups
path = ("/v1/security_groups/{}/network_interfaces/{}?version={}"
"&generation={}".format(sg_info["id"], id,
self.cfg["version"],
self.cfg["generation"]))
# Return data
return qw("iaas", "GET", path, headers())["data"]
except Exception as error:
print("Error fetching network interface with ID {} associated to"
" security group {}. {}".format(id, security_group, error))
raise
[docs] def get_security_group_interface_by_name(self, security_group, name):
"""Retrieve specific network interface associated to a security group
by name
:param security_group: Security group name or ID
:type security_group: str
:param name: Network interface name
:type name: str
:return: Network interface information
:rtype: dict
"""
# Retrieve security group information to get the ID
# (mostly useful if a name is provided)
sg_info = self.get_security_group(security_group)
if "errors" in sg_info:
return sg_info
try:
# Retrieve network interfaces
data = self.get_security_group_interfaces(sg_info["id"])
if "errors" in data:
return data
# Loop over network interfaces until filter match
for nic in data['network_interfaces']:
if nic["name"] == name:
# Return data
return nic
# Return error if no interface is found
return resource_not_found()
except Exception as error:
print("Error fetching network interface with name {} associated to"
" security group {}. {}".format(name, security_group, error))
raise
[docs] def get_security_group_rules(self, security_group):
"""Retrieve rules from a security group
:param security_group: Security group name or ID
:type security_group: str
:return: List of rules
:rtype: list
"""
# Retrieve security group information to get the ID
# (mostly useful if a name is provided)
sg_info = self.get_security_group(security_group)
if "errors" in sg_info:
return sg_info
try:
# Connect to api endpoint for security_groups
path = ("/v1/security_groups/{}/rules?version={}"
"&generation={}".format(sg_info["id"],
self.cfg["version"],
self.cfg["generation"]))
# Return data
return qw("iaas", "GET", path, headers())["data"]
except Exception as error:
print("Error fetching rules for security group with"
" ID {}. {}".format(id, error))
raise
[docs] def get_security_group_rule(self, security_group, rule):
"""Retrieve specific rule from a security group
:param security_group: Security group name or ID
:type security_group: str
:param rule: Rule ID
:type rule: str
:return: Rule information
:rtype: dict
"""
by_id = self.get_security_group_rule_by_id(security_group, rule)
if "errors" in by_id:
return by_id
return by_id
[docs] def get_security_group_rule_by_id(self, security_group, id):
"""Retrieve specific rule from a security group by ID
:param security_group: Security group name or ID
:type security_group: str
:param id: Rule ID
:type id: str
:return: Rule information
:rtype: dict
"""
# Retrieve security group information to get the ID
# (mostly useful if a name is provided)
sg_info = self.get_security_group(security_group)
if "errors" in sg_info:
return sg_info
try:
# Connect to api endpoint for security_groups
path = ("/v1/security_groups/{}/rules/{}?version={}"
"&generation={}".format(sg_info["id"], id,
self.cfg["version"],
self.cfg["generation"]))
# Return data
return qw("iaas", "GET", path, headers())["data"]
except Exception as error:
print("Error fetching rule with ID {} for security group"
" {}. {}".format(id, security_group, error))
raise
[docs] def add_interface_security_group(self, **kwargs):
"""Add network interface to a security group
:param interface: The network interface ID
:type interface: str
:param security_group: The security group name or ID
:type security_group: str
:param instance: Instance name or ID
:type instance: str, optional
"""
args = ["interface", "security_group"]
check_args(args, **kwargs)
# Build dict of argument and assign default value when needed
args = {
'interface': kwargs.get('interface'),
'security_group': kwargs.get('security_group'),
'instance': kwargs.get('instance'),
}
target = args["interface"]
# Retrieve security group information to get the ID
# (mostly useful if a name is provided)
sg_info = self.get_security_group(args["security_group"])
if "errors" in sg_info:
return sg_info
if args["instance"]:
# Retrieve network interface information to get the ID
# (mostly useful if a name is provided)
nic_info = self.instance.get_instance_interface(
args["instance"], args["interface"])
if "errors" in nic_info:
return nic_info
target = nic_info["id"]
try:
# Connect to api endpoint for security_groups
path = ("/v1/security_groups/{}/network_interfaces/{}?version={}"
"&generation={}".format(sg_info["id"],
target,
self.cfg["version"],
self.cfg["generation"]))
# Return data
return qw("iaas", "PUT", path, headers(), None)["data"]
except Exception as error:
print("Error adding network interface {} to security group"
" {}. {}".format(target, args["security_group"], error))
raise
[docs] def create_security_group(self, **kwargs):
"""Create security group
:param name: The user-defined name for this security group
:type name: str, optional
:param resource_group: The resource group to use
:type security_group: str, optional
:param vpc: The VPC this security group is to be a part of
:type vpc: str
:param rules: Array of rule prototype objects for rules to be created
for this security group
:type rules: list, optional
"""
args = ["vpc"]
check_args(args, **kwargs)
# Build dict of argument and assign default value when needed
args = {
'name': kwargs.get('name'),
'resource_group': kwargs.get('resource_group'),
'vpc': kwargs.get('vpc'),
'rules': kwargs.get('rules'),
}
# Construct payload
payload = {}
for key, value in args.items():
if value is not None:
if key == "vpc":
vpc_info = self.vpc.get_vpc(args["vpc"])
if "errors" in vpc_info:
return vpc_info
payload["vpc"] = {"id": vpc_info["id"]}
elif key == "resource_group":
rg_info = self.rg.get_resource_group(
args["resource_group"])
if "errors" in rg_info:
return rg_info
payload["resource_group"] = {"id": rg_info["id"]}
elif key == "rules":
rs = []
for sg_rules in args["rules"]:
rs.append(sg_rules)
payload["rules"] = rs
else:
payload[key] = value
try:
# Connect to api endpoint for security_groups
path = ("/v1/security_groups?version={}&generation={}".format(
self.cfg["version"], self.cfg["generation"]))
# Return data
return qw("iaas", "POST", path, headers(),
json.dumps(payload))["data"]
except Exception as error:
print("Error creating security group. {}".format(error))
raise
[docs] def create_security_group_rule(self, **kwargs):
"""Create security group rule
:param sg: Security group name or ID where to add the rule
:type sg: str
:param direction: The direction of traffic to enforce, either inbound
or outbound
:type direction: str
:param ip_version: The IP version to enforce
:type ip_version: str
:param protocol: The protocol to enforce
:type protocol: str
:param port_min: The inclusive lower bound of TCP/UDP port range
:type port_min: int
:param port_max: The inclusive upper bound of TCP/UDP port range
:type port_max: int
:param code: The ICMP traffic code to allow
:type code: int
:param type: The ICMP traffic type to allow
:type type: str
:param cidr_block: The CIDR block
:type cidr_block: str
:param address: The IP address
:type address: str
:param security_group: The unique identifier for this security group
:type security_group: str
"""
args = ["sg", "direction"]
check_args(args, **kwargs)
# Build dict of argument and assign default value when needed
args = {
"sg": kwargs.get('sg'),
'direction': kwargs.get('direction'),
'ip_version': kwargs.get('ip_version'),
'protocol': kwargs.get('protocol'),
'port_min': kwargs.get('port_min'),
'port_max': kwargs.get('port_max'),
'code': kwargs.get('code'),
'type': kwargs.get('type'),
'cidr_block': kwargs.get('cidr_block'),
'address': kwargs.get('address'),
'security_group': kwargs.get('security_group'),
}
# Retrieve security group information to get the ID
# (mostly useful if a name is provided)
sg_info = self.get_security_group(args["sg"])
if "errors" in sg_info:
return sg_info
# Construct payload
payload = {}
for key, value in args.items():
# sg argument should not be in the payload
if key != "sg" and value is not None:
if key == "security_group":
payload["remote"] = {"id": sg_info["id"]}
elif key == "address":
payload["remote"] = {"address": args["address"]}
elif key == "cidr_block":
payload["remote"] = {"cidr_block": args["cidr_block"]}
else:
payload[key] = value
try:
# Connect to api endpoint for security_groups
path = ("/v1/security_groups/{}/rules?version={}"
"&generation={}".format(sg_info["id"],
self.cfg["version"],
self.cfg["generation"]))
# Return data
return qw("iaas", "POST", path, headers(),
json.dumps(payload))["data"]
except Exception as error:
print("Error creating security group rule. {}".format(error))
raise
[docs] def delete_security_group(self, security_group):
"""Delete security group
:param security_group: Security group name or ID
:type security_group: str
:return: Delete status
:rtype: dict
"""
try:
# Check if security group exists
sg_info = self.get_security_group_by_name(security_group)
if "errors" in sg_info:
return sg_info
# Connect to api endpoint for security_groups
path = ("/v1/security_groups/{}?version={}&generation={}".format(
sg_info["id"], self.cfg["version"], self.cfg["generation"]))
data = qw("iaas", "DELETE", path, headers())
# Return data
if data["response"].status != 204:
return data["data"]
# Return status
return resource_deleted()
except Exception as error:
print("Error deleting security group {}. {}".format(
security_group, error))
raise
[docs] def remove_interface_security_group(self, security_group, interface):
"""Remove network interface from a security group
:param security_group: Security group name or ID
:type security_group: str
:parem interface: Interface ID
:type interface: str
:return: Delete status
:rtype: dict
"""
try:
# Check if security group exists
sg_info = self.get_security_group(security_group)
if "errors" in sg_info:
return sg_info
# Connect to api endpoint for security_groups
path = ("/v1/security_groups/{}/network_interfaces/{}?version={}"
"&generation={}".format(sg_info["id"], interface,
self.cfg["version"],
self.cfg["generation"]))
data = qw("iaas", "DELETE", path, headers())
# Return data
if data["response"].status != 204:
return data["data"]
# Return status
return resource_deleted()
except Exception as error:
print("Error removing network interface {} from security group"
" {}. {}".format(interface, security_group, error))
raise
[docs] def delete_security_group_rule(self, security_group, rule):
"""Delete rule from security group
:param security_group: Security group name or ID
:type security_group: str
:parem rule: Rule name or ID
:type rule: str
:return: Delete status
:rtype: dict
"""
try:
# Check if security group exists
sg_info = self.get_security_group(security_group)
if "errors" in sg_info:
return sg_info
# Check if rule exists
rule_info = self.get_security_group_rule(sg_info["id"], rule)
if "errors" in rule_info:
return rule_info
# Connect to api endpoint for security_groups
path = ("/v1/security_groups/{}/rules/{}?version={}"
"&generation={}".format(sg_info["id"], rule_info["id"],
self.cfg["version"],
self.cfg["generation"]))
data = qw("iaas", "DELETE", path, headers())
# Return data
if data["response"].status != 204:
return data["data"]
# Return status
return resource_deleted()
except Exception as error:
print("Error deleting rule {} from security group {}. {}".format(
rule, security_group, error))
raise