#!/usr/bin/env python3 # -*- encoding: utf-8; py-indent-offset: 4 -*- ## MIT License ## ## Copyright (c) 2024 Bash Club ## ## Permission is hereby granted, free of charge, to any person obtaining a copy ## of this software and associated documentation files (the "Software"), to deal ## in the Software without restriction, including without limitation the rights ## to use, copy, modify, merge, publish, distribute, sublicense, and/or sell ## copies of the Software, and to permit persons to whom the Software is ## furnished to do so, subject to the following conditions: ## ## The above copyright notice and this permission notice shall be included in all ## copies or substantial portions of the Software. ## ## THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR ## IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, ## FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE ## AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER ## LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, ## OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE ## SOFTWARE. ### __VERSION__ = 2.01 import sys import socket import re import json import requests from urllib3.exceptions import InsecureRequestWarning from statistics import mean from collections import defaultdict from pprint import pprint try: import cmk.utils.paths AGENT_TMP_PATH = cmk.utils.paths.Path(cmk.utils.paths.tmp_dir, "agents/agent_unifi") except ImportError: AGENT_TMP_PATH = None UNIFI_DEVICE_TABLE = { 'BZ2' : 'UAP', 'BZ2LR' : 'UAP-LR', 'S216150' : 'US-16-150W', 'S224250' : 'US-24-250W', 'S224500' : 'US-24-500W', 'S248500' : 'US-48-500W', 'S248750' : 'US-48-750W', 'S28150' : 'US-8-150W', 'U2HSR' : 'UAP-Outdoor+', 'U2IW' : 'UAP-IW', 'U2L48' : 'UAP-LR', 'U2Lv2' : 'UAP-LRv2', 'U2M' : 'UAP-Mini', 'U2O' : 'UAP-Outdoor', 'U2S48' : 'UAP', 'U2Sv2' : 'UAPv2', 'U5O' : 'UAP-Outdoor5', 'U6ENT' : 'U6-Enterprise', 'U6EXT' : 'U6-Extender', 'U6IW' : 'U6-IW', 'U6M' : 'U6-Mesh', 'U7E' : 'UAP-AC', 'U7EDU' : 'UAP-AC-EDU', 'U7Ev2' : 'UAP-AC', 'U7HD' : 'UAP-AC-HD', 'U7IW' : 'UAP-AC-IW', 'U7IWP' : 'UAP-AC-IW-Pro', 'U7LR' : 'UAP-AC-LR', 'U7LT' : 'UAP-AC-Lite', 'U7MP' : 'UAP-AC-M-Pro', 'U7MSH' : 'UAP-AC-M', 'U7NHD' : 'UAP-nanoHD', 'U7O' : 'UAP-AC-Outdoor', 'U7P' : 'UAP-AC-Pro', 'U7PG2' : 'UAP-AC-Pro', 'U7SHD' : 'UAP-AC-SHD', 'UAE6' : 'U6-Extender-EA', 'UAIW6' : 'U6-IW-EA', 'UAL6' : 'U6-Lite', 'UALR6' : 'U6-LR-EA', 'UALR6v2' : 'U6-LR', 'UALR6v3' : 'U6-LR', 'UAM6' : 'U6-Mesh-EA', 'UAP6' : 'U6-LR', 'UAP6MP' : 'U6-Pro', 'UASXG' : 'UAS-XG', 'UBB' : 'UBB', 'UBBXG' : 'UBB-XG', 'UCK' : 'UCK', 'UCK-v2' : 'UCK', 'UCK-v3' : 'UCK', 'UCKG2' : 'UCK-G2', 'UCKP' : 'UCK-G2-Plus', 'UCMSH' : 'UAP-XG-Mesh', 'UCXG' : 'UAP-XG', 'UDC48X6' : 'USW-Leaf', 'UDM' : 'UDM', 'UDMB' : 'UAP-BeaconHD', 'UDMPRO' : 'UDM-Pro', 'UDMPROSE' : 'UDM-SE', 'UDR' : 'UDR', 'UDW' : 'UDW', 'UDWPRO' : 'UDWPRO', 'UFLHD' : 'UAP-FlexHD', 'UGW3' : 'USG-3P', 'UGW4' : 'USG-Pro-4', 'UGWHD4' : 'USG', 'UGWXG' : 'USG-XG-8', 'UHDIW' : 'UAP-IW-HD', 'ULTE' : 'U-LTE', 'ULTEPEU' : 'U-LTE-Pro', 'ULTEPUS' : 'U-LTE-Pro', 'UP1' : 'USP-Plug', 'UP4' : 'UVP-X', 'UP5' : 'UVP', 'UP5c' : 'UVP', 'UP5t' : 'UVP-Pro', 'UP5tc' : 'UVP-Pro', 'UP6' : 'USP-Strip', 'UP7' : 'UVP-Executive', 'UP7c' : 'UVP-Executive', 'US16P150' : 'US-16-150W', 'US24' : 'USW-24-G1', 'US24P250' : 'US-24-250W', 'US24P500' : 'US-24-500W', 'US24PL2' : 'US-L2-24-PoE', 'US24PRO' : 'USW-Pro-24-PoE', 'US24PRO2' : 'USW-Pro-24', 'US48' : 'US-48-G1', 'US48P500' : 'US-48-500W', 'US48P750' : 'US-48-750W', 'US48PL2' : 'US-L2-48-PoE', 'US48PRO' : 'USW-Pro-48-PoE', 'US48PRO2' : 'USW-Pro-48', 'US624P' : 'USW-Enterprise-24-PoE', 'US648P' : 'USW-Enterprise-48-PoE', 'US68P' : 'USW-Enterprise-8-PoE', 'US6XG150' : 'US-XG-6PoE', 'US8' : 'US-8', 'US8P150' : 'US-8-150W', 'US8P60' : 'US-8-60W', 'USAGGPRO' : 'USW-Pro-Aggregation', 'USC8' : 'US-8', 'USC8P150' : 'US-8-150W', 'USC8P450' : 'USW-Industrial', 'USC8P60' : 'US-8-60W', 'USF5P' : 'USW-Flex', 'USFXG' : 'USW-Flex-XG', 'USL16LP' : 'USW-Lite-16-PoE', 'USL16P' : 'USW-16-PoE', 'USL24' : 'USW-24-G2', 'USL24P' : 'USW-24-PoE', 'USL48' : 'USW-48-G2', 'USL48P' : 'USW-48-PoE', 'USL8A' : 'USW-Aggregation', 'USL8LP' : 'USW-Lite-8-PoE', 'USL8MP' : 'USW-Mission-Critical', 'USMINI' : 'USW-Flex-Mini', 'USPPDUP' : 'USP-PDU-Pro', 'USPRPS' : 'USP-RPS', 'USXG' : 'US-16-XG', 'USXG24' : 'USW-EnterpriseXG-24', 'UXBSDM' : 'UWB-XG-BK', 'UXGPRO' : 'UXG-Pro', 'UXSDM' : 'UWB-XG', 'p2N' : 'PICOM2HP' } try: from cmk.special_agents.utils.argument_parsing import create_default_argument_parser #from check_api import LOGGER ##/TODO except ImportError: from argparse import ArgumentParser as create_default_argument_parser class unifi_api_exception(Exception): pass class unifi_object(object): def __init__(self,**kwargs): for _k,_v in kwargs.items(): _k = _k.replace("-","_") if type(_v) == bool: _v = int(_v) setattr(self,_k,_v) self._PARENT = kwargs.get("_PARENT",object) if hasattr(self._PARENT,"_UNIFICONTROLLER"): self._UNIFICONTROLLER = self._PARENT._UNIFICONTROLLER self._API = self._PARENT._API if hasattr(self,"_init"): self._init() def __repr__(self): return repr([(_k,_v) for _k,_v in self.__dict__.items() if type(_v) in (int,str)]) ######################################## ###### ###### S S I D ###### ######################################## class unifi_network_ssid(unifi_object): def _init(self): self._UNIFICONTROLLER._UNIFI_SSIDS.append(self) self._UNIFI_SITE = self._PARENT._PARENT for _k,_v in getattr(self,"reasons_bar_chart_now",{}).items(): setattr(self,_k,_v) setattr(self,f"{self.radio}_num_sta",self.num_sta) setattr(self,f"{self.radio}_tcp_packet_loss",self.tcp_packet_loss) setattr(self,f"{self.radio}_wifi_retries",self.wifi_retries) setattr(self,f"{self.radio}_wifi_latency",self.wifi_latency) setattr(self,f"{self.radio}_avg_client_signal",self.avg_client_signal) def __str__(self): _ret = [] _unwanted = ["essid","radio","id","t","name","radio_name","wlanconf_id","is_wep","up","site_id","ap_mac","state", "na_num_sta","ng_num_sta","ng_tcp_packet_loss","na_tcp_packet_loss","na_wifi_retries","ng_wifi_retries", "na_wifi_latency","ng_wifi_latency","na_avg_client_signal","ng_avg_client_signal" ] for _k,_v in self.__dict__.items(): if _k.startswith("_") or _k in _unwanted or type(_v) not in (str,int,float): continue _ret.append(f"{self.essid}|{self.radio}_{_k}|{_v}") return "\n".join(_ret) ######################################## ###### ###### R A D I O ###### ######################################## class unifi_network_radio(unifi_object): def _update_stats(self,stats): _prefixlen = len(self.name) +1 for _k,_v in stats.items(): if _k.startswith(self.name): if type(_v) == float: _v = int(_v) setattr(self,_k[_prefixlen:],_v) def __str__(self): _ret = [] _unwanted = ["name","ast_be_xmit","extchannel","cu_total","cu_self_rx","cu_self_tx"] for _k,_v in self.__dict__.items(): if _k.startswith("_") or _k in _unwanted or type(_v) not in (str,int,float): continue _ret.append(f"{self.name}|{_k}|{_v}") return "\n".join(_ret) ######################################## ###### ###### P O R T ###### ######################################## class unifi_network_port(unifi_object): def _init(self): self.oper_status = self._get_state(getattr(self,"up",None)) self.admin_status = self._get_state(getattr(self,"enable",None)) if hasattr(self,"ifname"): ## GW / UDM Names _name = list(filter(lambda x: x.get("ifname") == self.ifname,self._PARENT.ethernet_overrides)) if _name: _name = _name[0] if getattr(self,"name",None) and _name.get("networkgroup") != "LAN": self.name = _name.get("networkgroup","unkn") else: self.name = self.ifname if not hasattr(self,"port_idx") and hasattr(self,"ifname"): self.port_idx = int(self.ifname[-1])+1 ## ethX self.portconf = self._PARENT._PARENT._PORTCONFIGS.get(getattr(self,"portconf_id",None)) def _get_state(self,state): return { "1" : 1, ## up "0" : 2 ## down }.get(str(state),4) ##unknown def __str__(self): _ret = [] _unwanted = ["up","enabled","media","anonymous_id","www_gw_mac","wan_gw_mac","attr_hidden_id","masked","flowctrl_tx","flowctrl_rx","portconf_id","speed_caps"] for _k,_v in self.__dict__.items(): if _k.startswith("_") or _k in _unwanted or type(_v) not in (str,int,float): continue _ret.append(f"{self.port_idx}|{_k}|{_v}") return "\n".join(_ret) ######################################## ###### ###### D E V I C E ###### ######################################## class unifi_device(unifi_object): def _init(self): if not hasattr(self,"name"): _mac_end = self.mac.replace(":","")[-4:] self.name = f"{self.model}:{_mac_end}" self._piggy_back = True self._PARENT._SITE_DEVICES.append(self) self._NETWORK_PORTS = [] self._NETWORK_RADIO = [] self._NETWORK_SSIDS = [] for _k,_v in getattr(self,"sys_stats",{}).items(): _k = _k.replace("-","_") setattr(self,_k,_v) self.model_name = UNIFI_DEVICE_TABLE.get(self.model) if self.type in ("ugw","udm") and hasattr(self,"connect_request_ip"): ## change ip to local ip self.wan_ip = self.ip self.ip = self.connect_request_ip if getattr(self,"speedtest_status_saved",False): _speedtest = getattr(self,"speedtest_status",{}) self.speedtest_time = int(_speedtest.get("rundate","0")) self.speedtest_status = int(_speedtest.get("status_summary","0")) self.speedtest_ping = round(_speedtest.get("latency",-1),1) self.speedtest_download = round(_speedtest.get("xput_download",0.0),1) self.speedtest_upload = round(_speedtest.get("xput_upload",0.0),1) _temp = list(map(lambda x: x.get("value",0),getattr(self,"temperatures",[]))) if _temp: self.general_temperature = "{0:.1f}".format(mean(_temp)) for _port in getattr(self,"port_table",[]): self._NETWORK_PORTS.append(unifi_network_port(_PARENT=self,**_port)) for _radio in getattr(self,"radio_table_stats",[]): _radio_obj = unifi_network_radio(_PARENT=self,**_radio) _radio_obj._update_stats(getattr(self,"stat",{}).get("ap",{})) self._NETWORK_RADIO.append(_radio_obj) for _ssid in getattr(self,"vap_table",[]): self._NETWORK_SSIDS.append(unifi_network_ssid(_PARENT=self,**_ssid)) def _get_uplink(self): if type(getattr(self,"uplink",None)) == dict: self.uplink_up = int(self.uplink.get("up","0")) self.uplink_device = self._UNIFICONTROLLER._get_device_by_mac(self.uplink.get("uplink_mac")) self.uplink_remote_port = self.uplink.get("uplink_remote_port") self.uplink_type = self.uplink.get("type") def _get_short_info(self): _ret = [] _wanted = ["version","ip","mac","serial","model","model_name","uptime","upgradeable","num_sta","adopted","state"] for _k,_v in self.__dict__.items(): if _k.startswith("_") or _k not in _wanted or type(_v) not in (str,int,float): continue _ret.append(f"{self.name}|{_k}|{_v}") return "\n".join(_ret) def __str__(self): if self._piggy_back: _piggybackname = getattr(self,self._API.PIGGYBACK_ATTRIBUT,self.name) _ret = [f"<<<<{_piggybackname}>>>>"] else: _ret = [] _ret.append("<<>>") _unwanted = ["anon_id","device_id","site_id","known_cfgversion","cfgversion","syslog_key","has_speaker","has_eth1", "next_interval","next_heartbeat","next_heartbeat_at","guest_token","connect_request_ip","connect_request_port", "start_connected_millis","start_disconnected_millis","wlangroup_id_na","wlangroup_id_ng","uplink_down_timeout" "unsupported_reason","connected_at","provisioned_at","fw_caps","hw_caps","manufacturer_id","use_custom_config", "led_override","led_override_color","led_override_color_brightness","sys_error_caps","adoptable_when_upgraded", "mesh_uplink_1","mesh_uplink_1","considered_lost_at","outdoor_mode_override","unsupported_reason","architecture", "kernel_version","required_version","prev_non_busy_state","has_fan","has_temperature","flowctrl_enabled","hash_id", "speedtest-status-saved","usg_caps","two_phase_adopt","rollupgrade","locating","dot1x_portctrl_enabled", "lcm_idle_timeout_override","lcm_brightness_override","uplink_depth","mesh_sta_vap_enabled","mesh_uplink_2", "lcm_tracker_enabled","model_incompatible","model_in_lts","model_in_eol","country_code","wifi_caps", "meshv3_peer_mac","element_peer_mac","vwireEnabled","hide_ch_width","x_authkey","x_ssh_hostkey_fingerprint", "x_fingerprint","x_inform_authkey","op_mode","uptime" ] for _k,_v in self.__dict__.items(): if _k.startswith("_") or _k in _unwanted or type(_v) not in (str,int,float): continue _ret.append(f"{_k}|{_v}") _ret.append("<<>>") _ret.append(f"{{\"unifi_device\":\"unifi-{self.type}\"}}") _uptime = getattr(self,"uptime",None) if _uptime: _ret.append("<<>>") _ret.append(str(_uptime)) if self._NETWORK_PORTS: _ret += ["","<<>>"] + [str(_port) for _port in self._NETWORK_PORTS] if self._NETWORK_RADIO: _ret += ["","<<>>"] + [str(_radio) for _radio in self._NETWORK_RADIO] if self._NETWORK_SSIDS: _ret += ["","<<>>"] + [str(_ssid) for _ssid in sorted(self._NETWORK_SSIDS,key=lambda x: x.essid)] return "\n".join(_ret) ######################################## ###### ###### S I T E ###### ######################################## class unifi_site(unifi_object): def _init(self): for _subsys in self.health: _name = _subsys.get("subsystem") for _k,_v in _subsys.items(): _k = _k.replace("-","_") if _k == "subsystem" or type(_v) not in (str,int,float): continue #print(f"{_k}:{_v}") setattr(self,f"{_name}_{_k}",_v) ##pprint(_api.get_data("/stat/rogueap")) self._SITE_DEVICES = [] self._PORTCONFIGS = {} self._get_portconfig() self._get_devices() _satisfaction = list(filter( lambda x: x != None,map( lambda x: getattr(x,"satisfaction",None),self._SITE_DEVICES ) )) self.satisfaction = max(0,int(mean(_satisfaction)) if _satisfaction else 0) def _get_portconfig(self): _data = self._API.get_portconfig(site=self.name) for _config in _data: self._PORTCONFIGS[_config["_id"]] = _config.get("name") def _get_devices(self): _data = self._API.get_devices(site=self.name) for _device in _data: self._UNIFICONTROLLER._UNIFI_DEVICES.append(unifi_device(_PARENT=self,**_device)) def __str__(self): _ret = ["<<>>"] _unwanted = ["name","anonymous_id","www_gw_mac","wan_gw_mac","attr_hidden_id","attr_no_delete",""] for _k,_v in self.__dict__.items(): if _k.startswith("_") or _k in _unwanted or type(_v) not in (str,int,float): continue _ret.append(f"{self.name}|{_k}|{_v}") return "\n".join(_ret) ######################################## ###### ###### C O N T R O L L E R ###### ######################################## class unifi_controller(unifi_object): def _init(self): self._UNIFICONTROLLER = self self._UNIFI_SITES = [] self._UNIFI_DEVICES = [] self._UNIFI_SSIDS = [] self._get_systemhealth() self._get_sites() for _dev in self._UNIFI_DEVICES: _dev._get_uplink() if hasattr(self,"cloudkey_version"): self.cloudkey_version = re.sub(".*?v(\d+\.\d+\.\d+\.[a-z0-9]+).*","\\1",self.cloudkey_version) self.type = getattr(self,"ubnt_device_type","unifi-sw-controller") self.controller_version = self.version delattr(self,"version") def _get_systemhealth(self): _data = self._API.get_sysinfo() _wanted = ["timezone","autobackup","version","previous_version","update_available","hostname","name","uptime","cloudkey_update_available","cloudkey_update_version","cloudkey_version","ubnt_device_type","udm_version","udm_update_version","udm_update_available"] if _data: for _k,_v in _data[0].items(): if _k in _wanted: if type(_v) == bool: _v = int(_v) setattr(self,_k,_v) def _get_device_by_mac(self,mac): try: return next(filter(lambda x: x.mac == mac,self._UNIFI_DEVICES)).name except StopIteration: return None def _get_sites(self): _data = self._API.get_sites() for _site in _data: if self._API.SITES and _site.get("name") not in self._API.SITES and _site.get("desc").lower() not in self._API.SITES: continue self._UNIFI_SITES.append(unifi_site(_PARENT=self,**_site)) def _get_ssidlist(self): _dict = defaultdict(list) for _ssid in self._UNIFI_SSIDS: _dict[f"{_ssid.essid}@{_ssid._UNIFI_SITE.desc}"].append(_ssid) _ret = [] for _ssid,_obj in _dict.items(): #pprint(_obj) for _key in ("num_sta","ng_num_sta","na_num_sta","ng_tcp_packet_loss","na_tcp_packet_loss","ng_wifi_retries","na_wifi_retries","ng_wifi_latency","na_wifi_latency"): _ret.append("|".join([_ssid,_key,str(sum(map(lambda x: getattr(x,_key,0),_obj)))])) _signals = list(map(lambda x: getattr(x,"ng_avg_client_signal",0),filter(lambda x: x.radio == "ng",_obj))) _ret.append("|".join([_ssid,"ng_avg_client_signal",str(mean(_signals if _signals else [0]))])) _signals = list(map(lambda x: getattr(x,"na_avg_client_signal",0),filter(lambda x: x.radio == "na",_obj))) _ret.append("|".join([_ssid,"na_avg_client_signal",str(mean(_signals if _signals else [0]))])) _ret.append("|".join([_ssid,"channels",",".join( sorted( set(map(lambda x: str(getattr(x,"channel","0")),_obj)) ,key = lambda x: int(x)) )])) _ret.append("|".join([_ssid,"avg_client_signal",str(mean(map(lambda x: getattr(x,"avg_client_signal",0),_obj))) ])) return _ret def __str__(self): _ret = ["<<>>"] for _k,_v in self.__dict__.items(): if _k.startswith("_") or type(_v) not in (str,int,float): continue _ret.append(f"{_k}|{_v}") ## check udm _has_udm = list(filter(lambda x: x.name == self.name,self._UNIFI_DEVICES)) if _has_udm: _udm = _has_udm[0] _udm._piggy_back = False _ret.append(str(_udm)) _ret.append("<<>>") _ret.append(f"{{\"unifi_device\":\"unifi-{self.type}\"}}") ## SITES ## for _site in self._UNIFI_SITES: _ret.append(str(_site)) _ret.append("<<>>") for _device in self._UNIFI_DEVICES: if _device._piggy_back: _ret.append(_device._get_short_info()) ## device list ## ssid list _ret.append("<<>>") _ret += self._get_ssidlist() if self._API.PIGGYBACK_ATTRIBUT.lower() != "none": ## PIGGYBACK DEVICES ## for _device in self._UNIFI_DEVICES: if _device._piggy_back and _device.adopted: _ret.append(str(_device)) return "\n".join(_ret) ######################################## ###### ###### A P I ###### https://ubntwiki.com/products/software/unifi-controller/api ######################################## class unifi_controller_api(object): def __init__(self,host,username,password,port,site,verify_cert,rawapi,piggybackattr,**kwargs): self.host = host self.url = f"https://{host}" if port != 443: self.url = f"https://{host}:{port}" self._verify_cert = verify_cert if not verify_cert: requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning) self.RAW_API = rawapi self.PIGGYBACK_ATTRIBUT = piggybackattr self.SITES = site.lower().split(",") if site else None self._session = requests.Session() self.check_unifi_os() self.login(username,password) def check_unifi_os(self): _response = self.request("GET",url=self.url,allow_redirects=False) _osid = re.findall('UNIFI_OS_MANIFEST.*?"id":"(\w+)"',_response.text) if _osid and _osid[0] in ("UCKP","UNVR","UDMPRO","UDMENT","UDM","UDR"): self.is_unifios = _osid[0] else: self.is_unifios = [] def get_sysinfo(self): return self.get_data("/stat/sysinfo") def get_sites(self): return self.get_data("/stat/sites",site=None) def get_portconfig(self,site): return self.get_data("/rest/portconf",site=site) def get_devices(self,site): return self.get_data("/stat/device",site=site) def login(self,username,password): if self.is_unifios: url=f"{self.url}/api/auth/login" else: url=f"{self.url}/api/login" auth = { "username" : username, "password" : password, "remember" : True } _response = self.request("POST",url=url,json=auth) if _response.status_code == 404: raise unifi_api_exception("API not Found try other Port or IP") _json = _response.json() if _json.get("meta",{}).get("rc") == "ok" or _json.get("status") == "ACTIVE": return raise unifi_api_exception("Login failed") def get_data(self,path,site="default",method="GET",**kwargs): _json = self.request(method=method,path=path,site=site,**kwargs).json() if type(_json) == dict: _meta = _json.get("meta",{}) if _meta.get("rc") == "ok": return _json.get("data",[]) if _json.get("modelKey") == "nvr": return _json if type(_json) == list: return _json raise unifi_api_exception(_meta.get("msg",_json.get("errors",repr(_json)))) def request(self,method,url=None,path=None,site=None,json=None,**kwargs): if not url: if self.is_unifios == "UNVR": url = f"{self.url}/proxy/protect/api" elif self.is_unifios: url = f"{self.url}/proxy/network/api" else: url = f"{self.url}/api" if site is not None: url += f"/s/{site}" if path is not None: url += f"{path}" _request = requests.Request(method,url,json=json) _prepped_request = self._session.prepare_request(_request) else: _request = requests.Request(method,url,json=json) _prepped_request = _request.prepare() _response = self._session.send(_prepped_request,verify=self._verify_cert,timeout=10,**kwargs) if _response.status_code == 200 and hasattr(_response,"json") and self.RAW_API: try: pprint(_response.json()) except: pass return _response ######################################## ###### ###### M A I N ###### ######################################## if __name__ == '__main__': parser = create_default_argument_parser(description=__doc__) parser.add_argument('-u', '--user', dest='username', required=True, help='User to access the DSM.') parser.add_argument('-p', '--password', dest='password', required=True, help='Password to access the DSM.') parser.add_argument('--ignore-cert', dest='verify_cert', action='store_false', help='Do not verify the SSL cert') parser.add_argument('-s','--site', dest='site', required=False, help='Site') parser.add_argument('--port', dest='port',type=int,default='443') parser.add_argument('--piggyback', dest='piggybackattr',type=str,default='name') parser.add_argument('--rawapi', dest='rawapi', action='store_true') parser.add_argument("host",type=str, help="""Host name or IP address of Unifi Controller""") args = parser.parse_args() try: _api = unifi_controller_api(**args.__dict__) except socket.error as e: pprint(e) sys.exit(1) if _api.is_unifios: labels = {"cmk/os_family": "UnifiOS"} print("<<>>") print(json.dumps(labels)) if _api.is_unifios == "UNVR": pprint(_api.get_data("/sensors",site=None)) pprint(_api.get_data("/cameras",site=None)) pprint(_api.get_data("/nvr",site=None)) sys.exit(0) ##pprint(_api.get_data("/stat/rogueap?within=4")) ##pprint(_api.get_data("/rest/user",site="default",method="GET")) ##pprint(_api.get_data("/stat/sta",site="default",method="GET")) ##sys.exit(0) _controller = unifi_controller(_API=_api) if args.rawapi == False: print(_controller)