From 821ba44e1a14c685c7a2738f60f6ae364ad07f97 Mon Sep 17 00:00:00 2001 From: Janos SUTO Date: Mon, 6 Jun 2022 14:29:16 +0200 Subject: [PATCH] Added support for sphinx rt index Signed-off-by: Janos SUTO --- config.php.in | 1 + etc/sphinx.conf.in | 34 +++++++++++++- util/rtindex.py | 109 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 142 insertions(+), 2 deletions(-) create mode 100755 util/rtindex.py diff --git a/config.php.in b/config.php.in index a9981beb..27903c0c 100644 --- a/config.php.in +++ b/config.php.in @@ -244,6 +244,7 @@ $config['SPHINX_ATTACHMENT_INDEX'] = 'att1'; $config['SPHINX_TAG_INDEX'] = 'tag1'; $config['SPHINX_NOTE_INDEX'] = 'note1'; $config['SPHINX_STRICT_SCHEMA'] = 1; +$config['RT'] = 0; $config['MAX_EMAIL_LEN'] = 41; $config['RELOAD_COMMAND'] = 'sudo -n /etc/init.d/rc.piler reload'; diff --git a/etc/sphinx.conf.in b/etc/sphinx.conf.in index c4eadc38..b20da9dc 100644 --- a/etc/sphinx.conf.in +++ b/etc/sphinx.conf.in @@ -46,6 +46,8 @@ source base sql_attr_uint = attachments } + + source delta : base { sql_query_pre = SET NAMES utf8mb4 @@ -89,6 +91,7 @@ source dailydelta : base sql_query_pre = SET NAMES utf8mb4 sql_query = SELECT FROM sph_index WHERE id=-1 } +?> source tag : base { @@ -110,7 +113,7 @@ source note : base } - + index main1 { source = main1 @@ -198,6 +201,7 @@ index delta1 } + index tag1 { @@ -228,6 +232,27 @@ index note1 } + +index piler1 +{ + type = rt + path = /var/piler/sphinx/piler1 + rt_mem_limit = 128M + rt_field = sender + rt_field = rcpt + rt_field = senderdomain + rt_field = rcptdomain + rt_field = subject + rt_field = body + rt_field = attachment_types + rt_attr_bigint = arrived + rt_attr_bigint = sent + rt_attr_uint = size + rt_attr_uint = direction + rt_attr_uint = folder + rt_attr_uint = attachments +} + indexer { @@ -240,7 +265,12 @@ searchd listen = 127.0.0.1:9312 listen = 127.0.0.1:9306:mysql41 log = /dev/null - binlog_path = + binlog_path = /var/piler/sphinx + binlog_max_log_size = 16M + binlog_flush = 2 + + rt_flush_period = 300 + ##query_log = read_timeout = 5 max_children = 30 diff --git a/util/rtindex.py b/util/rtindex.py new file mode 100755 index 00000000..bd5b6e38 --- /dev/null +++ b/util/rtindex.py @@ -0,0 +1,109 @@ +#!/usr/bin/python3 + +import configparser +import MySQLdb as dbapi +import argparse +import getpass +import os +import sys +import syslog +import time + +SQL_SELECT_QUERY = "SELECT id, `from`, `to`, fromdomain, todomain, subject, " + \ + "arrived, sent, body, size, direction, folder, attachments, " + \ + "attachment_types FROM sph_index" + +opts = {} + + +def read_options(filename="", opts={}): + s = "[piler]\n" + open(filename, 'r').read() + config = configparser.ConfigParser() + config.read_string(s) + + if config.has_option('piler', 'mysqlhost'): + opts['dbhost'] = config.get('piler', 'mysqlhost') + else: + opts['dbhost'] = 'localhost' + + opts['username'] = config.get('piler', 'mysqluser') + opts['password'] = config.get('piler', 'mysqlpwd') + opts['database'] = config.get('piler', 'mysqldb') + + # FIXME + opts['sphx_host'] = "127.0.0.1" + opts['sphx_port'] = 9306 + + +def process_messages(opts): + opts['db'] = dbapi.connect(opts['dbhost'], opts['username'], + opts['password'], opts['database']) + + opts['sphx'] = dbapi.connect(host=opts['sphx_host'], + port=opts['sphx_port']) + + cursor = opts['db'].cursor() + s_cursor = opts['sphx'].cursor() + + cursor.execute(SQL_SELECT_QUERY) + rows = cursor.fetchmany(opts['batch_size']) + + if rows: + ids = [x[0] for x in rows] + syslog.syslog("Processed %d items" % (len(ids))) + + s_cursor.executemany(""" + INSERT INTO piler1 (id, sender, rcpt, senderdomain, rcptdomain, + subject, arrived, sent, body, size, direction, folder, attachments, + attachment_types) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, + %s, %s, %s, %s)""", rows) + + format = ", ".join(['%s'] * len(ids)) + cursor.execute("DELETE FROM sph_index WHERE id IN (%s)" % (format), ids) + opts['db'].commit() + + else: + time.sleep(opts['sleep']) + + if opts['db']: + opts['db'].close() + + if opts['sphx']: + opts['sphx'].close() + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("-c", "--config", type=str, help="piler.conf path", + default="/etc/piler/piler.conf") + parser.add_argument("-b", "--batch-size", type=int, help="batch size " + + "to process", default=1000) + parser.add_argument("-s", "--sleep", type=int, help="sleep after no data " + + "to index", default=5) + parser.add_argument("-d", "--dry-run", help="dry run", action='store_true') + parser.add_argument("-v", "--verbose", help="verbose mode", + action='store_true') + + args = parser.parse_args() + + if getpass.getuser() not in ['root', 'piler']: + print("Please run me as user 'piler'") + sys.exit(1) + + opts['dry_run'] = args.dry_run + opts['verbose'] = args.verbose + opts['batch_size'] = args.batch_size + opts['sleep'] = args.sleep + opts['db'] = None + opts['sphx'] = None + + syslog.openlog(logoption=syslog.LOG_PID, facility=syslog.LOG_MAIL) + + read_options(args.config, opts) + + while True: + process_messages(opts) + + +if __name__ == "__main__": + main()