Write directly to manticore using rt index

Signed-off-by: Janos SUTO <sj@acts.hu>
This commit is contained in:
Janos SUTO 2022-12-29 06:19:33 +01:00
parent 05c7158ed0
commit 29dc4685da
10 changed files with 86 additions and 5 deletions

View File

@ -84,8 +84,12 @@ struct _parse_rule config_parse_rules[] =
{ "piler_header_field", "string", (void*) string_parser, offsetof(struct config, piler_header_field), "X-piler-id:", MAXVAL-1},
{ "process_rcpt_to_addresses", "integer", (void*) int_parser, offsetof(struct config, process_rcpt_to_addresses), "0", sizeof(int)},
{ "queuedir", "string", (void*) string_parser, offsetof(struct config, queuedir), QUEUE_DIR, MAXVAL-1},
{ "rtindex", "integer", (void*) int_parser, offsetof(struct config, rtindex), "0", sizeof(int)},
{ "security_header", "string", (void*) string_parser, offsetof(struct config, security_header), "", MAXVAL-1},
{ "server_id", "integer", (void*) int_parser, offsetof(struct config, server_id), "0", sizeof(int)},
{ "sphxdb", "string", (void*) string_parser, offsetof(struct config, sphxdb), "piler1", MAXVAL-1},
{ "sphxhost", "string", (void*) string_parser, offsetof(struct config, sphxhost), "127.0.0.1", MAXVAL-1},
{ "sphxport", "integer", (void*) int_parser, offsetof(struct config, sphxport), "9306", sizeof(int)},
{ "smtp_access_list", "integer", (void*) int_parser, offsetof(struct config, smtp_access_list), "0", sizeof(int)},
{ "smtp_timeout", "integer", (void*) int_parser, offsetof(struct config, smtp_timeout), "60", sizeof(int)},
{ "spam_header_line", "string", (void*) string_parser, offsetof(struct config, spam_header_line), "", MAXVAL-1},

View File

@ -80,6 +80,12 @@ struct config {
char mysqldb[MAXVAL];
int mysql_connect_timeout;
// manticore stuff
char sphxhost[MAXVAL];
int sphxport;
char sphxdb[MAXVAL];
int rtindex;
int update_counters_to_memcached;
int memcached_to_db_interval;

View File

@ -114,6 +114,7 @@
#define SQL_PREPARED_STMT_GET_META_ID_BY_MESSAGE_ID "SELECT id, piler_id FROM " SQL_METADATA_TABLE " WHERE message_id=?"
#define SQL_PREPARED_STMT_INSERT_INTO_RCPT_TABLE "INSERT INTO " SQL_RECIPIENT_TABLE " (`id`,`to`,`todomain`) VALUES(?,?,?)"
#define SQL_PREPARED_STMT_INSERT_INTO_SPHINX_TABLE "INSERT INTO " SQL_SPHINX_TABLE " (`id`, `from`, `to`, `fromdomain`, `todomain`, `subject`, `body`, `arrived`, `sent`, `size`, `direction`, `folder`, `attachments`, `attachment_types`) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
#define SQL_PREPARED_STMT_INSERT_INTO_RT_TABLE "INSERT INTO piler1 (id, sender, rcpt, senderdomain, rcptdomain, subject, body, arrived, sent, size, direction, folder, attachments, attachment_types) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
#define SQL_PREPARED_STMT_INSERT_INTO_META_TABLE "INSERT INTO " SQL_METADATA_TABLE " (`from`,`fromdomain`,`subject`,`spam`,`arrived`,`sent`,`retained`,`size`,`hlen`,`direction`,`attachments`,`piler_id`,`message_id`,`reference`,`digest`,`bodydigest`,`vcode`) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
#define SQL_PREPARED_STMT_INSERT_INTO_ATTACHMENT_TABLE "INSERT INTO " SQL_ATTACHMENT_TABLE " (`piler_id`,`attachment_id`,`sig`,`name`,`type`,`size`,`ptr`) VALUES(?,?,?,?,?,?,?)"
#define SQL_PREPARED_STMT_GET_ATTACHMENT_ID_BY_SIGNATURE "SELECT `id` FROM `" SQL_ATTACHMENT_TABLE "` WHERE `sig`=? AND `ptr`=0 AND `size`=?"

View File

@ -253,7 +253,7 @@ struct session_data {
int journal_envelope_length, journal_bottom_length;
unsigned int sql_errno;
#ifdef NEED_MYSQL
MYSQL mysql;
MYSQL mysql, sphx;
#endif
};

View File

@ -31,8 +31,11 @@ int store_index_data(struct session_data *sdata, struct parser_state *state, str
if(*subj == ' ') subj++;
if(cfg->rtindex){
if(prepare_sql_statement(sdata, &sql, SQL_PREPARED_STMT_INSERT_INTO_RT_TABLE) == ERR) return rc;
} else {
if(prepare_sql_statement(sdata, &sql, SQL_PREPARED_STMT_INSERT_INTO_SPHINX_TABLE) == ERR) return rc;
}
fix_email_address_for_sphinx(state->b_from);
fix_email_address_for_sphinx(state->b_sender);

View File

@ -33,11 +33,40 @@ int open_database(struct session_data *sdata, struct config *cfg){
}
void close_database(struct session_data *sdata){
int open_sphx(struct session_data *sdata, struct config *cfg){
int rc=1;
char buf[BUFLEN];
mysql_init(&(sdata->sphx));
mysql_options(&(sdata->sphx), MYSQL_OPT_CONNECT_TIMEOUT, (const char*)&cfg->mysql_connect_timeout);
mysql_options(&(sdata->sphx), MYSQL_OPT_RECONNECT, (const char*)&rc);
if(mysql_real_connect(&(sdata->sphx), cfg->sphxhost, "", "", cfg->sphxdb, cfg->sphxport, "", 0) == 0){
syslog(LOG_PRIORITY, "cant connect to %s:%d", cfg->sphxhost, cfg->sphxport);
return ERR;
}
snprintf(buf, sizeof(buf)-2, "SET NAMES %s", cfg->mysqlcharset);
mysql_real_query(&(sdata->sphx), buf, strlen(buf));
snprintf(buf, sizeof(buf)-2, "SET CHARACTER SET %s", cfg->mysqlcharset);
mysql_real_query(&(sdata->sphx), buf, strlen(buf));
return OK;
}
void close_sphx(struct session_data *sdata){
mysql_close(&(sdata->mysql));
}
void close_database(struct session_data *sdata){
mysql_close(&(sdata->sphx));
}
void p_bind_init(struct sql *sql){
int i;
@ -225,6 +254,23 @@ int prepare_sql_statement(struct session_data *sdata, struct sql *sql, char *s){
}
int prepare_sphx_statement(struct session_data *sdata, struct sql *sql, char *s){
sql->stmt = mysql_stmt_init(&(sdata->sphx));
if(!(sql->stmt)){
syslog(LOG_PRIORITY, "%s: error: mysql_stmt_init()", sdata->ttmpfile);
return ERR;
}
if(mysql_stmt_prepare(sql->stmt, s, strlen(s))){
syslog(LOG_PRIORITY, "%s: error: mysql_stmt_prepare() %s => sql: %s", sdata->ttmpfile, mysql_stmt_error(sql->stmt), s);
return ERR;
}
return OK;
}
void close_prepared_statement(struct sql *sql){
if(sql->stmt) mysql_stmt_close(sql->stmt);
}

View File

@ -287,10 +287,19 @@ void child_main(struct child *ptr){
sig_block(SIGHUP);
if(open_database(&sdata, &cfg) == OK){
int sphxopen = 0;
if(cfg.rtindex && open_sphx(&sdata, &cfg) == OK){
sphxopen = 1;
}
if((cfg.rtindex == 0 || sphxopen == 1) && open_database(&sdata, &cfg) == OK){
ptr->messages += process_dir(dir, &sdata, &data, &cfg);
close_database(&sdata);
if(cfg.rtindex){
close_sphx(&sdata);
}
sleep(1);
}
else {

View File

@ -341,6 +341,7 @@ int main(int argc, char **argv){
if(open_database(&sdata, &cfg) == ERR) return 0;
if(cfg.rtindex && open_sphx(&sdata, &cfg) == ERR) return 0;
setlocale(LC_CTYPE, cfg.locale);
@ -392,6 +393,8 @@ int main(int argc, char **argv){
close_database(&sdata);
if(cfg.rtindex) close_sphx(&sdata);
if(data.quiet == 0) printf("\n");
return 0;

View File

@ -245,6 +245,11 @@ int main(int argc, char **argv){
p_clean_exit("cannot connect to mysql server", 1);
}
if(cfg.rtindex && open_sphx(&sdata, &cfg) == ERR){
p_clean_exit("cannot connect to 127.0.0.1:9306", 1);
}
load_rules(&sdata, data.folder_rules, SQL_FOLDER_RULE_TABLE);
if(folder){
@ -272,6 +277,7 @@ int main(int argc, char **argv){
clearhash(data.mydomains);
close_database(&sdata);
if(cfg.rtindex) close_sphx(&sdata);
return 0;
}

View File

@ -7,8 +7,11 @@
int open_database(struct session_data *sdata, struct config *cfg);
int open_sphx(struct session_data *sdata, struct config *cfg);
void close_database(struct session_data *sdata);
void close_sphx(struct session_data *sdata);
int prepare_sql_statement(struct session_data *sdata, struct sql *sql, char *s);
int prepare_sphx_statement(struct session_data *sdata, struct sql *sql, char *s);
void p_query(struct session_data *sdata, char *s);
int p_exec_stmt(struct session_data *sdata, struct sql *sql);
int p_store_results(struct sql *sql);