diff --git a/util/rtindex.py b/util/rtindex.py index bd5b6e38..41c1746b 100755 --- a/util/rtindex.py +++ b/util/rtindex.py @@ -12,6 +12,12 @@ import time SQL_SELECT_QUERY = "SELECT id, `from`, `to`, fromdomain, todomain, subject, " + \ "arrived, sent, body, size, direction, folder, attachments, " + \ "attachment_types FROM sph_index" +SQL_INSERT_QUERY = "INSERT INTO piler1 (id, sender, rcpt, senderdomain, " + \ + "rcptdomain, subject, arrived, sent, body, size, direction, " + \ + "folder, attachments, attachment_types) VALUES (%s, %s, %s, " + \ + "%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)" +SQL_DELETE_QUERY = "DELETE FROM sph_index WHERE id IN (%s)" +SLEEP_DELAY = 5 opts = {} @@ -30,47 +36,44 @@ def read_options(filename="", opts={}): opts['password'] = config.get('piler', 'mysqlpwd') opts['database'] = config.get('piler', 'mysqldb') - # FIXME - opts['sphx_host'] = "127.0.0.1" - opts['sphx_port'] = 9306 +def process_batch(opts): + try: + opts['db'] = dbapi.connect(opts['dbhost'], opts['username'], + opts['password'], opts['database']) -def process_messages(opts): - opts['db'] = dbapi.connect(opts['dbhost'], opts['username'], - opts['password'], opts['database']) + cursor = opts['db'].cursor() - opts['sphx'] = dbapi.connect(host=opts['sphx_host'], - port=opts['sphx_port']) + while True: + cursor.execute(SQL_SELECT_QUERY) + rows = cursor.fetchmany(opts['batch_size']) + if rows == (): + time.sleep(SLEEP_DELAY) + break - cursor = opts['db'].cursor() - s_cursor = opts['sphx'].cursor() + ids = [x[0] for x in rows] - cursor.execute(SQL_SELECT_QUERY) - rows = cursor.fetchmany(opts['batch_size']) + opts['sphx'] = dbapi.connect(host=opts['sphinx_host'], + port=opts['sphinx_port']) + sphx_cursor = opts['sphx'].cursor() - if rows: - ids = [x[0] for x in rows] - syslog.syslog("Processed %d items" % (len(ids))) + sphx_cursor.executemany(SQL_INSERT_QUERY, rows) + opts['sphx'].commit() + opts['sphx'].close() - s_cursor.executemany(""" - INSERT INTO piler1 (id, sender, rcpt, senderdomain, rcptdomain, - subject, arrived, sent, body, size, direction, folder, attachments, - attachment_types) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, - %s, %s, %s, %s)""", rows) + syslog.syslog("%d records inserted" % (sphx_cursor.rowcount)) - format = ", ".join(['%s'] * len(ids)) - cursor.execute("DELETE FROM sph_index WHERE id IN (%s)" % (format), ids) - opts['db'].commit() + format = ", ".join(['%s'] * len(ids)) + cursor.execute(SQL_DELETE_QUERY % (format), ids) + opts['db'].commit() - else: - time.sleep(opts['sleep']) + except dbapi.DatabaseError as e: + syslog.syslog("Error %s" % e) + time.sleep(SLEEP_DELAY) if opts['db']: opts['db'].close() - if opts['sphx']: - opts['sphx'].close() - def main(): parser = argparse.ArgumentParser() @@ -78,8 +81,10 @@ def main(): default="/etc/piler/piler.conf") parser.add_argument("-b", "--batch-size", type=int, help="batch size " + "to process", default=1000) - parser.add_argument("-s", "--sleep", type=int, help="sleep after no data " + - "to index", default=5) + parser.add_argument("-s", "--sphinx", type=str, help="sphinx server", + default="127.0.0.1") + parser.add_argument("-p", "--port", type=int, help="sphinx sql port", + default=9306) parser.add_argument("-d", "--dry-run", help="dry run", action='store_true') parser.add_argument("-v", "--verbose", help="verbose mode", action='store_true') @@ -92,8 +97,9 @@ def main(): opts['dry_run'] = args.dry_run opts['verbose'] = args.verbose + opts['sphinx_host'] = args.sphinx + opts['sphinx_port'] = args.port opts['batch_size'] = args.batch_size - opts['sleep'] = args.sleep opts['db'] = None opts['sphx'] = None @@ -102,7 +108,7 @@ def main(): read_options(args.config, opts) while True: - process_messages(opts) + process_batch(opts) if __name__ == "__main__":