From 46d203a32dffc6e5a8c141b930bb563ca6749d28 Mon Sep 17 00:00:00 2001 From: Thorsten Spille Date: Mon, 12 Aug 2024 11:36:03 +0200 Subject: [PATCH] Update opnsense_checkmk_agent.py 1.2.2 --- opnsense_checkmk_agent.py | 682 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 660 insertions(+), 22 deletions(-) diff --git a/opnsense_checkmk_agent.py b/opnsense_checkmk_agent.py index 58c3dc0..3713b98 100644 --- a/opnsense_checkmk_agent.py +++ b/opnsense_checkmk_agent.py @@ -27,7 +27,12 @@ ## * smartdisk - install the mkp from https://github.com/bashclub/checkmk-smart plugins os-smart ## * squid - install the mkp from https://exchange.checkmk.com/p/squid and forwarder -> listen on loopback active -__VERSION__ = "1.0.8" +## task types2 +## speedtest|proxy|ssh|nmap|domain|blocklist +## + + +__VERSION__ = "1.2.3" import sys import os @@ -62,14 +67,33 @@ from collections import Counter,defaultdict from pprint import pprint from socketserver import TCPServer,StreamRequestHandler +import unbound +unbound.RR_TYPE_TLSA = 52 +unbound.RR_TYPE_SPF = unbound.RR_TYPE_TXT +from OpenSSL import SSL,crypto +from binascii import b2a_hex +from datetime import datetime + SCRIPTPATH = os.path.abspath(__file__) SYSHOOK_METHOD = re.findall("rc\.syshook\.d\/(start|stop)/",SCRIPTPATH) BASEDIR = "/usr/local/check_mk_agent" +VARDIR = "/var/lib/check_mk_agent" CHECKMK_CONFIG = "/usr/local/etc/checkmk.conf" MK_CONFDIR = os.path.dirname(CHECKMK_CONFIG) LOCALDIR = os.path.join(BASEDIR,"local") PLUGINSDIR = os.path.join(BASEDIR,"plugins") -SPOOLDIR = os.path.join(BASEDIR,"spool") +SPOOLDIR = os.path.join(VARDIR,"spool") +TASKDIR = os.path.join(BASEDIR,"tasks") +TASKFILE_KEYS = "service|type|interval|interface|disabled|ipaddress|hostname|domain|port|piggyback|sshoptions|options|tenant" +TASKFILE_REGEX = re.compile(f"^({TASKFILE_KEYS}):\s*(.*?)(?:\s+#|$)",re.M) +MAX_SIMULATAN_THREADS = 2 + +for _dir in (BASEDIR, VARDIR, LOCALDIR, PLUGINSDIR, SPOOLDIR, TASKDIR): + if not os.path.exists(_dir): + try: + os.mkdir(_dir) + except: + pass os.environ["MK_CONFDIR"] = MK_CONFDIR os.environ["MK_LIBDIR"] = BASEDIR @@ -131,8 +155,11 @@ class NginxConnectionPool(HTTPConnectionPool): return NginxConnection() class NginxAdapter(HTTPAdapter): + ## deprecated def get_connection(self, url, proxies=None): return NginxConnectionPool() + def get_connection_with_tls_context(self, request, verify, proxies=None, cert=None): + return NginxConnectionPool() def check_pid(pid): try: @@ -167,7 +194,8 @@ class checkmk_checker(object): KEY_LENGTH = 32 IV_LENGTH = 16 PBKDF2_CYCLES = 10_000 - SALT = b"Salted__" + #SALT = b"Salted__" + SALT = os.urandom(SALT_LENGTH) _backend = crypto_default_backend() _kdf_key = PBKDF2HMAC( algorithm = hashes.SHA256(), @@ -187,6 +215,34 @@ class checkmk_checker(object): _encrypted_message = _encryptor.update(message) + _encryptor.finalize() return pad_pkcs7(b"03",10) + SALT + _encrypted_message + def decrypt_msg(self,message,password='secretpassword'): + SALT_LENGTH = 8 + KEY_LENGTH = 32 + IV_LENGTH = 16 + PBKDF2_CYCLES = 10_000 + message = message[10:] # strip header + SALT = message[:SALT_LENGTH] + message = message[SALT_LENGTH:] + _backend = crypto_default_backend() + _kdf_key = PBKDF2HMAC( + algorithm = hashes.SHA256(), + length = KEY_LENGTH + IV_LENGTH, + salt = SALT, + iterations = PBKDF2_CYCLES, + backend = _backend + ).derive(password.encode("utf-8")) + _key, _iv = _kdf_key[:KEY_LENGTH],_kdf_key[KEY_LENGTH:] + _decryptor = Cipher( + algorithms.AES(_key), + modes.CBC(_iv), + backend = _backend + ).decryptor() + _decrypted_message = _decryptor.update(message) + try: + return _decrypted_message.decode("utf-8").strip() + except UnicodeDecodeError: + return ("invalid key") + def do_checks(self,debug=False,remote_ip=None,**kwargs): self._getosinfo() _errors = [] @@ -195,6 +251,15 @@ class checkmk_checker(object): _lines.append("AgentOS: {os}".format(**self._info)) _lines.append(f"Version: {__VERSION__}") _lines.append("Hostname: {hostname}".format(**self._info)) + ## only tenant data + if remote_ip in self.tenants.keys(): + _secret = self.tenants.get(remote_ip,None) + _lines += self.taskrunner.get_data(tenant=remote_ip)[1:] ## remove number of tasks + _lines.append("") + if _secret: + return self.encrypt_msg("\n".join(_lines),password=_secret) + return "\n".join(_lines).encode("utf-8") + if self.onlyfrom: _lines.append("OnlyFrom: {0}".format(",".join(self.onlyfrom))) @@ -269,6 +334,7 @@ class checkmk_checker(object): with open(_filename) as _f: _lines.append(_f.read()) + _lines += self.taskrunner.get_data() _lines.append("") if debug: sys.stdout.write("\n".join(_errors)) @@ -327,9 +393,11 @@ class checkmk_checker(object): #raise _latest_firmware = {} _current_firmware = {} + _mayor_upgrade = None try: _upgrade_json = json.load(open("/tmp/pkg_upgrade.json","r")) _upgrade_packages = dict(map(lambda x: (x.get("name"),x),_upgrade_json.get("upgrade_packages"))) + _mayor_upgrade = _upgrade_json.get("upgrade_major_version") _current_firmware["version"] = _upgrade_packages.get("opnsense").get("current_version") _latest_firmware["version"] = _upgrade_packages.get("opnsense").get("new_version") except: @@ -342,7 +410,7 @@ class checkmk_checker(object): "config_age" : int(time.time() - _config_modified) , "last_configchange" : time.strftime("%H:%M %d.%m.%Y",time.localtime(_config_modified)), "product_series" : _info.get("product_series"), - "latest_version" : _latest_firmware.get("version","unknown"), + "latest_version" : (_mayor_upgrade if _mayor_upgrade else _latest_firmware.get("version","unknown")), "latest_date" : _latest_firmware.get("date",""), "hostname" : self._run_prog("hostname").strip(" \n") } @@ -380,8 +448,8 @@ class checkmk_checker(object): try: _certpem = base64.b64decode(_cert.get("crt")) _x509cert = x509.load_pem_x509_certificate(_certpem,crypto_default_backend()) - _cert["not_valid_before"] = _x509cert.not_valid_before.timestamp() - _cert["not_valid_after"] = _x509cert.not_valid_after.timestamp() + _cert["not_valid_before"] = _x509cert.not_valid_before_utc.timestamp() + _cert["not_valid_after"] = _x509cert.not_valid_after_utc.timestamp() _cert["serial"] = _x509cert.serial_number _cert["common_name"] = self.get_common_name(_x509cert.subject) _cert["issuer"] = self.get_common_name(_x509cert.issuer) @@ -596,6 +664,7 @@ class checkmk_checker(object): return [f"0 Services running_services={_num_running:.0f}|stopped_service={_num_stopped:.0f} All Services running"] def checklocal_carpstatus(self): + #sysctl net.inet.carp.demotion #TODO _ret = [] _virtual = self._config_reader().get("virtualip") if not _virtual: @@ -722,7 +791,7 @@ class checkmk_checker(object): try: _sock.connect(_path) _data = _sock.recv(1024).decode("utf-8").strip() - _name, _rtt, _rttsd, _loss = re.findall("(\w+)\s(\d+)\s(\d+)\s(\d+)$",_data)[0] + _name, _rtt, _rttsd, _loss = re.findall("(\S+)\s(\d+)\s(\d+)\s(\d+)$",_data)[0] assert _name.strip() == gateway return int(_rtt)/1_000_000.0,int(_rttsd)/1_000_000.0, int(_loss) except: @@ -940,7 +1009,7 @@ class checkmk_checker(object): _json_data = {} for _phase1 in _phase1config: if _phase1 == None: - continue + continue _ikeid = _phase1.get("ikeid") _name = _phase1.get("descr") if len(_name.strip()) < 1: @@ -976,7 +1045,6 @@ class checkmk_checker(object): _con["life-time"] = max(_con["life-time"],_install_time) _con["status"] = 0 if _con["status"] != 1 else 1 - ## QuickHack #FIXME remote-id/local-id translate type to ip, set and check if sas and config is same count #_required_phase2 = len(list(filter(lambda x: x.get("ikeid") == _ikeid,_phase2config))) #if _phase2_up >= _required_phase2: @@ -989,6 +1057,7 @@ class checkmk_checker(object): _ret.append("{status} \"IPsec Tunnel: {remote-name}\" if_in_octets=0|if_out_octets=0|lifetime=0 not running".format(**_con)) else: _con["status"] = max(_con["status"],1) + #_con["phase2"] = f"{_phase2_up}/{_required_phase2}" _con["phase2"] = f"{_phase2_up}" _ret.append("{status} \"IPsec Tunnel: {remote-name}\" if_in_octets={bytes-received}|if_out_octets={bytes-sent}|lifetime={life-time} {phase2} {state} {local-id} - {remote-id}({remote-host})".format(**_con)) return _ret @@ -1089,7 +1158,7 @@ class checkmk_checker(object): def _read_nginx_socket(self): session = requests.Session() - session.mount("http://nginx/", NginxAdapter()) + session.mount("http://nginx/vts", NginxAdapter()) response = session.get("http://nginx/vts") return response.json() @@ -1448,20 +1517,33 @@ class checkmk_cached_process(object): return _data class checkmk_server(TCPServer,checkmk_checker): - def __init__(self,port,pidfile,onlyfrom=None,encrypt=None,skipcheck=None,**kwargs): + def __init__(self,port,pidfile,onlyfrom=None,encrypt=None,skipcheck=None,tenants=None,**kwargs): + self.tcp_port = port self.pidfile = pidfile self.onlyfrom = onlyfrom.split(",") if onlyfrom else None self.skipcheck = skipcheck.split(",") if skipcheck else [] + self.tenants = self._get_tenants(tenants) if type(tenants) == str else {} self._available_sysctl_list = self._run_prog("sysctl -aN").split() self._available_sysctl_temperature_list = list(filter(lambda x: x.lower().find("temperature") > -1 and x.lower().find("cpu") == -1,self._available_sysctl_list)) self.encrypt = encrypt self._mutex = threading.Lock() self.user = pwd.getpwnam("root") self.allow_reuse_address = True + self.taskrunner = checkmk_taskrunner(self) TCPServer.__init__(self,("",port),checkmk_handler,bind_and_activate=False) + def _get_tenants(self,tenants): + _ret = {} + for _tenant in tenants.split(","): + if _tenant.find("#") > -1: + _addr,_secret = _tenant.split("#",1) + _ret[_addr] = _secret + else: + _ret[_addr] = None + return _ret + def verify_request(self, request, client_address): - if self.onlyfrom and client_address[0] not in self.onlyfrom: + if self.onlyfrom and client_address[0] not in self.onlyfrom and client_address[0] not in self.tenants.keys(): log("Client {0} not allowed".format(*client_address),"warn") return False return True @@ -1474,7 +1556,7 @@ class checkmk_server(TCPServer,checkmk_checker): def server_start(self): log("starting checkmk_agent") - + self.taskrunner.start() signal.signal(signal.SIGTERM, self._signal_handler) signal.signal(signal.SIGINT, self._signal_handler) signal.signal(signal.SIGHUP, self._signal_handler) @@ -1492,6 +1574,24 @@ class checkmk_server(TCPServer,checkmk_checker): sys.stdout.write("\n") pass + def cmkclient(self,host="127.0.0.1",port=None,enryptionkey=None): + if port == None: + port = self.tcp_port + if host == "127.0.0.1" and enryptionkey == None: + enryptionkey = self.encrypt + _sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM) + _sock.connect((host,port)) + _msg = b"" + while True: + _data = _sock.recv(2048) + if not _data: + break + _msg += _data + + if enryptionkey: + return self.decrypt_msg(_msg,enryptionkey) + return _msg + def _signal_handler(self,signum,*args): if signum in (signal.SIGTERM,signal.SIGINT): log("stopping checkmk_agent") @@ -1547,6 +1647,512 @@ class checkmk_server(TCPServer,checkmk_checker): def __del__(self): pass ## todo +BLACKLISTS = [ + 'all.s5h.net', + 'aspews.ext.sorbs.net', + 'b.barracudacentral.org', + 'bl.nordspam.com', + 'blackholes.five-ten-sg.com', + 'blacklist.woody.ch', + 'bogons.cymru.com', + 'cbl.abuseat.org', + 'combined.abuse.ch', + 'combined.rbl.msrbl.net', + 'db.wpbl.info', + 'dnsbl-2.uceprotect.net', + 'dnsbl-3.uceprotect.net', + 'dnsbl.cyberlogic.net', + 'dnsbl.sorbs.net', + 'drone.abuse.ch', + 'dul.ru', + 'images.rbl.msrbl.net', + 'ips.backscatterer.org', + 'ix.dnsbl.manitu.net', + 'korea.services.net', + 'matrix.spfbl.net', + 'phishing.rbl.msrbl.net', + 'proxy.bl.gweep.ca', + 'proxy.block.transip.nl', + 'psbl.surriel.com', + 'rbl.interserver.net', + 'relays.bl.gweep.ca', + 'relays.bl.kundenserver.de', + 'relays.nether.net', + 'residential.block.transip.nl', + 'singular.ttk.pte.hu', + 'spam.dnsbl.sorbs.net', + 'spam.rbl.msrbl.net', + 'spambot.bls.digibase.ca', + 'spamlist.or.kr', + 'spamrbl.imp.ch', + 'spamsources.fabel.dk', + 'ubl.lashback.com', + 'virbl.bit.nl', + 'virus.rbl.msrbl.net', + 'virus.rbl.jp', + 'wormrbl.imp.ch', + 'z.mailspike.net', + 'zen.spamhaus.org' +] + + +class checkmk_resolver(object): + def __init__(self,nameserver="127.0.0.1"): + self._ub_ctx = unbound.ub_ctx() + self._ub_ctx.add_ta_file("/var/unbound/root.key") + self._ub_ctx.set_fwd(nameserver) + self._ub_ctx.set_option("qname-minimisation:", "yes") + + def dns_reverseip(self,ipaddr,tld=False): + _addr = ipaddress.ip_address(ipaddr) + if tld: + return _addr.reverse_pointer + return ".".join(_addr.reverse_pointer.split(".")[:-2]) + + def resolve(self,hostname,rrtype="A",secure=True): + rrtype = rrtype.upper() + _rrtype_code = getattr(unbound,f"RR_TYPE_{rrtype}",None) + if not _rrtype_code: + return False,None + if _rrtype_code == 12: #PTR + hostname = self.dns_reverseip(str(hostname),tld=True) + _status, _results = self._ub_ctx.resolve(hostname,_rrtype_code) + _is_secure = bool(_results.secure) + if _status == 0 and _results.havedata: + if _rrtype_code == 1: # A + return _is_secure, list(map(lambda x: ipaddress.IPv4Address(x),_results.data.address_list)) + if _rrtype_code in (2,12): # NS, PTR + return _is_secure, list(map(lambda x: x.strip("."),_results.data.domain_list)) + if _rrtype_code == 16: #TXT/SPF + _data = list(map(lambda x: x[1:].decode("ascii",errors="ignore"),_results.data.raw)) + if rrtype == "SPF": + return _is_secure,list(filter(lambda x: x.startswith("v=spf1"),_data)) + return _is_secure, _data + if _rrtype_code == 15: # MX + return _is_secure, _results.data.mx_list + if _rrtype_code == 28: # AAAA + return _is_secure, list(map(lambda x: ipaddress.IPv6Address(int(b2a_hex(x),16)),_results.data.raw)) + if _rrtype_code == 52: # TLSA + _parsed_results = [] + for _hexresult in map(lambda x: b2a_hex(x),_results.data.raw): + _parsed_results.append((int(_hexresult[0:2],16),int(_hexresult[2:4],16),int(_hexresult[4:6],16), _hexresult[6:].decode("ascii"))) + return _is_secure, _parsed_results + + return _is_secure,_results.data.raw + return _is_secure,[] + + +class checkmk_task(object): + def __init__(self,id,config): + self._mutex = threading.RLock() + self.id = id + self.config = config + self.lastmodified = time.time() + self.piggyback = config.get("piggyback","") + self.type = config.get("type") + _tenant = config.get("tenant") + self.tenant = _tenant.split(",") if type(_tenant) == str else [] + self.interval = int(config.get("interval","3600")) + self.nextrun = time.time() + self._data = "" + self._thread = None + + @property + def get_piggyback(self): ## namen mit host prefixen + with self._mutex: + sys.stderr.write(f"getPiggyback-{self.id}\n") + sys.stderr.flush() + return self.piggyback + + def update(self,config): + with self._mutex: + self.interval = int(config.get("interval","3600")) + self.piggyback = config.get("piggyback") + self.tenant = config.get("tenant") + self.config = config + _now = time.time() + self.lastmodified = _now + if self.nextrun < _now: + self.nextrun = _now + self.interval + + def run(self): + _t = None + with self._mutex: + if self._thread == None: + _t = threading.Thread(target=self._run,name=self.id) + _t.daemon = True + self._thread = _t + self.nextrun = time.time() + self.interval + if _t: + _t.start() + + def _run(self): + try: + _function = getattr(self,f"_{self.type}") + if _function: + _data = _function() + with self._mutex: + self._data = _data + finally: + with self._mutex: + self._thread = None + + def _nmap(self): + host = self.config.get("hostname") + service = self.config.get("service","") + if not host: + return "<<>>\n\n" + port = self.config.get("port","") + if port: + port = port + "," + scanoptions = f"-sS -sU -p{port}U:53,67,123,111,137,138,161,427,500,623,1645,1646,1812,1813,4500,5060,5353,T:21,22,23,25,53,80,88,135,139,389,443,444,445,465,485,514,593,623,636,902,1433,1720,3128,3129,3268,3269,3389,5060,5900,5988,5989,6556,8000,8006,8010,8080,8084,8300,8443" + _proc_args = (["nmap","-Pn","-R","--disable-arp-ping","--open","--noninteractive","-oX","-"] + shlex.split(scanoptions) + [host]) + try: + _data = subprocess.check_output(_proc_args,shell=False,encoding="utf-8",stderr=subprocess.DEVNULL,timeout=300) + except subprocess.CalledProcessError as e: + if self._ignore_error: + _data = e.stdout + else: + _data = "" + except subprocess.TimeoutExpired: + _data = "" + _now = int(time.time()) + _results = [] + for _port in re.finditer("tcp|udp)\"\sportid=\"(?P\d+)\".*?state=\"(?P[\w|]+)\"\sreason=\"(?P[\w-]+)\"(?:.*?name=\"(?P[\w-]+)\")*.*",_data): + _results.append(_port.groupdict()) + + _ret = { + "host" : host, + "service" : service, + "ports" : _results + } + return f"<<>>\n" + json.dumps(_ret) + "\n<<<>>>" + + def _dummy(self): + _now = int(time.time()) + return f"<<>>\ncached({_now},{self.interval*2}) 0 Dummy - Test\n<<<>>>" + + def _cmk(self): + _now = int(time.time()) + return f"<<>>\nAgentOS: Task\nVersion: {__VERSION__}\n<<<>>>" + + def _proxy(self): + host = self.config.get("hostname") + port = self.config.get("port","6556") + if not self.piggyback: + self.piggyback = host + _data = "" + + + def _speedtest(self): + pass + + def _ssh(self): + pass + + def _domain(self): + _domain = self.config.get("domain") + _dns = checkmk_resolver() + _dnssec,_soa = _dns.resolve(_domain,"SOA") + _resultcheck = { + "DOMAIN": _domain, + "DNSSEC": _dnssec, + "SOA" : repr(_soa), + "NS" : [], + "MX" : [], + "TLSA" : [] + } + for _ns in _dns.resolve(_domain,"NS")[1]: + for _ipversion in ("A","AAAA"): + _ips = _dns.resolve(_ns,_ipversion)[1] + if not _ips: + continue + _ip = str(_ips[0]) + _resultcheck["NS"].append([_ns,_ip]) + + for _prio,_mx in _dns.resolve(_domain,"MX")[1]: + _mx = _mx.strip(".") + _, _tlsa = _dns.resolve(f"_25._tcp.{_mx}","TLSA") + for _ipversion in ("A","AAAA"): + _ips = _dns.resolve(_mx,_ipversion)[1] + if not _ips: + continue + _ip = str(_ips[0]) + _,_ptr = _dns.resolve(_ip,"PTR") + _sock,_banner = self.smtpconnect(_mx,_ip) + _cert_chain = ["",""] + if hasattr(_sock,"get_peer_cert_chain"): + _cert_chain = _sock.get_peer_cert_chain() + _resultcheck["MX"].append([ + _mx, + _prio, + _ip, + "".join(_ptr), + self.getcertinfo(_cert_chain[0],tlsa=_tlsa,usage=3), + self.getcertinfo(_cert_chain[1:],tlsa=_tlsa,usage=2), + _banner + ]) + _resultcheck["TLSA"].append([_mx,_tlsa]) + + _now = int(time.time()) + return f"<<>>\n" + json.dumps(_resultcheck) + "\n<<<>>>" + + def _blocklist(self): + _ipaddr = self.config.get("ipaddress","").split(",") + _dns = checkmk_resolver() + _service = self.config.get("service","") + if _ipaddr == [""]: + _hostname = self.config.get("hostname") + _ipaddr = map(str,_dns.resolve(_hostname,"A")[1]) + if not _service: + _service = _hostname + if not _service: + _service = _ipaddr[0] + _listed = [] + _ipchecklist = set(filter(lambda x: len(x) > 0,_ipaddr)) + if len(_ipchecklist) == 0: + return "" + for _ip in _ipchecklist: + _reverse_ip = _dns.dns_reverseip(str(_ip)) + for _blacklist in BLACKLISTS: + if _dns.resolve(f"{_reverse_ip}.{_blacklist}")[1]: + _listed.append((_ip,_blacklist)) + + _total_listed = len(_listed) + _status = 2 if _total_listed > 0 else 0 + _message = " ".join(_ipaddr) + "not blocked" + if _total_listed > 0: + _message = ",".join([f"{_ip} is on {_bl}" for _ip,_bl in _listed]) + _legacy = "{0} 'Blocklist {1}' blocklist={2}|blocked={3} {4}".format(_status,_service,len(BLACKLISTS),_total_listed,_message) + _now = int(time.time()) + return f"<<>>\ncached({_now},{self.interval*2}) " + _legacy + "\n<<<>>>" + + def __str__(self): + with self._mutex: + return self._data + + def __repr__(self): + _next = self.nextrun - time.time() + return f"{self.id}: {_next}" + + def __lt__(self,other): + return self.nextrun < other.nextrun + + def cachecontent(self,content): + _now = int(time.time()) + _cache=f"cache({_now},{self.interval})" + for _section in re.findall("^<<<(.*?)>>>\s*$",content,re.M): + if _section.group(1).startswith("local"): + continue + + @staticmethod + def smtpconnect(hostname,ipaddr=None,port=25,starttls=True): + _banner = "" + if ipaddr == None: + ipaddr = hostname + try: + _sock = socket.create_connection((str(ipaddr),port),timeout=5) + _sock.settimeout(20) + _banner = _sock.recv(2048) + while not _banner.startswith(b"220 "): + _banner = _sock.recv(2048) + _banner = _banner.decode("utf-8") + _sock.send(b"EHLO checkmkopnSenseAgent\r\n") + if starttls: + if _sock.recv(4096).find(b"250-STARTTLS") == -1: + _sock.close() + return None,_banner.strip("\r\n") + _sock.send(b"STARTTLS\r\n") + _sock.recv(1024) + _ctx = SSL.Context(SSL.SSLv23_METHOD) + _ssl = SSL.Connection(_ctx,_sock) + _ssl.set_connect_state() + _ssl.set_tlsext_host_name(hostname.encode("ascii")) + _ssl.setblocking(1) + try: + _ssl.do_handshake() + except SSL.WantReadError: + pass + _ssl.sock_shutdown(socket.SHUT_RDWR) + _sock.close() + _sock = _ssl + else: + _sock.recv(1024) + except socket.error: + return None,"" + return _sock,_banner.strip("\r\n") + + def getcertinfo(self,x509cert,tlsa=None,usage=0): + if x509cert == None: + return {} + elif type(x509cert) == list: + return [self.getcertinfo(x,tlsa,usage) for x in x509cert] + else: + try: + _tlsa = [] + if tlsa: + for _entry in filter(lambda x: x[0] == usage,tlsa): + _certhash, _tlsa_rr = self.get_tlsa_record(x509cert,usage,selector=_entry[1],mtype=_entry[2]) + _tlsa.append([_entry[3] == _certhash,_tlsa_rr]) + _dns_altnames = [] + for _count in range(x509cert.get_extension_count()): + _extension = x509cert.get_extension(_count) + if _extension.get_short_name() == b"subjectAltName": + for _san in str(_extension).split(", "): + if _san.startswith("DNS:"): + _dns_altnames.append(_san[4:]) + + return { + "cn" : x509cert.get_subject().commonName, + "notAfter" : x509cert.get_notAfter().decode("ascii"), + "notBefore" : x509cert.get_notBefore().decode("ascii"), + "keysize" : x509cert.get_pubkey().bits(), + "algo" : x509cert.get_signature_algorithm().decode("ascii"), + "san" : _dns_altnames, + "tlsa" : _tlsa + } + except: + return {} + + @staticmethod + def get_tlsa_record(x509cert,usage,selector,mtype): + if selector == 0: + _cert = crypto.dump_certificate(crypto.FILETYPE_ASN1,x509cert) + else: + _cert = crypto.dump_publickey(crypto.FILETYPE_ASN1,x509cert.get_pubkey()) + if mtype == 0: + _hash = _cert.hex() + elif mtype == 1: + _hash = hashlib.sha256(_cert).hexdigest() + elif mtype == 2: + _hash = hashlib.sha512(_cert).hexdigest() + else: + _hash = _cert.hex() ## todo unknown + return _hash,f"{usage} {selector} {mtype} {_hash}" + + +class checkmk_taskrunner(object): + def __init__(self,cmkserver): + self._mutex = threading.RLock() + self.isrunning = True + self._queue = [] + self.err = None + self._event = threading.Event() + + def start(self): + _t = threading.Thread(target=self._run_forever,name="cmk_taskrunner") + _t.daemon = True + _t.start() + + def get_data(self,tenant=None): + _data = [] + _fails = 0 + _task_running_count = len(self._get_running_task_threads()) + sys.stderr.write("GetDATA\n") + sys.stderr.flush() + with self._mutex: + sys.stderr.write("GetDATA-Mutex\n") + sys.stderr.flush() + if self.err: + for _line in str(self.err).split(): + sys.stderr.write(_line) + self.err = None + _task_count = len(self._queue) + if _task_count == 0: + return [] + _piggyback = "" + _data = [] + for _task in sorted(self._queue,key=lambda x: x.get_piggyback): + #if tenant in (None,_task.tenant): + if len(_task.tenant) == 0 or tenant in _task.tenant: + if _task.get_piggyback != _piggyback: + if _piggyback != "": + _data += ["<<<<>>>>"] + _piggyback = _task.get_piggyback + if _piggyback: + _data += [f"<<<<{_piggyback}>>>>"] + _out = str(_task) + if len(_out.strip()) > 0: + _data += _out.split("\n") + if _piggyback != "": + _data += ["<<<<>>>>"] + + _task_service = "{0} 'CMK Tasks' tasks={1}|tasks_running={2},tasks_failed={3} OK".format(0 if _fails == 0 else 1,_task_count,_task_running_count,_fails) + return [_task_service] + _data + + def check_taskdir(self): + _ids = [] + for _file in glob.glob(f"{TASKDIR}/*.task"): + _id = os.path.basename(_file) + _task = list(filter(lambda x: x.id == _id,self._queue)) + if _task: + if _task[0].lastmodified >= os.stat(_file).st_mtime: + sys.stderr.write(f"{_id} not modified\n") + sys.stderr.flush() + continue + with open(_file,"r",encoding="utf-8") as _f: + _options = dict(TASKFILE_REGEX.findall(_f.read())) + _type = _options.get("type","").strip() + if _type not in ("nmap","speedtest","proxy","ssh","domain","blocklist","cmk","dummy"): + sys.stderr.write(f"unknown {_type}\n") + sys.stderr.flush() + continue + #pprint(_options) + if _task: + _task[0].update(_options) + else: + _task = checkmk_task(_id,_options) + self._queue.append(_task) + _ids.append(_id) + + # remove old tasks + for _task in self._queue: + if _task.id not in _ids: + self._queue.remove(_task) + #self._queue = list(filter(lambda x: x.id in _ids,self._queue)) ## remove config if file removed or disabled + #pprint(self._queue) + + def _get_running_task_threads(self): + return list(filter(lambda x: x.name.endswith(".task"),threading.enumerate())) + + def _run_forever(self): + _check = 0 + PREEXEC = 120 + while self.isrunning: + try: + _now = time.time() + if _check + 600 < _now: + with self._mutex: + self.check_taskdir() + _check = _now + continue + + next_task = None + with self._mutex: + if self._queue: + self._queue.sort() + next_task = self._queue[0] + + if next_task: + wait_time = max(0, next_task.nextrun - _now - PREEXEC) + if wait_time > 0: + self._event.wait(min(30, wait_time)) + self._event.clear() + else: + self._run_task(next_task) + else: + self._event.wait(30) + self._event.clear() + + except Exception as err: + sys.stderr.write(str(err) + "\n") + sys.stderr.flush() + + def _run_task(self, task): + running_tasks = self._get_running_task_threads() + if len(running_tasks) < MAX_SIMULATAN_THREADS: + task.run() + REGEX_SMART_VENDOR = re.compile(r"^\s*(?P\d+)\s(?P[-\w]+).*\s{2,}(?P[\w\/() ]+)$",re.M) REGEX_SMART_DICT = re.compile(r"^(.*?):\s*(.*?)$",re.M) class smart_disc(object): @@ -1665,13 +2271,17 @@ if __name__ == "__main__": help=_("show help message")) _parser.add_argument("--start",action="store_true", help=_("start the daemon")) + _parser.add_argument("--restart",action="store_true", + help=_("stop and restart the daemon")) _parser.add_argument("--stop",action="store_true", help=_("stop the daemon")) _parser.add_argument("--status",action="store_true", help=_("show daemon status")) _parser.add_argument("--nodaemon",action="store_true", help=_("run in foreground")) - _parser.add_argument("--update",nargs="?",const="main",type=str,choices=["main","testing"], + _parser.add_argument("--checkoutput",nargs="?",const="127.0.0.1",type=str,metavar="hostname", + help=_("connect to [hostname]port and decrypt if needed")) + _parser.add_argument("--update",nargs="?",const="main",type=str,metavar="branch/commitid", help=_("check for update")) _parser.add_argument("--config",type=str,dest="configfile",default=CHECKMK_CONFIG, help=_("path to config file")) @@ -1706,12 +2316,14 @@ if __name__ == "__main__": for _k,_v in re.findall(f"^(\w+):\s*(.*?)(?:\s+#|$)",open(args.configfile,"rt").read(),re.M): if _k == "port": args.port = int(_v) - if _k == "encrypt": + if _k == "encrypt" and args.encrypt == None: args.encrypt = _v if _k == "onlyfrom": args.onlyfrom = _v if _k == "skipcheck": args.skipcheck = _v + if _k == "tenants": + args.tenants = _v if _k.lower() == "localdir": LOCALDIR = _v if _k.lower() == "plugindir": @@ -1730,9 +2342,9 @@ if __name__ == "__main__": _pid = int(re.findall("\s(\d+)\s",_out.split("\n")[1])[0]) except (IndexError,ValueError): pass - _active_methods = [getattr(args,x,False) for x in ("start","stop","status","zabbix","nodaemon","debug","update","help")] - - if SYSHOOK_METHOD and any(_active_methods) == False: + _active_methods = [getattr(args,x,False) for x in ("start","stop","restart","status","zabbix","nodaemon","debug","update","checkoutput","help")] + if SYSHOOK_METHOD and not any(_active_methods): + #print(f"SYSHOOK {SYSHOOK_METHOD} - {repr(_active_methods)}") log(f"using syshook {SYSHOOK_METHOD[0]}") setattr(args,SYSHOOK_METHOD[0],True) if args.start: @@ -1756,17 +2368,29 @@ if __name__ == "__main__": except OSError: print("not running") - elif args.stop: + elif args.stop or args.restart: if _pid == 0: sys.stderr.write("not running\n") sys.stderr.flush() - sys.exit(1) + if args.stop: + sys.exit(1) try: + print("stopping") os.kill(_pid,signal.SIGTERM) except ProcessLookupError: if os.path.exists(args.pidfile): os.remove(args.pidfile) + if args.restart: + print("starting") + time.sleep(3) + _server.daemonize() + + elif args.checkoutput: + sys.stdout.write(_server.cmkclient(args.checkoutput)) + sys.stdout.write("\n") + sys.stdout.flush() + elif args.debug: sys.stdout.write(_server.do_checks(debug=True).decode(sys.stdout.encoding)) sys.stdout.flush() @@ -1863,15 +2487,29 @@ if __name__ == "__main__": print("\t* squid - install the mkp from https://exchange.checkmk.com/p/squid and forwarder -> listen on loopback active\n") _parser.print_help() print("\n") - print(f"The CHECKMK_BASEDIR is under {BASEDIR} (local,plugin,spool).") + if "start" in SYSHOOK_METHOD: + print("The agent will start automatic on system boot") + else: + print("to start the agent on boot, copy the file to /usr/local/etc/rc.syshook.d/start/") + print(f"The CHECKMK_BASEDIR is under {BASEDIR} (local,plugin,spool,tasks).") print(f"Default config file location is {args.configfile}, create it if it doesn't exist.") print("Config file options port,encrypt,onlyfrom,skipcheck with a colon and the value like the commandline option\n") print("active config:") print("-"*35) - for _opt in ("port","encrypt","onlyfrom","skipcheck"): + for _opt in ("port","encrypt","onlyfrom","skipcheck","tenants"): _val = getattr(args,_opt,None) if _val: print(f"{_opt}: {_val}") + print("\n") + print("the following tasks are found") + try: + _taskrunner = checkmk_taskrunner(None) + _taskrunner.check_taskdir() + for _task in _taskrunner._queue: + print(" * [{type}]{id} ({interval} sec)".format(**_task.__dict__)) + except: + raise + print("") else: