mirror of
https://github.com/bashclub/check-unifi-controller.git
synced 2025-01-12 11:40:12 +01:00
655 lines
26 KiB
Python
655 lines
26 KiB
Python
#!/usr/bin/env python3
|
|
# -*- encoding: utf-8; py-indent-offset: 4 -*-
|
|
## MIT License
|
|
##
|
|
## Copyright (c) 2021 Bash Club
|
|
##
|
|
## Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
## of this software and associated documentation files (the "Software"), to deal
|
|
## in the Software without restriction, including without limitation the rights
|
|
## to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
## copies of the Software, and to permit persons to whom the Software is
|
|
## furnished to do so, subject to the following conditions:
|
|
##
|
|
## The above copyright notice and this permission notice shall be included in all
|
|
## copies or substantial portions of the Software.
|
|
##
|
|
## THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
## IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
## FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
## AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
## LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
## OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
|
## SOFTWARE.
|
|
|
|
###
|
|
__VERSION__ = 0.87
|
|
|
|
import sys
|
|
import socket
|
|
import re
|
|
import json
|
|
import requests
|
|
from urllib3.exceptions import InsecureRequestWarning
|
|
from statistics import mean
|
|
from collections import defaultdict
|
|
|
|
from pprint import pprint
|
|
try:
|
|
import cmk.utils.paths
|
|
AGENT_TMP_PATH = cmk.utils.paths.Path(cmk.utils.paths.tmp_dir, "agents/agent_unifi")
|
|
except ImportError:
|
|
AGENT_TMP_PATH = None
|
|
|
|
UNIFI_DEVICE_TABLE = {
|
|
'BZ2' : 'UAP',
|
|
'BZ2LR' : 'UAP-LR',
|
|
'U2HSR' : 'UAP-Outdoor+',
|
|
'U2IW' : 'UAP-IW',
|
|
'U2L48' : 'UAP-AC-LR',
|
|
'U2Lv2' : 'UAP-AC-LR',
|
|
'U2M' : 'UAP-Mini',
|
|
'U2O' : 'UAP-Outdoor',
|
|
'U2S48' : 'UAP-AC',
|
|
'U2Sv2' : 'UAP-AC',
|
|
'U5O' : 'UAP-Outdoor5',
|
|
'U7E' : 'UAP-AC',
|
|
'U7EDU' : 'UAP-AC-EDU',
|
|
'U7Ev2' : 'UAP-AC',
|
|
'U7HD' : 'UAP-AC-HD',
|
|
'U7SHD' : 'UAP-AC-SHD',
|
|
'U7NHD' : 'UAP-nanoHD',
|
|
'UFLHD' : 'UAP-FlexHD',
|
|
'UHDIW' : 'UAP-IW-HD',
|
|
'UAIW6' : 'U6-IW',
|
|
'UAE6' : 'U6-Extender',
|
|
'UAL6' : 'U6-Lite',
|
|
'UAM6' : 'U6-Mesh',
|
|
'UALR6' : 'U6-LR-EA',
|
|
'UAP6' : 'U6-LR',
|
|
'UALR6v2' : 'U6-LR',
|
|
'UALR6v3' : 'U6-LR',
|
|
'UCXG' : 'UAP-XG',
|
|
'UXSDM' : 'UWB-XG',
|
|
'UXBSDM' : 'UWB-XG-BK',
|
|
'UCMSH' : 'UAP-XG-Mesh',
|
|
'U7IW' : 'UAP-AC-IW',
|
|
'U7IWP' : 'UAP-AC-IW-Pro',
|
|
'U7MP' : 'UAP-AC-M-Pro',
|
|
'U7LR' : 'UAP-AC-LR',
|
|
'U7LT' : 'UAP-AC-Lite',
|
|
'U7O' : 'UAP-AC-Outdoor',
|
|
'U7P' : 'UAP-Pro',
|
|
'U7MSH' : 'UAP-AC-M',
|
|
'U7PG2' : 'UAP-AC-Pro',
|
|
'p2N' : 'PICOM2HP',
|
|
'UDMB' : 'UAP-BeaconHD',
|
|
'USF5P' : 'USW-Flex',
|
|
'US8' : 'US-8',
|
|
'US8P60' : 'US-8-60W',
|
|
'US8P150' : 'US-8-150W',
|
|
'S28150' : 'US-8-150W',
|
|
'USC8' : 'US-8',
|
|
'USC8P60' : 'US-8-60W',
|
|
'USC8P150' : 'US-8-150W',
|
|
'US16P150' : 'US-16-150W',
|
|
'S216150' : 'US-16-150W',
|
|
'US24' : 'US-24-G1',
|
|
'US24PRO' : 'USW-Pro-24-PoE',
|
|
'US24PRO2' : 'USW-Pro-24',
|
|
'US24P250' : 'US-24-250W',
|
|
'US24PL2' : 'US-L2-24-PoE',
|
|
'US24P500' : 'US-24-500W',
|
|
'S224250' : 'US-24-250W',
|
|
'S224500' : 'US-24-500W',
|
|
'US48' : 'US-48-G1',
|
|
'US48PRO' : 'USW-Pro-48-PoE',
|
|
'US48PRO2' : 'USW-Pro-48',
|
|
'US48P500' : 'US-48-500W',
|
|
'US48PL2' : 'US-L2-48-PoE',
|
|
'US48P750' : 'US-48-750W',
|
|
'S248500' : 'US-48-500W',
|
|
'S248750' : 'US-48-750W',
|
|
'US6XG150' : 'US-XG-6PoE',
|
|
'USMINI' : 'USW-Flex-Mini',
|
|
'USXG' : 'US-16-XG',
|
|
'USC8P450' : 'USW-Industrial',
|
|
'UDC48X6' : 'USW-Leaf',
|
|
'USL8A' : 'UniFi Switch Aggregation',
|
|
'USAGGPRO' : 'UniFi Switch Aggregation Pro',
|
|
'USL8LP' : 'USW-Lite-8-PoE',
|
|
'USL8MP' : 'USW-Mission-Critical',
|
|
'USL16P' : 'USW-16-PoE',
|
|
'USL16LP' : 'USW-Lite-16-PoE',
|
|
'USL24' : 'USW-24-G2',
|
|
'USL48' : 'USW-48-G2',
|
|
'USL24P' : 'USW-24-PoE',
|
|
'USL48P' : 'USW-48-PoE',
|
|
'UGW3' : 'USG-3P',
|
|
'UGW4' : 'USG-Pro-4',
|
|
'UGWHD4' : 'USG',
|
|
'UGWXG' : 'USG-XG-8',
|
|
'UDM' : 'UDM',
|
|
'UDMSE' : 'UDM-SE',
|
|
'UDMPRO' : 'UDM-Pro',
|
|
'UP4' : 'UVP-X',
|
|
'UP5' : 'UVP',
|
|
'UP5t' : 'UVP-Pro',
|
|
'UP7' : 'UVP-Executive',
|
|
'UP5c' : 'UVP',
|
|
'UP5tc' : 'UVP-Pro',
|
|
'UP7c' : 'UVP-Executive',
|
|
'UCK' : 'UCK',
|
|
'UCK-v2' : 'UCK',
|
|
'UCK-v3' : 'UCK',
|
|
'UCKG2' : 'UCK-G2',
|
|
'UCKP' : 'UCK-G2-Plus',
|
|
'UASXG' : 'UAS-XG',
|
|
'ULTE' : 'U-LTE',
|
|
'ULTEPUS' : 'U-LTE-Pro',
|
|
'ULTEPEU' : 'U-LTE-Pro',
|
|
'UP1' : 'USP-Plug',
|
|
'UP6' : 'USP-Strip',
|
|
'USPPDUP' : 'USP - Power Distribution Unit Pro',
|
|
'USPRPS' : 'USP-RPS',
|
|
'US624P' : 'UniFi6 Switch 24',
|
|
'UBB' : 'UBB',
|
|
'UXGPRO' : 'UniFi NeXt-Gen Gateway PRO'
|
|
}
|
|
|
|
try:
|
|
from cmk.special_agents.utils.argument_parsing import create_default_argument_parser
|
|
#from check_api import LOGGER ##/TODO
|
|
except ImportError:
|
|
from argparse import ArgumentParser as create_default_argument_parser
|
|
|
|
class unifi_api_exception(Exception):
|
|
pass
|
|
|
|
class unifi_object(object):
|
|
def __init__(self,**kwargs):
|
|
for _k,_v in kwargs.items():
|
|
_k = _k.replace("-","_")
|
|
if type(_v) == bool:
|
|
_v = int(_v)
|
|
setattr(self,_k,_v)
|
|
|
|
self._PARENT = kwargs.get("_PARENT",object)
|
|
if hasattr(self._PARENT,"_UNIFICONTROLLER"):
|
|
self._UNIFICONTROLLER = self._PARENT._UNIFICONTROLLER
|
|
self._API = self._PARENT._API
|
|
if hasattr(self,"_init"):
|
|
self._init()
|
|
|
|
def __repr__(self):
|
|
return repr([(_k,_v) for _k,_v in self.__dict__.items() if type(_v) in (int,str)])
|
|
|
|
########################################
|
|
######
|
|
###### S S I D
|
|
######
|
|
########################################
|
|
class unifi_network_ssid(unifi_object):
|
|
def _init(self):
|
|
self._UNIFICONTROLLER._UNIFI_SSIDS.append(self)
|
|
self._UNIFI_SITE = self._PARENT._PARENT
|
|
for _k,_v in getattr(self,"reasons_bar_chart_now",{}).items():
|
|
setattr(self,_k,_v)
|
|
setattr(self,f"{self.radio}_num_sta",self.num_sta)
|
|
setattr(self,f"{self.radio}_tcp_packet_loss",self.tcp_packet_loss)
|
|
setattr(self,f"{self.radio}_wifi_retries",self.wifi_retries)
|
|
setattr(self,f"{self.radio}_wifi_latency",self.wifi_latency)
|
|
setattr(self,f"{self.radio}_avg_client_signal",self.avg_client_signal)
|
|
def __str__(self):
|
|
_ret = []
|
|
_unwanted = ["essid","radio","id","t","name","radio_name","wlanconf_id","is_wep","up","site_id","ap_mac","state",
|
|
"na_num_sta","ng_num_sta","ng_tcp_packet_loss","na_tcp_packet_loss","na_wifi_retries","ng_wifi_retries",
|
|
"na_wifi_latency","ng_wifi_latency","na_avg_client_signal","ng_avg_client_signal"
|
|
]
|
|
for _k,_v in self.__dict__.items():
|
|
if _k.startswith("_") or _k in _unwanted or type(_v) not in (str,int,float):
|
|
continue
|
|
_ret.append(f"{self.essid}|{self.radio}_{_k}|{_v}")
|
|
return "\n".join(_ret)
|
|
|
|
########################################
|
|
######
|
|
###### R A D I O
|
|
######
|
|
########################################
|
|
class unifi_network_radio(unifi_object):
|
|
def _update_stats(self,stats):
|
|
_prefixlen = len(self.name) +1
|
|
for _k,_v in stats.items():
|
|
if _k.startswith(self.name):
|
|
if type(_v) == float:
|
|
_v = int(_v)
|
|
setattr(self,_k[_prefixlen:],_v)
|
|
def __str__(self):
|
|
_ret = []
|
|
_unwanted = ["name","ast_be_xmit","extchannel","cu_total","cu_self_rx","cu_self_tx"]
|
|
for _k,_v in self.__dict__.items():
|
|
if _k.startswith("_") or _k in _unwanted or type(_v) not in (str,int,float):
|
|
continue
|
|
_ret.append(f"{self.name}|{_k}|{_v}")
|
|
return "\n".join(_ret)
|
|
|
|
########################################
|
|
######
|
|
###### P O R T
|
|
######
|
|
########################################
|
|
class unifi_network_port(unifi_object):
|
|
def _init(self):
|
|
self.oper_status = self._get_state(getattr(self,"up",None))
|
|
self.admin_status = self._get_state(getattr(self,"enable",None))
|
|
if hasattr(self,"ifname"): ## GW / UDM Names
|
|
_name = list(filter(lambda x: x.get("ifname") == self.ifname,self._PARENT.ethernet_overrides))
|
|
if _name:
|
|
_name = _name[0]
|
|
if getattr(self,"name",None) and _name.get("networkgroup") != "LAN":
|
|
self.name = _name.get("networkgroup","unkn")
|
|
else:
|
|
self.name = self.ifname
|
|
if not hasattr(self,"port_idx") and hasattr(self,"ifname"):
|
|
self.port_idx = int(self.ifname[-1])+1 ## ethX
|
|
|
|
self.portconf = self._PARENT._PARENT._PORTCONFIGS.get(getattr(self,"portconf_id",None))
|
|
|
|
|
|
def _get_state(self,state):
|
|
return {
|
|
"1" : 1, ## up
|
|
"0" : 2 ## down
|
|
}.get(str(state),4) ##unknown
|
|
def __str__(self):
|
|
_ret = []
|
|
_unwanted = ["up","enabled","media","anonymous_id","www_gw_mac","wan_gw_mac","attr_hidden_id","masked","flowctrl_tx","flowctrl_rx","portconf_id","speed_caps"]
|
|
for _k,_v in self.__dict__.items():
|
|
if _k.startswith("_") or _k in _unwanted or type(_v) not in (str,int,float):
|
|
continue
|
|
_ret.append(f"{self.port_idx}|{_k}|{_v}")
|
|
return "\n".join(_ret)
|
|
|
|
########################################
|
|
######
|
|
###### D E V I C E
|
|
######
|
|
########################################
|
|
class unifi_device(unifi_object):
|
|
def _init(self):
|
|
if not hasattr(self,"name"):
|
|
_mac_end = self.mac.replace(":","")[-4:]
|
|
self.name = f"{self.model}:{_mac_end}"
|
|
self._piggy_back = True
|
|
self._PARENT._SITE_DEVICES.append(self)
|
|
self._NETWORK_PORTS = []
|
|
self._NETWORK_RADIO = []
|
|
self._NETWORK_SSIDS = []
|
|
|
|
for _k,_v in getattr(self,"sys_stats",{}).items():
|
|
_k = _k.replace("-","_")
|
|
setattr(self,_k,_v)
|
|
self.model_name = UNIFI_DEVICE_TABLE.get(self.model)
|
|
if self.type in ("ugw","udm"):
|
|
## change ip to local ip
|
|
self.wan_ip = self.ip
|
|
self.ip = self.connect_request_ip
|
|
|
|
if getattr(self,"speedtest_status_saved",False):
|
|
_speedtest = getattr(self,"speedtest_status",{})
|
|
self.speedtest_time = int(_speedtest.get("rundate","0"))
|
|
self.speedtest_status = int(_speedtest.get("status_summary","0"))
|
|
self.speedtest_ping = round(_speedtest.get("latency",-1),1)
|
|
self.speedtest_download = round(_speedtest.get("xput_download",0.0),1)
|
|
self.speedtest_upload = round(_speedtest.get("xput_upload",0.0),1)
|
|
|
|
_temp = list(map(lambda x: x.get("value",0),getattr(self,"temperatures",[])))
|
|
if _temp:
|
|
self.general_temperature = "{0:.1f}".format(mean(_temp))
|
|
|
|
for _port in getattr(self,"port_table",[]):
|
|
self._NETWORK_PORTS.append(unifi_network_port(_PARENT=self,**_port))
|
|
|
|
for _radio in getattr(self,"radio_table_stats",[]):
|
|
_radio_obj = unifi_network_radio(_PARENT=self,**_radio)
|
|
_radio_obj._update_stats(getattr(self,"stat",{}).get("ap",{}))
|
|
self._NETWORK_RADIO.append(_radio_obj)
|
|
|
|
for _ssid in getattr(self,"vap_table",[]):
|
|
self._NETWORK_SSIDS.append(unifi_network_ssid(_PARENT=self,**_ssid))
|
|
|
|
def _get_uplink(self):
|
|
if type(getattr(self,"uplink",None)) == dict:
|
|
self.uplink_up = int(self.uplink.get("up","0"))
|
|
self.uplink_device = self._UNIFICONTROLLER._get_device_by_mac(self.uplink.get("uplink_mac"))
|
|
self.uplink_remote_port = self.uplink.get("uplink_remote_port")
|
|
self.uplink_type = self.uplink.get("type")
|
|
|
|
def _get_short_info(self):
|
|
_ret = []
|
|
_wanted = ["version","ip","mac","serial","model","model_name","uptime","upgradeable","num_sta","adopted","state"]
|
|
for _k,_v in self.__dict__.items():
|
|
if _k.startswith("_") or _k not in _wanted or type(_v) not in (str,int,float):
|
|
continue
|
|
_ret.append(f"{self.name}|{_k}|{_v}")
|
|
return "\n".join(_ret)
|
|
|
|
def __str__(self):
|
|
if self._piggy_back:
|
|
_piggybackname = getattr(self,self._API.PIGGYBACK_ATTRIBUT,self.name)
|
|
_ret = [f"<<<<{_piggybackname}>>>>"]
|
|
else:
|
|
_ret = []
|
|
_ret.append("<<<unifi_device:sep(124)>>>")
|
|
_unwanted = ["anon_id","device_id","site_id","known_cfgversion","cfgversion","syslog_key","has_speaker","has_eth1",
|
|
"next_interval","next_heartbeat","next_heartbeat_at","guest_token","connect_request_ip","connect_request_port",
|
|
"start_connected_millis","start_disconnected_millis","wlangroup_id_na","wlangroup_id_ng","uplink_down_timeout"
|
|
"unsupported_reason","connected_at","provisioned_at","fw_caps","hw_caps","manufacturer_id","use_custom_config",
|
|
"led_override","led_override_color","led_override_color_brightness","sys_error_caps","adoptable_when_upgraded",
|
|
"mesh_uplink_1","mesh_uplink_1","considered_lost_at","outdoor_mode_override","unsupported_reason","architecture",
|
|
"kernel_version","required_version","prev_non_busy_state","has_fan","has_temperature","flowctrl_enabled","hash_id",
|
|
"speedtest-status-saved","usg_caps","two_phase_adopt","rollupgrade","locating","dot1x_portctrl_enabled",
|
|
"lcm_idle_timeout_override","lcm_brightness_override","uplink_depth","mesh_sta_vap_enabled","mesh_uplink_2",
|
|
"lcm_tracker_enabled","model_incompatible","model_in_lts","model_in_eol","country_code","wifi_caps",
|
|
"meshv3_peer_mac","element_peer_mac","vwireEnabled","hide_ch_width","x_authkey","x_ssh_hostkey_fingerprint",
|
|
"x_fingerprint","x_inform_authkey","op_mode","uptime"
|
|
]
|
|
for _k,_v in self.__dict__.items():
|
|
if _k.startswith("_") or _k in _unwanted or type(_v) not in (str,int,float):
|
|
continue
|
|
_ret.append(f"{_k}|{_v}")
|
|
|
|
_ret.append("<<<labels:sep(0)>>>")
|
|
_ret.append(f"{{\"unifi_device\":\"unifi-{self.type}\"}}")
|
|
_uptime = getattr(self,"uptime",None)
|
|
if _uptime:
|
|
_ret.append("<<<uptime>>>")
|
|
_ret.append(str(_uptime))
|
|
if self._NETWORK_PORTS:
|
|
_ret += ["","<<<unifi_network_ports:sep(124)>>>"] + [str(_port) for _port in self._NETWORK_PORTS]
|
|
if self._NETWORK_RADIO:
|
|
_ret += ["","<<<unifi_network_radios:sep(124)>>>"] + [str(_radio) for _radio in self._NETWORK_RADIO]
|
|
|
|
if self._NETWORK_SSIDS:
|
|
_ret += ["","<<<unifi_network_ssids:sep(124)>>>"] + [str(_ssid) for _ssid in sorted(self._NETWORK_SSIDS,key=lambda x: x.essid)]
|
|
return "\n".join(_ret)
|
|
|
|
########################################
|
|
######
|
|
###### S I T E
|
|
######
|
|
########################################
|
|
class unifi_site(unifi_object):
|
|
def _init(self):
|
|
for _subsys in self.health:
|
|
_name = _subsys.get("subsystem")
|
|
for _k,_v in _subsys.items():
|
|
_k = _k.replace("-","_")
|
|
if _k == "subsystem" or type(_v) not in (str,int,float):
|
|
continue
|
|
#print(f"{_k}:{_v}")
|
|
setattr(self,f"{_name}_{_k}",_v)
|
|
|
|
##pprint(_api.get_data("/stat/rogueap"))
|
|
self._SITE_DEVICES = []
|
|
self._PORTCONFIGS = {}
|
|
self._get_portconfig()
|
|
self._get_devices()
|
|
_satisfaction = list(filter(
|
|
lambda x: x != None,map(
|
|
lambda x: getattr(x,"satisfaction",None),self._SITE_DEVICES
|
|
)
|
|
))
|
|
self.satisfaction = max(0,int(mean(_satisfaction)) if _satisfaction else 0)
|
|
|
|
def _get_portconfig(self):
|
|
_data = self._API.get_portconfig(site=self.name)
|
|
for _config in _data:
|
|
self._PORTCONFIGS[_config["_id"]] = _config.get("name")
|
|
|
|
def _get_devices(self):
|
|
_data = self._API.get_devices(site=self.name)
|
|
for _device in _data:
|
|
self._UNIFICONTROLLER._UNIFI_DEVICES.append(unifi_device(_PARENT=self,**_device))
|
|
|
|
def __str__(self):
|
|
_ret = ["<<<unifi_sites:sep(124)>>>"]
|
|
_unwanted = ["name","anonymous_id","www_gw_mac","wan_gw_mac","attr_hidden_id","attr_no_delete",""]
|
|
for _k,_v in self.__dict__.items():
|
|
if _k.startswith("_") or _k in _unwanted or type(_v) not in (str,int,float):
|
|
continue
|
|
_ret.append(f"{self.name}|{_k}|{_v}")
|
|
return "\n".join(_ret)
|
|
|
|
########################################
|
|
######
|
|
###### C O N T R O L L E R
|
|
######
|
|
########################################
|
|
class unifi_controller(unifi_object):
|
|
def _init(self):
|
|
self._UNIFICONTROLLER = self
|
|
self._UNIFI_SITES = []
|
|
self._UNIFI_DEVICES = []
|
|
self._UNIFI_SSIDS = []
|
|
self._get_systemhealth()
|
|
self._get_sites()
|
|
for _dev in self._UNIFI_DEVICES:
|
|
_dev._get_uplink()
|
|
if hasattr(self,"cloudkey_version"):
|
|
self.cloudkey_version = re.sub(".*?v(\d+\.\d+\.\d+\.[a-z0-9]+).*","\\1",self.cloudkey_version)
|
|
self.type = getattr(self,"ubnt_device_type","unifi-sw-controller")
|
|
self.controller_version = self.version
|
|
delattr(self,"version")
|
|
|
|
def _get_systemhealth(self):
|
|
_data = self._API.get_sysinfo()
|
|
_wanted = ["timezone","autobackup","version","previous_version","update_available","hostname","name","uptime","cloudkey_update_available","cloudkey_update_version","cloudkey_version","ubnt_device_type","udm_version","udm_update_version","udm_update_available"]
|
|
if _data:
|
|
for _k,_v in _data[0].items():
|
|
if _k in _wanted:
|
|
if type(_v) == bool:
|
|
_v = int(_v)
|
|
setattr(self,_k,_v)
|
|
|
|
def _get_device_by_mac(self,mac):
|
|
try:
|
|
return next(filter(lambda x: x.mac == mac,self._UNIFI_DEVICES)).name
|
|
except StopIteration:
|
|
return None
|
|
|
|
def _get_sites(self):
|
|
_data = self._API.get_sites()
|
|
for _site in _data:
|
|
if self._API.SITES and _site.get("name") not in self._API.SITES and _site.get("desc").lower() not in self._API.SITES:
|
|
continue
|
|
self._UNIFI_SITES.append(unifi_site(_PARENT=self,**_site))
|
|
|
|
def _get_ssidlist(self):
|
|
_dict = defaultdict(list)
|
|
for _ssid in self._UNIFI_SSIDS:
|
|
_dict[f"{_ssid.essid}@{_ssid._UNIFI_SITE.desc}"].append(_ssid)
|
|
|
|
_ret = []
|
|
for _ssid,_obj in _dict.items():
|
|
#pprint(_obj)
|
|
for _key in ("num_sta","ng_num_sta","na_num_sta","ng_tcp_packet_loss","na_tcp_packet_loss","ng_wifi_retries","na_wifi_retries","ng_wifi_latency","na_wifi_latency"):
|
|
_ret.append("|".join([_ssid,_key,str(sum(map(lambda x: getattr(x,_key,0),_obj)))]))
|
|
|
|
_signals = list(map(lambda x: getattr(x,"ng_avg_client_signal",0),filter(lambda x: x.radio == "ng",_obj)))
|
|
_ret.append("|".join([_ssid,"ng_avg_client_signal",str(mean(_signals if _signals else [0]))]))
|
|
_signals = list(map(lambda x: getattr(x,"na_avg_client_signal",0),filter(lambda x: x.radio == "na",_obj)))
|
|
_ret.append("|".join([_ssid,"na_avg_client_signal",str(mean(_signals if _signals else [0]))]))
|
|
_ret.append("|".join([_ssid,"channels",",".join(
|
|
sorted(
|
|
set(map(lambda x: str(getattr(x,"channel","0")),_obj))
|
|
,key = lambda x: int(x))
|
|
)]))
|
|
_ret.append("|".join([_ssid,"avg_client_signal",str(mean(map(lambda x: getattr(x,"avg_client_signal",0),_obj))) ]))
|
|
return _ret
|
|
|
|
def __str__(self):
|
|
_ret = ["<<<unifi_controller:sep(124)>>>"]
|
|
for _k,_v in self.__dict__.items():
|
|
if _k.startswith("_") or type(_v) not in (str,int,float):
|
|
continue
|
|
_ret.append(f"{_k}|{_v}")
|
|
|
|
## check udm
|
|
_has_udm = list(filter(lambda x: x.name == self.name,self._UNIFI_DEVICES))
|
|
if _has_udm:
|
|
_udm = _has_udm[0]
|
|
_udm._piggy_back = False
|
|
_ret.append(str(_udm))
|
|
|
|
_ret.append("<<<labels:sep(0)>>>")
|
|
_ret.append(f"{{\"unifi_device\":\"unifi-{self.type}\"}}")
|
|
|
|
## SITES ##
|
|
for _site in self._UNIFI_SITES:
|
|
_ret.append(str(_site))
|
|
|
|
_ret.append("<<<unifi_device_shortlist:sep(124)>>>")
|
|
for _device in self._UNIFI_DEVICES:
|
|
if _device._piggy_back:
|
|
_ret.append(_device._get_short_info())
|
|
## device list
|
|
|
|
## ssid list
|
|
_ret.append("<<<unifi_ssid_list:sep(124)>>>")
|
|
_ret += self._get_ssidlist()
|
|
|
|
if self._API.PIGGYBACK_ATTRIBUT.lower() != "none":
|
|
## PIGGYBACK DEVICES ##
|
|
for _device in self._UNIFI_DEVICES:
|
|
if _device._piggy_back and _device.adopted:
|
|
_ret.append(str(_device))
|
|
return "\n".join(_ret)
|
|
|
|
|
|
########################################
|
|
######
|
|
###### A P I
|
|
###### https://ubntwiki.com/products/software/unifi-controller/api
|
|
########################################
|
|
class unifi_controller_api(object):
|
|
def __init__(self,host,username,password,port,site,verify_cert,rawapi,piggybackattr,**kwargs):
|
|
self.host = host
|
|
self.url = f"https://{host}"
|
|
if port != 443:
|
|
self.url = f"https://{host}:{port}"
|
|
|
|
self._verify_cert = verify_cert
|
|
if not verify_cert:
|
|
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
|
|
self.RAW_API = rawapi
|
|
self.PIGGYBACK_ATTRIBUT = piggybackattr
|
|
self.SITES = site.lower().split(",") if site else None
|
|
self._session = requests.Session()
|
|
self.check_unifi_os()
|
|
self.login(username,password)
|
|
|
|
def check_unifi_os(self):
|
|
_response = self.request("GET",url=self.url,allow_redirects=False)
|
|
self.is_unifios= _response.status_code == 200 and _response.headers.get("x-csrf-token")
|
|
|
|
def get_sysinfo(self):
|
|
return self.get_data("/stat/sysinfo")
|
|
|
|
def get_sites(self):
|
|
return self.get_data("/stat/sites",site=None)
|
|
|
|
def get_portconfig(self,site):
|
|
return self.get_data("/rest/portconf",site=site)
|
|
|
|
def get_devices(self,site):
|
|
return self.get_data("/stat/device",site=site)
|
|
|
|
def login(self,username,password):
|
|
if self.is_unifios:
|
|
url=f"{self.url}/api/auth/login"
|
|
else:
|
|
url=f"{self.url}/api/login"
|
|
auth = {
|
|
"username" : username,
|
|
"password" : password,
|
|
"remember" : True
|
|
}
|
|
_response = self.request("POST",url=url,json=auth)
|
|
if _response.status_code == 404:
|
|
raise unifi_api_exception("API not Found try other Port or IP")
|
|
_json = _response.json()
|
|
if _json.get("meta",{}).get("rc") == "ok" or _json.get("status") == "ACTIVE":
|
|
return
|
|
raise unifi_api_exception("Login failed")
|
|
|
|
def get_data(self,path,site="default",method="GET",**kwargs):
|
|
_json = self.request(method=method,path=path,site=site,**kwargs).json()
|
|
_meta = _json.get("meta",{})
|
|
if _meta.get("rc") == "ok":
|
|
return _json.get("data",[])
|
|
raise unifi_api_exception(_meta.get("msg",_json.get("errors",repr(_json))))
|
|
|
|
def request(self,method,url=None,path=None,site=None,json=None,**kwargs):
|
|
if not url:
|
|
if self.is_unifios:
|
|
url = f"{self.url}/proxy/network/api"
|
|
else:
|
|
url = f"{self.url}/api"
|
|
if site is not None:
|
|
url += f"/s/{site}"
|
|
if path is not None:
|
|
url += f"{path}"
|
|
_request = requests.Request(method,url,json=json)
|
|
_prepped_request = self._session.prepare_request(_request)
|
|
else:
|
|
_request = requests.Request(method,url,json=json)
|
|
_prepped_request = _request.prepare()
|
|
_response = self._session.send(_prepped_request,verify=self._verify_cert,timeout=10,**kwargs)
|
|
if _response.status_code == 200 and hasattr(_response,"json") and self.RAW_API:
|
|
try:
|
|
pprint(_response.json())
|
|
except:
|
|
pass
|
|
return _response
|
|
|
|
########################################
|
|
######
|
|
###### M A I N
|
|
######
|
|
########################################
|
|
if __name__ == '__main__':
|
|
parser = create_default_argument_parser(description=__doc__)
|
|
parser.add_argument('-u', '--user', dest='username', required=True,
|
|
help='User to access the DSM.')
|
|
parser.add_argument('-p', '--password', dest='password', required=True,
|
|
help='Password to access the DSM.')
|
|
parser.add_argument('--ignore-cert', dest='verify_cert', action='store_false',
|
|
help='Do not verify the SSL cert')
|
|
parser.add_argument('-s','--site', dest='site', required=False,
|
|
help='Site')
|
|
parser.add_argument('--port', dest='port',type=int,default='443')
|
|
parser.add_argument('--piggyback', dest='piggybackattr',type=str,default='name')
|
|
parser.add_argument('--rawapi', dest='rawapi', action='store_true')
|
|
parser.add_argument("host",type=str,
|
|
help="""Host name or IP address of Unifi Controller""")
|
|
args = parser.parse_args()
|
|
print("<<<check_mk>>>")
|
|
print(f"Version: {__VERSION__}")
|
|
try:
|
|
_api = unifi_controller_api(**args.__dict__)
|
|
except socket.error as e:
|
|
pprint(e)
|
|
sys.exit(1)
|
|
|
|
if _api.is_unifios:
|
|
print("AgentOS: UnifiOS")
|
|
##pprint(_api.get_data("/stat/rogueap?within=4"))
|
|
##pprint(_api.get_data("/rest/user",site="default",method="GET"))
|
|
##pprint(_api.get_data("/stat/sta",site="default",method="GET"))
|
|
##sys.exit(0)
|
|
_controller = unifi_controller(_API=_api)
|
|
if args.rawapi == False:
|
|
print(_controller)
|