mirror of
https://bitbucket.org/jsuto/piler.git
synced 2024-12-27 09:20:12 +01:00
e2f6a71827
Signed-off-by: Janos SUTO <sj@acts.hu>
291 lines
9.4 KiB
Python
Executable File
291 lines
9.4 KiB
Python
Executable File
#!/usr/bin/python3
|
|
|
|
import configparser
|
|
import MySQLdb as dbapi
|
|
import argparse
|
|
import getpass
|
|
import os
|
|
import sys
|
|
import syslog
|
|
import time
|
|
|
|
SQL_PURGE_SELECT_QUERY = "SELECT id, piler_id, size FROM " +\
|
|
"metadata WHERE deleted=0 AND retained < UNIX_TIMESTAMP(NOW()) " +\
|
|
"AND id NOT IN (SELECT id FROM rcpt WHERE `to` IN " +\
|
|
"(SELECT email FROM legal_hold)) AND id NOT IN (SELECT " +\
|
|
"id FROM metadata WHERE `from` IN (SELECT email FROM " +\
|
|
"legal_hold))"
|
|
|
|
opts = {}
|
|
|
|
|
|
def read_options(filename="", opts={}):
|
|
s = "[piler]\n" + open(filename, 'r').read()
|
|
config = configparser.ConfigParser()
|
|
config.read_string(s)
|
|
|
|
if config.has_option('piler', 'mysqlhost'):
|
|
opts['dbhost'] = config.get('piler', 'mysqlhost')
|
|
else:
|
|
opts['dbhost'] = 'localhost'
|
|
|
|
opts['username'] = config.get('piler', 'mysqluser')
|
|
opts['password'] = config.get('piler', 'mysqlpwd')
|
|
opts['database'] = config.get('piler', 'mysqldb')
|
|
opts['storedir'] = config.get('piler', 'queuedir')
|
|
opts['rtindex'] = config.getint('piler', 'rtindex', fallback=0)
|
|
opts['sphxhost'] = config.get('piler', 'sphxhost', fallback='127.0.0.1')
|
|
opts['sphxport'] = config.getint('piler', 'sphxport', fallback=9306)
|
|
opts['server_id'] = "%02x" % config.getint('piler', 'server_id')
|
|
|
|
|
|
def is_purge_enabled(opts={}):
|
|
cursor = opts['db'].cursor()
|
|
|
|
cursor.execute("SELECT `value` FROM `option` WHERE `key`='enable_purge'")
|
|
|
|
row = cursor.fetchone()
|
|
if row and row[0] == '1':
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def purge_m_files(ids=[], opts={}):
|
|
if len(ids) > 0:
|
|
remove_m_files(ids, opts)
|
|
|
|
# Set deleted=1 for aged metadata entries
|
|
# as well as clean other tables
|
|
|
|
if opts['dry_run'] is False:
|
|
cursor = opts['db'].cursor()
|
|
format = ", ".join(['%s'] * len(ids))
|
|
cursor.execute("UPDATE metadata SET deleted=1, subject=NULL, `from`=''," +
|
|
"fromdomain='', message_id='' WHERE piler_id IN " +
|
|
"(%s)" % (format), ids)
|
|
|
|
cursor.execute("DELETE FROM rcpt WHERE id IN (SELECT id FROM metadata " +
|
|
"WHERE piler_id IN (%s))" % (format), ids)
|
|
cursor.execute("DELETE FROM note WHERE id IN (SELECT id FROM metadata " +
|
|
"WHERE piler_id IN (%s))" % (format), ids)
|
|
cursor.execute("DELETE FROM tag WHERE id IN (SELECT id FROM metadata " +
|
|
"WHERE piler_id IN (%s))" % (format), ids)
|
|
cursor.execute("DELETE FROM private WHERE id IN (SELECT id FROM metadata " +
|
|
"WHERE piler_id IN (%s))" % (format), ids)
|
|
cursor.execute("DELETE FROM folder_message WHERE id IN (SELECT id FROM " +
|
|
"metadata WHERE piler_id IN (%s))" % (format), ids)
|
|
|
|
opts['db'].commit()
|
|
|
|
|
|
def purge_attachments_by_piler_id(ids=[], opts={}):
|
|
format = ", ".join(['%s'] * len(ids))
|
|
|
|
cursor = opts['db'].cursor()
|
|
|
|
cursor.execute("SELECT i, piler_id, attachment_id, refcount FROM " +
|
|
"v_attachment WHERE piler_id IN (%s)" % (format), ids)
|
|
|
|
while True:
|
|
rows = cursor.fetchall()
|
|
if rows == ():
|
|
break
|
|
else:
|
|
remove_attachment_files(rows, opts)
|
|
|
|
|
|
def purge_attachments_by_attachment_id(opts={}):
|
|
format = ", ".join(['%s'] * len(opts['referenced_attachments']))
|
|
|
|
cursor = opts['db'].cursor()
|
|
|
|
cursor.execute("SELECT i, piler_id, attachment_id, refcount FROM " +
|
|
"v_attachment WHERE refcount=0 AND i IN (%s)" %
|
|
(format), opts['referenced_attachments'])
|
|
|
|
while True:
|
|
rows = cursor.fetchall()
|
|
if rows == ():
|
|
break
|
|
else:
|
|
for (id, piler_id, attachment_id, refcount) in rows:
|
|
if opts['dry_run'] is False:
|
|
unlink(get_attachment_file_path(piler_id, attachment_id,
|
|
opts), opts)
|
|
else:
|
|
print(get_attachment_file_path(piler_id, attachment_id, opts))
|
|
|
|
|
|
def remove_attachment_files(rows=(), opts={}):
|
|
remove_ids = []
|
|
referenced_ids = []
|
|
|
|
if rows == ():
|
|
return
|
|
|
|
# If refcount > 0, then save attachment.id, and handle later,
|
|
# otherwise delete the attachment from the filesystem, and
|
|
# attachment table
|
|
|
|
for (id, piler_id, attachment_id, refcount) in rows:
|
|
if refcount == 0:
|
|
remove_ids.append(id)
|
|
|
|
if opts['dry_run'] is False:
|
|
unlink(get_attachment_file_path(piler_id, attachment_id,
|
|
opts), opts)
|
|
else:
|
|
print(get_attachment_file_path(piler_id, attachment_id, opts))
|
|
else:
|
|
referenced_ids.append(id)
|
|
|
|
if remove_ids:
|
|
if opts['dry_run'] is False:
|
|
format = ", ".join(['%s'] * len(remove_ids))
|
|
cursor = opts['db'].cursor()
|
|
cursor.execute("DELETE FROM attachment WHERE id IN (%s)" %
|
|
(format), remove_ids)
|
|
opts['db'].commit()
|
|
else:
|
|
print(remove_ids)
|
|
|
|
opts['referenced_attachments'] = referenced_ids
|
|
|
|
|
|
def remove_m_files(ids=[], opts={}):
|
|
for i in range(0, len(ids)):
|
|
if opts['dry_run'] is False:
|
|
unlink(get_m_file_path(ids[i], opts), opts)
|
|
opts['messages'] = opts['messages'] + 1
|
|
else:
|
|
print(get_m_file_path(ids[i], opts))
|
|
|
|
|
|
def unlink(filename="", opts={}):
|
|
if opts['verbose']:
|
|
print("removing", filename)
|
|
|
|
try:
|
|
st = os.stat(filename)
|
|
opts['purged_stored_size'] = opts['purged_stored_size'] + st.st_size
|
|
opts['files'] = opts['files'] + 1
|
|
os.unlink(filename)
|
|
except:
|
|
pass
|
|
|
|
|
|
def get_m_file_path(id='', opts={}):
|
|
return "/".join([opts['storedir'], id[24:26], id[8:11], id[32:34],
|
|
id[34:36], id + ".m"])
|
|
|
|
|
|
def get_attachment_file_path(piler_id='', attachment_id=0, opts={}):
|
|
return "/".join([opts['storedir'], piler_id[24:26], piler_id[8:11],
|
|
piler_id[32:34], piler_id[34:36], piler_id + ".a" +
|
|
str(attachment_id)])
|
|
|
|
|
|
def purge_index_data(ids=[], opts={}):
|
|
'''
|
|
Delete from index data in case of RT index
|
|
'''
|
|
|
|
if opts['rtindex'] == 1 and opts['dry_run'] is False:
|
|
cursor = opts['sphx'].cursor()
|
|
a = "," . join([str(x) for x in ids])
|
|
cursor.execute("DELETE FROM piler WHERE id IN (%s)" % (a))
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("-c", "--config", type=str, help="piler.conf path",
|
|
default="/etc/piler/piler.conf")
|
|
parser.add_argument("-b", "--batch-size", type=int, help="batch size " +
|
|
"to delete", default=1000)
|
|
parser.add_argument("-d", "--dry-run", help="dry run", action='store_true')
|
|
parser.add_argument("-v", "--verbose", help="verbose mode",
|
|
action='store_true')
|
|
|
|
args = parser.parse_args()
|
|
|
|
if getpass.getuser() not in ['root', 'piler']:
|
|
print("Please run me as user 'piler'")
|
|
sys.exit(1)
|
|
|
|
opts['dry_run'] = args.dry_run
|
|
opts['verbose'] = args.verbose
|
|
opts['db'] = None
|
|
opts['sphx'] = None
|
|
opts['messages'] = 0
|
|
opts['files'] = 0
|
|
opts['size'] = 0
|
|
opts['purged_size'] = 0
|
|
opts['purged_stored_size'] = 0
|
|
opts['referenced_attachments'] = []
|
|
|
|
syslog.openlog(logoption=syslog.LOG_PID, facility=syslog.LOG_MAIL)
|
|
|
|
read_options(args.config, opts)
|
|
try:
|
|
opts['db'] = dbapi.connect(opts['dbhost'], opts['username'],
|
|
opts['password'], opts['database'])
|
|
|
|
opts['sphx'] = dbapi.connect(host=opts['sphxhost'], port=opts['sphxport'])
|
|
|
|
if is_purge_enabled(opts) is False:
|
|
syslog.syslog("Purging emails is disabled")
|
|
sys.exit(1)
|
|
|
|
cursor = opts['db'].cursor()
|
|
cursor.execute(SQL_PURGE_SELECT_QUERY)
|
|
|
|
while True:
|
|
rows = cursor.fetchmany(args.batch_size)
|
|
if rows == ():
|
|
break
|
|
|
|
id = [x[0] for x in rows]
|
|
piler_id = [x[1] for x in rows]
|
|
size = [x[2] for x in rows]
|
|
|
|
opts['purged_size'] = opts['purged_size'] + sum(size)
|
|
|
|
purge_m_files(piler_id, opts)
|
|
purge_attachments_by_piler_id(piler_id, opts)
|
|
purge_index_data(id, opts)
|
|
|
|
# It's possible that there's attachment duplication, thus
|
|
# refcount > 0, even though after deleting the duplicates
|
|
# (references) refcount becomes zero.
|
|
if len(opts['referenced_attachments']) > 0:
|
|
purge_attachments_by_attachment_id(opts)
|
|
|
|
# Update the counter table
|
|
if opts['dry_run'] is False:
|
|
cursor.execute("UPDATE counter SET rcvd=rcvd-%s, size=size-%s, " +
|
|
"stored_size=stored_size-%s",
|
|
(str(opts['messages']), str(opts['purged_size']),
|
|
str(opts['purged_stored_size'])))
|
|
opts['db'].commit()
|
|
|
|
except dbapi.DatabaseError as e:
|
|
print("Error %s" % e)
|
|
|
|
if opts['db']:
|
|
opts['db'].close()
|
|
|
|
summary = "Purged " + str(opts['messages']) + " messages, " +\
|
|
str(opts['files']) + " files, " +\
|
|
str(opts['purged_size']) + "/" +\
|
|
str(opts['purged_stored_size']) + " bytes"
|
|
|
|
if opts['verbose']:
|
|
print(summary)
|
|
|
|
syslog.syslog(summary)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|