mysql-backend.js 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610
  1. ///////////////////////////////////////////////////////////////////////////////////
  2. // NodeJS Statsd MySQL Backend 0.1.0-alpha1
  3. // ------------------------------------------------------------------------------
  4. //
  5. // Authors: Nicolas FRADIN, Damien PACAUD
  6. // Date: 31/10/2012
  7. //
  8. ///////////////////////////////////////////////////////////////////////////////////
  9. var _mysql = require('mysql'),
  10. util = require('util'),
  11. fs = require('fs'),
  12. sequence = require('sequence').create();
  13. var STATSD_PACKETS_RECEIVED = "statsd.packets_received";
  14. var STATSD_BAD_LINES = "statsd.bad_lines_seen";
  15. /**
  16. * Backend Constructor
  17. *
  18. * Example config :
  19. *
  20. mysql: {
  21. host: "localhost",
  22. port: 3306,
  23. user: "root",
  24. password: "root",
  25. database: "statsd_db",
  26. tables: ["statsd_users", "statsd_statistics"]
  27. }
  28. *
  29. * @param startupTime
  30. * @param config
  31. * @param emmiter
  32. */
  33. function StatdMySQLBackend(startupTime, config, emitter) {
  34. var self = this;
  35. self.config = config.mysql || {};
  36. self.engines = {
  37. counters: [],
  38. gauges: [],
  39. timers: [],
  40. sets: []
  41. };
  42. // Verifying that the config file contains enough information for this backend to work
  43. if(!this.config.host || !this.config.database || !this.config.user) {
  44. console.log("You need to specify at least host, port, database and user for this mysql backend");
  45. process.exit(-1);
  46. }
  47. // Default port for mysql is 3306, if unset in conf file, we set it here to default
  48. if(!this.config.port) {
  49. this.config.port = 3306;
  50. }
  51. // Set backend path
  52. for(var backend_index in config.backends) {
  53. var currentBackend = config.backends[backend_index];
  54. if(currentBackend.indexOf('mysql-backend.js') > -1) {
  55. self.config.backendPath = currentBackend.substring(0, currentBackend.lastIndexOf('/')+1);
  56. }
  57. }
  58. //Default tables
  59. if(!this.config.tables) {
  60. this.config.tables = {counters: ["counters_statistics"], gauges: ["gauges_statistics"]};
  61. }
  62. // Default engines
  63. if(!self.config.engines) {
  64. self.config.engines = {
  65. counters: ["engines/countersEngine.js"],
  66. gauges: ["engines/gaugesEngine.js"],
  67. timers: [],
  68. sets: []
  69. };
  70. }
  71. // Synchronous sequence
  72. sequence.then(function( next ) {
  73. // Check if tables exists
  74. self.checkDatabase(function(err) {
  75. if(err) {
  76. console.log('Database check failed ! Exit...');
  77. process.exit(-1);
  78. } else {
  79. console.log('Database is valid.');
  80. next();
  81. }
  82. });
  83. }).then(function( next ) {
  84. process.stdout.write('Loading MySQL backend engines...');
  85. // Load backend engines
  86. self.loadEngines(function(err) {
  87. if(err) {
  88. process.stdout.write("[FAILED]\n");
  89. console.log(err);
  90. }
  91. process.stdout.write("[OK]\n");
  92. next();
  93. });
  94. }).then(function( next ) {
  95. // Attach events
  96. emitter.on('flush', function(time_stamp, metrics) { self.onFlush(time_stamp, metrics); } );
  97. emitter.on('status', self.onStatus );
  98. console.log("Statsd MySQL backend is loaded.");
  99. });
  100. }
  101. /**
  102. * Load MySQL Backend Query Engines
  103. *
  104. */
  105. StatdMySQLBackend.prototype.loadEngines = function(callback) {
  106. var self = this;
  107. // Iterate on each engine type defined in configuration
  108. for(var engineType in self.config.engines) {
  109. var typeEngines = self.config.engines[engineType];
  110. // Load engines for current type
  111. for(var engineIndex in typeEngines) {
  112. // Get current engine path
  113. var enginePath = typeEngines[engineIndex];
  114. // Load current engine
  115. var currentEngine = require(self.config.backendPath + enginePath).init();
  116. if(currentEngine === undefined) {
  117. callback("Unable to load engine '" + enginePath + "' ! Please check...");
  118. }
  119. // Add engine to MySQL Backend engines
  120. self.engines[engineType].push(currentEngine);
  121. }
  122. }
  123. callback();
  124. }
  125. /**
  126. * Open MySQL connection
  127. *
  128. * @return boolean Indicates if connection succeed
  129. */
  130. StatdMySQLBackend.prototype.openMySqlConnection = function() {
  131. var self = this;
  132. var canExecuteQuerries = true;
  133. // Create MySQL connection
  134. self.sqlConnection = _mysql.createConnection(this.config);
  135. self.sqlConnection.connect(function(error){
  136. canExecuteQuerries = false;
  137. });
  138. return canExecuteQuerries;
  139. }
  140. /**
  141. * Close MySQL connection
  142. *
  143. */
  144. StatdMySQLBackend.prototype.closeMySqlConnection = function() {
  145. var self = this;
  146. self.sqlConnection.end(function(error) {
  147. if(error){
  148. console.log("There was an error while trying to close DB connection : " + util.inspect(error));
  149. //Let's make sure that socket is destroyed
  150. self.sqlConnection.destroy();
  151. }
  152. });
  153. return;
  154. }
  155. /**
  156. * Check if required tables are created. If not create them.
  157. *
  158. */
  159. StatdMySQLBackend.prototype.checkDatabase = function(callback) {
  160. var self = this;
  161. console.log("Checking database...");
  162. var isConnected = self.openMySqlConnection();
  163. if(!isConnected) {
  164. console.log("Unable to connect to MySQL database ! Please check...");
  165. process.exit(-1);
  166. }
  167. var tables = self.config.tables
  168. // Count stats types
  169. var typesCount = 0;
  170. for(var statType in tables) { typesCount++; }
  171. // Iterate on each stat type (counters, gauges, ...)
  172. var statTypeIndex = 0;
  173. for(var statType in tables) {
  174. // Get tables for current stat type
  175. var typeTables = tables[statType];
  176. // Count tables for current type
  177. var tablesCount = 0;
  178. for(var table_index in typeTables) { tablesCount++; }
  179. // Check if tables exists for current type
  180. self.checkIfTablesExists(statTypeIndex, typeTables, tablesCount, 0, function(type_index, err) {
  181. if(err) {
  182. callback(err);
  183. }
  184. // If all types were parsed, call the callback method
  185. if(type_index == typesCount-1) {
  186. callback();
  187. }
  188. });
  189. statTypeIndex++;
  190. }
  191. }
  192. /**
  193. * Check if a table exists in database. If not, create it.
  194. */
  195. StatdMySQLBackend.prototype.checkIfTablesExists = function(type_index, tables_names, size, startIndex, callback) {
  196. var self = this;
  197. self.sqlConnection.query('show tables like "'+tables_names[startIndex]+'";', function(err, results, fields) {
  198. if(err) {
  199. callback(err);
  200. }
  201. // If table wasn't found
  202. if(results.length == 0) {
  203. console.log("Table '" + tables_names[startIndex] + "' was not found !");
  204. // Create table
  205. self.createTable(tables_names[startIndex], function(err) {
  206. if(err) {
  207. callback(type_index, err);
  208. }
  209. if(startIndex == size - 1) {
  210. // If all tables were created for this type, call the callback method
  211. callback(type_index);
  212. }
  213. else {
  214. // Else iterate on the next table to create
  215. self.checkIfTablesExists(type_index, tables_names, size, startIndex+1, callback);
  216. }
  217. });
  218. }
  219. // If table was found in database
  220. else {
  221. console.log("Table '" + tables_names[startIndex] + "' was found.");
  222. if(startIndex == size-1){
  223. // If all tables were created for this type, call the callback method
  224. callback(type_index);
  225. }
  226. else {
  227. // Else iterate on the next table to create
  228. self.checkIfTablesExists(type_index, tables_names, size, startIndex+1, callback)
  229. }
  230. }
  231. });
  232. }
  233. /**
  234. * Create a table from corresponding sql script file
  235. */
  236. StatdMySQLBackend.prototype.createTable = function(table_name, callback) {
  237. var self = this;
  238. // Try to read SQL file for this table
  239. var sqlFilePath = self.config.backendPath + 'tables/' + table_name + '.sql';
  240. fs.readFile(sqlFilePath, 'utf8', function (err,data) {
  241. if (err) {
  242. console.log("Unable to read file: '" + sqlFilePath + "' !");
  243. callback(err);
  244. }
  245. // Split querries
  246. var querries = data.split("$$");
  247. // Prepare querries
  248. var queuedQuerries = "";
  249. for(var queryIndex in querries) {
  250. var query = querries[queryIndex];
  251. if(query.trim() == "") continue;
  252. queuedQuerries += query;
  253. if(queuedQuerries[queuedQuerries.length-1] !== ";") {
  254. queuedQuerries += ";";
  255. }
  256. }
  257. // Execute querries
  258. self.sqlConnection.query(queuedQuerries, function(err, results, fields) {
  259. if(err) {
  260. console.log("Unable to execute query: '" + query +"' for table '"+table_name+"' !");
  261. callback(err);
  262. }
  263. console.log("Table '" + table_name +"' was created with success.");
  264. callback();
  265. });
  266. });
  267. }
  268. /**
  269. * Method executed when statsd flush received datas
  270. *
  271. * @param time_stamp
  272. * @param metrics
  273. */
  274. StatdMySQLBackend.prototype.onFlush = function(time_stamp, metrics) {
  275. var self = this;
  276. var counters = metrics['counters'];
  277. var timers = metrics['timers'];
  278. var gauges = metrics['gauges'];
  279. var sets = metrics['sets'];
  280. var pctThreshold = metrics['pctThreshold'];
  281. console.log("METRICS : \n " + util.inspect(metrics) + "\n ===========================");
  282. // Handle statsd counters
  283. self.handleCounters(counters,time_stamp);
  284. // Handle statsd gauges
  285. self.handleGauges(gauges,time_stamp);
  286. }
  287. /**
  288. * Handle and process received counters
  289. *
  290. * @param _counters received counters
  291. * @param time_stamp flush time_stamp
  292. */
  293. StatdMySQLBackend.prototype.handleCounters = function(_counters, time_stamp) {
  294. var self = this;
  295. var packets_received = parseInt(_counters[STATSD_PACKETS_RECEIVED]);
  296. var bad_lines_seen = parseInt(_counters[STATSD_BAD_LINES]);
  297. if(packets_received > 0) {
  298. // Get userCounters for this flush
  299. var userCounters = self.getUserCounters(_counters);
  300. var userCountersSize = 0;
  301. for(var userCounterName in userCounters) { userCountersSize++; }
  302. if(userCountersSize > 0) {
  303. console.log("Counters received !");
  304. var querries = [];
  305. // Open MySQL connection
  306. var canExecuteQuerries = self.openMySqlConnection();
  307. if(canExecuteQuerries) {
  308. //////////////////////////////////////////////////////////////////////
  309. // Call buildQuerries method on each counterEngine
  310. for(var countersEngineIndex in self.engines.counters) {
  311. console.log("countersEngineIndex = " + countersEngineIndex);
  312. var countersEngine = self.engines.counters[countersEngineIndex];
  313. // Add current engine querries to querries list
  314. var engineQuerries = countersEngine.buildQuerries(userCounters, time_stamp);
  315. querries = querries.concat(engineQuerries);
  316. // Insert data into database every 100 query
  317. if(querries.length >= 100) {
  318. // Execute querries
  319. self.executeQuerries(querries);
  320. querries = [];
  321. }
  322. }
  323. if(querries.length > 0) {
  324. // Execute querries
  325. self.executeQuerries(querries);
  326. querries = [];
  327. }
  328. }
  329. // Close MySQL Connection
  330. self.closeMySqlConnection();
  331. }
  332. }
  333. }
  334. /**
  335. * Handle and process received gauges
  336. *
  337. * @param _gauges received _gauges
  338. * @param time_stamp flush time_stamp
  339. */
  340. StatdMySQLBackend.prototype.handleGauges = function(_gauges, time_stamp) {
  341. var self = this;
  342. var gaugesSize = 0
  343. for(var g in _gauges) { gaugesSize++; }
  344. // If gauges received
  345. if(gaugesSize > 0) {
  346. console.log("Gauges received !");
  347. console.log("Gauges = " + util.inspect(_gauges));
  348. var querries = [];
  349. // Open MySQL connection
  350. var canExecuteQuerries = self.openMySqlConnection();
  351. if(canExecuteQuerries) {
  352. console.log("ok");
  353. //////////////////////////////////////////////////////////////////////
  354. // Call buildQuerries method on each counterEngine
  355. for(var gaugesEngineIndex in self.engines.gauges) {
  356. console.log("gaugesEngineIndex = " + gaugesEngineIndex);
  357. var gaugesEngine = self.engines.gauges[gaugesEngineIndex];
  358. // Add current engine querries to querries list
  359. var engineQuerries = gaugesEngine.buildQuerries(_gauges, time_stamp);
  360. querries = querries.concat(engineQuerries);
  361. // Insert data into database every 100 query
  362. if(querries.length >= 100) {
  363. // Execute querries
  364. self.executeQuerries(querries);
  365. querries = [];
  366. }
  367. }
  368. if(querries.length > 0) {
  369. // Execute querries
  370. self.executeQuerries(querries);
  371. querries = [];
  372. }
  373. } else {
  374. console.log("Unable to open db connection !");
  375. }
  376. // Close MySQL Connection
  377. self.closeMySqlConnection();
  378. }
  379. }
  380. /**
  381. * Handle and process received timers
  382. *
  383. * @param _timers received timers
  384. * @param time_stamp flush time_stamp
  385. */
  386. StatdMySQLBackend.prototype.handleTimers = function(_timers, time_stamp) {
  387. var self = this;
  388. var timersSize = 0
  389. for(var t in _timers) { timersSize++; }
  390. // If gauges received
  391. if(timersSize > 0) {
  392. console.log("Timers received !");
  393. console.log("Timers = " + util.inspect(_gauges));
  394. var querries = [];
  395. // Open MySQL connection
  396. var canExecuteQuerries = self.openMySqlConnection();
  397. if(canExecuteQuerries) {
  398. console.log("ok");
  399. //////////////////////////////////////////////////////////////////////
  400. // Call buildQuerries method on each counterEngine
  401. for(var gaugesEngineIndex in self.engines.gauges) {
  402. console.log("gaugesEngineIndex = " + gaugesEngineIndex);
  403. var gaugesEngine = self.engines.gauges[gaugesEngineIndex];
  404. // Add current engine querries to querries list
  405. var engineQuerries = gaugesEngine.buildQuerries(_gauges, time_stamp);
  406. querries = querries.concat(engineQuerries);
  407. // Insert data into database every 100 query
  408. if(querries.length >= 100) {
  409. // Execute querries
  410. self.executeQuerries(querries);
  411. querries = [];
  412. }
  413. }
  414. if(querries.length > 0) {
  415. // Execute querries
  416. self.executeQuerries(querries);
  417. querries = [];
  418. }
  419. } else {
  420. console.log("Unable to open db connection !");
  421. }
  422. // Close MySQL Connection
  423. self.closeMySqlConnection();
  424. }
  425. }
  426. /**
  427. * MISSING DOCUMENTATION
  428. *
  429. * @param sqlQuerries
  430. */
  431. StatdMySQLBackend.prototype.executeQuerries = function(sqlQuerries) {
  432. var self = this;
  433. for(var i = 0 ; i < sqlQuerries.length ; i++){
  434. console.log("Query " + i + " : " + sqlQuerries[i]);
  435. self.sqlConnection.query(sqlQuerries[i], function(err, rows) {
  436. if(!err) {
  437. console.log(" -> Query [SUCCESS]");
  438. }
  439. else {
  440. //TODO : add better error handling code
  441. console.log(" -> Query [ERROR]");
  442. }
  443. });
  444. }
  445. }
  446. /**
  447. *
  448. *
  449. */
  450. StatdMySQLBackend.prototype.getUserCounters = function(_counters) {
  451. var userCounters = {};
  452. for(var counterName in _counters) {
  453. var counterNameParts = counterName.split('.');
  454. if(counterNameParts[0] !== "statsd") {
  455. userCounters[counterName] = _counters[counterName];
  456. }
  457. }
  458. return userCounters;
  459. }
  460. /**
  461. *
  462. *
  463. */
  464. StatdMySQLBackend.prototype.getStatsdCounters = function(_counters) {
  465. var statsdCounters = {};
  466. for(var counterName in _counters) {
  467. var counterNameParts = counterName.split('.');
  468. if(counterNameParts[0] === "statsd") {
  469. statsdCounters[counterName] = _counters[counterName];
  470. }
  471. }
  472. return statsdCounters;
  473. }
  474. /**
  475. *
  476. * @param error
  477. * @param backend_name
  478. * @param stat_name
  479. * @param stat_value
  480. */
  481. StatdMySQLBackend.prototype.onStatus = function(error, backend_name, stat_name, stat_value) {
  482. }
  483. exports.init = function(startupTime, config, events) {
  484. var instance = new StatdMySQLBackend(startupTime, config, events);
  485. return true;
  486. };