From 5b88a4f2f1e1ce0c43539fe282eda2703e85b4d3 Mon Sep 17 00:00:00 2001 From: SJ Date: Fri, 1 Jun 2012 14:25:49 +0200 Subject: [PATCH] added a reindexing utility --- src/Makefile.in | 6 +- src/import.c | 2 +- src/parser.c | 80 +++++++++++--------- src/parser.h | 4 +- src/piler.h | 2 + src/reindex.c | 189 ++++++++++++++++++++++++++++++++++++++++++++++++ src/session.c | 2 +- src/test.c | 2 +- 8 files changed, 246 insertions(+), 41 deletions(-) create mode 100644 src/reindex.c diff --git a/src/Makefile.in b/src/Makefile.in index 16d7df95..59137094 100644 --- a/src/Makefile.in +++ b/src/Makefile.in @@ -33,7 +33,7 @@ MAKE = `which make` INSTALL = @INSTALL@ -all: libpiler.a piler pilerconf pilerget pilerimport pilerexport pilerpurge test +all: libpiler.a piler pilerconf pilerget pilerimport pilerexport pilerpurge reindex test install: install-piler @@ -63,6 +63,9 @@ pilerpurge: pilerpurge.c libpiler.a pilerconf: pilerconf.c cfg.o misc.o tai.o $(CC) $(CFLAGS) $(INCDIR) $(DEFS) -o $@ $^ $(LIBDIR) +reindex: reindex.c libpiler.a + $(CC) $(CFLAGS) $(INCDIR) $(DEFS) -o $@ $^ -lpiler $(LIBS) $(LIBDIR) + test: test.c libpiler.a $(CC) $(CFLAGS) $(INCDIR) $(DEFS) -o pilertest $^ -lpiler $(LIBS) $(LIBDIR) @LDFLAGS@ @@ -82,6 +85,7 @@ install-piler: $(INSTALL) -m 6755 -o $(RUNNING_USER) -g $(RUNNING_GROUP) pilerimport $(DESTDIR)$(bindir) $(INSTALL) -m 6755 -o $(RUNNING_USER) -g $(RUNNING_GROUP) pilerexport $(DESTDIR)$(bindir) $(INSTALL) -m 6755 -o $(RUNNING_USER) -g $(RUNNING_GROUP) pilerpurge $(DESTDIR)$(bindir) + $(INSTALL) -m 6755 -o $(RUNNING_USER) -g $(RUNNING_GROUP) reindex $(DESTDIR)$(bindir) clean: rm -f *.o *.a libpiler.so* piler pilerconf pilerget pilerimport pilerexport pilerpurge pilertest diff --git a/src/import.c b/src/import.c index db64d372..2ef56d42 100644 --- a/src/import.c +++ b/src/import.c @@ -66,7 +66,7 @@ int import_message(char *filename, struct session_data *sdata, struct __data *da sdata->sent = 0; - state = parse_message(sdata, cfg); + state = parse_message(sdata, 1, cfg); post_parse(sdata, &state, cfg); if(sdata->sent > sdata->now) sdata->sent = sdata->now; diff --git a/src/parser.c b/src/parser.c index b5017d87..d7b5add1 100644 --- a/src/parser.c +++ b/src/parser.c @@ -16,7 +16,7 @@ #include -struct _state parse_message(struct session_data *sdata, struct __config *cfg){ +struct _state parse_message(struct session_data *sdata, int take_into_pieces, struct __config *cfg){ FILE *f; char buf[MAXBUFSIZE]; struct _state state; @@ -30,19 +30,23 @@ struct _state parse_message(struct session_data *sdata, struct __config *cfg){ } - state.mfd = open(sdata->tmpframe, O_CREAT|O_RDWR, S_IRUSR|S_IWUSR); - if(state.mfd == -1){ - syslog(LOG_PRIORITY, "%s: cannot open frame file: %s", sdata->ttmpfile, sdata->tmpframe); - fclose(f); - return state; + if(take_into_pieces == 1){ + state.mfd = open(sdata->tmpframe, O_CREAT|O_RDWR, S_IRUSR|S_IWUSR); + if(state.mfd == -1){ + syslog(LOG_PRIORITY, "%s: cannot open frame file: %s", sdata->ttmpfile, sdata->tmpframe); + fclose(f); + return state; + } } - while(fgets(buf, sizeof(buf)-1, f)){ - parse_line(buf, &state, sdata, cfg); + parse_line(buf, &state, sdata, take_into_pieces, cfg); + } + + if(take_into_pieces == 1){ + close(state.mfd); state.mfd = 0; } - close(state.mfd); state.mfd = 0; fclose(f); return state; @@ -94,7 +98,7 @@ void post_parse(struct session_data *sdata, struct _state *state, struct __confi } -int parse_line(char *buf, struct _state *state, struct session_data *sdata, struct __config *cfg){ +int parse_line(char *buf, struct _state *state, struct session_data *sdata, int take_into_pieces, struct __config *cfg){ char *p, *q, puf[SMALLBUFSIZE]; int x, n, len, b64_len, boundary_line=0; @@ -119,15 +123,17 @@ int parse_line(char *buf, struct _state *state, struct session_data *sdata, stru } - if(state->message_state == MSG_BODY && state->fd != -1 && is_item_on_string(state->boundaries, buf) == 0){ - //printf("dumping: %s", buf); - n = write(state->fd, buf, len); - state->attachments[state->n_attachments].size += len; - } - else { - state->saved_size += len; - //printf("%s", buf); - n = write(state->mfd, buf, len); + if(take_into_pieces == 1){ + if(state->message_state == MSG_BODY && state->fd != -1 && is_item_on_string(state->boundaries, buf) == 0){ + //printf("dumping: %s", buf); + n = write(state->fd, buf, len); + state->attachments[state->n_attachments].size += len; + } + else { + state->saved_size += len; + //printf("%s", buf); + n = write(state->mfd, buf, len); + } } @@ -146,26 +152,30 @@ int parse_line(char *buf, struct _state *state, struct session_data *sdata, stru snprintf(state->attachments[state->n_attachments].internalname, TINYBUFSIZE-1, "%s.a%d", sdata->ttmpfile, state->n_attachments); //printf("DUMP FILE: %s\n", state->attachments[state->n_attachments].internalname); - state->fd = open(state->attachments[state->n_attachments].internalname, O_CREAT|O_RDWR, S_IRUSR|S_IWUSR); - if(state->fd == -1){ - state->attachments[state->n_attachments].size = 0; - memset(state->attachments[state->n_attachments].type, 0, TINYBUFSIZE); - memset(state->attachments[state->n_attachments].filename, 0, TINYBUFSIZE); - memset(state->attachments[state->n_attachments].internalname, 0, TINYBUFSIZE); - memset(state->attachments[state->n_attachments].digest, 0, 2*DIGEST_LENGTH+1); + if(take_into_pieces == 1){ + state->fd = open(state->attachments[state->n_attachments].internalname, O_CREAT|O_RDWR, S_IRUSR|S_IWUSR); + if(state->fd == -1){ - syslog(LOG_PRIORITY, "%s: error opening %s", sdata->ttmpfile, state->attachments[state->n_attachments].internalname); + state->attachments[state->n_attachments].size = 0; + memset(state->attachments[state->n_attachments].type, 0, TINYBUFSIZE); + memset(state->attachments[state->n_attachments].filename, 0, TINYBUFSIZE); + memset(state->attachments[state->n_attachments].internalname, 0, TINYBUFSIZE); + memset(state->attachments[state->n_attachments].digest, 0, 2*DIGEST_LENGTH+1); - state->n_attachments--; - state->has_to_dump = 0; + syslog(LOG_PRIORITY, "%s: error opening %s", sdata->ttmpfile, state->attachments[state->n_attachments].internalname); + state->n_attachments--; + state->has_to_dump = 0; + + } + else { + snprintf(puf, sizeof(puf)-1, "ATTACHMENT_POINTER_%s.a%d_XXX_PILER", sdata->ttmpfile, state->n_attachments); + n = write(state->mfd, puf, strlen(puf)); + //printf("%s", puf); + } } - else { - snprintf(puf, sizeof(puf)-1, "ATTACHMENT_POINTER_%s.a%d_XXX_PILER", sdata->ttmpfile, state->n_attachments); - n = write(state->mfd, puf, strlen(puf)); - //printf("%s", puf); - } + } else { state->has_to_dump = 0; @@ -325,7 +335,7 @@ int parse_line(char *buf, struct _state *state, struct session_data *sdata, stru state->content_type_is_set = 0; if(state->has_to_dump == 1){ - if(state->fd != -1) close(state->fd); + if(take_into_pieces == 1 && state->fd != -1) close(state->fd); state->fd = -1; } diff --git a/src/parser.h b/src/parser.h index f2371254..e0af01b7 100644 --- a/src/parser.h +++ b/src/parser.h @@ -9,9 +9,9 @@ #include "config.h" #include "defs.h" -struct _state parse_message(struct session_data *sdata, struct __config *cfg); +struct _state parse_message(struct session_data *sdata, int take_into_pieces, struct __config *cfg); void post_parse(struct session_data *sdata, struct _state *state, struct __config *cfg); -int parse_line(char *buf, struct _state *state, struct session_data *sdata, struct __config *cfg); +int parse_line(char *buf, struct _state *state, struct session_data *sdata, int take_into_pieces, struct __config *cfg); void init_state(struct _state *state); unsigned long parse_date_header(char *s); diff --git a/src/piler.h b/src/piler.h index aa5989bc..21d78cb0 100644 --- a/src/piler.h +++ b/src/piler.h @@ -53,5 +53,7 @@ int prepare_a_mysql_statement(struct session_data *sdata, MYSQL_STMT **stmt, cha int import_message(char *filename, struct session_data *sdata, struct __data *data, struct __config *cfg); +int store_index_data(struct session_data *sdata, struct _state *state, uint64 id, struct __config *cfg); + #endif /* _PILER_H */ diff --git a/src/reindex.c b/src/reindex.c new file mode 100644 index 00000000..985fc99f --- /dev/null +++ b/src/reindex.c @@ -0,0 +1,189 @@ +/* + * reindex.c, SJ + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +extern char *optarg; +extern int optind; + +int progressbar = 0; + + +void usage(){ + printf("\nusage: reindex \n\n"); + printf(" [-c|--config ] \n"); + printf(" -f \n"); + printf(" -t \n"); + printf(" [-p]\n"); + + exit(0); +} + + +void clean_exit(char *msg, int rc){ + if(msg) printf("error: %s\n", msg); + + exit(rc); +} + + +uint64 retrieve_email_by_metadata_id(struct session_data *sdata, uint64 from_id, uint64 to_id, struct __config *cfg){ + MYSQL_RES *res; + MYSQL_ROW row; + FILE *f; + char filename[SMALLBUFSIZE]; + char s[SMALLBUFSIZE]; + int rc=0; + uint64 stored_id=0, reindexed=0; + struct _state state; + + + snprintf(s, sizeof(s)-1, "SELECT `id`, `piler_id`, `arrived`, `sent` FROM %s WHERE id BETWEEN %llu AND %llu", SQL_METADATA_TABLE, from_id, to_id); + + rc = mysql_real_query(&(sdata->mysql), s, strlen(s)); + + if(rc == 0){ + res = mysql_store_result(&(sdata->mysql)); + if(res){ + while((row = mysql_fetch_row(res))){ + + stored_id = strtoull(row[0], NULL, 10); + if(stored_id > 0){ + snprintf(sdata->ttmpfile, SMALLBUFSIZE-1, "%s", (char*)row[1]); + + snprintf(filename, sizeof(filename)-1, "%llu.eml", stored_id); + + f = fopen(filename, "w"); + if(f){ + rc = retrieve_email_from_archive(sdata, f, cfg); + fclose(f); + + if(rc){ + printf("cannot retrieve: %s\n", filename); + unlink(filename); + continue; + } + + snprintf(sdata->filename, SMALLBUFSIZE-1, "%s", filename); + + state = parse_message(sdata, 0, cfg); + post_parse(sdata, &state, cfg); + + sdata->now = strtoul(row[2], NULL, 10); + sdata->sent = strtoul(row[3], NULL, 10); + + rc = store_index_data(sdata, &state, stored_id, cfg); + + if(rc == OK) reindexed++; + else printf("failed to add to %s table: %s\n", SQL_SPHINX_TABLE, filename); + + unlink(filename); + + if(progressbar && reindexed % 10 == 0) printf("."); + + } + else printf("cannot open: %s\n", filename); + + } + } + mysql_free_result(res); + } + else rc = 1; + } + + if(progressbar) printf("\n"); + + return reindexed; +} + + +int main(int argc, char **argv){ + int c; + uint64 from_id=0, to_id=0, n=0; + char *configfile=CONFIG_FILE; + struct session_data sdata; + struct __config cfg; + + + while(1){ + c = getopt(argc, argv, "c:f:t:phv?"); + + if(c == -1) break; + + switch(c){ + + case 'c' : + configfile = optarg; + break; + + case 'f' : + from_id = strtoull(optarg, NULL, 10); + break; + + case 't' : + to_id = strtoull(optarg, NULL, 10); + break; + + case 'p' : + progressbar = 1; + break; + + + default : + usage(); + break; + } + + } + + + if(from_id <= 0 || to_id <= 0) usage(); + + + (void) openlog("reindex", LOG_PID, LOG_MAIL); + + + cfg = read_config(configfile); + + if(read_key(&cfg)){ + printf("%s\n", ERR_READING_KEY); + return 1; + } + + init_session_data(&sdata); + + + mysql_init(&(sdata.mysql)); + mysql_options(&(sdata.mysql), MYSQL_OPT_CONNECT_TIMEOUT, (const char*)&cfg.mysql_connect_timeout); + if(mysql_real_connect(&(sdata.mysql), cfg.mysqlhost, cfg.mysqluser, cfg.mysqlpwd, cfg.mysqldb, cfg.mysqlport, cfg.mysqlsocket, 0) == 0){ + clean_exit("cannot connect to mysql server", 1); + } + + mysql_real_query(&(sdata.mysql), "SET NAMES utf8", strlen("SET NAMES utf8")); + mysql_real_query(&(sdata.mysql), "SET CHARACTER SET utf8", strlen("SET CHARACTER SET utf8")); + + + n = retrieve_email_by_metadata_id(&sdata, from_id, to_id, &cfg); + + printf("put %llu messages to %s table for reindexing\n", n, SQL_SPHINX_TABLE); + + mysql_close(&(sdata.mysql)); + + return 0; +} + + diff --git a/src/session.c b/src/session.c index 10502606..af90cd5d 100644 --- a/src/session.c +++ b/src/session.c @@ -137,7 +137,7 @@ int handle_smtp_session(int new_sd, struct __data *data, struct __config *cfg){ gettimeofday(&tv1, &tz); - sstate = parse_message(&sdata, cfg); + sstate = parse_message(&sdata, 1, cfg); post_parse(&sdata, &sstate, cfg); gettimeofday(&tv2, &tz); diff --git a/src/test.c b/src/test.c index 494c898e..53167a6b 100644 --- a/src/test.c +++ b/src/test.c @@ -68,7 +68,7 @@ int main(int argc, char **argv){ snprintf(sdata.filename, SMALLBUFSIZE-1, "%s", argv[1]); snprintf(sdata.tmpframe, SMALLBUFSIZE-1, "%s.m", argv[1]); - state = parse_message(&sdata, &cfg); + state = parse_message(&sdata, 0, &cfg); post_parse(&sdata, &state, &cfg); printf("message-id: %s\n", state.message_id);