mysql-backend.js 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957
  1. ///////////////////////////////////////////////////////////////////////////////////
  2. // NodeJS Statsd MySQL Backend 0.1.0-alpha1
  3. // ------------------------------------------------------------------------------
  4. //
  5. // Authors: Nicolas FRADIN, Damien PACAUD
  6. // Date: 31/10/2012
  7. //
  8. ///////////////////////////////////////////////////////////////////////////////////
  9. var _mysql = require('mysql'),
  10. util = require('util'),
  11. fs = require('fs'),
  12. sequence = require('sequence').create();
  13. var STATSD_PACKETS_RECEIVED = "statsd.packets_received";
  14. var STATSD_BAD_LINES = "statsd.bad_lines_seen";
  15. /**
  16. * Backend Constructor
  17. *
  18. * Example config :
  19. *
  20. mysql: {
  21. host: "localhost",
  22. port: 3306,
  23. user: "root",
  24. password: "root",
  25. database: "statsd_db",
  26. tables: ["statsd_users", "statsd_statistics"]
  27. }
  28. *
  29. * @param startupTime
  30. * @param config
  31. * @param emmiter
  32. */
  33. function StatdMySQLBackend(startupTime, config, emitter) {
  34. var self = this;
  35. self.config = config.mysql || {};
  36. self.engines = {
  37. counters: [],
  38. gauges: [],
  39. timers: [],
  40. sets: []
  41. };
  42. // Verifying that the config file contains enough information for this backend to work
  43. if(!this.config.host || !this.config.database || !this.config.user) {
  44. console.log("You need to specify at least host, port, database and user for this mysql backend");
  45. process.exit(-1);
  46. }
  47. // Default port for mysql is 3306, if unset in conf file, we set it here to default
  48. if(!this.config.port) {
  49. this.config.port = 3306;
  50. }
  51. // Set backend path
  52. for(var backend_index in config.backends) {
  53. var currentBackend = config.backends[backend_index];
  54. if(currentBackend.indexOf('mysql-backend.js') > -1) {
  55. self.config.backendPath = currentBackend.substring(0, currentBackend.lastIndexOf('/')+1);
  56. }
  57. }
  58. //Default tables
  59. if(!this.config.tables) {
  60. this.config.tables = {counters: ["counters_statistics"], gauges: ["gauges_statistics"]};
  61. }
  62. // Default engines
  63. if(!self.config.engines) {
  64. self.config.engines = {
  65. counters: ["engines/countersEngine.js"],
  66. gauges: ["engines/gaugesEngine.js"],
  67. timers: [],
  68. sets: []
  69. };
  70. }
  71. // Synchronous sequence
  72. sequence.then(function( next ) {
  73. // Check if tables exists
  74. self.checkDatabase(function(err) {
  75. if(err) {
  76. console.log('Database check failed ! Exit...');
  77. process.exit(-1);
  78. } else {
  79. next();
  80. }
  81. });
  82. }).then(function( next ) {
  83. // Load backend engines
  84. self.loadEngines();
  85. next();
  86. }).then(function( next ) {
  87. // Attach events
  88. emitter.on('flush', function(time_stamp, metrics) { self.onFlush(time_stamp, metrics); } );
  89. emitter.on('status', self.onStatus );
  90. });
  91. }
  92. /**
  93. * Load MySQL Backend Query Engines
  94. *
  95. */
  96. StatdMySQLBackend.prototype.loadEngines = function() {
  97. var self = this;
  98. // Iterate on each engine type defined in configuration
  99. for(var engineType in self.config.engines) {
  100. var typeEngines = self.config.engines[engineType];
  101. // Load engines for current type
  102. for(var engineIndex in typeEngines) {
  103. // Get current engine path
  104. var enginePath = typeEngines[engineIndex];
  105. // Load current engine
  106. var currentEngine = require(self.config.backendPath + enginePath).init();
  107. if(currentEngine === undefined) {
  108. console.log("Unable to load engine '" + enginePath + "' ! Please check...");
  109. process.exit(-1);
  110. }
  111. // Add engine to MySQL Backend engines
  112. self.engines[engineType].push(currentEngine);
  113. }
  114. }
  115. }
  116. /**
  117. * Open MySQL connection
  118. *
  119. * @return boolean Indicates if connection succeed
  120. */
  121. StatdMySQLBackend.prototype.openMySqlConnection = function() {
  122. var self = this;
  123. var canExecuteQuerries = true;
  124. // Create MySQL connection
  125. self.sqlConnection = _mysql.createConnection(this.config);
  126. self.sqlConnection.connect(function(error){
  127. canExecuteQuerries = false;
  128. });
  129. console.log("canExecuteQuerries = " + util.inspect(canExecuteQuerries));
  130. return canExecuteQuerries;
  131. }
  132. /**
  133. * Close MySQL connection
  134. *
  135. */
  136. StatdMySQLBackend.prototype.closeMySqlConnection = function() {
  137. var self = this;
  138. self.sqlConnection.end(function(error) {
  139. if(error){
  140. console.log("There was an error while trying to close DB connection : " + util.inspect(error));
  141. //Let's make sure that socket is destroyed
  142. self.sqlConnection.destroy();
  143. }
  144. });
  145. return;
  146. }
  147. /**
  148. * Check if required tables are created. If not create them.
  149. *
  150. */
  151. StatdMySQLBackend.prototype.checkDatabase = function(callback) {
  152. var self = this;
  153. var isConnected = self.openMySqlConnection();
  154. if(!isConnected) {
  155. console.log("Unable to connect to MySQL database ! Please check...");
  156. process.exit(-1);
  157. }
  158. // Iterate on each stat type (counters, gauges, ...)
  159. var tables = self.config.tables
  160. for(var statType in tables) {
  161. // Get tables for current stat type
  162. var typeTables = tables[statType];
  163. var tablesCount = 0;
  164. for(var table_index in typeTables) { tablesCount++; }
  165. // Check if tables exists for current type
  166. self.checkIfTablesExists(typeTables, tablesCount, 0, function(err) {
  167. if(err) {
  168. callback(err);
  169. }
  170. callback();
  171. });
  172. }
  173. }
  174. /**
  175. * Check if a table exists in database. If not, create it.
  176. */
  177. StatdMySQLBackend.prototype.checkIfTablesExists = function(tables_names, size, startIndex, callback) {
  178. var self = this;
  179. self.sqlConnection.query('show tables like "'+tables_names[startIndex]+'";', function(err, results, fields) {
  180. if(err) {
  181. callback(err);
  182. }
  183. if(results.length == 0) {
  184. console.log("Table '" + tables_names[startIndex] + "' was not found !");
  185. self.createTable(tables_names[startIndex], function(err) {
  186. if(err) {
  187. callback(err);
  188. }
  189. if(startIndex == size - 1) {
  190. callback();
  191. } else {
  192. self.checkIfTablesExists(tables_names, size, startIndex+1, callback);
  193. }
  194. });
  195. } else {
  196. console.log("Table '" + tables_names[startIndex] + "' was found.");
  197. if(startIndex == size-1){
  198. callback();
  199. } else {
  200. self.checkIfTablesExists(tables_names, size, startIndex+1, callback)
  201. }
  202. }
  203. });
  204. }
  205. /**
  206. * Create a table from corresponding sql script file
  207. */
  208. StatdMySQLBackend.prototype.createTable = function(table_name, callback) {
  209. var self = this;
  210. // Try to read SQL file for this table
  211. var sqlFilePath = self.config.backendPath + 'tables/' + table_name + '.sql';
  212. fs.readFile(sqlFilePath, 'utf8', function (err,data) {
  213. if (err) {
  214. console.log("Unable to read file: '" + sqlFilePath + "' ! Exit...");
  215. callback(err);
  216. }
  217. // Split querries
  218. var querries = data.split("$$");
  219. // Prepare querries
  220. var queuedQuerries = "";
  221. for(var queryIndex in querries) {
  222. var query = querries[queryIndex];
  223. if(query.trim() == "") continue;
  224. queuedQuerries += query;
  225. if(queuedQuerries[queuedQuerries.length-1] !== ";") {
  226. queuedQuerries += ";";
  227. }
  228. }
  229. // Execute querries
  230. self.sqlConnection.query(queuedQuerries, function(err, results, fields) {
  231. if(err) {
  232. console.log("Unable to execute query: '" + query +"' for table '"+table_name+"' !");
  233. callback(err);
  234. }
  235. console.log("Table '" + table_name +"' was created with success.");
  236. callback();
  237. });
  238. });
  239. }
  240. /**
  241. * Method executed when statsd flush received datas
  242. *
  243. * @param time_stamp
  244. * @param metrics
  245. */
  246. StatdMySQLBackend.prototype.onFlush = function(time_stamp, metrics) {
  247. var self = this;
  248. var counters = metrics['counters'];
  249. var timers = metrics['timers'];
  250. var gauges = metrics['gauges'];
  251. var sets = metrics['sets'];
  252. var pctThreshold = metrics['pctThreshold'];
  253. // Handle statsd counters
  254. self.handleCounters(counters,time_stamp);
  255. // Handle statsd gauges
  256. self.handleGauges(gauges,time_stamp);
  257. }
  258. /**
  259. * Handle and process received counters
  260. *
  261. * @param _counters received counters
  262. * @param time_stamp flush time_stamp
  263. */
  264. StatdMySQLBackend.prototype.handleCounters = function(_counters, time_stamp) {
  265. var self = this;
  266. var packets_received = parseInt(_counters[STATSD_PACKETS_RECEIVED]);
  267. var bad_lines_seen = parseInt(_counters[STATSD_BAD_LINES]);
  268. if(packets_received > 0) {
  269. // Get userCounters for this flush
  270. var userCounters = self.getUserCounters(_counters);
  271. var userCountersSize = 0;
  272. for(var userCounterName in userCounters) { userCountersSize++; }
  273. if(userCountersSize > 0) {
  274. console.log("Counters received !");
  275. var querries = [];
  276. // Open MySQL connection
  277. var canExecuteQuerries = self.openMySqlConnection();
  278. if(canExecuteQuerries) {
  279. //////////////////////////////////////////////////////////////////////
  280. // Call buildQuerries method on each counterEngine
  281. for(var countersEngineIndex in self.engines.counters) {
  282. console.log("countersEngineIndex = " + countersEngineIndex);
  283. var countersEngine = self.engines.counters[countersEngineIndex];
  284. // Add current engine querries to querries list
  285. var engineQuerries = countersEngine.buildQuerries(userCounters, time_stamp);
  286. querries = querries.concat(engineQuerries);
  287. // Insert data into database every 100 query
  288. if(querries.length >= 100) {
  289. // Execute querries
  290. self.executeQuerries(querries);
  291. querries = [];
  292. }
  293. }
  294. if(querries.length > 0) {
  295. // Execute querries
  296. self.executeQuerries(querries);
  297. querries = [];
  298. }
  299. }
  300. // Close MySQL Connection
  301. self.closeMySqlConnection();
  302. }
  303. }
  304. }
  305. /**
  306. * Handle and process received counters
  307. *
  308. * @param _counters received counters
  309. * @param time_stamp flush time_stamp
  310. */
  311. StatdMySQLBackend.prototype.handleGauges = function(_gauges, time_stamp) {
  312. var self = this;
  313. var gaugesSize = 0
  314. for(var g in _gauges) { gaugesSize++; }
  315. // If gauges received
  316. if(gaugesSize > 0) {
  317. console.log("Gauges received !");
  318. console.log("Gauges = " + util.inspect(_gauges));
  319. var querries = [];
  320. // Open MySQL connection
  321. var canExecuteQuerries = self.openMySqlConnection();
  322. if(canExecuteQuerries) {
  323. console.log("ok");
  324. //////////////////////////////////////////////////////////////////////
  325. // Call buildQuerries method on each counterEngine
  326. for(var gaugesEngineIndex in self.engines.gauges) {
  327. console.log("gaugesEngineIndex = " + gaugesEngineIndex);
  328. var gaugesEngine = self.engines.gauges[gaugesEngineIndex];
  329. // Add current engine querries to querries list
  330. var engineQuerries = gaugesEngine.buildQuerries(_gauges, time_stamp);
  331. querries = querries.concat(engineQuerries);
  332. // Insert data into database every 100 query
  333. if(querries.length >= 100) {
  334. // Execute querries
  335. self.executeQuerries(querries);
  336. querries = [];
  337. }
  338. }
  339. if(querries.length > 0) {
  340. // Execute querries
  341. self.executeQuerries(querries);
  342. querries = [];
  343. }
  344. } else {
  345. console.log("Unable to open db connection !");
  346. }
  347. // Close MySQL Connection
  348. self.closeMySqlConnection();
  349. }
  350. }
  351. StatdMySQLBackend.prototype.executeQuerries = function(sqlQuerries) {
  352. var self = this;
  353. for(var i = 0 ; i < sqlQuerries.length ; i++){
  354. console.log("Query " + i + " : " + sqlQuerries[i]);
  355. self.sqlConnection.query(sqlQuerries[i], function(err, rows) {
  356. if(!err) {
  357. console.log(" -> Query [SUCCESS]");
  358. }
  359. else {
  360. //TODO : add better error handling code
  361. console.log(" -> Query [ERROR]");
  362. }
  363. });
  364. }
  365. }
  366. /**
  367. *
  368. *
  369. */
  370. StatdMySQLBackend.prototype.getUserCounters = function(_counters) {
  371. var userCounters = {};
  372. for(var counterName in _counters) {
  373. var counterNameParts = counterName.split('.');
  374. if(counterNameParts[0] !== "statsd") {
  375. userCounters[counterName] = _counters[counterName];
  376. }
  377. }
  378. return userCounters;
  379. }
  380. /**
  381. *
  382. *
  383. */
  384. StatdMySQLBackend.prototype.getStatsdCounters = function(_counters) {
  385. var statsdCounters = {};
  386. for(var counterName in _counters) {
  387. var counterNameParts = counterName.split('.');
  388. if(counterNameParts[0] === "statsd") {
  389. statsdCounters[counterName] = _counters[counterName];
  390. }
  391. }
  392. return statsdCounters;
  393. }
  394. /**
  395. *
  396. * @param error
  397. * @param backend_name
  398. * @param stat_name
  399. * @param stat_value
  400. */
  401. StatdMySQLBackend.prototype.onStatus = function(error, backend_name, stat_name, stat_value) {
  402. }
  403. exports.init = function(startupTime, config, events) {
  404. var instance = new StatdMySQLBackend(startupTime, config, events);
  405. return true;
  406. };
  407. ///////////////////////////////////////////////////////////////////////////////////
  408. // NodeJS Statsd MySQL Backend 0.1.0-alpha1
  409. // ------------------------------------------------------------------------------
  410. //
  411. // Authors: Nicolas FRADIN, Damien PACAUD
  412. // Date: 31/10/2012
  413. //
  414. ///////////////////////////////////////////////////////////////////////////////////
  415. var _mysql = require('mysql'),
  416. util = require('util'),
  417. fs = require('fs');
  418. var STATSD_PACKETS_RECEIVED = "statsd.packets_received";
  419. var STATSD_BAD_LINES = "statsd.bad_lines_seen";
  420. /**
  421. * Backend Constructor
  422. *
  423. * Example config :
  424. *
  425. mysql: {
  426. host: "localhost",
  427. port: 3306,
  428. user: "root",
  429. password: "root",
  430. database: "statsd_db",
  431. tables: ["statsd_users", "statsd_statistics"]
  432. }
  433. *
  434. * @param startupTime
  435. * @param config
  436. * @param emmiter
  437. */
  438. function StatdMySQLBackend(startupTime, config, emitter) {
  439. var self = this;
  440. self.config = config.mysql || {};
  441. self.engines = {
  442. counters: [],
  443. gauges: [],
  444. timers: [],
  445. sets: []
  446. };
  447. // Verifying that the config file contains enough information for this backend to work
  448. if(!this.config.host || !this.config.database || !this.config.user) {
  449. console.log("You need to specify at least host, port, database and user for this mysql backend");
  450. process.exit(-1);
  451. }
  452. // Default port for mysql is 3306, if unset in conf file, we set it here to default
  453. if(!this.config.port) {
  454. this.config.port = 3306;
  455. }
  456. // Set backend path
  457. for(var backend_index in config.backends) {
  458. var currentBackend = config.backends[backend_index];
  459. if(currentBackend.indexOf('mysql-backend.js') > -1) {
  460. self.config.backendPath = currentBackend.substring(0, currentBackend.lastIndexOf('/')+1);
  461. }
  462. }
  463. //Default tables
  464. if(!this.config.tables) {
  465. this.config.tables = {counters: ["counters_statistics"], gauges: ["gauges_statistics"]};
  466. }
  467. // Default engines
  468. if(!self.config.engines) {
  469. self.config.engines = {
  470. counters: ["engines/countersEngine.js"],
  471. gauges: ["engines/gaugesEngine.js"],
  472. timers: [],
  473. sets: []
  474. };
  475. }
  476. // Check if tables exists
  477. self.checkDatabase();
  478. // Load backend engines
  479. self.loadEngines();
  480. // Attach events
  481. emitter.on('flush', function(time_stamp, metrics) { self.onFlush(time_stamp, metrics); } );
  482. emitter.on('status', self.onStatus );
  483. }
  484. /**
  485. * Load MySQL Backend Query Engines
  486. *
  487. */
  488. StatdMySQLBackend.prototype.loadEngines = function() {
  489. var self = this;
  490. // Iterate on each engine type defined in configuration
  491. for(var engineType in self.config.engines) {
  492. var typeEngines = self.config.engines[engineType];
  493. // Load engines for current type
  494. for(var engineIndex in typeEngines) {
  495. // Get current engine path
  496. var enginePath = typeEngines[engineIndex];
  497. // Load current engine
  498. var currentEngine = require(self.config.backendPath + enginePath).init();
  499. if(currentEngine === undefined) {
  500. console.log("Unable to load engine '" + enginePath + "' ! Please check...");
  501. process.exit(-1);
  502. }
  503. // Add engine to MySQL Backend engines
  504. self.engines[engineType].push(currentEngine);
  505. }
  506. }
  507. }
  508. /**
  509. * Open MySQL connection
  510. *
  511. * @return boolean Indicates if connection succeed
  512. */
  513. StatdMySQLBackend.prototype.openMySqlConnection = function() {
  514. var self = this;
  515. var canExecuteQuerries = true;
  516. // Create MySQL connection
  517. self.sqlConnection = _mysql.createConnection(this.config);
  518. self.sqlConnection.connect(function(error){
  519. canExecuteQuerries = false;
  520. });
  521. return canExecuteQuerries;
  522. }
  523. /**
  524. * Close MySQL connection
  525. *
  526. */
  527. StatdMySQLBackend.prototype.closeMySqlConnection = function() {
  528. var self = this;
  529. self.sqlConnection.end(function(error) {
  530. if(error){
  531. console.log("There was an error while trying to close DB connection : " + util.inspect(error));
  532. //Let's make sure that socket is destroyed
  533. self.sqlConnection.destroy();
  534. }
  535. });
  536. return;
  537. }
  538. /**
  539. * Check if required tables are created. If not create them.
  540. *
  541. */
  542. StatdMySQLBackend.prototype.checkDatabase = function() {
  543. var self = this;
  544. var isConnected = self.openMySqlConnection();
  545. if(!isConnected) {
  546. console.log("Unable to connect to MySQL database ! Please check...");
  547. process.exit(-1);
  548. }
  549. // Iterate on each stat type (counters, gauges, ...)
  550. var tables = self.config.tables
  551. for(var statType in tables) {
  552. // Get tables for current stat type
  553. var typeTables = tables[statType];
  554. // Iterate on each table for current stat type
  555. for(var table_index in typeTables) {
  556. var table_name = typeTables[table_index];
  557. console.log("Check if table exists : '" + table_name + "'");
  558. self.sqlConnection.query('show tables like "'+table_name+'";', function(err, results, fields) {
  559. if(err) {
  560. console.log("Unbale to execute query !");
  561. process.exit(-1);
  562. }
  563. // If table doesn't exists
  564. if(results.length == 0) {
  565. console.log("Table '" + table_name + "' was not found !");
  566. // Try to read SQL file for this table
  567. var sqlFilePath = self.config.backendPath + 'tables/' + table_name + '.sql';
  568. fs.readFile(sqlFilePath, 'utf8', function (err,data) {
  569. if (err) {
  570. console.log("Unable to read file: '" + sqlFilePath + "' ! Exit...");
  571. process.exit(-1);
  572. }
  573. // Split querries
  574. var querries = data.split("$$");
  575. // Execute each query
  576. for(var queryIndex in querries) {
  577. var query = querries[queryIndex];
  578. if(query.trim() == "") continue;
  579. self.sqlConnection.query(query, function(err, results, fields) {
  580. if(err) {
  581. console.log("Unable to execute query: '" + query +"' for table '"+table_name+"' ! Exit...");
  582. process.exit(-1);
  583. }
  584. });
  585. }
  586. console.log("Table '" + table_name +"' was created with success.");
  587. });
  588. }
  589. });
  590. }
  591. }
  592. }
  593. /**
  594. * Method executed when statsd flush received datas
  595. *
  596. * @param time_stamp
  597. * @param metrics
  598. */
  599. StatdMySQLBackend.prototype.onFlush = function(time_stamp, metrics) {
  600. var self = this;
  601. var counters = metrics['counters'];
  602. var timers = metrics['timers'];
  603. var gauges = metrics['gauges'];
  604. var sets = metrics['sets'];
  605. var pctThreshold = metrics['pctThreshold'];
  606. // Handle statsd counters
  607. self.handleCounters(counters,time_stamp);
  608. // Handle statsd gauges
  609. self.handleGauges(gauges,time_stamp);
  610. }
  611. /**
  612. * Handle and process received counters
  613. *
  614. * @param _counters received counters
  615. * @param time_stamp flush time_stamp
  616. */
  617. StatdMySQLBackend.prototype.handleCounters = function(_counters, time_stamp) {
  618. var self = this;
  619. var packets_received = parseInt(_counters[STATSD_PACKETS_RECEIVED]);
  620. var bad_lines_seen = parseInt(_counters[STATSD_BAD_LINES]);
  621. if(packets_received > 0) {
  622. // Get userCounters for this flush
  623. var userCounters = self.getUserCounters(_counters);
  624. var userCountersSize = 0;
  625. for(var userCounterName in userCounters) { userCountersSize++; }
  626. if(userCountersSize > 0) {
  627. console.log("Counters received !");
  628. var querries = [];
  629. // Open MySQL connection
  630. var canExecuteQuerries = self.openMySqlConnection();
  631. if(canExecuteQuerries) {
  632. //////////////////////////////////////////////////////////////////////
  633. // Call buildQuerries method on each counterEngine
  634. for(var countersEngineIndex in self.engines.counters) {
  635. console.log("countersEngineIndex = " + countersEngineIndex);
  636. var countersEngine = self.engines.counters[countersEngineIndex];
  637. // Add current engine querries to querries list
  638. var engineQuerries = countersEngine.buildQuerries(userCounters, time_stamp);
  639. querries = querries.concat(engineQuerries);
  640. // Insert data into database every 100 query
  641. if(querries.length >= 100) {
  642. // Execute querries
  643. self.executeQuerries(querries);
  644. querries = [];
  645. }
  646. }
  647. if(querries.length > 0) {
  648. // Execute querries
  649. self.executeQuerries(querries);
  650. querries = [];
  651. }
  652. }
  653. // Close MySQL Connection
  654. self.closeMySqlConnection();
  655. }
  656. }
  657. }
  658. /**
  659. * Handle and process received counters
  660. *
  661. * @param _counters received counters
  662. * @param time_stamp flush time_stamp
  663. */
  664. StatdMySQLBackend.prototype.handleGauges = function(_gauges, time_stamp) {
  665. var self = this;
  666. var gaugesSize = 0
  667. for(var g in _gauges) { gaugesSize++; }
  668. // If gauges received
  669. if(gaugesSize > 0) {
  670. console.log("Gauges received !");
  671. console.log("Gauges = " + util.inspect(_gauges));
  672. var querries = [];
  673. // Open MySQL connection
  674. var canExecuteQuerries = self.openMySqlConnection();
  675. if(canExecuteQuerries) {
  676. console.log("ok");
  677. //////////////////////////////////////////////////////////////////////
  678. // Call buildQuerries method on each counterEngine
  679. for(var gaugesEngineIndex in self.engines.gauges) {
  680. console.log("gaugesEngineIndex = " + gaugesEngineIndex);
  681. var gaugesEngine = self.engines.gauges[gaugesEngineIndex];
  682. // Add current engine querries to querries list
  683. var engineQuerries = gaugesEngine.buildQuerries(_gauges, time_stamp);
  684. querries = querries.concat(engineQuerries);
  685. // Insert data into database every 100 query
  686. if(querries.length >= 100) {
  687. // Execute querries
  688. self.executeQuerries(querries);
  689. querries = [];
  690. }
  691. }
  692. if(querries.length > 0) {
  693. // Execute querries
  694. self.executeQuerries(querries);
  695. querries = [];
  696. }
  697. } else {
  698. console.log("Unable to open db connection !");
  699. }
  700. // Close MySQL Connection
  701. self.closeMySqlConnection();
  702. }
  703. }
  704. StatdMySQLBackend.prototype.executeQuerries = function(sqlQuerries) {
  705. var self = this;
  706. for(var i = 0 ; i < sqlQuerries.length ; i++){
  707. console.log("Query " + i + " : " + sqlQuerries[i]);
  708. self.sqlConnection.query(sqlQuerries[i], function(err, rows) {
  709. if(!err) {
  710. console.log(" -> Query [SUCCESS]");
  711. }
  712. else {
  713. //TODO : add better error handling code
  714. console.log(" -> Query [ERROR]");
  715. }
  716. });
  717. }
  718. }
  719. /**
  720. *
  721. *
  722. */
  723. StatdMySQLBackend.prototype.getUserCounters = function(_counters) {
  724. var userCounters = {};
  725. for(var counterName in _counters) {
  726. var counterNameParts = counterName.split('.');
  727. if(counterNameParts[0] !== "statsd") {
  728. userCounters[counterName] = _counters[counterName];
  729. }
  730. }
  731. return userCounters;
  732. }
  733. /**
  734. *
  735. *
  736. */
  737. StatdMySQLBackend.prototype.getStatsdCounters = function(_counters) {
  738. var statsdCounters = {};
  739. for(var counterName in _counters) {
  740. var counterNameParts = counterName.split('.');
  741. if(counterNameParts[0] === "statsd") {
  742. statsdCounters[counterName] = _counters[counterName];
  743. }
  744. }
  745. return statsdCounters;
  746. }
  747. /**
  748. *
  749. * @param error
  750. * @param backend_name
  751. * @param stat_name
  752. * @param stat_value
  753. */
  754. StatdMySQLBackend.prototype.onStatus = function(error, backend_name, stat_name, stat_value) {
  755. }
  756. exports.init = function(startupTime, config, events) {
  757. var instance = new StatdMySQLBackend(startupTime, config, events);
  758. return true;
  759. };