#!/usr/bin/python3.6 import sys import requests import json import warnings import time import logging import getpass from bravado.client import SwaggerClient from bravado.requests_client import RequestsClient print ("*******************************************") input_values = "false" if len(sys.argv) > 1 : if sys.argv[1] == "inputvalues": print ("User will provide configuration values.") input_values = "true" else: print ("Default configuration values will be used.") else: print ("Default configuration values will be used.") print ("*******************************************") #logging.basicConfig(level=logging.DEBUG) class FTDClient: '''This class acts as an FTD REST API client with a series of wrapper methods available along with a raw Bravado REST client if desired. ''' headers = { "Content-Type": "application/json", "Accept": "application/json" } def __init__(self, address, port, username, password): ''' Constructor used to initialize the bravado_client address: IP or hostname of the device to connect to port: Port number to connect to username: username to use (default 'admin') password: password to use (default 'Admin123') ''' # stash connectivity info in bravado_client self.server_address = address self.server_port = port self.username = username self.password = password # WARNINGS requests.packages.urllib3.disable_warnings() # swagger doesn't like 'also_return_response' sent from FDM warnings.filterwarnings('ignore', 'config also_return_response is not a recognized config key') # after we auth we get an access token note that we have both normal (short session logins and custom where you can extend the length of the session) # both will leverage this same variable for now self.access_token = None self.bravado_client = None def login(self): ''' This is the normal login which will give you a ~30 minute session with no refresh. Should be fine for short lived work. Do not use for sessions that need to last longer than 30 minutes. ''' payload = '{{"grant_type": "password", "username": "{}", "password": "{}"}}'.format(self.username, self.password) auth_headers = {**FTDClient.headers, 'Authorization': 'Bearer '} print ('Authentication Headers: %s', auth_headers) print ('Authentication Payload is: %s', payload) r = requests.post("https://{}:{}/api/fdm/v3/fdm/token".format(self.server_address, self.server_port), data=payload, verify=False, headers=auth_headers) if r.status_code == 400: raise Exception("Error logging in: {}".format(r.content)) try: self.access_token = r.json()['access_token'] print ("**********************************") print ("Access Token:") print (self.access_token) print ("**********************************") except: raise def login_custom(self, session_length=120): ''' This is a custom login where you will by default get a 1 day session and can customize and create an even longer session session_length: number of seconds for the session to last (default 1 day of seconds) ''' payload = '{{"grant_type": "custom_token", "access_token": "{}", "desired_expires_in": {}, "desired_refresh_expires_in":{}, "desired_subject":"python_client{}", "desired_refresh_count":3}}'.format(self.access_token, session_length, (session_length*2), int(time.time())) auth_headers = {**FTDClient.headers, 'Authorization': 'Bearer '} r = requests.post("https://{}:{}/api/fdm/v3/fdm/token".format(self.server_address, self.server_port), data=payload, verify=False, headers=auth_headers) if r.status_code == 400: raise Exception("Error logging in: {}".format(r.content)) try: self.access_token = r.json()['access_token'] except: raise return r.json() def logout(self): ''' Used for explicit session logout ''' logout_payload = {'grant_type': 'revoke_token', 'access_token': self.access_token, 'token_to_revoke': self.access_token} requests.post("https://{}:{}/api/fdm/v3/fdm/token".format(self.server_address, self.server_port), data=json.dumps(logout_payload), verify=False, headers=FTDClient.headers) self.access_token = None def provision(self): prov_headers = {**FTDClient.headers, 'Authorization': 'Bearer {}'.format(self.access_token)} provision_resp = requests.get("https://{}:{}/api/fdm/v3/devices/default/action/provision/default".format(self.server_address, self.server_port), headers=prov_headers, verify=False) resp_json = provision_resp.json() resp_json['acceptEULA'] = 'true' print ("**************************") print ("**** Provisioning ******") print ("**************************") resp = requests.post("https://{}:{}/api/fdm/v3/devices/default/action/provision".format(self.server_address, self.server_port), verify=False, headers=prov_headers, data=json.dumps(resp_json)) if resp.status_code == 200: print("Provisioning Successful") else: print(resp.text) def deploy(self): deploy_headers = {**FTDClient.headers, 'Authorization': 'Bearer {}'.format(self.access_token)} init_deploy_resp = requests.post("https://{}:{}/api/fdm/v3/operational/deploy".format(self.server_address, self.server_port), verify=False, headers=deploy_headers) init_deploy_resp_json = init_deploy_resp.json() print("Deployment Started... This may take several minutes.") loop_counter = 0 while True: loop_counter = loop_counter + 1 deploy_resp = requests.get("https://{}:{}/api/fdm/v3/operational/deploy/{}".format(self.server_address, self.server_port, init_deploy_resp_json['id']), verify=False, headers=deploy_headers) deploy_resp_json = deploy_resp.json() time.sleep(10) print("Deployment in Progress [" + str(loop_counter) + "]") if deploy_resp_json['state'] == "DEPLOYED": print("Deployment Successful") break def get_client(self): ''' Returns a raw bravado_client to interact with Bravado using the Open API generated API ''' if self.bravado_client: return self.bravado_client # retrieve it if we don't already have it http_client = RequestsClient() http_client.session.verify = False http_client.session.headers = {**FTDClient.headers, 'Authorization': 'Bearer {}'.format(self.access_token)} # bravado will validate field type if it's in the JSON self.bravado_client = SwaggerClient.from_url('https://{}:{}/apispec/ngfw.json'.format(self.server_address, self.server_port), http_client=http_client, config={'validate_responses': False, 'validate_swagger_spec': False}) return self.bravado_client # ---------------- def update_interface_with_static_ip(client, interface_name, ip, netmask): # remove default DHCP server entry from inside interface for dhcpserver_container in client.DHCPServerContainer.getDHCPServerContainerList().result()['items']: dhcpserver_container.servers = [server for server in dhcpserver_container.servers if server.interface.name != interface_name] client.DHCPServerContainer.editDHCPServerContainer(objId=dhcpserver_container.id, body=dhcpserver_container).result() # find the interface by name interfaces = client.Interface.getPhysicalInterfaceList(filter="name:%s" % interface_name).result()['items'] if interfaces: data_interface = interfaces[0] # update the interface with static ip ipv4address = client.get_model("HAIPv4Address")(type="haipv4address", ipAddress=ip, netmask=netmask) interfaceipv4 = client.get_model("InterfaceIPv4")(type="interfaceipv4", ipType="STATIC", ipAddress=ipv4address) data_interface.ipv4 = interfaceipv4 client.Interface.editPhysicalInterface(objId=data_interface.id, body=data_interface).result() def set_default_gateway(client,dg): # Add object for default gateway dg_object_body = {'name': 'Default_Gateway', 'subType': 'HOST', 'value': dg, 'isSystemDefined': False, 'dnsResolution': 'IPV4_ONLY', 'type': 'networkobject'} client.NetworkObject.addNetworkObject(body=dg_object_body).response().result dg_object = client.NetworkObject.getNetworkObjectList(filter="name:%s" % "Default_Gateway").response().result['items'][0] #Get static route container details static_route_container = client.StaticRouteEntryContainer.getStaticRouteEntryContainerList().response().result['items'][0] static_route_container_id = static_route_container['id'] #Get outside interface details outside_interface = client.Interface.getPhysicalInterfaceList(filter="name:%s" % "outside").result()['items'][0] #Get details of the 0.0.0.0 ipv4 object ipv4_network_wrapper = client.NetworkObject.getNetworkObjectList(filter="name:%s" % "any-ipv4").response().result['items'][0] # Create default route default_route_body = { 'iface': {'id': outside_interface['id'], 'type': 'physicalinterface', 'version': outside_interface['version'], 'name': 'outside'}, 'networks': [{'id': ipv4_network_wrapper['id'], 'type': 'networkobject', 'version': ipv4_network_wrapper['version'], 'name': 'any-ipv4'}], 'gateway': {'id': dg_object['id'], 'type': 'networkobject', 'version': dg_object['version'], 'name': 'Default_Gateway'}, 'metricValue': 1, 'ipType': 'IPv4', 'type': 'staticrouteentry'} client.StaticRouteEntry.addStaticRouteEntry(parentId=static_route_container_id, body=default_route_body).response().result def set_DNS_server(client, dns_server_ip, search_domain): #Create DNS Server Group dns_server_group_body = {'name': 'MgmtDNSServer', 'dnsServers': [{'ipAddress': dns_server_ip, 'type': 'dnsserver'}], 'timeout': 2, 'retries': 2, 'searchDomain': search_domain, 'systemDefined': False, 'type': 'dnsservergroup'} mgmt_DNS_Server = client.DNS.addDNSServerGroup(body=dns_server_group_body).response().result #Get Mgmt DNS settings mgmt_DNS_Setting_wrapper = client.DNS.getDeviceDNSSettingsList().response().result['items'][0] #Apply DNS server to mgmt settings mgmt_DNS_setting_body = {'version': mgmt_DNS_Setting_wrapper['version'], 'name': 'DeviceDNSSettings', 'dnsServerGroup': {'id': mgmt_DNS_Server['id'], 'type': 'dnsservergroup', 'version': mgmt_DNS_Server['version'], 'name': 'MgmtDNSServer'}, 'id': mgmt_DNS_Setting_wrapper['id'], 'type': 'devicednssettings'} client.DNS.editDeviceDNSSettings(objId=mgmt_DNS_Setting_wrapper['id'], body=mgmt_DNS_setting_body).response().result def set_NTP_server(client, ntp_ip): #Get NTP server list ntp_settings = client.NTP.getNTPList().response().result['items'][0] #Assign new NTP server ntp_body = {'version': ntp_settings['version'], 'enabled': True, 'ntpServers': [ntp_ip], 'id': ntp_settings['id'], 'type': 'ntp'} client.NTP.editNTP(objId=ntp_settings['id'], body=ntp_body).response().result def set_Device_hostname(client, hostname): #Get current hostname device_hostname = client.DeviceHostname.getDeviceHostnameList().response().result['items'][0] #Assign new hostname device_hostname_body = {'version': device_hostname['version'], 'hostname': hostname, 'id': device_hostname['id'], 'type': 'devicehostname'} client.DeviceHostname.editDeviceHostname(objId=device_hostname['id'], body=device_hostname_body).response().result def create_Default_policy(client): #Get Security zone info inside_zone = client.SecurityZone.getSecurityZoneList().response().result['items'][0] outside_zone = client.SecurityZone.getSecurityZoneList().response().result['items'][1] #Get interface info for inside and outside interfaces inside_interface = client.Interface.getPhysicalInterfaceList(filter="name:%s" % "inside").result()['items'][0] outside_interface = client.Interface.getPhysicalInterfaceList(filter="name:%s" % "outside").result()['items'][0] #Assign inside interface to inside_zone and outside interface to outside_zone ReferenceModel = client.get_model("ReferenceModel") inside_zone.interfaces = [ReferenceModel(id=inside_interface.id, type=inside_interface.type)] outside_zone.interfaces = [ReferenceModel(id=outside_interface.id, type=outside_interface.type)] client.SecurityZone.editSecurityZone(objId=inside_zone['id'], body=inside_zone).response().result client.SecurityZone.editSecurityZone(objId=outside_zone['id'], body=outside_zone).response().result #Get 0.0.0.0 object for Internat PAT any_ipv4_object = client.NetworkObject.getNetworkObjectList(filter="name:%s" % "any-ipv4").response().result['items'][0] #Create default internet PAT statement object_nat_container = client.NAT.getObjectNatRuleContainerList().response().result['items'][0] default_nat = client.get_model("ObjectNatRule")() default_nat.name = "InsideOutsideNatRule" default_nat.sourceInterface = ReferenceModel(id=inside_interface.id, type=inside_interface.type) default_nat.destinationInterface = ReferenceModel(id=outside_interface.id, type=outside_interface.type) default_nat.natType = "DYNAMIC" default_nat.originalNetwork = ReferenceModel(id=any_ipv4_object.id, type=any_ipv4_object.type) default_nat.enabled = True default_nat.interfaceInTranslatedNetwork = True default_nat.type = "objectnatrule" client.NAT.addObjectNatRule(parentId=object_nat_container.id, body=default_nat).response().result #client.NAT.addObjectNatRule(parentId=object_nat_container['id'], body=object_nat_rule_body).response().result access_policy = client.AccessPolicy.getAccessPolicyList().response().result['items'][0] access_rule = client.get_model("AccessRule")(type="accessrule", ruleAction="TRUST", eventLogAction="LOG_NONE") access_rule.name = "Inside_Outside_Rule" access_rule.sourceZones = [ReferenceModel(id=inside_zone.id, type=inside_zone.type)] access_rule.destinationZones = [ReferenceModel(id=outside_zone.id, type=outside_zone.type)] client.AccessPolicy.addAccessRule(body=access_rule, parentId=access_policy['id']).result() def enable_Feature_license(client): #Enable Threat, URL Filtering and Malware licenses threat_lic_body = {'count': 1, 'compliant': True, 'licenseType': 'THREAT', 'type': 'license'} malware_lic_body = {'count': 1, 'compliant': True, 'licenseType': 'MALWARE', 'type': 'license'} url_lic_body = {'count': 1, 'compliant': True, 'licenseType': 'URLFILTERING', 'type': 'license'} client.SmartLicensing.addLicense(body=threat_lic_body).response().result client.SmartLicensing.addLicense(body=malware_lic_body).response().result client.SmartLicensing.addLicense(body=url_lic_body).response().result if __name__ == '__main__': # # DEFAULT INPUTS BEGIN # ngfw_number = 0 while ngfw_number != '1' and ngfw_number != '2' and ngfw_number != '3': ngfw_number = str(input( "Specify firewall to setup.\n Enter 1 for NGFW1.\n Enter 2 for NGFW2.\n Enter 3 for NGFW3.\nWhich firewall do you want to setup? ")) hostname = 'NGFW' + ngfw_number outside_netmask = "18" inside_netmask = "24" if ngfw_number == "1": outside_ip = "198.18.133.81" inside_ip = "198.19.10.1" mgmt_ip = "198.19.10.81" if ngfw_number == "2": outside_ip = "198.18.133.82" inside_ip = "198.19.10.2" mgmt_ip = "198.19.10.82" if ngfw_number == "3": outside_ip = "198.18.133.83" inside_ip = "198.19.10.3" mgmt_ip = "198.19.10.83" mgmt_port = "443" default_gateway = "198.18.128.1" dns = "198.19.10.100" search_domain = "dcloud.local" ntp = "198.19.10.100" username = "admin" password = "C1sco12345" # # DEFAULT INPUTS END # if input_values == "true": hostname = input("Enter Device Hostname [" + hostname + "]: ") or hostname print("Hostname: " + hostname) if input_values == "true": mgmt_ip = input("Enter Firewall Management IP [" + mgmt_ip + "]: ") or mgmt_ip print("Firewall Management IP: " + mgmt_ip + "/24") if input_values == "true": mgmt_port = input("Enter Management Port [" + mgmt_port + "]: ") or mgmt_port print("Management Port: " + mgmt_port) if input_values == "true": username = input("Enter Username [" + username + "]: ") or username print("Username: " + username) if input_values == "true": password = input("Enter Password [" + password + "]: ") or password print("Password: " + password) http_client = FTDClient(mgmt_ip, mgmt_port, username, password) http_client.login() http_client.provision() client = http_client.get_client() set_Device_hostname(client, hostname) if input_values == "true": outside_ip = input("Enter Outside IP Address [" + outside_ip + "]: ") or outside_ip print("Outside IP Address: " + outside_ip) if input_values == "true": outside_netmask = input("Enter Outside IP Subnet Mask [" + outside_netmask + "]: ") or outside_netmask print("Outside IP Subnet Mask: " + outside_netmask) update_interface_with_static_ip(client, "outside", outside_ip, outside_netmask) if input_values == "true": inside_ip = input("Enter Inside IP Address [" + inside_ip + "]: ") or inside_ip print("Inside IP Address: " + inside_ip) if input_values == "true": inside_netmask = input("Enter Inside IP Subnet Mask [" + inside_netmask + "]: ") or inside_netmask print("Inside IP Subnet Mask: " + inside_netmask) update_interface_with_static_ip(client, "inside", inside_ip, inside_netmask) if input_values == "true": default_gateway = input("Enter Default Gateway IP Address [" + default_gateway + "]: ") or default_gateway print("Default Gateway IP Address: " + default_gateway) set_default_gateway(client, default_gateway) if input_values == "true": dns = input("Enter DNS Server IP Address [" + dns + "]: ") or dns print("DNS Server IP Address: " + dns) if input_values == "true": search_domain = input("Enter Search Domain [" + search_domain + "]: ") or search_domain print("DNS Server Search Domain: " + search_domain) set_DNS_server(client, dns, search_domain) if input_values == "true": ntp = input("Enter NTP Server IP Address [" + ntp + "]: ") or ntp print("NTP Server IP Address: " + ntp) set_NTP_server(client, ntp) # Create default NAT and access policy create_Default_policy(client) # Enable feature licenses enable_Feature_license(client) http_client.deploy() print("Setup Complete") http_client.logout()