From ab344e0385f76fb618e9525ab8c1b739bf64fe6c Mon Sep 17 00:00:00 2001 From: SJ Date: Sat, 9 May 2015 14:31:20 +0200 Subject: [PATCH] added a mmap based ipc feature to prevent duplicates --- configure | 25 +++++++++++++++++++++++++ configure.in | 21 +++++++++++++++++++++ piler-config.h.in | 3 +++ src/cfg.c | 1 + src/cfg.h | 2 ++ src/defs.h | 3 +++ src/errmsg.h | 1 + src/message.c | 18 ++++++++++++++++++ src/piler.c | 21 ++++++++++++++++++--- util/Makefile.in | 1 + util/deduphelper | 1 + 11 files changed, 94 insertions(+), 3 deletions(-) create mode 100644 util/deduphelper diff --git a/configure b/configure index 53651794..07d0aa12 100755 --- a/configure +++ b/configure @@ -621,6 +621,7 @@ ac_includes_default="\ ac_subst_vars='LTLIBOBJS LIBOBJS +DATAROOTDIR LIBEXECDIR DATADIR CFGDIR @@ -4610,6 +4611,30 @@ _ACEOF +dataroot_dir=`echo $datarootdir | grep prefix` + +if test -n "$dataroot_dir"; then + if test "$prefix" = "NONE" + then + dataroot_dir="$ac_default_prefix/share" + else + dataroot_dir="$prefix/share" + fi +else + dataroot_dir="$datarootdir" +fi + + +DATAROOTDIR=$dataroot_dir + + +cat >>confdefs.h <<_ACEOF +#define DATAROOTDIR "$dataroot_dir" +_ACEOF + + + + cat >>confdefs.h <<_ACEOF #define VIRUS_TEMPLATE "$my_prefix/share/clapf/template.virus" _ACEOF diff --git a/configure.in b/configure.in index 2f0f1956..1a4da2f2 100644 --- a/configure.in +++ b/configure.in @@ -340,6 +340,27 @@ AC_SUBST(LIBEXECDIR) AC_DEFINE_UNQUOTED(LIBEXECDIR,"$libexec_dir",[where to look for the piler helpers]) +dnl configure dataroot directory + +dataroot_dir=`echo $datarootdir | grep prefix` + +if test -n "$dataroot_dir"; then + if test "$prefix" = "NONE" + then + dataroot_dir="$ac_default_prefix/share" + else + dataroot_dir="$prefix/share" + fi +else + dataroot_dir="$datarootdir" +fi + + +DATAROOTDIR=$dataroot_dir +AC_SUBST(DATAROOTDIR) +AC_DEFINE_UNQUOTED(DATAROOTDIR,"$dataroot_dir",[where to look for the share data files]) + + AC_DEFINE_UNQUOTED(VIRUS_TEMPLATE, "$my_prefix/share/clapf/template.virus", [where the virus template is]) AC_DEFINE_UNQUOTED(ZOMBIE_NET_REGEX, "$my_prefix/share/clapf/zombienets.regex", [where the virus template is]) diff --git a/piler-config.h.in b/piler-config.h.in index 8dbbe98f..686eb031 100644 --- a/piler-config.h.in +++ b/piler-config.h.in @@ -4,9 +4,12 @@ #define CONFDIR "/usr/local/etc" #define DATADIR "/usr/local/var" +#define DATAROOTDIR "/usr/local/share" #define KEYFILE CONFDIR "/piler.key" +#define MESSAGE_ID_DEDUP_FILE DATAROOTDIR "/piler/deduphelper" + #define HAVE_DAEMON 1 #undef TIMEOUT_BINARY diff --git a/src/cfg.c b/src/cfg.c index 1dc070c6..a5bd2f50 100644 --- a/src/cfg.c +++ b/src/cfg.c @@ -83,6 +83,7 @@ struct _parse_rule config_parse_rules[] = { "memcached_to_db_interval", "integer", (void*) int_parser, offsetof(struct __config, memcached_to_db_interval), "900", sizeof(int)}, { "memcached_ttl", "integer", (void*) int_parser, offsetof(struct __config, memcached_ttl), "86400", sizeof(int)}, { "min_word_len", "integer", (void*) int_parser, offsetof(struct __config, min_word_len), "1", sizeof(int)}, + { "mmap_dedup_test", "integer", (void*) int_parser, offsetof(struct __config, mmap_dedup_test), "0", sizeof(int)}, { "mysqlhost", "string", (void*) string_parser, offsetof(struct __config, mysqlhost), "", MAXVAL-1}, { "mysqlport", "integer", (void*) int_parser, offsetof(struct __config, mysqlport), "", sizeof(int)}, { "mysqlsocket", "string", (void*) string_parser, offsetof(struct __config, mysqlsocket), "/tmp/mysql.sock", MAXVAL-1}, diff --git a/src/cfg.h b/src/cfg.h index 21fb63d1..66f62b4e 100644 --- a/src/cfg.h +++ b/src/cfg.h @@ -89,6 +89,8 @@ struct __config { int syslog_recipients; + int mmap_dedup_test; + int debug; }; diff --git a/src/defs.h b/src/defs.h index 3ac92d1c..9e31bbf0 100644 --- a/src/defs.h +++ b/src/defs.h @@ -69,6 +69,7 @@ typedef void signal_func (int); struct child { pid_t pid; + int serial; int messages; int status; }; @@ -288,6 +289,8 @@ struct __data { struct node *mydomains[MAXHASH]; struct node *imapfolders[MAXHASH]; struct import *import; + char *dedup; + int child_serial; #ifdef NEED_MYSQL MYSQL_STMT *stmt_generic; diff --git a/src/errmsg.h b/src/errmsg.h index 7b9aacdc..933648ce 100644 --- a/src/errmsg.h +++ b/src/errmsg.h @@ -18,6 +18,7 @@ #define ERR_OPEN_TMP_FILE "ERR: opening a tempfile" #define ERR_TIMED_OUT "ERR: timed out" #define ERR_FORK_FAILED "ERR: cannot fork()" +#define ERR_OPEN_DEDUP_FILE "ERR: cannot open dedup file" #define ERR_MYSQL_CONNECT "Cannot connect to mysql server" #define ERR_PSQL_CONNECT "Cannot connect to PSql server" diff --git a/src/message.c b/src/message.c index 6aaf1246..97986b98 100644 --- a/src/message.c +++ b/src/message.c @@ -263,6 +263,24 @@ int process_message(struct session_data *sdata, struct _state *state, struct __d if(cfg->verbosity >= _LOG_DEBUG) syslog(LOG_PRIORITY, "%s: touch %s OK (%s)", sdata->ttmpfile, state->message_id_hash, state->message_id); + + + if(cfg->mmap_dedup_test == 1 && data->dedup != MAP_FAILED && data->child_serial >= 0 && data->child_serial < MAXCHILDREN){ + + if(strstr(data->dedup, state->message_id_hash)){ + if(cfg->verbosity >= _LOG_DEBUG) syslog(LOG_INFO, "%s: dedup string: %s", sdata->ttmpfile, data->dedup); + if(cfg->verbosity >= _LOG_DEBUG) syslog(LOG_INFO, "%s: message-id-hash=%s, serial=%d", sdata->ttmpfile, state->message_id_hash, data->child_serial); + + remove_stripped_attachments(state); + return ERR_EXISTS; + } + + memcpy(data->dedup + data->child_serial*DIGEST_LENGTH*2, state->message_id_hash, DIGEST_LENGTH*2); + } + + + + /* store base64 encoded file attachments */ if(state->n_attachments > 0){ diff --git a/src/piler.c b/src/piler.c index 4586b251..ec1d0965 100644 --- a/src/piler.c +++ b/src/piler.c @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -79,6 +80,7 @@ static void takesig(int sig){ if(quit == 0){ i = search_slot_by_pid(pid); if(i >= 0){ + children[i].serial = i; children[i].status = READY; children[i].pid = child_make(&children[i]); } @@ -108,7 +110,7 @@ static void child_main(struct child *ptr){ ptr->messages = 0; - if(cfg.verbosity >= _LOG_DEBUG) syslog(LOG_PRIORITY, "child (pid: %d) started main()", getpid()); + if(cfg.verbosity >= _LOG_DEBUG) syslog(LOG_PRIORITY, "child (pid: %d, serial: %d) started main()", getpid(), ptr->serial); while(1){ if(received_sighup == 1){ @@ -129,6 +131,7 @@ static void child_main(struct child *ptr){ syslog(LOG_PRIORITY, "connection from %s", s); + data.child_serial = ptr->serial; sig_block(SIGHUP); ptr->messages += handle_smtp_session(new_sd, &data, &cfg); @@ -137,7 +140,7 @@ static void child_main(struct child *ptr){ close(new_sd); if(cfg.max_requests_per_child > 0 && ptr->messages >= cfg.max_requests_per_child){ - if(cfg.verbosity >= _LOG_DEBUG) syslog(LOG_PRIORITY, "child (pid: %d) served enough: %d", getpid(), ptr->messages); + if(cfg.verbosity >= _LOG_DEBUG) syslog(LOG_PRIORITY, "child (pid: %d, serial: %d) served enough: %d", getpid(), ptr->messages, ptr->serial); break; } @@ -184,10 +187,12 @@ int child_pool_create(){ children[i].pid = 0; children[i].messages = 0; children[i].status = UNDEF; + children[i].serial = -1; } for(i=0; i