piler/util/rtindex.py

116 lines
3.6 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/python3
import configparser
import MySQLdb as dbapi
import argparse
import getpass
import os
import sys
import syslog
import time
SQL_SELECT_QUERY = "SELECT id, `from`, `to`, fromdomain, todomain, subject, " + \
"arrived, sent, body, size, direction, folder, attachments, " + \
"attachment_types FROM sph_index"
SQL_INSERT_QUERY = "INSERT INTO piler1 (id, sender, rcpt, senderdomain, " + \
"rcptdomain, subject, arrived, sent, body, size, direction, " + \
"folder, attachments, attachment_types) VALUES (%s, %s, %s, " + \
"%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
SQL_DELETE_QUERY = "DELETE FROM sph_index WHERE id IN (%s)"
SLEEP_DELAY = 5
opts = {}
def read_options(filename="", opts={}):
s = "[piler]\n" + open(filename, 'r').read()
config = configparser.ConfigParser()
config.read_string(s)
if config.has_option('piler', 'mysqlhost'):
opts['dbhost'] = config.get('piler', 'mysqlhost')
else:
opts['dbhost'] = 'localhost'
opts['username'] = config.get('piler', 'mysqluser')
opts['password'] = config.get('piler', 'mysqlpwd')
opts['database'] = config.get('piler', 'mysqldb')
def process_batch(opts):
try:
opts['db'] = dbapi.connect(opts['dbhost'], opts['username'],
opts['password'], opts['database'])
cursor = opts['db'].cursor()
while True:
cursor.execute(SQL_SELECT_QUERY)
rows = cursor.fetchmany(opts['batch_size'])
if rows == ():
time.sleep(SLEEP_DELAY)
break
ids = [x[0] for x in rows]
opts['sphx'] = dbapi.connect(host=opts['sphinx_host'],
port=opts['sphinx_port'])
sphx_cursor = opts['sphx'].cursor()
sphx_cursor.executemany(SQL_INSERT_QUERY, rows)
opts['sphx'].commit()
opts['sphx'].close()
syslog.syslog("%d records inserted" % (sphx_cursor.rowcount))
format = ", ".join(['%s'] * len(ids))
cursor.execute(SQL_DELETE_QUERY % (format), ids)
opts['db'].commit()
except dbapi.DatabaseError as e:
syslog.syslog("Error %s" % e)
time.sleep(SLEEP_DELAY)
if opts['db']:
opts['db'].close()
def main():
parser = argparse.ArgumentParser()
parser.add_argument("-c", "--config", type=str, help="piler.conf path",
default="/etc/piler/piler.conf")
parser.add_argument("-b", "--batch-size", type=int, help="batch size " +
"to process", default=1000)
parser.add_argument("-s", "--sphinx", type=str, help="sphinx server",
default="127.0.0.1")
parser.add_argument("-p", "--port", type=int, help="sphinx sql port",
default=9306)
parser.add_argument("-d", "--dry-run", help="dry run", action='store_true')
parser.add_argument("-v", "--verbose", help="verbose mode",
action='store_true')
args = parser.parse_args()
if getpass.getuser() not in ['root', 'piler']:
print("Please run me as user 'piler'")
sys.exit(1)
opts['dry_run'] = args.dry_run
opts['verbose'] = args.verbose
opts['sphinx_host'] = args.sphinx
opts['sphinx_port'] = args.port
opts['batch_size'] = args.batch_size
opts['db'] = None
opts['sphx'] = None
syslog.openlog(logoption=syslog.LOG_PID, facility=syslog.LOG_MAIL)
read_options(args.config, opts)
while True:
process_batch(opts)
if __name__ == "__main__":
main()