mirror of
https://bitbucket.org/jsuto/piler.git
synced 2024-11-07 22:31:59 +01:00
rtindex.py fixes
Signed-off-by: Janos SUTO <sj@acts.hu>
This commit is contained in:
parent
1ad94cd6f4
commit
59578f75c4
@ -12,6 +12,12 @@ import time
|
|||||||
SQL_SELECT_QUERY = "SELECT id, `from`, `to`, fromdomain, todomain, subject, " + \
|
SQL_SELECT_QUERY = "SELECT id, `from`, `to`, fromdomain, todomain, subject, " + \
|
||||||
"arrived, sent, body, size, direction, folder, attachments, " + \
|
"arrived, sent, body, size, direction, folder, attachments, " + \
|
||||||
"attachment_types FROM sph_index"
|
"attachment_types FROM sph_index"
|
||||||
|
SQL_INSERT_QUERY = "INSERT INTO piler1 (id, sender, rcpt, senderdomain, " + \
|
||||||
|
"rcptdomain, subject, arrived, sent, body, size, direction, " + \
|
||||||
|
"folder, attachments, attachment_types) VALUES (%s, %s, %s, " + \
|
||||||
|
"%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
|
||||||
|
SQL_DELETE_QUERY = "DELETE FROM sph_index WHERE id IN (%s)"
|
||||||
|
SLEEP_DELAY = 5
|
||||||
|
|
||||||
opts = {}
|
opts = {}
|
||||||
|
|
||||||
@ -30,47 +36,44 @@ def read_options(filename="", opts={}):
|
|||||||
opts['password'] = config.get('piler', 'mysqlpwd')
|
opts['password'] = config.get('piler', 'mysqlpwd')
|
||||||
opts['database'] = config.get('piler', 'mysqldb')
|
opts['database'] = config.get('piler', 'mysqldb')
|
||||||
|
|
||||||
# FIXME
|
|
||||||
opts['sphx_host'] = "127.0.0.1"
|
|
||||||
opts['sphx_port'] = 9306
|
|
||||||
|
|
||||||
|
def process_batch(opts):
|
||||||
|
try:
|
||||||
|
opts['db'] = dbapi.connect(opts['dbhost'], opts['username'],
|
||||||
|
opts['password'], opts['database'])
|
||||||
|
|
||||||
def process_messages(opts):
|
cursor = opts['db'].cursor()
|
||||||
opts['db'] = dbapi.connect(opts['dbhost'], opts['username'],
|
|
||||||
opts['password'], opts['database'])
|
|
||||||
|
|
||||||
opts['sphx'] = dbapi.connect(host=opts['sphx_host'],
|
while True:
|
||||||
port=opts['sphx_port'])
|
cursor.execute(SQL_SELECT_QUERY)
|
||||||
|
rows = cursor.fetchmany(opts['batch_size'])
|
||||||
|
if rows == ():
|
||||||
|
time.sleep(SLEEP_DELAY)
|
||||||
|
break
|
||||||
|
|
||||||
cursor = opts['db'].cursor()
|
ids = [x[0] for x in rows]
|
||||||
s_cursor = opts['sphx'].cursor()
|
|
||||||
|
|
||||||
cursor.execute(SQL_SELECT_QUERY)
|
opts['sphx'] = dbapi.connect(host=opts['sphinx_host'],
|
||||||
rows = cursor.fetchmany(opts['batch_size'])
|
port=opts['sphinx_port'])
|
||||||
|
sphx_cursor = opts['sphx'].cursor()
|
||||||
|
|
||||||
if rows:
|
sphx_cursor.executemany(SQL_INSERT_QUERY, rows)
|
||||||
ids = [x[0] for x in rows]
|
opts['sphx'].commit()
|
||||||
syslog.syslog("Processed %d items" % (len(ids)))
|
opts['sphx'].close()
|
||||||
|
|
||||||
s_cursor.executemany("""
|
syslog.syslog("%d records inserted" % (sphx_cursor.rowcount))
|
||||||
INSERT INTO piler1 (id, sender, rcpt, senderdomain, rcptdomain,
|
|
||||||
subject, arrived, sent, body, size, direction, folder, attachments,
|
|
||||||
attachment_types) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
|
|
||||||
%s, %s, %s, %s)""", rows)
|
|
||||||
|
|
||||||
format = ", ".join(['%s'] * len(ids))
|
format = ", ".join(['%s'] * len(ids))
|
||||||
cursor.execute("DELETE FROM sph_index WHERE id IN (%s)" % (format), ids)
|
cursor.execute(SQL_DELETE_QUERY % (format), ids)
|
||||||
opts['db'].commit()
|
opts['db'].commit()
|
||||||
|
|
||||||
else:
|
except dbapi.DatabaseError as e:
|
||||||
time.sleep(opts['sleep'])
|
syslog.syslog("Error %s" % e)
|
||||||
|
time.sleep(SLEEP_DELAY)
|
||||||
|
|
||||||
if opts['db']:
|
if opts['db']:
|
||||||
opts['db'].close()
|
opts['db'].close()
|
||||||
|
|
||||||
if opts['sphx']:
|
|
||||||
opts['sphx'].close()
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
parser = argparse.ArgumentParser()
|
parser = argparse.ArgumentParser()
|
||||||
@ -78,8 +81,10 @@ def main():
|
|||||||
default="/etc/piler/piler.conf")
|
default="/etc/piler/piler.conf")
|
||||||
parser.add_argument("-b", "--batch-size", type=int, help="batch size " +
|
parser.add_argument("-b", "--batch-size", type=int, help="batch size " +
|
||||||
"to process", default=1000)
|
"to process", default=1000)
|
||||||
parser.add_argument("-s", "--sleep", type=int, help="sleep after no data " +
|
parser.add_argument("-s", "--sphinx", type=str, help="sphinx server",
|
||||||
"to index", default=5)
|
default="127.0.0.1")
|
||||||
|
parser.add_argument("-p", "--port", type=int, help="sphinx sql port",
|
||||||
|
default=9306)
|
||||||
parser.add_argument("-d", "--dry-run", help="dry run", action='store_true')
|
parser.add_argument("-d", "--dry-run", help="dry run", action='store_true')
|
||||||
parser.add_argument("-v", "--verbose", help="verbose mode",
|
parser.add_argument("-v", "--verbose", help="verbose mode",
|
||||||
action='store_true')
|
action='store_true')
|
||||||
@ -92,8 +97,9 @@ def main():
|
|||||||
|
|
||||||
opts['dry_run'] = args.dry_run
|
opts['dry_run'] = args.dry_run
|
||||||
opts['verbose'] = args.verbose
|
opts['verbose'] = args.verbose
|
||||||
|
opts['sphinx_host'] = args.sphinx
|
||||||
|
opts['sphinx_port'] = args.port
|
||||||
opts['batch_size'] = args.batch_size
|
opts['batch_size'] = args.batch_size
|
||||||
opts['sleep'] = args.sleep
|
|
||||||
opts['db'] = None
|
opts['db'] = None
|
||||||
opts['sphx'] = None
|
opts['sphx'] = None
|
||||||
|
|
||||||
@ -102,7 +108,7 @@ def main():
|
|||||||
read_options(args.config, opts)
|
read_options(args.config, opts)
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
process_messages(opts)
|
process_batch(opts)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
Loading…
Reference in New Issue
Block a user