diff --git a/src/cfg.c b/src/cfg.c index 1b175096..e510bc4e 100644 --- a/src/cfg.c +++ b/src/cfg.c @@ -84,8 +84,12 @@ struct _parse_rule config_parse_rules[] = { "piler_header_field", "string", (void*) string_parser, offsetof(struct config, piler_header_field), "X-piler-id:", MAXVAL-1}, { "process_rcpt_to_addresses", "integer", (void*) int_parser, offsetof(struct config, process_rcpt_to_addresses), "0", sizeof(int)}, { "queuedir", "string", (void*) string_parser, offsetof(struct config, queuedir), QUEUE_DIR, MAXVAL-1}, + { "rtindex", "integer", (void*) int_parser, offsetof(struct config, rtindex), "0", sizeof(int)}, { "security_header", "string", (void*) string_parser, offsetof(struct config, security_header), "", MAXVAL-1}, { "server_id", "integer", (void*) int_parser, offsetof(struct config, server_id), "0", sizeof(int)}, + { "sphxdb", "string", (void*) string_parser, offsetof(struct config, sphxdb), "piler1", MAXVAL-1}, + { "sphxhost", "string", (void*) string_parser, offsetof(struct config, sphxhost), "127.0.0.1", MAXVAL-1}, + { "sphxport", "integer", (void*) int_parser, offsetof(struct config, sphxport), "9306", sizeof(int)}, { "smtp_access_list", "integer", (void*) int_parser, offsetof(struct config, smtp_access_list), "0", sizeof(int)}, { "smtp_timeout", "integer", (void*) int_parser, offsetof(struct config, smtp_timeout), "60", sizeof(int)}, { "spam_header_line", "string", (void*) string_parser, offsetof(struct config, spam_header_line), "", MAXVAL-1}, diff --git a/src/cfg.h b/src/cfg.h index 5b035825..5929e17c 100644 --- a/src/cfg.h +++ b/src/cfg.h @@ -80,6 +80,12 @@ struct config { char mysqldb[MAXVAL]; int mysql_connect_timeout; + // manticore stuff + char sphxhost[MAXVAL]; + int sphxport; + char sphxdb[MAXVAL]; + int rtindex; + int update_counters_to_memcached; int memcached_to_db_interval; diff --git a/src/config.h b/src/config.h index 7060faaf..b365d18c 100644 --- a/src/config.h +++ b/src/config.h @@ -114,6 +114,7 @@ #define SQL_PREPARED_STMT_GET_META_ID_BY_MESSAGE_ID "SELECT id, piler_id FROM " SQL_METADATA_TABLE " WHERE message_id=?" #define SQL_PREPARED_STMT_INSERT_INTO_RCPT_TABLE "INSERT INTO " SQL_RECIPIENT_TABLE " (`id`,`to`,`todomain`) VALUES(?,?,?)" #define SQL_PREPARED_STMT_INSERT_INTO_SPHINX_TABLE "INSERT INTO " SQL_SPHINX_TABLE " (`id`, `from`, `to`, `fromdomain`, `todomain`, `subject`, `body`, `arrived`, `sent`, `size`, `direction`, `folder`, `attachments`, `attachment_types`) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)" +#define SQL_PREPARED_STMT_INSERT_INTO_RT_TABLE "INSERT INTO piler1 (id, sender, rcpt, senderdomain, rcptdomain, subject, body, arrived, sent, size, direction, folder, attachments, attachment_types) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?)" #define SQL_PREPARED_STMT_INSERT_INTO_META_TABLE "INSERT INTO " SQL_METADATA_TABLE " (`from`,`fromdomain`,`subject`,`spam`,`arrived`,`sent`,`retained`,`size`,`hlen`,`direction`,`attachments`,`piler_id`,`message_id`,`reference`,`digest`,`bodydigest`,`vcode`) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)" #define SQL_PREPARED_STMT_INSERT_INTO_ATTACHMENT_TABLE "INSERT INTO " SQL_ATTACHMENT_TABLE " (`piler_id`,`attachment_id`,`sig`,`name`,`type`,`size`,`ptr`) VALUES(?,?,?,?,?,?,?)" #define SQL_PREPARED_STMT_GET_ATTACHMENT_ID_BY_SIGNATURE "SELECT `id` FROM `" SQL_ATTACHMENT_TABLE "` WHERE `sig`=? AND `ptr`=0 AND `size`=?" diff --git a/src/defs.h b/src/defs.h index c8b56667..9ad64c81 100644 --- a/src/defs.h +++ b/src/defs.h @@ -253,7 +253,7 @@ struct session_data { int journal_envelope_length, journal_bottom_length; unsigned int sql_errno; #ifdef NEED_MYSQL - MYSQL mysql; + MYSQL mysql, sphx; #endif }; diff --git a/src/message.c b/src/message.c index 0b98ff6e..3dcd95d0 100644 --- a/src/message.c +++ b/src/message.c @@ -31,8 +31,11 @@ int store_index_data(struct session_data *sdata, struct parser_state *state, str if(*subj == ' ') subj++; - if(prepare_sql_statement(sdata, &sql, SQL_PREPARED_STMT_INSERT_INTO_SPHINX_TABLE) == ERR) return rc; - + if(cfg->rtindex){ + if(prepare_sql_statement(sdata, &sql, SQL_PREPARED_STMT_INSERT_INTO_RT_TABLE) == ERR) return rc; + } else { + if(prepare_sql_statement(sdata, &sql, SQL_PREPARED_STMT_INSERT_INTO_SPHINX_TABLE) == ERR) return rc; + } fix_email_address_for_sphinx(state->b_from); fix_email_address_for_sphinx(state->b_sender); diff --git a/src/mysql.c b/src/mysql.c index 83c93fd9..6acbd6e3 100644 --- a/src/mysql.c +++ b/src/mysql.c @@ -33,11 +33,40 @@ int open_database(struct session_data *sdata, struct config *cfg){ } -void close_database(struct session_data *sdata){ +int open_sphx(struct session_data *sdata, struct config *cfg){ + int rc=1; + char buf[BUFLEN]; + + mysql_init(&(sdata->sphx)); + + mysql_options(&(sdata->sphx), MYSQL_OPT_CONNECT_TIMEOUT, (const char*)&cfg->mysql_connect_timeout); + mysql_options(&(sdata->sphx), MYSQL_OPT_RECONNECT, (const char*)&rc); + + if(mysql_real_connect(&(sdata->sphx), cfg->sphxhost, "", "", cfg->sphxdb, cfg->sphxport, "", 0) == 0){ + syslog(LOG_PRIORITY, "cant connect to %s:%d", cfg->sphxhost, cfg->sphxport); + return ERR; + } + + snprintf(buf, sizeof(buf)-2, "SET NAMES %s", cfg->mysqlcharset); + mysql_real_query(&(sdata->sphx), buf, strlen(buf)); + + snprintf(buf, sizeof(buf)-2, "SET CHARACTER SET %s", cfg->mysqlcharset); + mysql_real_query(&(sdata->sphx), buf, strlen(buf)); + + return OK; +} + + +void close_sphx(struct session_data *sdata){ mysql_close(&(sdata->mysql)); } +void close_database(struct session_data *sdata){ + mysql_close(&(sdata->sphx)); +} + + void p_bind_init(struct sql *sql){ int i; @@ -225,6 +254,23 @@ int prepare_sql_statement(struct session_data *sdata, struct sql *sql, char *s){ } +int prepare_sphx_statement(struct session_data *sdata, struct sql *sql, char *s){ + + sql->stmt = mysql_stmt_init(&(sdata->sphx)); + if(!(sql->stmt)){ + syslog(LOG_PRIORITY, "%s: error: mysql_stmt_init()", sdata->ttmpfile); + return ERR; + } + + if(mysql_stmt_prepare(sql->stmt, s, strlen(s))){ + syslog(LOG_PRIORITY, "%s: error: mysql_stmt_prepare() %s => sql: %s", sdata->ttmpfile, mysql_stmt_error(sql->stmt), s); + return ERR; + } + + return OK; +} + + void close_prepared_statement(struct sql *sql){ if(sql->stmt) mysql_stmt_close(sql->stmt); } diff --git a/src/piler.c b/src/piler.c index cfab8825..2824a153 100644 --- a/src/piler.c +++ b/src/piler.c @@ -287,10 +287,19 @@ void child_main(struct child *ptr){ sig_block(SIGHUP); - if(open_database(&sdata, &cfg) == OK){ + int sphxopen = 0; + if(cfg.rtindex && open_sphx(&sdata, &cfg) == OK){ + sphxopen = 1; + } + + if((cfg.rtindex == 0 || sphxopen == 1) && open_database(&sdata, &cfg) == OK){ ptr->messages += process_dir(dir, &sdata, &data, &cfg); close_database(&sdata); + if(cfg.rtindex){ + close_sphx(&sdata); + } + sleep(1); } else { diff --git a/src/pilerimport.c b/src/pilerimport.c index 3af32505..40c2e451 100644 --- a/src/pilerimport.c +++ b/src/pilerimport.c @@ -341,6 +341,7 @@ int main(int argc, char **argv){ if(open_database(&sdata, &cfg) == ERR) return 0; + if(cfg.rtindex && open_sphx(&sdata, &cfg) == ERR) return 0; setlocale(LC_CTYPE, cfg.locale); @@ -392,6 +393,8 @@ int main(int argc, char **argv){ close_database(&sdata); + if(cfg.rtindex) close_sphx(&sdata); + if(data.quiet == 0) printf("\n"); return 0; diff --git a/src/reindex.c b/src/reindex.c index 009f205a..80cd96d9 100644 --- a/src/reindex.c +++ b/src/reindex.c @@ -245,6 +245,11 @@ int main(int argc, char **argv){ p_clean_exit("cannot connect to mysql server", 1); } + if(cfg.rtindex && open_sphx(&sdata, &cfg) == ERR){ + p_clean_exit("cannot connect to 127.0.0.1:9306", 1); + } + + load_rules(&sdata, data.folder_rules, SQL_FOLDER_RULE_TABLE); if(folder){ @@ -272,6 +277,7 @@ int main(int argc, char **argv){ clearhash(data.mydomains); close_database(&sdata); + if(cfg.rtindex) close_sphx(&sdata); return 0; } diff --git a/src/sql.h b/src/sql.h index 9118de31..57dd834e 100644 --- a/src/sql.h +++ b/src/sql.h @@ -7,8 +7,11 @@ int open_database(struct session_data *sdata, struct config *cfg); +int open_sphx(struct session_data *sdata, struct config *cfg); void close_database(struct session_data *sdata); +void close_sphx(struct session_data *sdata); int prepare_sql_statement(struct session_data *sdata, struct sql *sql, char *s); +int prepare_sphx_statement(struct session_data *sdata, struct sql *sql, char *s); void p_query(struct session_data *sdata, char *s); int p_exec_stmt(struct session_data *sdata, struct sql *sql); int p_store_results(struct sql *sql);