From b95919c574d8d6ba419917f8cb0a8bfb20029974 Mon Sep 17 00:00:00 2001 From: SJ Date: Wed, 28 Dec 2011 15:30:53 +0100 Subject: [PATCH] smarter pilerget suitable for batch export --- src/Makefile.in | 10 +-- src/attachment.c | 173 ++++++++++++++++++++++++++++++++++++++++++ src/defs.h | 22 +++++- src/piler.h | 1 + src/pilerget.c | 192 +++++++++++++++++++++++++++++++++++++++-------- src/tai.h | 4 + 6 files changed, 365 insertions(+), 37 deletions(-) diff --git a/src/Makefile.in b/src/Makefile.in index 68de64d4..7a84b099 100644 --- a/src/Makefile.in +++ b/src/Makefile.in @@ -38,24 +38,24 @@ install: install-piler piler: piler.c libpiler.a - $(CC) $(CFLAGS) $(INCDIR) $(DEFS) -o $@ piler.c -lpiler $(LIBS) $(LDAP_LIBS) $(LIBDIR) @LDFLAGS@ @libclamav_extra_libs@ + $(CC) $(CFLAGS) $(INCDIR) $(DEFS) -o $@ piler.c -lpiler $(LIBS) $(LIBDIR) @LDFLAGS@ @libclamav_extra_libs@ libpiler.a: $(OBJS) $(MYSQL_OBJS) ar cr libpiler.a $(OBJS) $(MYSQL_OBJS) ranlib libpiler.a - $(CC) -shared -Wl -o libpiler.so.$(LIBPILER_VERSION) $(OBJS) $(MYSQL_OBJS) $(LIBS) $(LDAP_LIBS) @LDFLAGS@ + $(CC) -shared -Wl -o libpiler.so.$(LIBPILER_VERSION) $(OBJS) $(MYSQL_OBJS) $(LIBS) @LDFLAGS@ ln -sf libpiler.so.$(LIBPILER_VERSION) libpiler.so ln -sf libpiler.so.$(LIBPILER_VERSION) libpiler.so.$(PILER_VERSION) -pilerget: pilerget.c cfg.o misc.o tai.o store.o - $(CC) $(CFLAGS) $(INCDIR) $(DEFS) -o $@ $^ -lcrypto -lz $(LIBDIR) +pilerget: pilerget.c cfg.o misc.o tai.o store.o attachment.o + $(CC) $(CFLAGS) $(INCDIR) $(DEFS) -o $@ $^ $(LIBS) $(LIBDIR) pilerconf: pilerconf.c cfg.o misc.o tai.o $(CC) $(CFLAGS) $(INCDIR) $(DEFS) -o $@ $^ $(LIBDIR) test: - $(CC) $(CFLAGS) $(INCDIR) $(DEFS) -o pilertest $(srcdir)/test.c -lpiler $(LIBS) $(LDAP_LIBS) $(LIBDIR) @LDFLAGS@ + $(CC) $(CFLAGS) $(INCDIR) $(DEFS) -o pilertest $(srcdir)/test.c -lpiler $(LIBS) $(LIBDIR) @LDFLAGS@ %.o: $(srcdir)/%.c $(CC) $(CFLAGS) -fPIC $(INCDIR) $(DEFS) -c $< -o $@ diff --git a/src/attachment.c b/src/attachment.c index 5dd3e6a6..be138205 100644 --- a/src/attachment.c +++ b/src/attachment.c @@ -138,3 +138,176 @@ int store_attachments(struct session_data *sdata, struct _state *state, struct _ return 0; } + +int query_attachment_pointers(struct session_data *sdata, uint64 ptr, char *piler_id, int *id, struct __config *cfg){ + int rc=0; + char s[SMALLBUFSIZE]; + MYSQL_STMT *stmt; + MYSQL_BIND bind[2]; + my_bool is_null[2]; + unsigned long len=0; + + + stmt = mysql_stmt_init(&(sdata->mysql)); + if(!stmt){ + goto ENDE; + } + + snprintf(s, SMALLBUFSIZE-1, "SELECT `piler_id`, `attachment_id` FROM %s WHERE id=?", SQL_ATTACHMENT_TABLE); + + + if(mysql_stmt_prepare(stmt, s, strlen(s))){ + goto ENDE; + } + + memset(bind, 0, sizeof(bind)); + + bind[0].buffer_type = MYSQL_TYPE_LONGLONG; + bind[0].buffer = (char *)&ptr; + bind[0].is_null = 0; + len = sizeof(uint64); bind[0].length = &len; + + + if(mysql_stmt_bind_param(stmt, bind)){ + goto ENDE; + } + + + if(mysql_stmt_execute(stmt)){ + goto ENDE; + } + + + memset(bind, 0, sizeof(bind)); + + bind[0].buffer_type = MYSQL_TYPE_STRING; + bind[0].buffer = piler_id; + bind[0].buffer_length = RND_STR_LEN; + bind[0].is_null = &is_null[0]; + bind[0].length = &len; + + bind[1].buffer_type = MYSQL_TYPE_LONG; + bind[1].buffer = (char *)id; + bind[1].is_null = 0; + bind[1].length = 0; + + + if(mysql_stmt_bind_result(stmt, bind)){ + goto ENDE; + } + + + if(mysql_stmt_store_result(stmt)){ + goto ENDE; + } + + if(!mysql_stmt_fetch(stmt)){ + if(is_null[0] == 0){ + //printf("piler id: *%s*, id: %d\n", piler_id, *id); + rc = 1; + } + } + + mysql_stmt_close(stmt); + +ENDE: + + return rc; +} + + +int query_attachments(struct session_data *sdata, struct ptr_array *ptr_arr, struct __config *cfg){ + int i, rc, id, attachments=0; + uint64 ptr; + char s[SMALLBUFSIZE]; + MYSQL_STMT *stmt; + MYSQL_BIND bind[2]; + my_bool is_null[2]; + unsigned long len=0; + + + for(i=0; imysql)); + if(!stmt){ + goto ENDE; + } + + snprintf(s, SMALLBUFSIZE-1, "SELECT `attachment_id`, `ptr` FROM %s WHERE piler_id=? ORDER BY attachment_id ASC", SQL_ATTACHMENT_TABLE); + + + if(mysql_stmt_prepare(stmt, s, strlen(s))){ + goto ENDE; + } + + memset(bind, 0, sizeof(bind)); + + bind[0].buffer_type = MYSQL_TYPE_STRING; + bind[0].buffer = sdata->ttmpfile; + bind[0].is_null = 0; + len = strlen(sdata->ttmpfile); bind[0].length = &len; + + if(mysql_stmt_bind_param(stmt, bind)){ + goto ENDE; + } + + + if(mysql_stmt_execute(stmt)){ + goto ENDE; + } + + + memset(bind, 0, sizeof(bind)); + + bind[0].buffer_type = MYSQL_TYPE_LONG; + bind[0].buffer = (char *)&id; + bind[0].is_null = &is_null[0]; + bind[0].length = 0; + + bind[1].buffer_type = MYSQL_TYPE_LONGLONG; + bind[1].buffer = (char *)&ptr; + bind[1].is_null = &is_null[1]; + bind[1].length = 0; + + + + if(mysql_stmt_bind_result(stmt, bind)){ + syslog(LOG_PRIORITY, "%s: %s.mysql_stmt_bind_result() error: %s", sdata->ttmpfile, SQL_METADATA_TABLE, mysql_stmt_error(stmt)); + goto ENDE; + } + + + if(mysql_stmt_store_result(stmt)){ + syslog(LOG_PRIORITY, "%s: %s.mysql_stmt_store_result() error: %s", sdata->ttmpfile, SQL_METADATA_TABLE, mysql_stmt_error(stmt)); + goto ENDE; + } + + while(!mysql_stmt_fetch(stmt)){ + + if(id > 0 && id < MAX_ATTACHMENTS){ + if(ptr > 0){ + ptr_arr[id].ptr = ptr; + rc = query_attachment_pointers(sdata, ptr, &(ptr_arr[id].piler_id[0]), &(ptr_arr[id].attachment_id), cfg); + if(!rc){ + attachments = -1; + goto ENDE; + } + } + else { + snprintf(ptr_arr[id].piler_id, sizeof(ptr_arr[id].piler_id)-1, "%s", sdata->ttmpfile); + ptr_arr[id].attachment_id = id; + } + + attachments++; + } + } + + mysql_stmt_close(stmt); + +ENDE: + + return attachments; +} + + diff --git a/src/defs.h b/src/defs.h index 1a461b70..f59231a4 100644 --- a/src/defs.h +++ b/src/defs.h @@ -24,6 +24,7 @@ #endif #include +#include "tai.h" #include "config.h" #define MSG_UNDEF -1 @@ -47,7 +48,19 @@ #define UNDEF 0 #define READY 1 #define BUSY 2 -#define PROCESSED 3 + + +#define MAXCHILDREN 64 + + +typedef void signal_func (int); + + +struct child { + pid_t pid; + int messages; + int status; +}; struct attachment { @@ -59,6 +72,13 @@ struct attachment { }; +struct ptr_array { + uint64 ptr; + char piler_id[RND_STR_LEN+2]; + int attachment_id; +}; + + struct list { char s[SMALLBUFSIZE]; struct list *r; diff --git a/src/piler.h b/src/piler.h index 2eab6c44..13acd4b3 100644 --- a/src/piler.h +++ b/src/piler.h @@ -36,6 +36,7 @@ void digest_file(char *filename, char *digest); int processMessage(struct session_data *sdata, struct _state *sstate, struct __config *cfg); int store_file(struct session_data *sdata, char *filename, int startpos, int len, struct __config *cfg); int store_attachments(struct session_data *sdata, struct _state *state, struct __config *cfg); +int query_attachments(struct session_data *sdata, struct ptr_array *ptr_arr, struct __config *cfg); struct __config read_config(char *configfile); diff --git a/src/pilerget.c b/src/pilerget.c index 114cceb5..8a16f591 100644 --- a/src/pilerget.c +++ b/src/pilerget.c @@ -5,13 +5,13 @@ #include #include #include -#include +#include #include -#include -#include #include #include #include +#include +#include #include #include #include @@ -20,13 +20,16 @@ #include -char *configfile = CONFIG_FILE; +#define WRITE_TO_STDOUT 0 +#define WRITE_TO_BUFFER 1 +#define REALLYBIGBUFSIZE 524288 int fd=-1; EVP_CIPHER_CTX ctx; unsigned char *s=NULL; + void clean_exit(){ if(s) free(s); @@ -38,36 +41,36 @@ void clean_exit(){ } -void zerr(int ret) -{ - fputs("zpipe: ", stderr); - switch (ret) { - case Z_ERRNO: +void zerr(int ret){ + fputs("zpipe: ", stderr); + switch (ret) { + case Z_ERRNO: if (ferror(stdin)) fputs("error reading stdin\n", stderr); if (ferror(stdout)) fputs("error writing stdout\n", stderr); break; - case Z_STREAM_ERROR: + case Z_STREAM_ERROR: fputs("invalid compression level\n", stderr); break; - case Z_DATA_ERROR: + case Z_DATA_ERROR: fputs("invalid or incomplete deflate data\n", stderr); break; - case Z_MEM_ERROR: + case Z_MEM_ERROR: fputs("out of memory\n", stderr); break; - case Z_VERSION_ERROR: + case Z_VERSION_ERROR: fputs("zlib version mismatch!\n", stderr); - } + } } -int inf(unsigned char *in, int len, FILE *dest){ - int ret; +int inf(unsigned char *in, int len, int mode, char **buffer, FILE *dest){ + int ret, pos=0; unsigned have; z_stream strm; - unsigned char out[BIGBUFSIZE]; + char *new_ptr; + unsigned char out[REALLYBIGBUFSIZE]; /* allocate inflate state */ strm.zalloc = Z_NULL; @@ -83,8 +86,15 @@ int inf(unsigned char *in, int len, FILE *dest){ strm.avail_in = len; strm.next_in = in; + if(mode == WRITE_TO_BUFFER){ + *buffer = malloc(REALLYBIGBUFSIZE); + if(!*buffer) return Z_MEM_ERROR; + memset(*buffer, 0, REALLYBIGBUFSIZE); + } + + do { - strm.avail_out = BIGBUFSIZE; + strm.avail_out = REALLYBIGBUFSIZE; strm.next_out = out; ret = inflate(&strm, Z_NO_FLUSH); @@ -98,11 +108,31 @@ int inf(unsigned char *in, int len, FILE *dest){ return ret; } - have = BIGBUFSIZE - strm.avail_out; - if(fwrite(out, 1, have, dest) != have){ - (void)inflateEnd(&strm); - return Z_ERRNO; + have = REALLYBIGBUFSIZE - strm.avail_out; + + /* + * write the uncompressed result either to stdout + * or to the buffer + */ + + if(mode == WRITE_TO_STDOUT){ + if(fwrite(out, 1, have, dest) != have){ + (void)inflateEnd(&strm); + return Z_ERRNO; + } } + else { + memcpy(*buffer+pos, out, have); + pos += have; + new_ptr = realloc(*buffer, pos+REALLYBIGBUFSIZE); + if(!new_ptr){ + (void)inflateEnd(&strm); + return Z_MEM_ERROR; + } + *buffer = new_ptr; + memset(*buffer+pos, 0, REALLYBIGBUFSIZE); + } + } while (strm.avail_out == 0); @@ -113,9 +143,9 @@ int inf(unsigned char *in, int len, FILE *dest){ } -int retrieve_file_from_archive(char *filename, struct __config *cfg){ - int rc, n, olen, tlen, len; - unsigned char inbuf[BIGBUFSIZE]; +int retrieve_file_from_archive(char *filename, int mode, char **buffer, FILE *dest, struct __config *cfg){ + int rc=0, n, olen, tlen, len; + unsigned char inbuf[REALLYBIGBUFSIZE]; struct stat st; @@ -173,7 +203,8 @@ int retrieve_file_from_archive(char *filename, struct __config *cfg){ tlen += olen; - rc = inf(s, tlen, stdout); if(rc != Z_OK) zerr(rc); + rc = inf(s, tlen, mode, buffer, dest); + if(rc != Z_OK) zerr(rc); if(s) free(s); @@ -182,11 +213,83 @@ int retrieve_file_from_archive(char *filename, struct __config *cfg){ } +int retrieve_email_from_archive(struct session_data *sdata, FILE *dest, struct __config *cfg){ + int i, rc, attachments; + char *buffer=NULL, *saved_buffer, *p, filename[SMALLBUFSIZE], pointer[SMALLBUFSIZE]; + struct ptr_array ptr_arr[MAX_ATTACHMENTS]; + + if(strlen(sdata->ttmpfile) != RND_STR_LEN){ + fprintf(stderr, "invalid piler-id: %s\n", sdata->ttmpfile); + return 1; + } + + attachments = query_attachments(sdata, &ptr_arr[0], cfg); + + if(attachments == -1){ + fprintf(stderr, "problem querying the attachment of %s\n", sdata->ttmpfile); + return 1; + } + + snprintf(filename, sizeof(filename)-1, "%s/%c%c/%c%c/%c%c/%s.m", cfg->queuedir, *(sdata->ttmpfile+RND_STR_LEN-6), *(sdata->ttmpfile+RND_STR_LEN-5), *(sdata->ttmpfile+RND_STR_LEN-4), *(sdata->ttmpfile+RND_STR_LEN-3), *(sdata->ttmpfile+RND_STR_LEN-2), *(sdata->ttmpfile+RND_STR_LEN-1), sdata->ttmpfile); + + if(attachments == 0){ + rc = retrieve_file_from_archive(filename, WRITE_TO_STDOUT, &buffer, dest, cfg); + } + else { + rc = retrieve_file_from_archive(filename, WRITE_TO_BUFFER, &buffer, dest, cfg); + + if(buffer){ + saved_buffer = buffer; + + for(i=1; i<=attachments; i++){ + snprintf(pointer, sizeof(pointer)-1, "ATTACHMENT_POINTER_%s.a%d_XXX_PILER", sdata->ttmpfile, i); + + p = strstr(buffer, pointer); + if(p){ + *p = '\0'; + //printf("%s", buffer); + fwrite(buffer, 1, p - buffer, dest); + buffer = p + strlen(pointer); + + if(strlen(ptr_arr[i].piler_id) == RND_STR_LEN){ + snprintf(filename, sizeof(filename)-1, "%s/%c%c/%c%c/%c%c/%s.a%d", cfg->queuedir, ptr_arr[i].piler_id[RND_STR_LEN-6], ptr_arr[i].piler_id[RND_STR_LEN-5], ptr_arr[i].piler_id[RND_STR_LEN-4], ptr_arr[i].piler_id[RND_STR_LEN-3], ptr_arr[i].piler_id[RND_STR_LEN-2], ptr_arr[i].piler_id[RND_STR_LEN-1], ptr_arr[i].piler_id, ptr_arr[i].attachment_id); + + rc = retrieve_file_from_archive(filename, WRITE_TO_STDOUT, NULL, dest, cfg); + } + } + + } + + if(buffer){ + //printf("%s", buffer); + fwrite(buffer, 1, strlen(buffer), dest); + } + + buffer = saved_buffer; + free(buffer); + } + } + + return 0; +} + + int main(int argc, char **argv){ + int rc; + char filename[SMALLBUFSIZE]; + FILE *f; + struct session_data sdata; struct __config cfg; - cfg = read_config(configfile); + if(argc < 2){ + fprintf(stderr, "usage: %s \n", argv[0]); + exit(1); + } + + + cfg = read_config(CONFIG_FILE); + if(read_key(&cfg)){ printf("%s\n", ERR_READING_KEY); @@ -194,13 +297,40 @@ int main(int argc, char **argv){ } - if(argc != 2){ - printf("usage: %s \n", argv[0]); - return 1; + mysql_init(&(sdata.mysql)); + mysql_options(&(sdata.mysql), MYSQL_OPT_CONNECT_TIMEOUT, (const char*)&cfg.mysql_connect_timeout); + if(mysql_real_connect(&(sdata.mysql), cfg.mysqlhost, cfg.mysqluser, cfg.mysqlpwd, cfg.mysqldb, cfg.mysqlport, cfg.mysqlsocket, 0) == 0){ + printf("cannot connect to mysql server\n"); + return 0; + } + + mysql_real_query(&(sdata.mysql), "SET NAMES utf8", strlen("SET NAMES utf8")); + mysql_real_query(&(sdata.mysql), "SET CHARACTER SET utf8", strlen("SET CHARACTER SET utf8")); + + + if(argv[1][0] == '-'){ + + while((rc = read(0, sdata.ttmpfile, RND_STR_LEN+1)) > 0){ + trimBuffer(sdata.ttmpfile); + snprintf(filename, sizeof(filename)-1, "%s.eml", sdata.ttmpfile); + f = fopen(filename, "w"); + if(f){ + rc = retrieve_email_from_archive(&sdata, f, &cfg); + fclose(f); + } + else printf("cannot open: %s\n", filename); + } + + } + else { + snprintf(sdata.ttmpfile, SMALLBUFSIZE-1, "%s", argv[1]); + rc = retrieve_email_from_archive(&sdata, stdout, &cfg); } - retrieve_file_from_archive(argv[1], &cfg); + mysql_close(&(sdata.mysql)); return 0; } + + diff --git a/src/tai.h b/src/tai.h index 632d7642..07f799d1 100644 --- a/src/tai.h +++ b/src/tai.h @@ -2,6 +2,9 @@ * tai.h, SJ */ +#ifndef _TAI_H + #define _TAI_H + #define TAI_PACK 8 #define TAIA_PACK 16 #define TIMESTAMP 25 @@ -22,4 +25,5 @@ struct taia { void taia_now(struct taia *t); void taia_pack(char *s, struct taia *t); +#endif /* _TAI_H */