processing fix

Change-Id: Idde6a32aa3cca70850dd34b13cf4986dc17c5e3b
Signed-off-by: SJ <sj@acts.hu>
This commit is contained in:
SJ 2016-10-26 22:18:18 +02:00
parent 076b918008
commit edd4ff0349
3 changed files with 90 additions and 86 deletions

View File

@ -111,7 +111,8 @@
#define OK 0 #define OK 0
#define ERR 1 #define ERR 1
#define ERR_EXISTS 2 #define ERR_EXISTS 2
#define ERR_MYDOMAINS 3 #define ERR_DISCARDED 3
#define ERR_MYDOMAINS 4
#define ERR_FOLDER -1 #define ERR_FOLDER -1
#define AVIR_OK 0 #define AVIR_OK 0

View File

@ -354,7 +354,6 @@ struct __data {
#ifdef HAVE_MEMCACHED #ifdef HAVE_MEMCACHED
struct memcached_server memc; struct memcached_server memc;
#endif #endif
SSL_CTX *ctx; SSL_CTX *ctx;
SSL *ssl; SSL *ssl;
}; };
@ -370,20 +369,6 @@ struct counters {
}; };
struct session_ctx {
char *status;
int new_sd;
int db_conn;
int inj;
int bdat_rounds;
int bdat_last_round;
struct __config *cfg;
struct __data *data;
struct session_data *sdata;
struct parser_state *parser_state;
struct counters *counters;
};
struct smtp_session { struct smtp_session {
char ttmpfile[SMALLBUFSIZE]; char ttmpfile[SMALLBUFSIZE];
char mailfrom[SMALLBUFSIZE]; char mailfrom[SMALLBUFSIZE];

View File

@ -24,8 +24,6 @@
#include <dirent.h> #include <dirent.h>
#include <locale.h> #include <locale.h>
#include <errno.h> #include <errno.h>
#include <openssl/ssl.h>
#include <openssl/err.h>
#include <piler.h> #include <piler.h>
#define PROGNAME "piler" #define PROGNAME "piler"
@ -103,15 +101,94 @@ static void child_sighup_handler(int sig){
} }
int process_email(char *filename, struct session_data *sdata, struct __data *data, int size, struct __config *cfg){
int rc;
char *status=S_STATUS_UNDEF;
char *arule;
struct timezone tz;
struct timeval tv1, tv2;
struct parser_state parser_state;
struct counters counters;
gettimeofday(&tv1, &tz);
bzero(&counters, sizeof(counters));
init_session_data(sdata, cfg);
sdata->tot_len = size;
/*if(data->import->extra_recipient){
snprintf(sdata->rcptto[0], SMALLBUFSIZE-1, "%s", data->import->extra_recipient);
sdata->num_of_rcpt_to = 1;
}*/
parser_state = parse_message(sdata, 1, data, cfg);
post_parse(sdata, &parser_state, cfg);
arule = check_againt_ruleset(data->archiving_rules, &parser_state, sdata->tot_len, sdata->spam_message);
if(arule){
syslog(LOG_PRIORITY, "%s: discarding: archiving policy: *%s*", filename, arule);
rc = ERR_DISCARDED;
remove_stripped_attachments(&parser_state);
}
else {
make_digests(sdata, cfg);
if(sdata->hdr_len < 10){
syslog(LOG_PRIORITY, "%s: invalid message, hdr_len: %d", filename, sdata->hdr_len);
rc = ERR;
}
rc = process_message(sdata, &parser_state, data, cfg);
unlink(parser_state.message_id_hash);
}
unlink(sdata->tmpframe);
if(rc == OK){
status = S_STATUS_STORED;
counters.c_rcvd = 1;
counters.c_size += sdata->tot_len;
counters.c_stored_size = sdata->stored_len;
}
else if(rc == ERR_EXISTS){
status = S_STATUS_DUPLICATE;
counters.c_duplicate = 1;
syslog(LOG_PRIORITY, "%s: discarding: duplicate message, id: %llu, message-id: %s", filename, sdata->duplicate_id, parser_state.message_id);
}
else if(rc == ERR_DISCARDED){
status = S_STATUS_DISCARDED;
counters.c_ignore = 1;
}
else {
status = S_STATUS_ERROR;
}
if(rc != ERR) unlink(filename);
update_counters(sdata, data, &counters, cfg);
gettimeofday(&tv2, &tz);
syslog(LOG_PRIORITY, "%s: from=%s, size=%d/%d, attachments=%d, reference=%s, message-id=%s, retention=%d, folder=%d, delay=%.4f, status=%s",
filename, sdata->fromemail, sdata->tot_len,
sdata->stored_len, parser_state.n_attachments,
parser_state.reference, parser_state.message_id,
parser_state.retention, data->folder, tvdiff(tv2,tv1)/1000000.0, status);
return rc;
}
int process_dir(char *directory, struct session_data *sdata, struct __data *data, struct __config *cfg){ int process_dir(char *directory, struct session_data *sdata, struct __data *data, struct __config *cfg){
DIR *dir; DIR *dir;
struct dirent *de; struct dirent *de;
int rc=ERR, tot_msgs=0; int rc=ERR, tot_msgs=0;
char fname[SMALLBUFSIZE]; char fname[SMALLBUFSIZE];
char *status;
struct stat st; struct stat st;
struct timezone tz;
struct timeval tv1, tv2;
dir = opendir(directory); dir = opendir(directory);
if(!dir){ if(!dir){
@ -126,38 +203,15 @@ int process_dir(char *directory, struct session_data *sdata, struct __data *data
if(stat(fname, &st) == 0){ if(stat(fname, &st) == 0){
if(S_ISREG(st.st_mode)){ if(S_ISREG(st.st_mode)){
rc = process_email(fname, sdata, data, st.st_size, cfg);
status = NULL; if(rc == OK || rc == ERR_EXISTS){
gettimeofday(&tv1, &tz);
// ide kene az import_message fv roviden, es akkor mindent tudna loggolni!
rc = import_message(fname, sdata, data, cfg);
gettimeofday(&tv2, &tz);
if(rc == OK){
tot_msgs++; tot_msgs++;
status = S_STATUS_STORED; unlink(fname);
}
else if(rc == ERR_EXISTS){
tot_msgs++;
status = S_STATUS_DUPLICATE;
}
else {
status = S_STATUS_ERROR;
} }
//Oct 25 20:37:55 f5e88a047257 piler[3236]: 1/40000000580fc29234488f440fdc735c1869: size=172527/128280, delay=36067, status=stored //Oct 25 20:37:55 f5e88a047257 piler[3236]: 1/40000000580fc29234488f440fdc735c1869: size=172527/128280, delay=36067, status=stored
//syslog(LOG_PRIORITY, "%s: size=%d/%d, delay=%ld, status=%s", fname, sdata->tot_len, sdata->stored_len, tvdiff(tv2, tv1), status);
syslog(LOG_PRIORITY, "%s: size=%d/%d, delay=%ld, status=%s", fname, sdata->tot_len, sdata->stored_len, tvdiff(tv2, tv1), status);
/*syslog(LOG_PRIORITY, "%s: from=%s, size=%d/%d, attachments=%d, reference=%s, message-id=%s, retention=%d, folder=%d, %s, status=%s",
fname, sdata->fromemail, sdata->tot_len,
sdata->stored_len, parser_state->n_attachments,
sctx->parser_state->reference, sctx->parser_state->message_id,
sctx->parser_state->retention, sctx->data->folder, delay, sctx->status);*/
if(rc != ERR) unlink(fname);
} }
} }
else { else {
@ -172,17 +226,10 @@ int process_dir(char *directory, struct session_data *sdata, struct __data *data
static void child_main(struct child *ptr){ static void child_main(struct child *ptr){
struct import import;
struct session_data sdata; struct session_data sdata;
char dir[TINYBUFSIZE]; char dir[TINYBUFSIZE];
/* open directory, then process its files, then sleep 2 sec, and repeat */ /* open directory, then process its files, then sleep 1 sec, and repeat */
import.total_messages = import.total_size = import.processed_messages = import.batch_processing_limit = 0;
import.remove_after_import = 1;
import.extra_recipient = import.move_folder = NULL;
data.import = &import;
ptr->messages = 0; ptr->messages = 0;
@ -202,7 +249,7 @@ static void child_main(struct child *ptr){
ptr->messages += process_dir(dir, &sdata, &data, &cfg); ptr->messages += process_dir(dir, &sdata, &data, &cfg);
close_database(&sdata); close_database(&sdata);
sleep(2); sleep(1);
} }
else { else {
syslog(LOG_PRIORITY, "ERROR: cannot open database"); syslog(LOG_PRIORITY, "ERROR: cannot open database");
@ -317,10 +364,6 @@ void p_clean_exit(){
if(data.dedup != MAP_FAILED) munmap(data.dedup, MAXCHILDREN*DIGEST_LENGTH*2); if(data.dedup != MAP_FAILED) munmap(data.dedup, MAXCHILDREN*DIGEST_LENGTH*2);
if(data.ctx) SSL_CTX_free(data.ctx);
ERR_free_strings();
exit(1); exit(1);
} }
@ -331,25 +374,6 @@ void fatal(char *s){
} }
int init_ssl(){
SSL_library_init();
SSL_load_error_strings();
data.ctx = SSL_CTX_new(TLSv1_server_method());
if(data.ctx == NULL){ syslog(LOG_PRIORITY, "SSL_CTX_new() failed"); return ERR; }
if(SSL_CTX_set_cipher_list(data.ctx, cfg.cipher_list) == 0){ syslog(LOG_PRIORITY, "failed to set cipher list: '%s'", cfg.cipher_list); return ERR; }
if(SSL_CTX_use_PrivateKey_file(data.ctx, cfg.pemfile, SSL_FILETYPE_PEM) != 1){ syslog(LOG_PRIORITY, "cannot load private key from %s", cfg.pemfile); return ERR; }
if(SSL_CTX_use_certificate_file(data.ctx, cfg.pemfile, SSL_FILETYPE_PEM) != 1){ syslog(LOG_PRIORITY, "cannot load certificate from %s", cfg.pemfile); return ERR; }
return OK;
}
void initialise_configuration(){ void initialise_configuration(){
struct session_data sdata; struct session_data sdata;
@ -392,10 +416,6 @@ void initialise_configuration(){
initrules(data.retention_rules); initrules(data.retention_rules);
initrules(data.folder_rules); initrules(data.folder_rules);
if(cfg.tls_enable > 0 && data.ctx == NULL && init_ssl() == OK){
snprintf(data.starttls, sizeof(data.starttls)-1, SMTP_EXTENSION_STARTTLS);
}
if(open_database(&sdata, &cfg) == ERR){ if(open_database(&sdata, &cfg) == ERR){
syslog(LOG_PRIORITY, "cannot connect to mysql server"); syslog(LOG_PRIORITY, "cannot connect to mysql server");
return; return;
@ -460,8 +480,6 @@ int main(int argc, char **argv){
initrules(data.archiving_rules); initrules(data.archiving_rules);
initrules(data.retention_rules); initrules(data.retention_rules);
initrules(data.folder_rules); initrules(data.folder_rules);
data.ctx = NULL;
data.ssl = NULL;
data.dedup = MAP_FAILED; data.dedup = MAP_FAILED;
memset(data.starttls, 0, sizeof(data.starttls)); memset(data.starttls, 0, sizeof(data.starttls));