/////////////////////////////////////////////////////////////////////////////////// // NodeJS Statsd MySQL Backend 1.0 // ------------------------------------------------------------------------------ // // Authors: Nicolas FRADIN, Damien PACAUD // Date: 31/10/2012 // /////////////////////////////////////////////////////////////////////////////////// var _mysql = require('mysql'), util = require('util'), fs = require('fs'); var STATSD_PACKETS_RECEIVED = "statsd.packets_received"; var STATSD_BAD_LINES = "statsd.bad_lines_seen"; /** * Backend Constructor * * Example config : * mysql: { host: "localhost", port: 3306, user: "root", password: "root", database: "statsd_db", tables: ["statsd_users", "statsd_statistics"] } * * @param startupTime * @param config * @param emmiter */ function StatdMySQLBackend(startupTime, config, emitter) { var self = this; self.config = config.mysql || {}; self.engines = {}; // Verifying that the config file contains enough information for this backend to work if(!this.config.host || !this.config.database || !this.config.user) { console.log("You need to specify at least host, port, database and user for this mysql backend"); process.exit(-1); } // Default port for mysql is 3306, if unset in conf file, we set it here to default if(!this.config.port) { this.config.port = 3306; } // Set backend path for(var backend_index in config.backends) { var currentBackend = config.backends[backend_index]; if(currentBackend.indexOf('mysql-backend.js') > -1) { self.config.backendPath = currentBackend.substring(0, currentBackend.lastIndexOf('/')+1); } } //Default tables if(!this.config.tables) { this.config.tables = ["statistics"]; } // Default engines if(!self.config.engines) { self.config.engines = {}; self.config.engines.countersEngine = "./countersEngine.js"; } // Check if tables exists self.checkDatabase(); // Load backend engines self.loadEngines(); // Attach events emitter.on('flush', function(time_stamp, metrics) { self.onFlush(time_stamp, metrics); } ); emitter.on('status', self.onStatus ); } /** * * */ StatdMySQLBackend.prototype.loadEngines = function() { var self = this; // Load counters engine self.engines.countersEngine = require(self.config.engines.countersEngine).init(); if(self.engines.countersEngine === undefined) { console.log("Unable to load counter engine ! Please check..."); process.exit(-1); } } /** * Open MySQL connection * * @return boolean Indicates if connection succeed */ StatdMySQLBackend.prototype.openMySqlConnection = function() { var self = this; var canExecuteQuerries = true; // Create MySQL connection self.sqlConnection = _mysql.createConnection(this.config); self.sqlConnection.connect(function(error){ canExecuteQuerries = false; }); return canExecuteQuerries; } /** * Close MySQL connection * */ StatdMySQLBackend.prototype.closeMySqlConnection = function() { var self = this; self.sqlConnection.end(function(error) { if(error){ console.log("There was an error while trying to close DB connection : " + util.inspect(error)); //Let's make sure that socket is destroyed self.sqlConnection.destroy(); } }); return; } /** * * */ StatdMySQLBackend.prototype.checkDatabase = function(callback) { var self = this; var isConnected = self.openMySqlConnection(); if(!isConnected) { console.log("Unable to connect to MySQL database ! Please check..."); process.exit(-1); } // Check if tables exists for(var table_index in self.config.tables) { var table_name = self.config.tables[table_index]; console.log("Check if table exists : '" + table_name + "'"); self.sqlConnection.query('show tables like "'+table_name+'";', function(err, results, fields) { if(err) { console.log("Unbale to execute query !"); process.exit(-1); } // If table doesn't exists if(results.length > 0) { console.log("Table '" + table_name + "' was found !"); if(table_index == self.config.tables.length-1) { console.log("-- MySQL Backend is ready !"); } } else { console.log("Table '" + table_name + "' was not found !"); // Try to read SQL file for this table var sqlFilePath = self.config.backendPath + 'tables/' + table_name + '.sql'; fs.readFile(sqlFilePath, 'utf8', function (err,data) { if (err) { console.log("Unable to read file: '" + sqlFilePath + "' ! Exit..."); process.exit(-1); } self.sqlConnection.query(data, function(err, results, fields) { if(err) { console.log("Unable to create table: '" + table_name +"' ! Exit..."); process.exit(-1); } console.log("Table '" + table_name +"' was created with success."); if(table_index == self.config.tables.length-1) { console.log("-- MySQL Backend is ready !"); } }); }); } }); } } /** * Method executed when statsd flush received datas * * @param time_stamp * @param metrics */ StatdMySQLBackend.prototype.onFlush = function(time_stamp, metrics) { var self = this; var counters = metrics['counters']; var timers = metrics['timers']; var gauges = metrics['gauges']; var sets = metrics['sets']; var pctThreshold = metrics['pctThreshold']; self.handleCounters(counters,time_stamp); } /** * Handle and process received counters * * @param _counters received counters * @param time_stamp flush time_stamp */ StatdMySQLBackend.prototype.handleCounters = function(_counters, time_stamp) { var self = this; var packets_received = parseInt(_counters[STATSD_PACKETS_RECEIVED]); var bad_lines_seen = parseInt(_counters[STATSD_BAD_LINES]); if(packets_received > 0) { // Get userCounters for this flush var userCounters = self.getUserCounters(_counters); var querries = []; console.log("Preaparing querries..."); // Call countersEngine's buildQuerries method querries = self.engines.countersEngine.buildQuerries(userCounters, time_stamp); var querriesCount = querries.length; console.log("Querries count : " + querriesCount ); // If at least one querry can be executed if(querriesCount > 0) { // Open MySQL connection var canExecuteQuerries = self.openMySqlConnection(); // If connection succeed if(canExecuteQuerries) { console.log("Executing " + querriesCount + " querries..."); // Execute querries self.executeQuerries(querries); // Close MySQL connection self.closeMySqlConnection(); } } } else { console.log("No user packets received."); } return; } StatdMySQLBackend.prototype.executeQuerries = function(sqlQuerries) { var self = this; for(var i = 0 ; i < sqlQuerries.length ; i++){ console.log("Query " + i + " : " + sqlQuerries[i]); self.sqlConnection.query(sqlQuerries[i], function(err, rows) { if(!err) { console.log(" -> Query [SUCCESS]"); } else { //TODO : add better error handling code console.log(" -> Query [ERROR]"); } }); } } /** * * */ StatdMySQLBackend.prototype.getUserCounters = function(_counters) { var userCounters = {}; for(var counterName in _counters) { var counterNameParts = counterName.split('.'); if(counterNameParts[0] !== "statsd") { userCounters[counterName] = _counters[counterName]; } } return userCounters; } /** * * */ StatdMySQLBackend.prototype.getStatsdCounters = function(_counters) { var statsdCounters = {}; for(var counterName in _counters) { var counterNameParts = counterName.split('.'); if(counterNameParts[0] === "statsd") { statsdCounters[counterName] = _counters[counterName]; } } return statsdCounters; } /** * * @param error * @param backend_name * @param stat_name * @param stat_value */ StatdMySQLBackend.prototype.onStatus = function(error, backend_name, stat_name, stat_value) { } exports.init = function(startupTime, config, events) { var instance = new StatdMySQLBackend(startupTime, config, events); return true; }; /* * Backend example : repeater.js * var util = require('util'), dgram = require('dgram'); function RepeaterBackend(startupTime, config, emitter){ var self = this; this.config = config.repeater || []; this.sock = dgram.createSocket('udp6'); // attach emitter.on('packet', function(packet, rinfo) { self.process(packet, rinfo); }); }; RepeaterBackend.prototype.process = function(packet, rinfo) { var self = this; hosts = self.config; for(var i=0; i