123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669 |
- ///////////////////////////////////////////////////////////////////////////////////
- // NodeJS Statsd MySQL Backend 0.1.0-alpha1
- // ------------------------------------------------------------------------------
- //
- // Authors: Nicolas FRADIN, Damien PACAUD
- // Date: 31/10/2012
- //
- ///////////////////////////////////////////////////////////////////////////////////
- var _mysql = require('mysql'),
- util = require('util'),
- fs = require('fs'),
- sequence = require('sequence').Sequence.create();
- var STATSD_PACKETS_RECEIVED = "statsd.packets_received";
- var STATSD_BAD_LINES = "statsd.bad_lines_seen";
- /**
- * Backend Constructor
- *
- * Example config :
- *
- mysql: {
- host: "localhost",
- port: 3306,
- user: "root",
- password: "root",
- database: "statsd_db",
- tables: ["statsd_users", "statsd_statistics"]
- }
- *
- * @param startupTime
- * @param config
- * @param emmiter
- */
- function StatdMySQLBackend(startupTime, config, emitter) {
- var self = this;
- self.config = config.mysql || {};
- self.engines = {
- counters: [],
- gauges: [],
- timers: [],
- sets: []
- };
- // Verifying that the config file contains enough information for this backend to work
- if(!this.config.host || !this.config.database || !this.config.user || !this.config.password) {
- console.log("You need to specify at least host, port, database, user and password for this mysql backend");
- process.exit(-1);
- }
- // Default port for mysql is 3306, if unset in conf file, we set it here to default
- if(!this.config.port) {
- this.config.port = 3306;
- }
- // Set backend path
- for(var backend_index in config.backends) {
- var currentBackend = config.backends[backend_index];
- if(currentBackend.indexOf('mysql-backend.js') > -1) {
- self.config.backendPath = currentBackend.substring(0, currentBackend.lastIndexOf('/')+1);
- }
- }
- //Default tables
- if(!this.config.tables) {
- this.config.tables = {counters: ["counters_statistics"], gauges: ["gauges_statistics"], timers:["timers_statistics"],sets:["sets_statistics"]};
- }
- // Default engines
- if(!self.config.engines) {
- self.config.engines = {
- counters: ["engines/countersEngine.js"],
- gauges: ["engines/gaugesEngine.js"],
- timers: ["engines/timersEngine.js"],
- sets: ["engines/setsEngine.js"]
- };
- }
-
- // Synchronous sequence
- sequence.then(function( next ) {
- // Check if tables exists
- self.checkDatabase(function(err) {
- if(err) {
- console.log('Database check failed ! Exit...');
- process.exit(-1);
- } else {
- console.log('Database is valid.');
- next();
- }
- });
- }).then(function( next ) {
- process.stdout.write('Loading MySQL backend engines...');
- // Load backend engines
- self.loadEngines(function(err) {
- if(err) {
- process.stdout.write("[FAILED]\n");
- console.log(err);
- }
- process.stdout.write("[OK]\n");
- next();
- });
- }).then(function( next ) {
- // Attach events
- emitter.on('flush', function(time_stamp, metrics) { self.onFlush(time_stamp, metrics); } );
- emitter.on('status', self.onStatus );
- console.log("Statsd MySQL backend is loaded.");
- });
-
- }
- /**
- * Load MySQL Backend Query Engines
- *
- */
- StatdMySQLBackend.prototype.loadEngines = function(callback) {
- var self = this;
- // Iterate on each engine type defined in configuration
- for(var engineType in self.config.engines) {
- var typeEngines = self.config.engines[engineType];
- // Load engines for current type
- for(var engineIndex in typeEngines) {
- // Get current engine path
- var enginePath = typeEngines[engineIndex];
- // Load current engine
- var currentEngine = require(self.config.backendPath + enginePath).init();
- if(currentEngine === undefined) {
- callback("Unable to load engine '" + enginePath + "' ! Please check...");
- }
- // Add engine to MySQL Backend engines
- self.engines[engineType].push(currentEngine);
- }
- }
- callback();
- }
- /**
- * Open MySQL connection
- *
- * @return boolean Indicates if connection succeed
- */
- StatdMySQLBackend.prototype.openMySqlConnection = function() {
- var self = this;
- var canExecuteQuerries = true;
- // Create MySQL connection
- self.sqlConnection = _mysql.createConnection(this.config);
- self.sqlConnection.connect(function(error){
- canExecuteQuerries = false;
- });
- return canExecuteQuerries;
- }
- /**
- * Close MySQL connection
- *
- */
- StatdMySQLBackend.prototype.closeMySqlConnection = function() {
- var self = this;
- self.sqlConnection.end(function(error) {
- if(error){
- console.log("There was an error while trying to close DB connection : " + util.inspect(error));
- //Let's make sure that socket is destroyed
- self.sqlConnection.destroy();
- }
- });
- return;
- }
- /**
- * Check if required tables are created. If not create them.
- *
- */
- StatdMySQLBackend.prototype.checkDatabase = function(callback) {
- var self = this;
- console.log("Checking database...");
- var isConnected = self.openMySqlConnection();
- if(!isConnected) {
- console.log("Unable to connect to MySQL database ! Please check...");
- process.exit(-1);
- }
- var tables = self.config.tables
- // Count stats types
- var typesCount = 0;
- for(var statType in tables) { typesCount++; }
- // Iterate on each stat type (counters, gauges, ...)
- var statTypeIndex = 0;
- for(var statType in tables) {
- // Get tables for current stat type
- var typeTables = tables[statType];
- // Count tables for current type
- var tablesCount = 0;
- for(var table_index in typeTables) { tablesCount++; }
- // Check if tables exists for current type
- self.checkIfTablesExists(statTypeIndex, typeTables, tablesCount, 0, function(type_index, err) {
- if(err) {
- callback(err);
- }
- // If all types were parsed, call the callback method
- if(type_index == typesCount-1) {
- callback();
- }
- });
- statTypeIndex++;
- }
- }
- /**
- * Check if a table exists in database. If not, create it.
- */
- StatdMySQLBackend.prototype.checkIfTablesExists = function(type_index, tables_names, size, startIndex, callback) {
- var self = this;
-
- self.sqlConnection.query('show tables like "'+tables_names[startIndex]+'";', function(err, results, fields) {
- if(err) {
- callback(err);
- }
- // If table wasn't found
- if(results.length == 0) {
- console.log("Table '" + tables_names[startIndex] + "' was not found !");
- // Create table
- self.createTable(tables_names[startIndex], function(err) {
- if(err) {
- callback(type_index, err);
- }
- if(startIndex == size - 1) {
- // If all tables were created for this type, call the callback method
- callback(type_index);
- }
- else {
- // Else iterate on the next table to create
- self.checkIfTablesExists(type_index, tables_names, size, startIndex+1, callback);
- }
- });
- }
- // If table was found in database
- else {
- console.log("Table '" + tables_names[startIndex] + "' was found.");
- if(startIndex == size-1){
- // If all tables were created for this type, call the callback method
- callback(type_index);
- }
- else {
- // Else iterate on the next table to create
- self.checkIfTablesExists(type_index, tables_names, size, startIndex+1, callback)
- }
- }
- });
- }
- /**
- * Create a table from corresponding sql script file
- */
- StatdMySQLBackend.prototype.createTable = function(table_name, callback) {
- var self = this;
- // Try to read SQL file for this table
- var sqlFilePath = self.config.backendPath + 'tables/' + table_name + '.sql';
- fs.readFile(sqlFilePath, 'utf8', function (err,data) {
- if (err) {
- console.log("Unable to read file: '" + sqlFilePath + "' !");
- callback(err);
- }
- // Split querries
- var querries = data.split("$$");
- // Prepare querries
- var queuedQuerries = "";
- for(var queryIndex in querries) {
- var query = querries[queryIndex];
- if(query.trim() == "") continue;
- queuedQuerries += query;
- if(queuedQuerries[queuedQuerries.length-1] !== ";") {
- queuedQuerries += ";";
- }
- }
- // Execute querries
- self.sqlConnection.query(queuedQuerries, function(err, results, fields) {
- if(err) {
- console.log("Unable to execute query: '" + query +"' for table '"+table_name+"' !");
- callback(err);
- }
- console.log("Table '" + table_name +"' was created with success.");
- callback();
- });
-
- });
- }
- /**
- * Method executed when statsd flush received datas
- *
- * @param time_stamp
- * @param metrics
- */
- StatdMySQLBackend.prototype.onFlush = function(time_stamp, metrics) {
- var self = this;
- var counters = metrics['counters'];
- var timers = metrics['timers'];
- var gauges = metrics['gauges'];
- var sets = metrics['sets'];
- var pctThreshold = metrics['pctThreshold'];
- //console.log("METRICS : \n " + util.inspect(metrics) + "\n ===========================");
- // Handle statsd counters
- self.handleCounters(counters,time_stamp);
- // Handle statsd gauges
- self.handleGauges(gauges,time_stamp);
-
- // Handle statsd timers
- self.handleTimers(timers,time_stamp);
-
- // Handle stastd sets
- self.handleSets(sets,time_stamp);
- }
- /**
- * Handle and process received counters
- *
- * @param _counters received counters
- * @param time_stamp flush time_stamp
- */
- StatdMySQLBackend.prototype.handleCounters = function(_counters, time_stamp) {
-
- var self = this;
- var packets_received = parseInt(_counters[STATSD_PACKETS_RECEIVED]);
- var bad_lines_seen = parseInt(_counters[STATSD_BAD_LINES]);
- if(packets_received > 0) {
- // Get userCounters for this flush
- var userCounters = self.getUserCounters(_counters);
- var userCountersSize = 0;
- for(var userCounterName in userCounters) { userCountersSize++; }
-
- if(userCountersSize > 0) {
- console.log("Counters received !");
- var querries = [];
- // Open MySQL connection
- var canExecuteQuerries = self.openMySqlConnection();
- if(canExecuteQuerries) {
- //////////////////////////////////////////////////////////////////////
- // Call buildQuerries method on each counterEngine
- for(var countersEngineIndex in self.engines.counters) {
- console.log("countersEngineIndex = " + countersEngineIndex);
- var countersEngine = self.engines.counters[countersEngineIndex];
- // Add current engine querries to querries list
- var engineQuerries = countersEngine.buildQuerries(userCounters, time_stamp);
- querries = querries.concat(engineQuerries);
- // Insert data into database every 100 query
- if(querries.length >= 100) {
- // Execute querries
- self.executeQuerries(querries);
- querries = [];
- }
- }
- if(querries.length > 0) {
- // Execute querries
- self.executeQuerries(querries);
- querries = [];
- }
- }
- // Close MySQL Connection
- self.closeMySqlConnection();
- }
- }
- }
- /**
- * Handle and process received gauges
- *
- * @param _gauges received _gauges
- * @param time_stamp flush time_stamp
- */
- StatdMySQLBackend.prototype.handleGauges = function(_gauges, time_stamp) {
- var self = this;
-
- var gaugesSize = 0
- for(var g in _gauges) { gaugesSize++; }
- // If gauges received
- if(gaugesSize > 0) {
- console.log("Gauges received !");
- console.log("Gauges = " + util.inspect(_gauges));
- var querries = [];
- // Open MySQL connection
- var canExecuteQuerries = self.openMySqlConnection();
- if(canExecuteQuerries) {
- //////////////////////////////////////////////////////////////////////
- // Call buildQuerries method on each counterEngine
- for(var gaugesEngineIndex in self.engines.gauges) {
- console.log("gaugesEngineIndex = " + gaugesEngineIndex);
- var gaugesEngine = self.engines.gauges[gaugesEngineIndex];
- // Add current engine querries to querries list
- var engineQuerries = gaugesEngine.buildQuerries(_gauges, time_stamp);
- querries = querries.concat(engineQuerries);
- // Insert data into database every 100 query
- if(querries.length >= 100) {
- // Execute querries
- self.executeQuerries(querries);
- querries = [];
- }
- }
- if(querries.length > 0) {
- // Execute querries
- self.executeQuerries(querries);
- querries = [];
- }
- } else {
- console.log("Unable to open db connection !");
- }
- // Close MySQL Connection
- self.closeMySqlConnection();
- }
- }
- /**
- * Handle and process received timers
- *
- * @param _timers received timers
- * @param time_stamp flush time_stamp
- */
- StatdMySQLBackend.prototype.handleTimers = function(_timers, time_stamp) {
- var self = this;
-
- var timersSize = 0
- for(var t in _timers) { timersSize++; }
- // If timers received
- if(timersSize > 0) {
- console.log("Timers received !");
- console.log("Timers = " + util.inspect(_timers));
- var querries = [];
- // Open MySQL connection
- var canExecuteQuerries = self.openMySqlConnection();
- if(canExecuteQuerries) {
- //////////////////////////////////////////////////////////////////////
- // Call buildQuerries method on each counterEngine
- for(var timersEngineIndex in self.engines.timers) {
- console.log("timersEngineIndex = " + timersEngineIndex);
- var timersEngine = self.engines.timers[timersEngineIndex];
- // Add current engine querries to querries list
- var engineQuerries = timersEngine.buildQuerries(_timers, time_stamp);
- querries = querries.concat(engineQuerries);
- // Insert data into database every 100 query
- if(querries.length >= 100) {
- // Execute querries
- self.executeQuerries(querries);
- querries = [];
- }
- }
- if(querries.length > 0) {
- // Execute querries
- self.executeQuerries(querries);
- querries = [];
- }
- } else {
- console.log("Unable to open db connection !");
- }
- // Close MySQL Connection
- self.closeMySqlConnection();
- }
- }
- /**
- * Handle and process received sets
- *
- * @param _sets received sets
- * @param time_stamp flush time_stamp
- */
- StatdMySQLBackend.prototype.handleSets = function(_sets, time_stamp) {
- var self = this;
-
- var setsSize = 0
- for(var s in _sets) { setsSize++; }
- // If timers received
- if(setsSize > 0) {
- console.log("sets received !");
- console.log("Sets = " + util.inspect(_sets));
- var querries = [];
- // Open MySQL connection
- var canExecuteQuerries = self.openMySqlConnection();
- if(canExecuteQuerries) {
- //////////////////////////////////////////////////////////////////////
- // Call buildQuerries method on each counterEngine
- for(var setsEngineIndex in self.engines.sets) {
- console.log("setsEngineIndex = " + setsEngineIndex);
- var setsEngine = self.engines.sets[setsEngineIndex];
- // Add current engine querries to querries list
- var engineQuerries = setsEngine.buildQuerries(_sets, time_stamp);
- querries = querries.concat(engineQuerries);
- // Insert data into database every 100 query
- if(querries.length >= 100) {
- // Execute querries
- self.executeQuerries(querries);
- querries = [];
- }
- }
- if(querries.length > 0) {
- // Execute querries
- self.executeQuerries(querries);
- querries = [];
- }
- } else {
- console.log("Unable to open db connection !");
- }
- // Close MySQL Connection
- self.closeMySqlConnection();
- }
- }
- /**
- * MISSING DOCUMENTATION
- *
- * @param sqlQuerries
- */
- StatdMySQLBackend.prototype.executeQuerries = function(sqlQuerries) {
-
- var self = this;
- for(var i = 0 ; i < sqlQuerries.length ; i++){
- console.log("Query " + i + " : " + sqlQuerries[i]);
- self.sqlConnection.query(sqlQuerries[i], function(err, rows) {
- if(!err) {
- console.log(" -> Query [SUCCESS]");
- }
- else {
- //TODO : add better error handling code
- console.log(" -> Query [ERROR]" + err.code);
- }
- });
- }
- }
- /**
- *
- *
- */
- StatdMySQLBackend.prototype.getUserCounters = function(_counters) {
- var userCounters = {};
- for(var counterName in _counters) {
- var counterNameParts = counterName.split('.');
- if(counterNameParts[0] !== "statsd") {
- userCounters[counterName] = _counters[counterName];
- }
- }
- return userCounters;
- }
- /**
- *
- *
- */
- StatdMySQLBackend.prototype.getStatsdCounters = function(_counters) {
- var statsdCounters = {};
- for(var counterName in _counters) {
- var counterNameParts = counterName.split('.');
- if(counterNameParts[0] === "statsd") {
- statsdCounters[counterName] = _counters[counterName];
- }
- }
- return statsdCounters;
- }
- /**
- *
- * @param error
- * @param backend_name
- * @param stat_name
- * @param stat_value
- */
- StatdMySQLBackend.prototype.onStatus = function(error, backend_name, stat_name, stat_value) {
- }
- exports.init = function(startupTime, config, events) {
- var instance = new StatdMySQLBackend(startupTime, config, events);
- return true;
- };
|