mirror of
https://bitbucket.org/jsuto/piler.git
synced 2025-06-12 23:17:02 +02:00
Merge branch 'mmap_dedup'
This commit is contained in:
@ -83,6 +83,7 @@ struct _parse_rule config_parse_rules[] =
|
||||
{ "memcached_to_db_interval", "integer", (void*) int_parser, offsetof(struct __config, memcached_to_db_interval), "900", sizeof(int)},
|
||||
{ "memcached_ttl", "integer", (void*) int_parser, offsetof(struct __config, memcached_ttl), "86400", sizeof(int)},
|
||||
{ "min_word_len", "integer", (void*) int_parser, offsetof(struct __config, min_word_len), "1", sizeof(int)},
|
||||
{ "mmap_dedup_test", "integer", (void*) int_parser, offsetof(struct __config, mmap_dedup_test), "0", sizeof(int)},
|
||||
{ "mysqlhost", "string", (void*) string_parser, offsetof(struct __config, mysqlhost), "", MAXVAL-1},
|
||||
{ "mysqlport", "integer", (void*) int_parser, offsetof(struct __config, mysqlport), "", sizeof(int)},
|
||||
{ "mysqlsocket", "string", (void*) string_parser, offsetof(struct __config, mysqlsocket), "/tmp/mysql.sock", MAXVAL-1},
|
||||
|
@ -89,6 +89,8 @@ struct __config {
|
||||
|
||||
int syslog_recipients;
|
||||
|
||||
int mmap_dedup_test;
|
||||
|
||||
int debug;
|
||||
};
|
||||
|
||||
|
@ -69,6 +69,7 @@ typedef void signal_func (int);
|
||||
|
||||
struct child {
|
||||
pid_t pid;
|
||||
int serial;
|
||||
int messages;
|
||||
int status;
|
||||
};
|
||||
@ -288,6 +289,8 @@ struct __data {
|
||||
struct node *mydomains[MAXHASH];
|
||||
struct node *imapfolders[MAXHASH];
|
||||
struct import *import;
|
||||
char *dedup;
|
||||
int child_serial;
|
||||
|
||||
#ifdef NEED_MYSQL
|
||||
MYSQL_STMT *stmt_generic;
|
||||
|
@ -18,6 +18,7 @@
|
||||
#define ERR_OPEN_TMP_FILE "ERR: opening a tempfile"
|
||||
#define ERR_TIMED_OUT "ERR: timed out"
|
||||
#define ERR_FORK_FAILED "ERR: cannot fork()"
|
||||
#define ERR_OPEN_DEDUP_FILE "ERR: cannot open dedup file"
|
||||
|
||||
#define ERR_MYSQL_CONNECT "Cannot connect to mysql server"
|
||||
#define ERR_PSQL_CONNECT "Cannot connect to PSql server"
|
||||
|
@ -263,6 +263,24 @@ int process_message(struct session_data *sdata, struct _state *state, struct __d
|
||||
|
||||
if(cfg->verbosity >= _LOG_DEBUG) syslog(LOG_PRIORITY, "%s: touch %s OK (%s)", sdata->ttmpfile, state->message_id_hash, state->message_id);
|
||||
|
||||
|
||||
|
||||
if(cfg->mmap_dedup_test == 1 && data->dedup != MAP_FAILED && data->child_serial >= 0 && data->child_serial < MAXCHILDREN){
|
||||
|
||||
if(strstr(data->dedup, state->message_id_hash)){
|
||||
if(cfg->verbosity >= _LOG_DEBUG) syslog(LOG_INFO, "%s: dedup string: %s", sdata->ttmpfile, data->dedup);
|
||||
if(cfg->verbosity >= _LOG_DEBUG) syslog(LOG_INFO, "%s: message-id-hash=%s, serial=%d", sdata->ttmpfile, state->message_id_hash, data->child_serial);
|
||||
|
||||
remove_stripped_attachments(state);
|
||||
return ERR_EXISTS;
|
||||
}
|
||||
|
||||
memcpy(data->dedup + data->child_serial*DIGEST_LENGTH*2, state->message_id_hash, DIGEST_LENGTH*2);
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
/* store base64 encoded file attachments */
|
||||
|
||||
if(state->n_attachments > 0){
|
||||
|
21
src/piler.c
21
src/piler.c
@ -8,6 +8,7 @@
|
||||
#include <strings.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/socket.h>
|
||||
#include <sys/mman.h>
|
||||
#include <netdb.h>
|
||||
#include <netinet/in.h>
|
||||
#include <arpa/inet.h>
|
||||
@ -79,6 +80,7 @@ static void takesig(int sig){
|
||||
if(quit == 0){
|
||||
i = search_slot_by_pid(pid);
|
||||
if(i >= 0){
|
||||
children[i].serial = i;
|
||||
children[i].status = READY;
|
||||
children[i].pid = child_make(&children[i]);
|
||||
}
|
||||
@ -108,7 +110,7 @@ static void child_main(struct child *ptr){
|
||||
|
||||
ptr->messages = 0;
|
||||
|
||||
if(cfg.verbosity >= _LOG_DEBUG) syslog(LOG_PRIORITY, "child (pid: %d) started main()", getpid());
|
||||
if(cfg.verbosity >= _LOG_DEBUG) syslog(LOG_PRIORITY, "child (pid: %d, serial: %d) started main()", getpid(), ptr->serial);
|
||||
|
||||
while(1){
|
||||
if(received_sighup == 1){
|
||||
@ -129,6 +131,7 @@ static void child_main(struct child *ptr){
|
||||
|
||||
syslog(LOG_PRIORITY, "connection from %s", s);
|
||||
|
||||
data.child_serial = ptr->serial;
|
||||
|
||||
sig_block(SIGHUP);
|
||||
ptr->messages += handle_smtp_session(new_sd, &data, &cfg);
|
||||
@ -137,7 +140,7 @@ static void child_main(struct child *ptr){
|
||||
close(new_sd);
|
||||
|
||||
if(cfg.max_requests_per_child > 0 && ptr->messages >= cfg.max_requests_per_child){
|
||||
if(cfg.verbosity >= _LOG_DEBUG) syslog(LOG_PRIORITY, "child (pid: %d) served enough: %d", getpid(), ptr->messages);
|
||||
if(cfg.verbosity >= _LOG_DEBUG) syslog(LOG_PRIORITY, "child (pid: %d, serial: %d) served enough: %d", getpid(), ptr->messages, ptr->serial);
|
||||
break;
|
||||
}
|
||||
|
||||
@ -184,10 +187,12 @@ int child_pool_create(){
|
||||
children[i].pid = 0;
|
||||
children[i].messages = 0;
|
||||
children[i].status = UNDEF;
|
||||
children[i].serial = -1;
|
||||
}
|
||||
|
||||
for(i=0; i<cfg.number_of_worker_processes; i++){
|
||||
children[i].status = READY;
|
||||
children[i].serial = i;
|
||||
children[i].pid = child_make(&children[i]);
|
||||
|
||||
if(children[i].pid == -1){
|
||||
@ -237,6 +242,8 @@ void p_clean_exit(){
|
||||
|
||||
unlink(cfg.pidfile);
|
||||
|
||||
if(data.dedup != MAP_FAILED) munmap(data.dedup, MAXCHILDREN*DIGEST_LENGTH*2);
|
||||
|
||||
#ifdef HAVE_STARTTLS
|
||||
if(data.ctx){
|
||||
SSL_CTX_free(data.ctx);
|
||||
@ -345,7 +352,7 @@ void initialise_configuration(){
|
||||
|
||||
|
||||
int main(int argc, char **argv){
|
||||
int i, rc, yes=1, daemonise=0;
|
||||
int i, rc, yes=1, daemonise=0, dedupfd;
|
||||
char port_string[8];
|
||||
struct addrinfo hints, *res;
|
||||
|
||||
@ -384,6 +391,7 @@ int main(int argc, char **argv){
|
||||
initrules(data.retention_rules);
|
||||
data.ctx = NULL;
|
||||
data.ssl = NULL;
|
||||
data.dedup = MAP_FAILED;
|
||||
memset(data.starttls, 0, sizeof(data.starttls));
|
||||
|
||||
|
||||
@ -425,6 +433,13 @@ int main(int argc, char **argv){
|
||||
|
||||
if(drop_privileges(pwd)) fatal(ERR_SETUID);
|
||||
|
||||
dedupfd = open(MESSAGE_ID_DEDUP_FILE, O_RDWR);
|
||||
if(dedupfd == -1) fatal(ERR_OPEN_DEDUP_FILE);
|
||||
|
||||
data.dedup = mmap(NULL, MAXCHILDREN*DIGEST_LENGTH*2, PROT_READ|PROT_WRITE, MAP_SHARED, dedupfd, 0);
|
||||
close(dedupfd);
|
||||
|
||||
if(data.dedup == MAP_FAILED) syslog(LOG_INFO, "cannot mmap() %s, errno=%d", MESSAGE_ID_DEDUP_FILE, errno);
|
||||
|
||||
syslog(LOG_PRIORITY, "%s %s, build %d starting", PROGNAME, VERSION, get_build());
|
||||
|
||||
|
Reference in New Issue
Block a user