improved pilerimport to support sphinx queries

This commit is contained in:
SJ 2015-08-12 15:38:02 +02:00
parent 5cb3a5dcae
commit 75b552e5e0
2 changed files with 230 additions and 113 deletions

View File

@ -14,7 +14,7 @@
#define VERSION "1.2.0-master" #define VERSION "1.2.0-master"
#define BUILD 917 #define BUILD 918
#define HOSTID "mailarchiver" #define HOSTID "mailarchiver"

View File

@ -21,10 +21,17 @@ extern char *optarg;
extern int optind; extern int optind;
int dryrun = 0; int dryrun = 0;
int exportall = 0;
int rc = 0;
char *query=NULL; char *query=NULL;
int verbosity = 0;
int max_matches = 1000;
char *index_list = "main1,dailydelta1,delta1";
regex_t regexp; regex_t regexp;
int export_emails_matching_to_query(struct session_data *sdata, struct __data *data, char *s, struct __config *cfg);
void usage(){ void usage(){
printf("\nusage: pilerexport \n\n"); printf("\nusage: pilerexport \n\n");
@ -33,6 +40,9 @@ void usage(){
printf(" -f|--from <email@address> -r|--to <email@address>\n"); printf(" -f|--from <email@address> -r|--to <email@address>\n");
printf(" -F|--from-domain <domain.com> -R|--to-domain <domain.com>\n"); printf(" -F|--from-domain <domain.com> -R|--to-domain <domain.com>\n");
printf(" -s|--minsize <number> -S|--maxsize <number>\n"); printf(" -s|--minsize <number> -S|--maxsize <number>\n");
printf(" -w|--where-condition (eg. \"match('@subject: piler')\"\n");
printf(" -m|--max-matches (default: %d)\n", max_matches);
printf(" -i|--index-list (default: %s)\n", index_list);
printf(" -A|--all -d|--dryrun \n"); printf(" -A|--all -d|--dryrun \n");
printf("\n use -A if you don't want to specify the start/stop time nor any from/to address\n\n"); printf("\n use -A if you don't want to specify the start/stop time nor any from/to address\n\n");
@ -136,6 +146,187 @@ int append_string_to_buffer(char **buffer, char *str){
} }
uint64 run_query(struct session_data *sdata, struct session_data *sdata2, struct __data *data, char *where_condition, uint64 last_id, int *num, struct __config *cfg){
MYSQL_RES *res;
MYSQL_ROW row;
int rc=0;
uint64 id=0;
char s[SMALLBUFSIZE];
*num = 0;
if(!where_condition) return id;
snprintf(s, sizeof(s)-1, "SELECT `id`, `piler_id`, `digest`, `bodydigest` FROM %s WHERE id IN (", SQL_METADATA_TABLE);
rc += append_string_to_buffer(&query, s);
snprintf(s, sizeof(s)-1, "SELECT id FROM %s WHERE %s AND id > %llu ORDER BY id ASC LIMIT 0,%d", index_list, where_condition, last_id, max_matches);
if(mysql_real_query(&(sdata2->mysql), s, strlen(s)) == 0){
res = mysql_store_result(&(sdata2->mysql));
if(res != NULL){
while((row = mysql_fetch_row(res))){
id = strtoull(row[0], NULL, 10);
(*num)++;
rc += append_string_to_buffer(&query, row[0]);
rc += append_string_to_buffer(&query, ",");
}
mysql_free_result(res);
rc += append_string_to_buffer(&query, "-1)");
}
}
if(!rc) export_emails_matching_to_query(sdata, data, query, cfg);
free(query);
query = NULL;
return id;
}
uint64 get_total_found(struct session_data *sdata){
MYSQL_RES *res;
MYSQL_ROW row;
uint64 total_found=0;
if(mysql_real_query(&(sdata->mysql), "SHOW META LIKE 'total_found'", 28) == 0){
res = mysql_store_result(&(sdata->mysql));
if(res != NULL){
while((row = mysql_fetch_row(res))){
total_found = strtoull(row[1], NULL, 10);
}
mysql_free_result(res);
}
}
return total_found;
}
void export_emails_matching_id_list(struct session_data *sdata, struct session_data *sdata2, struct __data *data, char *where_condition, struct __config *cfg){
int n;
uint64 count=0, last_id=0, total_found=0;
last_id = run_query(sdata, sdata2, data, where_condition, last_id, &n, cfg);
count += n;
total_found = get_total_found(sdata2);
while(count < total_found){
last_id = run_query(sdata, sdata2, data, where_condition, last_id, &n, cfg);
count += n;
}
}
int build_query_from_args(char *from, char *to, char *fromdomain, char *todomain, int minsize, int maxsize, unsigned long startdate, unsigned long stopdate){
int where_condition=0;
char s[SMALLBUFSIZE];
if(exportall == 1){
rc = append_string_to_buffer(&query, "SELECT `id`, `piler_id`, `digest`, `bodydigest` FROM ");
rc += append_string_to_buffer(&query, SQL_METADATA_TABLE);
return rc;
}
snprintf(s, sizeof(s)-1, "SELECT DISTINCT `id`, `piler_id`, `digest`, `bodydigest` FROM %s WHERE ", SQL_MESSAGES_VIEW);
rc = append_string_to_buffer(&query, s);
if(from){
rc += append_string_to_buffer(&query, "`from` IN (");
rc += append_string_to_buffer(&query, from);
rc += append_string_to_buffer(&query, ")");
free(from);
where_condition++;
}
if(to){
if(where_condition) rc = append_string_to_buffer(&query, " AND ");
rc += append_string_to_buffer(&query, "`to` IN (");
rc += append_string_to_buffer(&query, to);
rc += append_string_to_buffer(&query, ")");
free(to);
where_condition++;
}
if(fromdomain){
if(where_condition) rc = append_string_to_buffer(&query, " AND ");
rc += append_string_to_buffer(&query, "`fromdomain` IN (");
rc += append_string_to_buffer(&query, fromdomain);
rc += append_string_to_buffer(&query, ")");
free(fromdomain);
where_condition++;
}
if(todomain){
if(where_condition) rc = append_string_to_buffer(&query, " AND ");
rc += append_string_to_buffer(&query, "`todomain` IN (");
rc += append_string_to_buffer(&query, todomain);
rc += append_string_to_buffer(&query, ")");
free(todomain);
where_condition++;
}
if(minsize > 0){
if(where_condition) rc = append_string_to_buffer(&query, " AND ");
snprintf(s, sizeof(s)-1, " `size` >= %d", minsize);
rc += append_string_to_buffer(&query, s);
where_condition++;
}
if(maxsize > 0){
if(where_condition) rc = append_string_to_buffer(&query, " AND ");
snprintf(s, sizeof(s)-1, " `size` <= %d", maxsize);
rc += append_string_to_buffer(&query, s);
where_condition++;
}
if(startdate > 0){
if(where_condition) rc = append_string_to_buffer(&query, " AND ");
snprintf(s, sizeof(s)-1, " `sent` >= %ld", startdate);
rc += append_string_to_buffer(&query, s);
where_condition++;
}
if(stopdate > 0){
if(where_condition) rc = append_string_to_buffer(&query, " AND ");
snprintf(s, sizeof(s)-1, " `sent` <= %ld", stopdate);
rc += append_string_to_buffer(&query, s);
where_condition++;
}
rc += append_string_to_buffer(&query, " ORDER BY id ASC");
return rc;
}
int export_emails_matching_to_query(struct session_data *sdata, struct __data *data, char *s, struct __config *cfg){ int export_emails_matching_to_query(struct session_data *sdata, struct __data *data, char *s, struct __config *cfg){
FILE *f; FILE *f;
uint64 id, n=0; uint64 id, n=0;
@ -211,14 +402,12 @@ ENDE:
int main(int argc, char **argv){ int main(int argc, char **argv){
int c, rc, exportall=0, minsize=0, maxsize=0; int c, minsize=0, maxsize=0;
int where_condition=0;
size_t nmatch=0; size_t nmatch=0;
unsigned long startdate=0, stopdate=0; unsigned long startdate=0, stopdate=0;
char *configfile=CONFIG_FILE; char *configfile=CONFIG_FILE;
char *to=NULL, *from=NULL, *todomain=NULL, *fromdomain=NULL; char *to=NULL, *from=NULL, *todomain=NULL, *fromdomain=NULL, *where_condition=NULL;
char s[SMALLBUFSIZE]; struct session_data sdata, sdata2;
struct session_data sdata;
struct __data data; struct __data data;
struct __config cfg; struct __config cfg;
@ -247,15 +436,17 @@ int main(int argc, char **argv){
{"to-domain", required_argument, 0, 'R' }, {"to-domain", required_argument, 0, 'R' },
{"start-date", required_argument, 0, 'a' }, {"start-date", required_argument, 0, 'a' },
{"stop-date", required_argument, 0, 'b' }, {"stop-date", required_argument, 0, 'b' },
{"id", required_argument, 0, 'i' }, {"where-condition", required_argument, 0, 'w' },
{"max-matches", required_argument, 0, 'm' },
{"index-list", required_argument, 0, 'i' },
{0,0,0,0} {0,0,0,0}
}; };
int option_index = 0; int option_index = 0;
c = getopt_long(argc, argv, "c:s:S:f:r:F:R:a:b:i:Adhv?", long_options, &option_index); c = getopt_long(argc, argv, "c:s:S:f:r:F:R:a:b:w:m:i:Adhv?", long_options, &option_index);
#else #else
c = getopt(argc, argv, "c:s:S:f:r:F:R:a:b:i:Adhv?"); c = getopt(argc, argv, "c:s:S:f:r:F:R:a:b:w:m:i:Adhv?");
#endif #endif
if(c == -1) break; if(c == -1) break;
@ -332,12 +523,22 @@ int main(int argc, char **argv){
stopdate = convert_time(optarg, 23, 59, 59); stopdate = convert_time(optarg, 23, 59, 59);
break; break;
case 'w' :
where_condition = optarg;
break;
case 'm' :
max_matches = atoi(optarg);
break;
case 'i' :
index_list = optarg;
break;
case 'd' : case 'd' :
dryrun = 1; dryrun = 1;
break; break;
default : default :
usage(); usage();
break; break;
@ -346,7 +547,7 @@ int main(int argc, char **argv){
} }
if(from == NULL && to == NULL && fromdomain == NULL && todomain == NULL && startdate == 0 && stopdate == 0 && exportall == 0) usage(); if(from == NULL && to == NULL && fromdomain == NULL && todomain == NULL && where_condition == NULL && startdate == 0 && stopdate == 0 && exportall == 0) usage();
regfree(&regexp); regfree(&regexp);
@ -355,106 +556,6 @@ int main(int argc, char **argv){
(void) openlog("pilerexport", LOG_PID, LOG_MAIL); (void) openlog("pilerexport", LOG_PID, LOG_MAIL);
if(exportall == 1){
rc = append_string_to_buffer(&query, "SELECT `id`, `piler_id`, `digest`, `bodydigest` FROM ");
rc += append_string_to_buffer(&query, SQL_METADATA_TABLE);
goto GO;
}
snprintf(s, sizeof(s)-1, "SELECT DISTINCT `id`, `piler_id`, `digest`, `bodydigest` FROM %s WHERE ", SQL_MESSAGES_VIEW);
rc = append_string_to_buffer(&query, s);
if(from){
rc += append_string_to_buffer(&query, "`from` IN (");
rc += append_string_to_buffer(&query, from);
rc += append_string_to_buffer(&query, ")");
free(from);
where_condition++;
}
if(to){
if(where_condition) rc = append_string_to_buffer(&query, " AND ");
rc += append_string_to_buffer(&query, "`to` IN (");
rc += append_string_to_buffer(&query, to);
rc += append_string_to_buffer(&query, ")");
free(to);
where_condition++;
}
if(fromdomain){
if(where_condition) rc = append_string_to_buffer(&query, " AND ");
rc += append_string_to_buffer(&query, "`fromdomain` IN (");
rc += append_string_to_buffer(&query, fromdomain);
rc += append_string_to_buffer(&query, ")");
free(fromdomain);
where_condition++;
}
if(todomain){
if(where_condition) rc = append_string_to_buffer(&query, " AND ");
rc += append_string_to_buffer(&query, "`todomain` IN (");
rc += append_string_to_buffer(&query, todomain);
rc += append_string_to_buffer(&query, ")");
free(todomain);
where_condition++;
}
if(minsize > 0){
if(where_condition) rc = append_string_to_buffer(&query, " AND ");
snprintf(s, sizeof(s)-1, " `size` >= %d", minsize);
rc += append_string_to_buffer(&query, s);
where_condition++;
}
if(maxsize > 0){
if(where_condition) rc = append_string_to_buffer(&query, " AND ");
snprintf(s, sizeof(s)-1, " `size` <= %d", maxsize);
rc += append_string_to_buffer(&query, s);
where_condition++;
}
if(startdate > 0){
if(where_condition) rc = append_string_to_buffer(&query, " AND ");
snprintf(s, sizeof(s)-1, " `sent` >= %ld", startdate);
rc += append_string_to_buffer(&query, s);
where_condition++;
}
if(stopdate > 0){
if(where_condition) rc = append_string_to_buffer(&query, " AND ");
snprintf(s, sizeof(s)-1, " `sent` <= %ld", stopdate);
rc += append_string_to_buffer(&query, s);
where_condition++;
}
rc += append_string_to_buffer(&query, " ORDER BY id ASC");
GO:
if(rc) p_clean_exit("malloc problem building query", 1);
cfg = read_config(configfile); cfg = read_config(configfile);
@ -470,11 +571,27 @@ GO:
} }
if(where_condition){
rc = export_emails_matching_to_query(&sdata, &data, query, &cfg); init_session_data(&sdata2, &cfg);
free(query); strcpy(cfg.mysqlhost, "127.0.0.1");
cfg.mysqlport = 9306;
cfg.mysqlsocket[0] = '\0';
if(open_database(&sdata2, &cfg) == ERR){
p_clean_exit("cannot connect to 127.0.0.1:9306", 1);
}
export_emails_matching_id_list(&sdata, &sdata2, &data, where_condition, &cfg);
close_database(&sdata2);
}
else {
if(build_query_from_args(from, to, fromdomain, todomain, minsize, maxsize, startdate, stopdate) > 0) p_clean_exit("malloc problem building query", 1);
export_emails_matching_to_query(&sdata, &data, query, &cfg);
free(query);
}
close_database(&sdata); close_database(&sdata);