From c4957d128399570bef4cc9e6a34d95dff25f26f9 Mon Sep 17 00:00:00 2001 From: Janos SUTO Date: Sun, 30 Sep 2018 12:01:13 +0000 Subject: [PATCH] Added support for RT indexes Signed-off-by: Janos SUTO --- config.php.in | 1 + etc/cron.jobs.in | 2 - etc/sphinx.conf.in | 156 ++++------------------------- util/Makefile.in | 1 + util/rtindex.py | 110 ++++++++++++++++++++ webui/controller/search/helper.php | 4 +- webui/model/search/search.php | 5 +- 7 files changed, 137 insertions(+), 142 deletions(-) create mode 100755 util/rtindex.py diff --git a/config.php.in b/config.php.in index b557a4df..db73a935 100644 --- a/config.php.in +++ b/config.php.in @@ -223,6 +223,7 @@ $config['SPHINX_MAIN_INDEX'] = 'main1,dailydelta1,delta1'; $config['SPHINX_ATTACHMENT_INDEX'] = 'att1'; $config['SPHINX_TAG_INDEX'] = 'tag1'; $config['SPHINX_NOTE_INDEX'] = 'note1'; +$config['FROM_TOKEN'] = '@sender'; $config['RELOAD_COMMAND'] = 'sudo -n /etc/init.d/rc.piler reload'; $config['PILERIMPORT_IMAP_COMMAND'] = '/usr/local/bin/pilerimport -d /var/piler/imap -q -r'; diff --git a/etc/cron.jobs.in b/etc/cron.jobs.in index fcbb1c62..0b3a273b 100644 --- a/etc/cron.jobs.in +++ b/etc/cron.jobs.in @@ -1,6 +1,4 @@ ### PILERSTART -5,35 * * * * LIBEXECDIR/piler/indexer.delta.sh -30 2 * * * LIBEXECDIR/piler/indexer.main.sh */15 * * * * /usr/bin/indexer --config SYSCONFDIR/piler/sphinx.conf --quiet tag1 --rotate */15 * * * * /usr/bin/indexer --config SYSCONFDIR/piler/sphinx.conf --quiet note1 --rotate */5 * * * * /usr/bin/find LOCALSTATEDIR/piler/www/tmp -type f -name i.\* -exec rm -f {} \; diff --git a/etc/sphinx.conf.in b/etc/sphinx.conf.in index 0af92020..8cee9756 100644 --- a/etc/sphinx.conf.in +++ b/etc/sphinx.conf.in @@ -15,47 +15,6 @@ source base sql_attr_uint = attachments } -source delta : base -{ - sql_query_pre = SET NAMES utf8mb4 - sql_query_pre = REPLACE INTO sph_counter SELECT 1, IFNULL(MAX(id), 0) FROM sph_index - sql_query_post_index = DELETE FROM sph_index WHERE id<=(SELECT max_doc_id FROM sph_counter WHERE counter_id=1) - sql_query = SELECT id, `from`, `to`, `fromdomain`, `todomain`, `subject`, `arrived`, `sent`, `body`, `size`, `direction`, `folder`, `attachments`, `attachment_types` FROM sph_index \ - WHERE id <= (SELECT max_doc_id FROM sph_counter WHERE counter_id=1) - - sql_query_killlist = SELECT `id` FROM `metadata` WHERE `deleted`=1 -} - -source main1 : base -{ - sql_query_pre = SET NAMES utf8mb4 - sql_query = SELECT id, `from`, `to`, `fromdomain`, `todomain`, `subject`, `arrived`, `sent`, `body`, `size`, `direction`, `folder`, `attachments`, `attachment_types` FROM sph_index WHERE id=-1 -} - -source main2 : base -{ - sql_query_pre = SET NAMES utf8mb4 - sql_query = SELECT id, `from`, `to`, `fromdomain`, `todomain`, `subject`, `arrived`, `sent`, `body`, `size`, `direction`, `folder`, `attachments`, `attachment_types` FROM sph_index WHERE id=-1 -} - -source main3 : base -{ - sql_query_pre = SET NAMES utf8mb4 - sql_query = SELECT id, `from`, `to`, `fromdomain`, `todomain`, `subject`, `arrived`, `sent`, `body`, `size`, `direction`, `folder`, `attachments`, `attachment_types` FROM sph_index WHERE id=-1 -} - -source main4 : base -{ - sql_query_pre = SET NAMES utf8mb4 - sql_query = SELECT id, `from`, `to`, `fromdomain`, `todomain`, `subject`, `arrived`, `sent`, `body`, `size`, `direction`, `folder`, `attachments`, `attachment_types` FROM sph_index WHERE id=-1 -} - -source dailydelta : base -{ - sql_query_pre = SET NAMES utf8mb4 - sql_query = SELECT id, `from`, `to`, `fromdomain`, `todomain`, `subject`, `arrived`, `sent`, `body`, `size`, `direction`, `folder`, `attachments`, `attachment_types` FROM sph_index WHERE id=-1 -} - source tag : base { sql_query_pre = SET NAMES utf8mb4 @@ -76,90 +35,6 @@ source note : base } -source att : base -{ - - sql_query_pre = SET NAMES utf8mb4 - sql_query = select a.id as aid, m.id as mid, a.name AS aname, a.size, REPLACE(REPLACE(m.`from`, '@', 'X'), '.', 'X') as `from`, REPLACE(REPLACE((select group_concat(rcpt.`to` ORDER BY `to` ASC SEPARATOR ' ') from rcpt where rcpt.id=mid group by rcpt.id), '@', 'X'), '.', 'X') as `to` from attachment a, metadata m where m.piler_id=a.piler_id - - sql_attr_uint = size - sql_attr_uint = mid -} - - -index main1 -{ - source = main1 - path = LOCALSTATEDIR/piler/sphinx/main1 - docinfo = extern - dict = keywords - min_prefix_len = 5 - min_word_len = 1 - #ngram_len = 1 - #ngram_chars = U+3000..U+2FA1F -} - -index main2 -{ - source = main2 - path = LOCALSTATEDIR/piler/sphinx/main2 - docinfo = extern - dict = keywords - min_prefix_len = 5 - min_word_len = 1 - #ngram_len = 1 - #ngram_chars = U+3000..U+2FA1F -} - -index main3 -{ - source = main3 - path = LOCALSTATEDIR/piler/sphinx/main3 - docinfo = extern - dict = keywords - min_prefix_len = 5 - min_word_len = 1 - #ngram_len = 1 - #ngram_chars = U+3000..U+2FA1F -} - -index main4 -{ - source = main4 - path = LOCALSTATEDIR/piler/sphinx/main4 - docinfo = extern - dict = keywords - min_prefix_len = 5 - min_word_len = 1 - #ngram_len = 1 - #ngram_chars = U+3000..U+2FA1F -} - -index dailydelta1 -{ - source = dailydelta - path = LOCALSTATEDIR/piler/sphinx/dailydelta1 - docinfo = extern - dict = keywords - min_prefix_len = 5 - min_word_len = 1 - #ngram_len = 1 - #ngram_chars = U+3000..U+2FA1F -} - -index delta1 -{ - source = delta - path = LOCALSTATEDIR/piler/sphinx/delta1 - docinfo = extern - dict = keywords - min_prefix_len = 5 - min_word_len = 1 - #ngram_len = 1 - #ngram_chars = U+3000..U+2FA1F -} - - index tag1 { source = tag @@ -186,16 +61,25 @@ index note1 } -index att1 +index rt1 { - source = att - path = /var/piler/sphinx/att1 - docinfo = extern - dict = keywords - min_prefix_len = 6 - min_word_len = 1 - ngram_len = 1 - ngram_chars = U+1100..U+2FA1F + type = rt + path = /var/piler/sphinx/rt1 + rt_mem_limit = 512M + + rt_field = sender + rt_field = fromdomain + rt_field = to + rt_field = todomain + rt_field = subject + rt_field = body + rt_field = attachment_types + + rt_attr_uint = size + rt_attr_uint = direction + rt_attr_uint = attachments + + rt_attr_timestamp = sent } @@ -210,7 +94,9 @@ searchd listen = 127.0.0.1:9312 listen = 127.0.0.1:9306:mysql41 log = /dev/null - binlog_path = + binlog_path = /var/piler/sphinx + binlog_flush = 2 + binlog_max_log_size = 64M ##query_log = read_timeout = 5 max_children = 30 diff --git a/util/Makefile.in b/util/Makefile.in index 31deefa8..f5ab9a31 100644 --- a/util/Makefile.in +++ b/util/Makefile.in @@ -43,6 +43,7 @@ install: $(INSTALL) -m 0755 $(srcdir)/import.sh $(DESTDIR)$(libexecdir)/piler $(INSTALL) -m 0755 $(srcdir)/purge.sh $(DESTDIR)$(libexecdir)/piler $(INSTALL) -m 0755 $(srcdir)/pilerpurge.py $(DESTDIR)$(libexecdir)/piler + $(INSTALL) -m 0755 $(srcdir)/rtindex.py $(DESTDIR)$(libexecdir)/piler $(INSTALL) -m 0755 $(srcdir)/postinstall.sh $(DESTDIR)$(libexecdir)/piler $(INSTALL) -m 0755 $(srcdir)/watch_sphinx_main_index.sh $(DESTDIR)$(libexecdir)/piler diff --git a/util/rtindex.py b/util/rtindex.py new file mode 100755 index 00000000..79a13de3 --- /dev/null +++ b/util/rtindex.py @@ -0,0 +1,110 @@ +#!/usr/bin/python + +import ConfigParser +import MySQLdb as dbapi +import StringIO +import argparse +import getpass +import os +import sys +import syslog +import time + +SQL_SELECT_INDEX_QUERY = "SELECT id, `from`, fromdomain, `to`, todomain, subject, body, attachment_types, size, direction, attachments, sent FROM sph_index ORDER BY id ASC" +SQL_INSERT_QUERY = "INSERT INTO rt1 VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)" +SQL_DELETE_QUERY = "DELETE FROM sph_index WHERE id IN (%s)" +SLEEP_DELAY = 5 + +opts = {} + + +def read_options(filename="", opts={}): + syslog.syslog("Reading %s" % (filename)) + + s = "[piler]\n" + open(filename, 'r').read() + fp = StringIO.StringIO(s) + config = ConfigParser.RawConfigParser() + config.readfp(fp) + + opts['username'] = config.get('piler', 'mysqluser') + opts['password'] = config.get('piler', 'mysqlpwd') + opts['database'] = config.get('piler', 'mysqldb') + + +def process_batch(opts={}): + try: + opts['db'] = dbapi.connect("localhost", opts['username'], + opts['password'], opts['database']) + + cursor = opts['db'].cursor() + cursor.execute(SQL_SELECT_INDEX_QUERY) + + while True: + rows = cursor.fetchmany(opts['batch_size']) + if rows == (): + time.sleep(SLEEP_DELAY) + break + + ids = [x[0] for x in rows] + + # Push data to sphinx + opts['sphx'] = dbapi.connect(host=opts['sphinx'], port=opts['port']) + sphx_cursor = opts['sphx'].cursor() + sphx_cursor.executemany(SQL_INSERT_QUERY, rows) + opts['sphx'].commit() + opts['sphx'].close() + + syslog.syslog("%d record inserted" % (sphx_cursor.rowcount)) + + # Delete rows from sph_index table + format = ", ".join(['%s'] * len(ids)) + cursor.execute(SQL_DELETE_QUERY % (format), ids) + opts['db'].commit() + + except dbapi.DatabaseError, e: + print "Error %s" % e + syslog.syslog("Error %s" % e) + time.sleep(SLEEP_DELAY) + + if opts['db']: + opts['db'].close() + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("-c", "--config", type=str, help="piler.conf path", + default="/etc/piler/piler.conf") + parser.add_argument("-b", "--batch-size", type=int, help="batch size " + + "to delete", default=1000) + parser.add_argument("-s", "--sphinx", type=str, help="sphinx server", + default="127.0.0.1") + parser.add_argument("-p", "--port", type=int, help="sphinx sql port", + default=9306) + parser.add_argument("-d", "--dry-run", help="dry run", action='store_true') + parser.add_argument("-v", "--verbose", help="verbose mode", + action='store_true') + + args = parser.parse_args() + + if getpass.getuser() not in ['root', 'piler']: + print "Please run me as user 'piler'" + sys.exit(1) + + opts['sphinx'] = args.sphinx + opts['port'] = args.port + opts['dry_run'] = args.dry_run + opts['verbose'] = args.verbose + opts['batch_size'] = args.batch_size + opts['db'] = None + opts['sphx'] = None + + syslog.openlog(logoption=syslog.LOG_PID, facility=syslog.LOG_MAIL) + + read_options(args.config, opts) + + while True: + process_batch(opts) + + +if __name__ == "__main__": + main() diff --git a/webui/controller/search/helper.php b/webui/controller/search/helper.php index ea162c27..7329b369 100644 --- a/webui/controller/search/helper.php +++ b/webui/controller/search/helper.php @@ -93,7 +93,7 @@ class ControllerSearchHelper extends Controller { private function fixup_post_simple_request() { $match = ''; - if(isset($this->request->post['from']) && $this->request->post['from']) { $match .= "@from " . $this->request->post['from'] . ' '; } + if(isset($this->request->post['from']) && $this->request->post['from']) { $match .= FROM_TOKEN . ' ' . $this->request->post['from'] . ' '; } if(isset($this->request->post['to']) && $this->request->post['to']) { $match .= "@to " . $this->request->post['to'] . ' '; } if(isset($this->request->post['subject']) && $this->request->post['subject']) { $match .= "@subject " . $this->request->post['subject'] . ' '; } if(isset($this->request->post['body']) && $this->request->post['body']) { $match .= "@body " . $this->request->post['body'] . ' '; } @@ -144,7 +144,7 @@ class ControllerSearchHelper extends Controller { } else if(strchr($v, '@')) { $prev_token_is_email = 1; - if($from == '') { $from = "@from"; } + if($from == '') { $from = FROM_TOKEN; } $from .= " $v"; } else { diff --git a/webui/model/search/search.php b/webui/model/search/search.php index ec269fae..091076c3 100644 --- a/webui/model/search/search.php +++ b/webui/model/search/search.php @@ -96,8 +96,7 @@ class ModelSearchSearch extends Model { $id = ""; $offset = 0; $total_sphx_hits = $num_rows = 0; - $fields = array("@(subject,body)", "@from", "@to", "@subject", "@body", "@attachment_types"); - + $fields = array("@(subject,body)", FROM_TOKEN, "@to", "@subject", "@body", "@attachment_types"); $pagelen = get_page_length(); $offset = $page * $pagelen; @@ -128,7 +127,7 @@ class ModelSearchSearch extends Model { if(substr($v, 0, 1) == "@") { $v = substr($v, 1, strlen($v)-1); - if($data['match'][$i-1] == "@from") { $data['match'][$i-1] = "@fromdomain"; } + if($data['match'][$i-1] == FROM_TOKEN) { $data['match'][$i-1] = "@fromdomain"; } if($data['match'][$i-1] == "@to") { $data['match'][$i-1] = "@todomain"; } }