mysql-backend.js 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669
  1. ///////////////////////////////////////////////////////////////////////////////////
  2. // NodeJS Statsd MySQL Backend 0.1.0-alpha1
  3. // ------------------------------------------------------------------------------
  4. //
  5. // Authors: Nicolas FRADIN, Damien PACAUD
  6. // Date: 31/10/2012
  7. //
  8. ///////////////////////////////////////////////////////////////////////////////////
  9. var _mysql = require('mysql'),
  10. util = require('util'),
  11. fs = require('fs'),
  12. sequence = require('sequence').Sequence.create();
  13. var STATSD_PACKETS_RECEIVED = "statsd.packets_received";
  14. var STATSD_BAD_LINES = "statsd.bad_lines_seen";
  15. /**
  16. * Backend Constructor
  17. *
  18. * Example config :
  19. *
  20. mysql: {
  21. host: "localhost",
  22. port: 3306,
  23. user: "root",
  24. password: "root",
  25. database: "statsd_db",
  26. tables: ["statsd_users", "statsd_statistics"]
  27. }
  28. *
  29. * @param startupTime
  30. * @param config
  31. * @param emmiter
  32. */
  33. function StatdMySQLBackend(startupTime, config, emitter) {
  34. var self = this;
  35. self.config = config.mysql || {};
  36. self.engines = {
  37. counters: [],
  38. gauges: [],
  39. timers: [],
  40. sets: []
  41. };
  42. // Verifying that the config file contains enough information for this backend to work
  43. if(!this.config.host || !this.config.database || !this.config.user || !this.config.password) {
  44. console.log("You need to specify at least host, port, database, user and password for this mysql backend");
  45. process.exit(-1);
  46. }
  47. // Default port for mysql is 3306, if unset in conf file, we set it here to default
  48. if(!this.config.port) {
  49. this.config.port = 3306;
  50. }
  51. // Set backend path
  52. for(var backend_index in config.backends) {
  53. var currentBackend = config.backends[backend_index];
  54. if(currentBackend.indexOf('mysql-backend.js') > -1) {
  55. self.config.backendPath = currentBackend.substring(0, currentBackend.lastIndexOf('/')+1);
  56. }
  57. }
  58. //Default tables
  59. if(!this.config.tables) {
  60. this.config.tables = {counters: ["counters_statistics"], gauges: ["gauges_statistics"], timers:["timers_statistics"],sets:["sets_statistics"]};
  61. }
  62. // Default engines
  63. if(!self.config.engines) {
  64. self.config.engines = {
  65. counters: ["engines/countersEngine.js"],
  66. gauges: ["engines/gaugesEngine.js"],
  67. timers: ["engines/timersEngine.js"],
  68. sets: ["engines/setsEngine.js"]
  69. };
  70. }
  71. // Synchronous sequence
  72. sequence.then(function( next ) {
  73. // Check if tables exists
  74. self.checkDatabase(function(err) {
  75. if(err) {
  76. console.log('Database check failed ! Exit...');
  77. process.exit(-1);
  78. } else {
  79. console.log('Database is valid.');
  80. next();
  81. }
  82. });
  83. }).then(function( next ) {
  84. process.stdout.write('Loading MySQL backend engines...');
  85. // Load backend engines
  86. self.loadEngines(function(err) {
  87. if(err) {
  88. process.stdout.write("[FAILED]\n");
  89. console.log(err);
  90. }
  91. process.stdout.write("[OK]\n");
  92. next();
  93. });
  94. }).then(function( next ) {
  95. // Attach events
  96. emitter.on('flush', function(time_stamp, metrics) { self.onFlush(time_stamp, metrics); } );
  97. emitter.on('status', self.onStatus );
  98. console.log("Statsd MySQL backend is loaded.");
  99. });
  100. }
  101. /**
  102. * Load MySQL Backend Query Engines
  103. *
  104. */
  105. StatdMySQLBackend.prototype.loadEngines = function(callback) {
  106. var self = this;
  107. // Iterate on each engine type defined in configuration
  108. for(var engineType in self.config.engines) {
  109. var typeEngines = self.config.engines[engineType];
  110. // Load engines for current type
  111. for(var engineIndex in typeEngines) {
  112. // Get current engine path
  113. var enginePath = typeEngines[engineIndex];
  114. // Load current engine
  115. var currentEngine = require(self.config.backendPath + enginePath).init();
  116. if(currentEngine === undefined) {
  117. callback("Unable to load engine '" + enginePath + "' ! Please check...");
  118. }
  119. // Add engine to MySQL Backend engines
  120. self.engines[engineType].push(currentEngine);
  121. }
  122. }
  123. callback();
  124. }
  125. /**
  126. * Open MySQL connection
  127. *
  128. * @return boolean Indicates if connection succeed
  129. */
  130. StatdMySQLBackend.prototype.openMySqlConnection = function() {
  131. var self = this;
  132. var canExecuteQuerries = true;
  133. // Create MySQL connection
  134. self.sqlConnection = _mysql.createConnection(this.config);
  135. self.sqlConnection.connect(function(error){
  136. canExecuteQuerries = false;
  137. });
  138. return canExecuteQuerries;
  139. }
  140. /**
  141. * Close MySQL connection
  142. *
  143. */
  144. StatdMySQLBackend.prototype.closeMySqlConnection = function() {
  145. var self = this;
  146. self.sqlConnection.end(function(error) {
  147. if(error){
  148. console.log("There was an error while trying to close DB connection : " + util.inspect(error));
  149. //Let's make sure that socket is destroyed
  150. self.sqlConnection.destroy();
  151. }
  152. });
  153. return;
  154. }
  155. /**
  156. * Check if required tables are created. If not create them.
  157. *
  158. */
  159. StatdMySQLBackend.prototype.checkDatabase = function(callback) {
  160. var self = this;
  161. console.log("Checking database...");
  162. var isConnected = self.openMySqlConnection();
  163. if(!isConnected) {
  164. console.log("Unable to connect to MySQL database ! Please check...");
  165. process.exit(-1);
  166. }
  167. var tables = self.config.tables
  168. // Count stats types
  169. var typesCount = 0;
  170. for(var statType in tables) { typesCount++; }
  171. // Iterate on each stat type (counters, gauges, ...)
  172. var statTypeIndex = 0;
  173. for(var statType in tables) {
  174. // Get tables for current stat type
  175. var typeTables = tables[statType];
  176. // Count tables for current type
  177. var tablesCount = 0;
  178. for(var table_index in typeTables) { tablesCount++; }
  179. // Check if tables exists for current type
  180. self.checkIfTablesExists(statTypeIndex, typeTables, tablesCount, 0, function(type_index, err) {
  181. if(err) {
  182. callback(err);
  183. }
  184. // If all types were parsed, call the callback method
  185. if(type_index == typesCount-1) {
  186. callback();
  187. }
  188. });
  189. statTypeIndex++;
  190. }
  191. }
  192. /**
  193. * Check if a table exists in database. If not, create it.
  194. */
  195. StatdMySQLBackend.prototype.checkIfTablesExists = function(type_index, tables_names, size, startIndex, callback) {
  196. var self = this;
  197. self.sqlConnection.query('show tables like "'+tables_names[startIndex]+'";', function(err, results, fields) {
  198. if(err) {
  199. callback(err);
  200. }
  201. // If table wasn't found
  202. if(results.length == 0) {
  203. console.log("Table '" + tables_names[startIndex] + "' was not found !");
  204. // Create table
  205. self.createTable(tables_names[startIndex], function(err) {
  206. if(err) {
  207. callback(type_index, err);
  208. }
  209. if(startIndex == size - 1) {
  210. // If all tables were created for this type, call the callback method
  211. callback(type_index);
  212. }
  213. else {
  214. // Else iterate on the next table to create
  215. self.checkIfTablesExists(type_index, tables_names, size, startIndex+1, callback);
  216. }
  217. });
  218. }
  219. // If table was found in database
  220. else {
  221. console.log("Table '" + tables_names[startIndex] + "' was found.");
  222. if(startIndex == size-1){
  223. // If all tables were created for this type, call the callback method
  224. callback(type_index);
  225. }
  226. else {
  227. // Else iterate on the next table to create
  228. self.checkIfTablesExists(type_index, tables_names, size, startIndex+1, callback)
  229. }
  230. }
  231. });
  232. }
  233. /**
  234. * Create a table from corresponding sql script file
  235. */
  236. StatdMySQLBackend.prototype.createTable = function(table_name, callback) {
  237. var self = this;
  238. // Try to read SQL file for this table
  239. var sqlFilePath = self.config.backendPath + 'tables/' + table_name + '.sql';
  240. fs.readFile(sqlFilePath, 'utf8', function (err,data) {
  241. if (err) {
  242. console.log("Unable to read file: '" + sqlFilePath + "' !");
  243. callback(err);
  244. }
  245. // Split querries
  246. var querries = data.split("$$");
  247. // Prepare querries
  248. var queuedQuerries = "";
  249. for(var queryIndex in querries) {
  250. var query = querries[queryIndex];
  251. if(query.trim() == "") continue;
  252. queuedQuerries += query;
  253. if(queuedQuerries[queuedQuerries.length-1] !== ";") {
  254. queuedQuerries += ";";
  255. }
  256. }
  257. // Execute querries
  258. self.sqlConnection.query(queuedQuerries, function(err, results, fields) {
  259. if(err) {
  260. console.log("Unable to execute query: '" + query +"' for table '"+table_name+"' !");
  261. callback(err);
  262. }
  263. console.log("Table '" + table_name +"' was created with success.");
  264. callback();
  265. });
  266. });
  267. }
  268. /**
  269. * Method executed when statsd flush received datas
  270. *
  271. * @param time_stamp
  272. * @param metrics
  273. */
  274. StatdMySQLBackend.prototype.onFlush = function(time_stamp, metrics) {
  275. var self = this;
  276. var counters = metrics['counters'];
  277. var timers = metrics['timers'];
  278. var gauges = metrics['gauges'];
  279. var sets = metrics['sets'];
  280. var pctThreshold = metrics['pctThreshold'];
  281. //console.log("METRICS : \n " + util.inspect(metrics) + "\n ===========================");
  282. // Handle statsd counters
  283. self.handleCounters(counters,time_stamp);
  284. // Handle statsd gauges
  285. self.handleGauges(gauges,time_stamp);
  286. // Handle statsd timers
  287. self.handleTimers(timers,time_stamp);
  288. // Handle stastd sets
  289. self.handleSets(sets,time_stamp);
  290. }
  291. /**
  292. * Handle and process received counters
  293. *
  294. * @param _counters received counters
  295. * @param time_stamp flush time_stamp
  296. */
  297. StatdMySQLBackend.prototype.handleCounters = function(_counters, time_stamp) {
  298. var self = this;
  299. var packets_received = parseInt(_counters[STATSD_PACKETS_RECEIVED]);
  300. var bad_lines_seen = parseInt(_counters[STATSD_BAD_LINES]);
  301. if(packets_received > 0) {
  302. // Get userCounters for this flush
  303. var userCounters = self.getUserCounters(_counters);
  304. var userCountersSize = 0;
  305. for(var userCounterName in userCounters) { userCountersSize++; }
  306. if(userCountersSize > 0) {
  307. console.log("Counters received !");
  308. var querries = [];
  309. // Open MySQL connection
  310. var canExecuteQuerries = self.openMySqlConnection();
  311. if(canExecuteQuerries) {
  312. //////////////////////////////////////////////////////////////////////
  313. // Call buildQuerries method on each counterEngine
  314. for(var countersEngineIndex in self.engines.counters) {
  315. console.log("countersEngineIndex = " + countersEngineIndex);
  316. var countersEngine = self.engines.counters[countersEngineIndex];
  317. // Add current engine querries to querries list
  318. var engineQuerries = countersEngine.buildQuerries(userCounters, time_stamp);
  319. querries = querries.concat(engineQuerries);
  320. // Insert data into database every 100 query
  321. if(querries.length >= 100) {
  322. // Execute querries
  323. self.executeQuerries(querries);
  324. querries = [];
  325. }
  326. }
  327. if(querries.length > 0) {
  328. // Execute querries
  329. self.executeQuerries(querries);
  330. querries = [];
  331. }
  332. }
  333. // Close MySQL Connection
  334. self.closeMySqlConnection();
  335. }
  336. }
  337. }
  338. /**
  339. * Handle and process received gauges
  340. *
  341. * @param _gauges received _gauges
  342. * @param time_stamp flush time_stamp
  343. */
  344. StatdMySQLBackend.prototype.handleGauges = function(_gauges, time_stamp) {
  345. var self = this;
  346. var gaugesSize = 0
  347. for(var g in _gauges) { gaugesSize++; }
  348. // If gauges received
  349. if(gaugesSize > 0) {
  350. console.log("Gauges received !");
  351. console.log("Gauges = " + util.inspect(_gauges));
  352. var querries = [];
  353. // Open MySQL connection
  354. var canExecuteQuerries = self.openMySqlConnection();
  355. if(canExecuteQuerries) {
  356. //////////////////////////////////////////////////////////////////////
  357. // Call buildQuerries method on each counterEngine
  358. for(var gaugesEngineIndex in self.engines.gauges) {
  359. console.log("gaugesEngineIndex = " + gaugesEngineIndex);
  360. var gaugesEngine = self.engines.gauges[gaugesEngineIndex];
  361. // Add current engine querries to querries list
  362. var engineQuerries = gaugesEngine.buildQuerries(_gauges, time_stamp);
  363. querries = querries.concat(engineQuerries);
  364. // Insert data into database every 100 query
  365. if(querries.length >= 100) {
  366. // Execute querries
  367. self.executeQuerries(querries);
  368. querries = [];
  369. }
  370. }
  371. if(querries.length > 0) {
  372. // Execute querries
  373. self.executeQuerries(querries);
  374. querries = [];
  375. }
  376. } else {
  377. console.log("Unable to open db connection !");
  378. }
  379. // Close MySQL Connection
  380. self.closeMySqlConnection();
  381. }
  382. }
  383. /**
  384. * Handle and process received timers
  385. *
  386. * @param _timers received timers
  387. * @param time_stamp flush time_stamp
  388. */
  389. StatdMySQLBackend.prototype.handleTimers = function(_timers, time_stamp) {
  390. var self = this;
  391. var timersSize = 0
  392. for(var t in _timers) { timersSize++; }
  393. // If timers received
  394. if(timersSize > 0) {
  395. console.log("Timers received !");
  396. console.log("Timers = " + util.inspect(_timers));
  397. var querries = [];
  398. // Open MySQL connection
  399. var canExecuteQuerries = self.openMySqlConnection();
  400. if(canExecuteQuerries) {
  401. //////////////////////////////////////////////////////////////////////
  402. // Call buildQuerries method on each counterEngine
  403. for(var timersEngineIndex in self.engines.timers) {
  404. console.log("timersEngineIndex = " + timersEngineIndex);
  405. var timersEngine = self.engines.timers[timersEngineIndex];
  406. // Add current engine querries to querries list
  407. var engineQuerries = timersEngine.buildQuerries(_timers, time_stamp);
  408. querries = querries.concat(engineQuerries);
  409. // Insert data into database every 100 query
  410. if(querries.length >= 100) {
  411. // Execute querries
  412. self.executeQuerries(querries);
  413. querries = [];
  414. }
  415. }
  416. if(querries.length > 0) {
  417. // Execute querries
  418. self.executeQuerries(querries);
  419. querries = [];
  420. }
  421. } else {
  422. console.log("Unable to open db connection !");
  423. }
  424. // Close MySQL Connection
  425. self.closeMySqlConnection();
  426. }
  427. }
  428. /**
  429. * Handle and process received sets
  430. *
  431. * @param _sets received sets
  432. * @param time_stamp flush time_stamp
  433. */
  434. StatdMySQLBackend.prototype.handleSets = function(_sets, time_stamp) {
  435. var self = this;
  436. var setsSize = 0
  437. for(var s in _sets) { setsSize++; }
  438. // If timers received
  439. if(setsSize > 0) {
  440. console.log("sets received !");
  441. console.log("Sets = " + util.inspect(_sets));
  442. var querries = [];
  443. // Open MySQL connection
  444. var canExecuteQuerries = self.openMySqlConnection();
  445. if(canExecuteQuerries) {
  446. //////////////////////////////////////////////////////////////////////
  447. // Call buildQuerries method on each counterEngine
  448. for(var setsEngineIndex in self.engines.sets) {
  449. console.log("setsEngineIndex = " + setsEngineIndex);
  450. var setsEngine = self.engines.sets[setsEngineIndex];
  451. // Add current engine querries to querries list
  452. var engineQuerries = setsEngine.buildQuerries(_sets, time_stamp);
  453. querries = querries.concat(engineQuerries);
  454. // Insert data into database every 100 query
  455. if(querries.length >= 100) {
  456. // Execute querries
  457. self.executeQuerries(querries);
  458. querries = [];
  459. }
  460. }
  461. if(querries.length > 0) {
  462. // Execute querries
  463. self.executeQuerries(querries);
  464. querries = [];
  465. }
  466. } else {
  467. console.log("Unable to open db connection !");
  468. }
  469. // Close MySQL Connection
  470. self.closeMySqlConnection();
  471. }
  472. }
  473. /**
  474. * MISSING DOCUMENTATION
  475. *
  476. * @param sqlQuerries
  477. */
  478. StatdMySQLBackend.prototype.executeQuerries = function(sqlQuerries) {
  479. var self = this;
  480. for(var i = 0 ; i < sqlQuerries.length ; i++){
  481. console.log("Query " + i + " : " + sqlQuerries[i]);
  482. self.sqlConnection.query(sqlQuerries[i], function(err, rows) {
  483. if(!err) {
  484. console.log(" -> Query [SUCCESS]");
  485. }
  486. else {
  487. //TODO : add better error handling code
  488. console.log(" -> Query [ERROR]" + err.code);
  489. }
  490. });
  491. }
  492. }
  493. /**
  494. *
  495. *
  496. */
  497. StatdMySQLBackend.prototype.getUserCounters = function(_counters) {
  498. var userCounters = {};
  499. for(var counterName in _counters) {
  500. var counterNameParts = counterName.split('.');
  501. if(counterNameParts[0] !== "statsd") {
  502. userCounters[counterName] = _counters[counterName];
  503. }
  504. }
  505. return userCounters;
  506. }
  507. /**
  508. *
  509. *
  510. */
  511. StatdMySQLBackend.prototype.getStatsdCounters = function(_counters) {
  512. var statsdCounters = {};
  513. for(var counterName in _counters) {
  514. var counterNameParts = counterName.split('.');
  515. if(counterNameParts[0] === "statsd") {
  516. statsdCounters[counterName] = _counters[counterName];
  517. }
  518. }
  519. return statsdCounters;
  520. }
  521. /**
  522. *
  523. * @param error
  524. * @param backend_name
  525. * @param stat_name
  526. * @param stat_value
  527. */
  528. StatdMySQLBackend.prototype.onStatus = function(error, backend_name, stat_name, stat_value) {
  529. }
  530. exports.init = function(startupTime, config, events) {
  531. var instance = new StatdMySQLBackend(startupTime, config, events);
  532. return true;
  533. };