mirror of
				https://github.com/bashclub/check-unifi-controller.git
				synced 2025-11-03 22:32:26 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			652 lines
		
	
	
		
			26 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			652 lines
		
	
	
		
			26 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
#!/usr/bin/env python3
 | 
						|
# -*- encoding: utf-8; py-indent-offset: 4 -*-
 | 
						|
##  MIT License
 | 
						|
##  
 | 
						|
##  Copyright (c) 2021 Bash Club
 | 
						|
##  
 | 
						|
##  Permission is hereby granted, free of charge, to any person obtaining a copy
 | 
						|
##  of this software and associated documentation files (the "Software"), to deal
 | 
						|
##  in the Software without restriction, including without limitation the rights
 | 
						|
##  to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 | 
						|
##  copies of the Software, and to permit persons to whom the Software is
 | 
						|
##  furnished to do so, subject to the following conditions:
 | 
						|
##  
 | 
						|
##  The above copyright notice and this permission notice shall be included in all
 | 
						|
##  copies or substantial portions of the Software.
 | 
						|
##  
 | 
						|
##  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 | 
						|
##  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 | 
						|
##  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 | 
						|
##  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 | 
						|
##  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 | 
						|
##  OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 | 
						|
##  SOFTWARE.
 | 
						|
 | 
						|
### 
 | 
						|
__VERSION__ = 0.83
 | 
						|
 | 
						|
import sys
 | 
						|
import socket
 | 
						|
import re
 | 
						|
import json
 | 
						|
import requests
 | 
						|
from urllib3.exceptions import InsecureRequestWarning
 | 
						|
from statistics import mean
 | 
						|
from collections import defaultdict
 | 
						|
 | 
						|
from pprint import pprint
 | 
						|
try:
 | 
						|
    import cmk.utils.paths
 | 
						|
    AGENT_TMP_PATH = cmk.utils.paths.Path(cmk.utils.paths.tmp_dir, "agents/agent_unifi")
 | 
						|
except ImportError:
 | 
						|
    AGENT_TMP_PATH = None
 | 
						|
 | 
						|
UNIFI_DEVICE_TABLE = {
 | 
						|
    'BZ2'       : 'UAP',
 | 
						|
    'BZ2LR'     : 'UAP-LR',
 | 
						|
    'U2HSR'     : 'UAP-Outdoor+',
 | 
						|
    'U2IW'      : 'UAP-IW',
 | 
						|
    'U2L48'     : 'UAP-AC-LR',
 | 
						|
    'U2Lv2'     : 'UAP-AC-LR',
 | 
						|
    'U2M'       : 'UAP-Mini',
 | 
						|
    'U2O'       : 'UAP-Outdoor',
 | 
						|
    'U2S48'     : 'UAP-AC',
 | 
						|
    'U2Sv2'     : 'UAP-AC',
 | 
						|
    'U5O'       : 'UAP-Outdoor5',
 | 
						|
    'U7E'       : 'UAP-AC',
 | 
						|
    'U7EDU'     : 'UAP-AC-EDU',
 | 
						|
    'U7Ev2'     : 'UAP-AC',
 | 
						|
    'U7HD'      : 'UAP-AC-HD',
 | 
						|
    'U7SHD'     : 'UAP-AC-SHD',
 | 
						|
    'U7NHD'     : 'UAP-nanoHD',
 | 
						|
    'UFLHD'     : 'UAP-FlexHD',
 | 
						|
    'UHDIW'     : 'UAP-IW-HD',
 | 
						|
    'UAIW6'     : 'U6-IW',
 | 
						|
    'UAE6'      : 'U6-Extender',
 | 
						|
    'UAL6'      : 'U6-Lite',
 | 
						|
    'UAM6'      : 'U6-Mesh',
 | 
						|
    'UALR6'     : 'U6-LR-EA',
 | 
						|
    'UAP6'      : 'U6-LR',
 | 
						|
    'UALR6v2'   : 'U6-LR',
 | 
						|
    'UALR6v3'   : 'U6-LR',
 | 
						|
    'UCXG'      : 'UAP-XG',
 | 
						|
    'UXSDM'     : 'UWB-XG',
 | 
						|
    'UXBSDM'    : 'UWB-XG-BK',
 | 
						|
    'UCMSH'     : 'UAP-XG-Mesh',
 | 
						|
    'U7IW'      : 'UAP-AC-IW',
 | 
						|
    'U7IWP'     : 'UAP-AC-IW-Pro',
 | 
						|
    'U7MP'      : 'UAP-AC-M-Pro',
 | 
						|
    'U7LR'      : 'UAP-AC-LR',
 | 
						|
    'U7LT'      : 'UAP-AC-Lite',
 | 
						|
    'U7O'       : 'UAP-AC-Outdoor',
 | 
						|
    'U7P'       : 'UAP-Pro',
 | 
						|
    'U7MSH'     : 'UAP-AC-M',
 | 
						|
    'U7PG2'     : 'UAP-AC-Pro',
 | 
						|
    'p2N'       : 'PICOM2HP',
 | 
						|
    'UDMB'      : 'UAP-BeaconHD',
 | 
						|
    'USF5P'     : 'USW-Flex',
 | 
						|
    'US8'       : 'US-8',
 | 
						|
    'US8P60'    : 'US-8-60W',
 | 
						|
    'US8P150'   : 'US-8-150W',
 | 
						|
    'S28150'    : 'US-8-150W',
 | 
						|
    'USC8'      : 'US-8',
 | 
						|
    'USC8P60'   : 'US-8-60W',
 | 
						|
    'USC8P150'  : 'US-8-150W',
 | 
						|
    'US16P150'  : 'US-16-150W',
 | 
						|
    'S216150'   : 'US-16-150W',
 | 
						|
    'US24'      : 'US-24-G1',
 | 
						|
    'US24PRO'   : 'USW-Pro-24-PoE',
 | 
						|
    'US24PRO2'  : 'USW-Pro-24',
 | 
						|
    'US24P250'  : 'US-24-250W',
 | 
						|
    'US24PL2'   : 'US-L2-24-PoE',
 | 
						|
    'US24P500'  : 'US-24-500W',
 | 
						|
    'S224250'   : 'US-24-250W',
 | 
						|
    'S224500'   : 'US-24-500W',
 | 
						|
    'US48'      : 'US-48-G1',
 | 
						|
    'US48PRO'   : 'USW-Pro-48-PoE',
 | 
						|
    'US48PRO2'  : 'USW-Pro-48',
 | 
						|
    'US48P500'  : 'US-48-500W',
 | 
						|
    'US48PL2'   : 'US-L2-48-PoE',
 | 
						|
    'US48P750'  : 'US-48-750W',
 | 
						|
    'S248500'   : 'US-48-500W',
 | 
						|
    'S248750'   : 'US-48-750W',
 | 
						|
    'US6XG150'  : 'US-XG-6PoE',
 | 
						|
    'USMINI'    : 'USW-Flex-Mini',
 | 
						|
    'USXG'      : 'US-16-XG',
 | 
						|
    'USC8P450'  : 'USW-Industrial',
 | 
						|
    'UDC48X6'   : 'USW-Leaf',
 | 
						|
    'USL8A'     : 'UniFi Switch Aggregation',
 | 
						|
    'USAGGPRO'  : 'UniFi Switch Aggregation Pro',
 | 
						|
    'USL8LP'    : 'USW-Lite-8-PoE',
 | 
						|
    'USL8MP'    : 'USW-Mission-Critical',
 | 
						|
    'USL16P'    : 'USW-16-PoE',
 | 
						|
    'USL16LP'   : 'USW-Lite-16-PoE',
 | 
						|
    'USL24'     : 'USW-24-G2',
 | 
						|
    'USL48'     : 'USW-48-G2',
 | 
						|
    'USL24P'    : 'USW-24-PoE',
 | 
						|
    'USL48P'    : 'USW-48-PoE',
 | 
						|
    'UGW3'      : 'USG-3P',
 | 
						|
    'UGW4'      : 'USG-Pro-4',
 | 
						|
    'UGWHD4'    : 'USG',
 | 
						|
    'UGWXG'     : 'USG-XG-8',
 | 
						|
    'UDM'       : 'UDM',
 | 
						|
    'UDMSE'     : 'UDM-SE',
 | 
						|
    'UDMPRO'    : 'UDM-Pro',
 | 
						|
    'UP4'       : 'UVP-X',
 | 
						|
    'UP5'       : 'UVP',
 | 
						|
    'UP5t'      : 'UVP-Pro',
 | 
						|
    'UP7'       : 'UVP-Executive',
 | 
						|
    'UP5c'      : 'UVP',
 | 
						|
    'UP5tc'     : 'UVP-Pro',
 | 
						|
    'UP7c'      : 'UVP-Executive',
 | 
						|
    'UCK'       : 'UCK',
 | 
						|
    'UCK-v2'    : 'UCK',
 | 
						|
    'UCK-v3'    : 'UCK',
 | 
						|
    'UCKG2'     : 'UCK-G2',
 | 
						|
    'UCKP'      : 'UCK-G2-Plus',
 | 
						|
    'UASXG'     : 'UAS-XG',
 | 
						|
    'ULTE'      : 'U-LTE',
 | 
						|
    'ULTEPUS'   : 'U-LTE-Pro',
 | 
						|
    'ULTEPEU'   : 'U-LTE-Pro',
 | 
						|
    'UP1'       : 'USP-Plug',
 | 
						|
    'UP6'       : 'USP-Strip',
 | 
						|
    'USPPDUP'   : 'USP - Power Distribution Unit Pro',
 | 
						|
    'USPRPS'    : 'USP-RPS',
 | 
						|
    'US624P'    : 'UniFi6 Switch 24',
 | 
						|
    'UBB'       : 'UBB',
 | 
						|
    'UXGPRO'    : 'UniFi NeXt-Gen Gateway PRO'
 | 
						|
}
 | 
						|
 | 
						|
try:
 | 
						|
    from cmk.special_agents.utils.argument_parsing import create_default_argument_parser
 | 
						|
    #from check_api import LOGGER ##/TODO
 | 
						|
except ImportError:
 | 
						|
    from argparse import ArgumentParser as create_default_argument_parser
 | 
						|
 | 
						|
class unifi_api_exception(Exception):
 | 
						|
    pass
 | 
						|
 | 
						|
class unifi_object(object):
 | 
						|
    def __init__(self,**kwargs):
 | 
						|
        for _k,_v in kwargs.items():
 | 
						|
            _k = _k.replace("-","_")
 | 
						|
            if type(_v) == bool:
 | 
						|
                _v = int(_v)
 | 
						|
            setattr(self,_k,_v)
 | 
						|
 | 
						|
        self._PARENT = kwargs.get("_PARENT",object)
 | 
						|
        if hasattr(self._PARENT,"_UNIFICONTROLLER"):
 | 
						|
            self._UNIFICONTROLLER = self._PARENT._UNIFICONTROLLER
 | 
						|
            self._API = self._PARENT._API
 | 
						|
        if hasattr(self,"_init"):
 | 
						|
            self._init()
 | 
						|
 | 
						|
    def __repr__(self):
 | 
						|
        return repr([(_k,_v) for _k,_v in self.__dict__.items() if type(_v) in (int,str)])
 | 
						|
 | 
						|
########################################
 | 
						|
######
 | 
						|
######      S S I D
 | 
						|
######
 | 
						|
########################################
 | 
						|
class unifi_network_ssid(unifi_object):
 | 
						|
    def _init(self):
 | 
						|
        self._UNIFICONTROLLER._UNIFI_SSIDS.append(self)
 | 
						|
        self._UNIFI_SITE = self._PARENT._PARENT
 | 
						|
        for _k,_v in getattr(self,"reasons_bar_chart_now",{}).items():
 | 
						|
            setattr(self,_k,_v)
 | 
						|
        setattr(self,f"{self.radio}_num_sta",self.num_sta)
 | 
						|
        setattr(self,f"{self.radio}_tcp_packet_loss",self.tcp_packet_loss)
 | 
						|
        setattr(self,f"{self.radio}_wifi_retries",self.wifi_retries)
 | 
						|
        setattr(self,f"{self.radio}_wifi_latency",self.wifi_latency)
 | 
						|
        setattr(self,f"{self.radio}_avg_client_signal",self.avg_client_signal)
 | 
						|
    def __str__(self):
 | 
						|
        _ret = []
 | 
						|
        _unwanted = ["essid","radio","id","t","name","radio_name","wlanconf_id","is_wep","up","site_id","ap_mac","state",
 | 
						|
            "na_num_sta","ng_num_sta","ng_tcp_packet_loss","na_tcp_packet_loss","na_wifi_retries","ng_wifi_retries",
 | 
						|
            "na_wifi_latency","ng_wifi_latency","na_avg_client_signal","ng_avg_client_signal"
 | 
						|
        ]
 | 
						|
        for _k,_v in self.__dict__.items():
 | 
						|
            if _k.startswith("_") or _k in _unwanted or type(_v) not in (str,int,float):
 | 
						|
                continue
 | 
						|
            _ret.append(f"{self.essid}|{self.radio}_{_k}|{_v}")
 | 
						|
        return "\n".join(_ret)
 | 
						|
 | 
						|
########################################
 | 
						|
######
 | 
						|
######      R A D I O 
 | 
						|
######
 | 
						|
########################################
 | 
						|
class unifi_network_radio(unifi_object):
 | 
						|
    def _update_stats(self,stats):
 | 
						|
        _prefixlen = len(self.name) +1
 | 
						|
        for _k,_v in stats.items():
 | 
						|
            if _k.startswith(self.name):
 | 
						|
                if type(_v) == float:
 | 
						|
                    _v = int(_v)
 | 
						|
                setattr(self,_k[_prefixlen:],_v)
 | 
						|
    def __str__(self):
 | 
						|
        _ret = []
 | 
						|
        _unwanted = ["name","ast_be_xmit","extchannel","cu_total","cu_self_rx","cu_self_tx"]
 | 
						|
        for _k,_v in self.__dict__.items():
 | 
						|
            if _k.startswith("_") or _k in _unwanted or type(_v) not in (str,int,float):
 | 
						|
                continue
 | 
						|
            _ret.append(f"{self.name}|{_k}|{_v}")
 | 
						|
        return "\n".join(_ret)
 | 
						|
        
 | 
						|
########################################
 | 
						|
######
 | 
						|
######      P O R T
 | 
						|
######
 | 
						|
########################################
 | 
						|
class unifi_network_port(unifi_object):
 | 
						|
    def _init(self):
 | 
						|
        self.oper_status = self._get_state(getattr(self,"up",None))
 | 
						|
        self.admin_status = self._get_state(getattr(self,"enable",None))
 | 
						|
        if hasattr(self,"ifname"): ## GW / UDM Names
 | 
						|
            _name = list(filter(lambda x: x.get("ifname") == self.ifname,self._PARENT.ethernet_overrides))
 | 
						|
            if _name:
 | 
						|
                _name = _name[0]
 | 
						|
                if getattr(self,"name",None) and _name.get("networkgroup") != "LAN":
 | 
						|
                    self.name = _name.get("networkgroup","unkn")
 | 
						|
            else:
 | 
						|
                self.name = self.ifname
 | 
						|
        if not hasattr(self,"port_idx") and hasattr(self,"ifname"):
 | 
						|
            self.port_idx = int(self.ifname[-1])+1  ## ethX
 | 
						|
        
 | 
						|
        self.portconf = self._PARENT._PARENT._PORTCONFIGS.get(getattr(self,"portconf_id",None))
 | 
						|
        
 | 
						|
 | 
						|
    def _get_state(self,state):
 | 
						|
        return {
 | 
						|
            "1"     : 1, ## up
 | 
						|
            "0"     : 2 ## down
 | 
						|
        }.get(str(state),4) ##unknown
 | 
						|
    def __str__(self):
 | 
						|
        _ret = []
 | 
						|
        _unwanted = ["up","enabled","media","anonymous_id","www_gw_mac","wan_gw_mac","attr_hidden_id","masked","flowctrl_tx","flowctrl_rx","portconf_id","speed_caps"]
 | 
						|
        for _k,_v in self.__dict__.items():
 | 
						|
            if _k.startswith("_") or _k in _unwanted or type(_v) not in (str,int,float):
 | 
						|
                continue
 | 
						|
            _ret.append(f"{self.port_idx}|{_k}|{_v}")
 | 
						|
        return "\n".join(_ret)
 | 
						|
 | 
						|
########################################
 | 
						|
######
 | 
						|
######      D E V I C E
 | 
						|
######
 | 
						|
########################################
 | 
						|
class unifi_device(unifi_object):
 | 
						|
    def _init(self):
 | 
						|
        if not hasattr(self,"name"):
 | 
						|
            _mac_end = self.mac.replace(":","")[-4:]
 | 
						|
            self.name = f"{self.model}:{_mac_end}"
 | 
						|
        self._piggy_back = True
 | 
						|
        self._PARENT._SITE_DEVICES.append(self)
 | 
						|
        self._NETWORK_PORTS = []
 | 
						|
        self._NETWORK_RADIO = []
 | 
						|
        self._NETWORK_SSIDS = []
 | 
						|
        
 | 
						|
        for _k,_v in getattr(self,"sys_stats",{}).items():
 | 
						|
            _k = _k.replace("-","_")
 | 
						|
            setattr(self,_k,_v)
 | 
						|
        self.model_name = UNIFI_DEVICE_TABLE.get(self.model)
 | 
						|
        if self.type in ("ugw","udm"):
 | 
						|
            ## change ip to local ip
 | 
						|
            self.wan_ip = self.ip
 | 
						|
            self.ip = self.connect_request_ip
 | 
						|
        
 | 
						|
        if getattr(self,"speedtest_status_saved",False):
 | 
						|
            _speedtest = getattr(self,"speedtest_status",{})
 | 
						|
            self.speedtest_time = int(_speedtest.get("rundate","0"))
 | 
						|
            self.speedtest_status = int(_speedtest.get("status_summary","0"))
 | 
						|
            self.speedtest_ping = round(_speedtest.get("latency",-1),1)
 | 
						|
            self.speedtest_download = round(_speedtest.get("xput_download",0.0),1)
 | 
						|
            self.speedtest_upload = round(_speedtest.get("xput_upload",0.0),1)
 | 
						|
        
 | 
						|
        _temp = list(map(lambda x: x.get("value",0),getattr(self,"temperatures",[])))
 | 
						|
        if _temp:
 | 
						|
            self.general_temperature = "{0:.1f}".format(mean(_temp))
 | 
						|
        
 | 
						|
        for _port in getattr(self,"port_table",[]):
 | 
						|
            self._NETWORK_PORTS.append(unifi_network_port(_PARENT=self,**_port))
 | 
						|
 | 
						|
        for _radio in getattr(self,"radio_table_stats",[]):
 | 
						|
            _radio_obj = unifi_network_radio(_PARENT=self,**_radio)
 | 
						|
            _radio_obj._update_stats(getattr(self,"stat",{}).get("ap",{}))
 | 
						|
            self._NETWORK_RADIO.append(_radio_obj)
 | 
						|
 | 
						|
        for _ssid in getattr(self,"vap_table",[]):
 | 
						|
            self._NETWORK_SSIDS.append(unifi_network_ssid(_PARENT=self,**_ssid))
 | 
						|
 | 
						|
    def _get_uplink(self):
 | 
						|
        if type(getattr(self,"uplink",None)) == dict:
 | 
						|
            self.uplink_up = int(self.uplink.get("up","0"))
 | 
						|
            self.uplink_device = self._UNIFICONTROLLER._get_device_by_mac(self.uplink.get("uplink_mac"))
 | 
						|
            self.uplink_remote_port = self.uplink.get("uplink_remote_port")
 | 
						|
            self.uplink_type = self.uplink.get("type")
 | 
						|
 | 
						|
    def _get_short_info(self):
 | 
						|
        _ret = []
 | 
						|
        _wanted = ["version","ip","mac","serial","model","model_name","uptime","upgradeable","num_sta","adopted","state"]
 | 
						|
        for _k,_v in self.__dict__.items():
 | 
						|
            if _k.startswith("_") or _k not in _wanted or type(_v) not in (str,int,float):
 | 
						|
                continue
 | 
						|
            _ret.append(f"{self.name}|{_k}|{_v}")
 | 
						|
        return "\n".join(_ret)
 | 
						|
 | 
						|
    def __str__(self):
 | 
						|
        if self._piggy_back:
 | 
						|
            _piggybackname = getattr(self,self._API.PIGGYBACK_ATTRIBUT,self.name)
 | 
						|
            _ret = [f"<<<<{_piggybackname}>>>>"]
 | 
						|
        else:
 | 
						|
            _ret = []
 | 
						|
        _ret.append("<<<unifi_device:sep(124)>>>")
 | 
						|
        _unwanted = ["anon_id","device_id","site_id","known_cfgversion","cfgversion","syslog_key","has_speaker","has_eth1",
 | 
						|
            "next_interval","next_heartbeat","next_heartbeat_at","guest_token","connect_request_ip","connect_request_port",
 | 
						|
            "start_connected_millis","start_disconnected_millis","wlangroup_id_na","wlangroup_id_ng","uplink_down_timeout"
 | 
						|
            "unsupported_reason","connected_at","provisioned_at","fw_caps","hw_caps","manufacturer_id","use_custom_config",
 | 
						|
            "led_override","led_override_color","led_override_color_brightness","sys_error_caps","adoptable_when_upgraded",
 | 
						|
            "mesh_uplink_1","mesh_uplink_1","considered_lost_at","outdoor_mode_override","unsupported_reason","architecture",
 | 
						|
            "kernel_version","required_version","prev_non_busy_state","has_fan","has_temperature","flowctrl_enabled","hash_id",
 | 
						|
            "speedtest-status-saved","usg_caps","two_phase_adopt","rollupgrade","locating","dot1x_portctrl_enabled",
 | 
						|
            "lcm_idle_timeout_override","lcm_brightness_override","uplink_depth","mesh_sta_vap_enabled","mesh_uplink_2",
 | 
						|
            "lcm_tracker_enabled","model_incompatible","model_in_lts","model_in_eol","country_code","wifi_caps",
 | 
						|
            "meshv3_peer_mac","element_peer_mac","vwireEnabled","hide_ch_width","x_authkey","x_ssh_hostkey_fingerprint",
 | 
						|
            "x_fingerprint","x_inform_authkey","op_mode"
 | 
						|
        ]
 | 
						|
        for _k,_v in self.__dict__.items():
 | 
						|
            if _k.startswith("_") or _k in _unwanted or type(_v) not in (str,int,float):
 | 
						|
                continue
 | 
						|
            _ret.append(f"{_k}|{_v}")
 | 
						|
 | 
						|
        _ret.append("<<<labels:sep(0)>>>")
 | 
						|
        _ret.append(f"{{\"unifi_device\":\"unifi-{self.type}\"}}")
 | 
						|
 | 
						|
        if self._NETWORK_PORTS:
 | 
						|
            _ret += ["","<<<unifi_network_ports:sep(124)>>>"] + [str(_port) for _port in self._NETWORK_PORTS]
 | 
						|
        if self._NETWORK_RADIO:
 | 
						|
            _ret += ["","<<<unifi_network_radios:sep(124)>>>"] + [str(_radio) for _radio in self._NETWORK_RADIO]
 | 
						|
        
 | 
						|
        if self._NETWORK_SSIDS:
 | 
						|
            _ret += ["","<<<unifi_network_ssids:sep(124)>>>"] + [str(_ssid) for _ssid in sorted(self._NETWORK_SSIDS,key=lambda x: x.essid)]
 | 
						|
        return "\n".join(_ret)
 | 
						|
 | 
						|
########################################
 | 
						|
######
 | 
						|
######      S I T E
 | 
						|
######
 | 
						|
########################################
 | 
						|
class unifi_site(unifi_object):
 | 
						|
    def _init(self):
 | 
						|
        for _subsys in self.health:
 | 
						|
            _name = _subsys.get("subsystem")
 | 
						|
            for _k,_v in _subsys.items():
 | 
						|
                _k = _k.replace("-","_")
 | 
						|
                if _k == "subsystem" or type(_v) not in (str,int,float):
 | 
						|
                    continue
 | 
						|
                #print(f"{_k}:{_v}")
 | 
						|
                setattr(self,f"{_name}_{_k}",_v)
 | 
						|
        
 | 
						|
        ##pprint(_api.get_data("/stat/rogueap"))
 | 
						|
        self._SITE_DEVICES = []
 | 
						|
        self._PORTCONFIGS = {}
 | 
						|
        self._get_portconfig()
 | 
						|
        self._get_devices()
 | 
						|
        _satisfaction = list(filter(
 | 
						|
            lambda x: x != None,map(
 | 
						|
                lambda x: getattr(x,"satisfaction",None),self._SITE_DEVICES
 | 
						|
            )
 | 
						|
        ))
 | 
						|
        self.satisfaction = max(0,int(mean(_satisfaction)) if _satisfaction else 0)
 | 
						|
 | 
						|
    def _get_portconfig(self):
 | 
						|
        _data = self._API.get_portconfig(site=self.name)
 | 
						|
        for _config in _data:
 | 
						|
            self._PORTCONFIGS[_config["_id"]] = _config.get("name")
 | 
						|
 | 
						|
    def _get_devices(self):
 | 
						|
        _data = self._API.get_devices(site=self.name)
 | 
						|
        for _device in _data:
 | 
						|
            self._UNIFICONTROLLER._UNIFI_DEVICES.append(unifi_device(_PARENT=self,**_device))
 | 
						|
 | 
						|
    def __str__(self):
 | 
						|
        _ret = ["<<<unifi_sites:sep(124)>>>"]
 | 
						|
        _unwanted = ["name","anonymous_id","www_gw_mac","wan_gw_mac","attr_hidden_id","attr_no_delete",""]
 | 
						|
        for _k,_v in self.__dict__.items():
 | 
						|
            if _k.startswith("_") or _k in _unwanted or type(_v) not in (str,int,float):
 | 
						|
                continue
 | 
						|
            _ret.append(f"{self.name}|{_k}|{_v}")
 | 
						|
        return "\n".join(_ret)
 | 
						|
 | 
						|
########################################
 | 
						|
######
 | 
						|
######      C O N T R O L L E R 
 | 
						|
######
 | 
						|
########################################
 | 
						|
class unifi_controller(unifi_object):
 | 
						|
    def _init(self):
 | 
						|
        self._UNIFICONTROLLER = self
 | 
						|
        self._UNIFI_SITES = []
 | 
						|
        self._UNIFI_DEVICES = []
 | 
						|
        self._UNIFI_SSIDS = []
 | 
						|
        self._get_systemhealth()
 | 
						|
        self._get_sites()
 | 
						|
        for _dev in self._UNIFI_DEVICES:
 | 
						|
            _dev._get_uplink()
 | 
						|
        if hasattr(self,"cloudkey_version"):
 | 
						|
            self.cloudkey_version = re.sub(".*?v(\d+\.\d+\.\d+\.[a-z0-9]+).*","\\1",self.cloudkey_version)
 | 
						|
        self.type = getattr(self,"ubnt_device_type","unifi-sw-controller")
 | 
						|
        self.controller_version = self.version
 | 
						|
        delattr(self,"version")
 | 
						|
 | 
						|
    def _get_systemhealth(self):
 | 
						|
        _data = self._API.get_sysinfo()
 | 
						|
        _wanted = ["timezone","autobackup","version","previous_version","update_available","hostname","name","uptime","cloudkey_update_available","cloudkey_update_version","cloudkey_version","ubnt_device_type","udm_version","udm_update_version","udm_update_available"]
 | 
						|
        if _data:
 | 
						|
            for _k,_v in _data[0].items():
 | 
						|
                if _k in _wanted:
 | 
						|
                    if type(_v) == bool:
 | 
						|
                        _v = int(_v)
 | 
						|
                    setattr(self,_k,_v)
 | 
						|
 | 
						|
    def _get_device_by_mac(self,mac):
 | 
						|
        try:
 | 
						|
            return next(filter(lambda x: x.mac == mac,self._UNIFI_DEVICES)).name
 | 
						|
        except StopIteration:
 | 
						|
            return None
 | 
						|
 | 
						|
    def _get_sites(self):
 | 
						|
        _data = self._API.get_sites()
 | 
						|
        for _site in _data:
 | 
						|
            if self._API.SITES and _site.get("name") not in self._API.SITES and _site.get("desc").lower() not in self._API.SITES:
 | 
						|
                continue
 | 
						|
            self._UNIFI_SITES.append(unifi_site(_PARENT=self,**_site))
 | 
						|
 | 
						|
    def _get_ssidlist(self):
 | 
						|
        _dict = defaultdict(list)
 | 
						|
        for _ssid in self._UNIFI_SSIDS:
 | 
						|
            _dict[f"{_ssid.essid}@{_ssid._UNIFI_SITE.desc}"].append(_ssid)
 | 
						|
        
 | 
						|
        _ret = []
 | 
						|
        for _ssid,_obj in _dict.items():
 | 
						|
            pprint(_obj)
 | 
						|
            for _key in ("num_sta","ng_num_sta","na_num_sta","ng_tcp_packet_loss","na_tcp_packet_loss","ng_wifi_retries","na_wifi_retries","ng_wifi_latency","na_wifi_latency"):
 | 
						|
                _ret.append("|".join([_ssid,_key,str(sum(map(lambda x: getattr(x,_key,0),_obj)))]))
 | 
						|
            
 | 
						|
            _signals = list(map(lambda x: getattr(x,"ng_avg_client_signal",0),filter(lambda x: x.radio == "ng",_obj)))
 | 
						|
            _ret.append("|".join([_ssid,"ng_avg_client_signal",str(mean(_signals if _signals else [0]))]))
 | 
						|
            _signals = list(map(lambda x: getattr(x,"na_avg_client_signal",0),filter(lambda x: x.radio == "na",_obj)))
 | 
						|
            _ret.append("|".join([_ssid,"na_avg_client_signal",str(mean(_signals if _signals else [0]))]))
 | 
						|
            _ret.append("|".join([_ssid,"channels",",".join(
 | 
						|
                sorted(
 | 
						|
                    set(map(lambda x: str(getattr(x,"channel","0")),_obj))
 | 
						|
                ,key = lambda x: int(x))
 | 
						|
            )]))
 | 
						|
            _ret.append("|".join([_ssid,"avg_client_signal",str(mean(map(lambda x: getattr(x,"avg_client_signal",0),_obj))) ]))
 | 
						|
        return _ret 
 | 
						|
        
 | 
						|
    def __str__(self):
 | 
						|
        _ret = ["<<<unifi_controller:sep(124)>>>"]
 | 
						|
        for _k,_v in self.__dict__.items():
 | 
						|
            if _k.startswith("_") or type(_v) not in (str,int,float):
 | 
						|
                continue
 | 
						|
            _ret.append(f"{_k}|{_v}")
 | 
						|
 | 
						|
        ## check udm
 | 
						|
        _has_udm = list(filter(lambda x: x.name == self.name,self._UNIFI_DEVICES))
 | 
						|
        if _has_udm:
 | 
						|
            _udm = _has_udm[0]
 | 
						|
            _udm._piggy_back = False
 | 
						|
            _ret.append(str(_udm))
 | 
						|
 | 
						|
        _ret.append("<<<labels:sep(0)>>>")
 | 
						|
        _ret.append(f"{{\"unifi_device\":\"unifi-{self.type}\"}}")
 | 
						|
 | 
						|
        ## SITES ##
 | 
						|
        for _site in self._UNIFI_SITES:
 | 
						|
            _ret.append(str(_site))
 | 
						|
 | 
						|
        _ret.append("<<<unifi_device_shortlist:sep(124)>>>")
 | 
						|
        for _device in self._UNIFI_DEVICES:
 | 
						|
            if _device._piggy_back:
 | 
						|
                _ret.append(_device._get_short_info())
 | 
						|
        ## device list
 | 
						|
        
 | 
						|
        ## ssid list
 | 
						|
        _ret.append("<<<unifi_ssid_list:sep(124)>>>")
 | 
						|
        _ret += self._get_ssidlist()
 | 
						|
 | 
						|
        if self._API.PIGGYBACK_ATTRIBUT.lower() != "none":
 | 
						|
            ## PIGGYBACK DEVICES ##
 | 
						|
            for _device in self._UNIFI_DEVICES:
 | 
						|
                if _device._piggy_back and _device.adopted:
 | 
						|
                    _ret.append(str(_device))
 | 
						|
        return "\n".join(_ret)
 | 
						|
 | 
						|
 | 
						|
########################################
 | 
						|
######
 | 
						|
######      A P I
 | 
						|
######      https://ubntwiki.com/products/software/unifi-controller/api
 | 
						|
########################################
 | 
						|
class unifi_controller_api(object):
 | 
						|
    def __init__(self,host,username,password,port,site,verify_cert,rawapi,piggybackattr,**kwargs):
 | 
						|
        self.host = host
 | 
						|
        self.url = f"https://{host}"
 | 
						|
        if port != 443:
 | 
						|
            self.url = f"https://{host}:{port}"
 | 
						|
 | 
						|
        self._verify_cert = verify_cert
 | 
						|
        if not verify_cert:
 | 
						|
            requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
 | 
						|
        self.RAW_API = rawapi
 | 
						|
        self.PIGGYBACK_ATTRIBUT = piggybackattr
 | 
						|
        self.SITES = site.lower().split(",") if site else None
 | 
						|
        self._session = requests.Session()
 | 
						|
        self.check_unifi_os()
 | 
						|
        self.login(username,password)
 | 
						|
 | 
						|
    def check_unifi_os(self):
 | 
						|
        _response = self.request("GET",url=self.url,allow_redirects=False)
 | 
						|
        self.is_unifios= _response.status_code == 200 and _response.headers.get("x-csrf-token")
 | 
						|
 | 
						|
    def get_sysinfo(self):
 | 
						|
        return self.get_data("/stat/sysinfo")
 | 
						|
 | 
						|
    def get_sites(self):
 | 
						|
        return self.get_data("/stat/sites",site=None)
 | 
						|
 | 
						|
    def get_portconfig(self,site):
 | 
						|
        return self.get_data("/rest/portconf",site=site)
 | 
						|
 | 
						|
    def get_devices(self,site):
 | 
						|
        return self.get_data("/stat/device",site=site)
 | 
						|
 | 
						|
    def login(self,username,password):
 | 
						|
        if self.is_unifios:
 | 
						|
            url=f"{self.url}/api/auth/login"
 | 
						|
        else:
 | 
						|
            url=f"{self.url}/api/login"
 | 
						|
        auth = {
 | 
						|
            "username"  : username,
 | 
						|
            "password"  : password,
 | 
						|
            "remember"  : True
 | 
						|
        }
 | 
						|
        _response = self.request("POST",url=url,json=auth)
 | 
						|
        if _response.status_code == 404:
 | 
						|
            raise unifi_api_exception("API not Found try other Port or IP")
 | 
						|
        _json = _response.json()
 | 
						|
        if _json.get("meta",{}).get("rc") == "ok" or _json.get("status") == "ACTIVE":
 | 
						|
            return
 | 
						|
        raise unifi_api_exception("Login failed")
 | 
						|
 | 
						|
    def get_data(self,path,site="default",method="GET",**kwargs):
 | 
						|
        _json = self.request(method=method,path=path,site=site,**kwargs).json()
 | 
						|
        _meta = _json.get("meta",{})
 | 
						|
        if _meta.get("rc") == "ok":
 | 
						|
            return _json.get("data",[])
 | 
						|
        raise unifi_api_exception(_meta.get("msg",_json.get("errors",repr(_json))))
 | 
						|
 | 
						|
    def request(self,method,url=None,path=None,site=None,json=None,**kwargs):
 | 
						|
        if not url:
 | 
						|
            if self.is_unifios:
 | 
						|
                url = f"{self.url}/proxy/network/api"
 | 
						|
            else:
 | 
						|
                url = f"{self.url}/api"
 | 
						|
            if site is not None:
 | 
						|
                url += f"/s/{site}"
 | 
						|
            if path is not None:
 | 
						|
                url += f"{path}"
 | 
						|
            _request = requests.Request(method,url,json=json)
 | 
						|
            _prepped_request = self._session.prepare_request(_request)
 | 
						|
        else:
 | 
						|
            _request = requests.Request(method,url,json=json)
 | 
						|
            _prepped_request = _request.prepare()
 | 
						|
        _response = self._session.send(_prepped_request,verify=self._verify_cert,timeout=10,**kwargs)
 | 
						|
        if _response.status_code == 200 and hasattr(_response,"json") and self.RAW_API:
 | 
						|
            try:
 | 
						|
                pprint(_response.json())
 | 
						|
            except:
 | 
						|
                pass
 | 
						|
        return _response
 | 
						|
 | 
						|
########################################
 | 
						|
######
 | 
						|
######      M A I N
 | 
						|
######
 | 
						|
########################################
 | 
						|
if __name__ == '__main__':
 | 
						|
    parser = create_default_argument_parser(description=__doc__)
 | 
						|
    parser.add_argument('-u', '--user', dest='username', required=True,
 | 
						|
                        help='User to access the DSM.')
 | 
						|
    parser.add_argument('-p', '--password', dest='password', required=True,
 | 
						|
                        help='Password to access the DSM.')
 | 
						|
    parser.add_argument('--ignore-cert', dest='verify_cert', action='store_false',
 | 
						|
                        help='Do not verify the SSL cert')
 | 
						|
    parser.add_argument('-s','--site', dest='site', required=False,
 | 
						|
                        help='Site')
 | 
						|
    parser.add_argument('--port', dest='port',type=int,default='443')
 | 
						|
    parser.add_argument('--piggyback', dest='piggybackattr',type=str,default='name')
 | 
						|
    parser.add_argument('--rawapi', dest='rawapi', action='store_true')
 | 
						|
    parser.add_argument("host",type=str,
 | 
						|
                        help="""Host name or IP address of Unifi Controller""")
 | 
						|
    args = parser.parse_args()
 | 
						|
    print("<<<check_mk>>>")
 | 
						|
    print(f"Version: {__VERSION__}")
 | 
						|
    try:
 | 
						|
        _api = unifi_controller_api(**args.__dict__)
 | 
						|
    except socket.error as e:
 | 
						|
        pprint(e)
 | 
						|
        sys.exit(1)
 | 
						|
 | 
						|
    if _api.is_unifios:
 | 
						|
        print("AgentOS: UnifiOS")
 | 
						|
    ##pprint(_api.get_data("/stat/rogueap?within=4"))
 | 
						|
    ##pprint(_api.get_data("/rest/user",site="default",method="GET"))
 | 
						|
    ##pprint(_api.get_data("/stat/sta",site="default",method="GET"))
 | 
						|
    ##sys.exit(0)
 | 
						|
    _controller = unifi_controller(_API=_api)
 | 
						|
    if args.rawapi == False:
 | 
						|
        print(_controller)
 |