mysql-backend.js 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548
  1. ///////////////////////////////////////////////////////////////////////////////////
  2. // NodeJS Statsd MySQL Backend 0.1.0-alpha1
  3. // ------------------------------------------------------------------------------
  4. //
  5. // Authors: Nicolas FRADIN, Damien PACAUD
  6. // Date: 31/10/2012
  7. //
  8. ///////////////////////////////////////////////////////////////////////////////////
  9. var _mysql = require('mysql'),
  10. util = require('util'),
  11. fs = require('fs'),
  12. sequence = require('sequence').create();
  13. var STATSD_PACKETS_RECEIVED = "statsd.packets_received";
  14. var STATSD_BAD_LINES = "statsd.bad_lines_seen";
  15. /**
  16. * Backend Constructor
  17. *
  18. * Example config :
  19. *
  20. mysql: {
  21. host: "localhost",
  22. port: 3306,
  23. user: "root",
  24. password: "root",
  25. database: "statsd_db",
  26. tables: ["statsd_users", "statsd_statistics"]
  27. }
  28. *
  29. * @param startupTime
  30. * @param config
  31. * @param emmiter
  32. */
  33. function StatdMySQLBackend(startupTime, config, emitter) {
  34. var self = this;
  35. self.config = config.mysql || {};
  36. self.engines = {
  37. counters: [],
  38. gauges: [],
  39. timers: [],
  40. sets: []
  41. };
  42. // Verifying that the config file contains enough information for this backend to work
  43. if(!this.config.host || !this.config.database || !this.config.user) {
  44. console.log("You need to specify at least host, port, database and user for this mysql backend");
  45. process.exit(-1);
  46. }
  47. // Default port for mysql is 3306, if unset in conf file, we set it here to default
  48. if(!this.config.port) {
  49. this.config.port = 3306;
  50. }
  51. // Set backend path
  52. for(var backend_index in config.backends) {
  53. var currentBackend = config.backends[backend_index];
  54. if(currentBackend.indexOf('mysql-backend.js') > -1) {
  55. self.config.backendPath = currentBackend.substring(0, currentBackend.lastIndexOf('/')+1);
  56. }
  57. }
  58. //Default tables
  59. if(!this.config.tables) {
  60. this.config.tables = {counters: ["counters_statistics"], gauges: ["gauges_statistics"]};
  61. }
  62. // Default engines
  63. if(!self.config.engines) {
  64. self.config.engines = {
  65. counters: ["engines/countersEngine.js"],
  66. gauges: ["engines/gaugesEngine.js"],
  67. timers: [],
  68. sets: []
  69. };
  70. }
  71. // Synchronous sequence
  72. sequence.then(function( next ) {
  73. // Check if tables exists
  74. self.checkDatabase(function(err) {
  75. if(err) {
  76. console.log('Database check failed ! Exit...');
  77. process.exit(-1);
  78. } else {
  79. console.log('Database is valid.');
  80. next();
  81. }
  82. });
  83. }).then(function( next ) {
  84. process.stdout.write('Loading MySQL backend engines...');
  85. // Load backend engines
  86. self.loadEngines(function(err) {
  87. if(err) {
  88. process.stdout.write("[FAILED]\n");
  89. console.log(err);
  90. }
  91. process.stdout.write("[OK]\n");
  92. next();
  93. });
  94. }).then(function( next ) {
  95. // Attach events
  96. emitter.on('flush', function(time_stamp, metrics) { self.onFlush(time_stamp, metrics); } );
  97. emitter.on('status', self.onStatus );
  98. console.log("Statsd MySQL backend is loaded.");
  99. });
  100. }
  101. /**
  102. * Load MySQL Backend Query Engines
  103. *
  104. */
  105. StatdMySQLBackend.prototype.loadEngines = function(callback) {
  106. var self = this;
  107. // Iterate on each engine type defined in configuration
  108. for(var engineType in self.config.engines) {
  109. var typeEngines = self.config.engines[engineType];
  110. // Load engines for current type
  111. for(var engineIndex in typeEngines) {
  112. // Get current engine path
  113. var enginePath = typeEngines[engineIndex];
  114. // Load current engine
  115. var currentEngine = require(self.config.backendPath + enginePath).init();
  116. if(currentEngine === undefined) {
  117. callback("Unable to load engine '" + enginePath + "' ! Please check...");
  118. }
  119. // Add engine to MySQL Backend engines
  120. self.engines[engineType].push(currentEngine);
  121. }
  122. }
  123. callback();
  124. }
  125. /**
  126. * Open MySQL connection
  127. *
  128. * @return boolean Indicates if connection succeed
  129. */
  130. StatdMySQLBackend.prototype.openMySqlConnection = function() {
  131. var self = this;
  132. var canExecuteQuerries = true;
  133. // Create MySQL connection
  134. self.sqlConnection = _mysql.createConnection(this.config);
  135. self.sqlConnection.connect(function(error){
  136. canExecuteQuerries = false;
  137. });
  138. return canExecuteQuerries;
  139. }
  140. /**
  141. * Close MySQL connection
  142. *
  143. */
  144. StatdMySQLBackend.prototype.closeMySqlConnection = function() {
  145. var self = this;
  146. self.sqlConnection.end(function(error) {
  147. if(error){
  148. console.log("There was an error while trying to close DB connection : " + util.inspect(error));
  149. //Let's make sure that socket is destroyed
  150. self.sqlConnection.destroy();
  151. }
  152. });
  153. return;
  154. }
  155. /**
  156. * Check if required tables are created. If not create them.
  157. *
  158. */
  159. StatdMySQLBackend.prototype.checkDatabase = function(callback) {
  160. var self = this;
  161. console.log("Checking database...");
  162. var isConnected = self.openMySqlConnection();
  163. if(!isConnected) {
  164. console.log("Unable to connect to MySQL database ! Please check...");
  165. process.exit(-1);
  166. }
  167. var tables = self.config.tables
  168. // Count stats types
  169. var typesCount = 0;
  170. for(var statType in tables) { typesCount++; }
  171. // Iterate on each stat type (counters, gauges, ...)
  172. var statTypeIndex = 0;
  173. for(var statType in tables) {
  174. // Get tables for current stat type
  175. var typeTables = tables[statType];
  176. // Count tables for current type
  177. var tablesCount = 0;
  178. for(var table_index in typeTables) { tablesCount++; }
  179. // Check if tables exists for current type
  180. self.checkIfTablesExists(statTypeIndex, typeTables, tablesCount, 0, function(type_index, err) {
  181. if(err) {
  182. callback(err);
  183. }
  184. // If all types were parsed, call the callback method
  185. if(type_index == typesCount-1) {
  186. callback();
  187. }
  188. });
  189. statTypeIndex++;
  190. }
  191. }
  192. /**
  193. * Check if a table exists in database. If not, create it.
  194. */
  195. StatdMySQLBackend.prototype.checkIfTablesExists = function(type_index, tables_names, size, startIndex, callback) {
  196. var self = this;
  197. self.sqlConnection.query('show tables like "'+tables_names[startIndex]+'";', function(err, results, fields) {
  198. if(err) {
  199. callback(err);
  200. }
  201. // If table wasn't found
  202. if(results.length == 0) {
  203. console.log("Table '" + tables_names[startIndex] + "' was not found !");
  204. // Create table
  205. self.createTable(tables_names[startIndex], function(err) {
  206. if(err) {
  207. callback(type_index, err);
  208. }
  209. if(startIndex == size - 1) {
  210. // If all tables were created for this type, call the callback method
  211. callback(type_index);
  212. }
  213. else {
  214. // Else iterate on the next table to create
  215. self.checkIfTablesExists(type_index, tables_names, size, startIndex+1, callback);
  216. }
  217. });
  218. }
  219. // If table was found in database
  220. else {
  221. console.log("Table '" + tables_names[startIndex] + "' was found.");
  222. if(startIndex == size-1){
  223. // If all tables were created for this type, call the callback method
  224. callback(type_index);
  225. }
  226. else {
  227. // Else iterate on the next table to create
  228. self.checkIfTablesExists(type_index, tables_names, size, startIndex+1, callback)
  229. }
  230. }
  231. });
  232. }
  233. /**
  234. * Create a table from corresponding sql script file
  235. */
  236. StatdMySQLBackend.prototype.createTable = function(table_name, callback) {
  237. var self = this;
  238. // Try to read SQL file for this table
  239. var sqlFilePath = self.config.backendPath + 'tables/' + table_name + '.sql';
  240. fs.readFile(sqlFilePath, 'utf8', function (err,data) {
  241. if (err) {
  242. console.log("Unable to read file: '" + sqlFilePath + "' !");
  243. callback(err);
  244. }
  245. // Split querries
  246. var querries = data.split("$$");
  247. // Prepare querries
  248. var queuedQuerries = "";
  249. for(var queryIndex in querries) {
  250. var query = querries[queryIndex];
  251. if(query.trim() == "") continue;
  252. queuedQuerries += query;
  253. if(queuedQuerries[queuedQuerries.length-1] !== ";") {
  254. queuedQuerries += ";";
  255. }
  256. }
  257. // Execute querries
  258. self.sqlConnection.query(queuedQuerries, function(err, results, fields) {
  259. if(err) {
  260. console.log("Unable to execute query: '" + query +"' for table '"+table_name+"' !");
  261. callback(err);
  262. }
  263. console.log("Table '" + table_name +"' was created with success.");
  264. callback();
  265. });
  266. });
  267. }
  268. /**
  269. * Method executed when statsd flush received datas
  270. *
  271. * @param time_stamp
  272. * @param metrics
  273. */
  274. StatdMySQLBackend.prototype.onFlush = function(time_stamp, metrics) {
  275. var self = this;
  276. var counters = metrics['counters'];
  277. var timers = metrics['timers'];
  278. var gauges = metrics['gauges'];
  279. var sets = metrics['sets'];
  280. var pctThreshold = metrics['pctThreshold'];
  281. // Handle statsd counters
  282. self.handleCounters(counters,time_stamp);
  283. // Handle statsd gauges
  284. self.handleGauges(gauges,time_stamp);
  285. }
  286. /**
  287. * Handle and process received counters
  288. *
  289. * @param _counters received counters
  290. * @param time_stamp flush time_stamp
  291. */
  292. StatdMySQLBackend.prototype.handleCounters = function(_counters, time_stamp) {
  293. var self = this;
  294. var packets_received = parseInt(_counters[STATSD_PACKETS_RECEIVED]);
  295. var bad_lines_seen = parseInt(_counters[STATSD_BAD_LINES]);
  296. if(packets_received > 0) {
  297. // Get userCounters for this flush
  298. var userCounters = self.getUserCounters(_counters);
  299. var userCountersSize = 0;
  300. for(var userCounterName in userCounters) { userCountersSize++; }
  301. if(userCountersSize > 0) {
  302. console.log("Counters received !");
  303. var querries = [];
  304. // Open MySQL connection
  305. var canExecuteQuerries = self.openMySqlConnection();
  306. if(canExecuteQuerries) {
  307. //////////////////////////////////////////////////////////////////////
  308. // Call buildQuerries method on each counterEngine
  309. for(var countersEngineIndex in self.engines.counters) {
  310. console.log("countersEngineIndex = " + countersEngineIndex);
  311. var countersEngine = self.engines.counters[countersEngineIndex];
  312. // Add current engine querries to querries list
  313. var engineQuerries = countersEngine.buildQuerries(userCounters, time_stamp);
  314. querries = querries.concat(engineQuerries);
  315. // Insert data into database every 100 query
  316. if(querries.length >= 100) {
  317. // Execute querries
  318. self.executeQuerries(querries);
  319. querries = [];
  320. }
  321. }
  322. if(querries.length > 0) {
  323. // Execute querries
  324. self.executeQuerries(querries);
  325. querries = [];
  326. }
  327. }
  328. // Close MySQL Connection
  329. self.closeMySqlConnection();
  330. }
  331. }
  332. }
  333. /**
  334. * Handle and process received counters
  335. *
  336. * @param _counters received counters
  337. * @param time_stamp flush time_stamp
  338. */
  339. StatdMySQLBackend.prototype.handleGauges = function(_gauges, time_stamp) {
  340. var self = this;
  341. var gaugesSize = 0
  342. for(var g in _gauges) { gaugesSize++; }
  343. // If gauges received
  344. if(gaugesSize > 0) {
  345. console.log("Gauges received !");
  346. console.log("Gauges = " + util.inspect(_gauges));
  347. var querries = [];
  348. // Open MySQL connection
  349. var canExecuteQuerries = self.openMySqlConnection();
  350. if(canExecuteQuerries) {
  351. console.log("ok");
  352. //////////////////////////////////////////////////////////////////////
  353. // Call buildQuerries method on each counterEngine
  354. for(var gaugesEngineIndex in self.engines.gauges) {
  355. console.log("gaugesEngineIndex = " + gaugesEngineIndex);
  356. var gaugesEngine = self.engines.gauges[gaugesEngineIndex];
  357. // Add current engine querries to querries list
  358. var engineQuerries = gaugesEngine.buildQuerries(_gauges, time_stamp);
  359. querries = querries.concat(engineQuerries);
  360. // Insert data into database every 100 query
  361. if(querries.length >= 100) {
  362. // Execute querries
  363. self.executeQuerries(querries);
  364. querries = [];
  365. }
  366. }
  367. if(querries.length > 0) {
  368. // Execute querries
  369. self.executeQuerries(querries);
  370. querries = [];
  371. }
  372. } else {
  373. console.log("Unable to open db connection !");
  374. }
  375. // Close MySQL Connection
  376. self.closeMySqlConnection();
  377. }
  378. }
  379. StatdMySQLBackend.prototype.executeQuerries = function(sqlQuerries) {
  380. var self = this;
  381. for(var i = 0 ; i < sqlQuerries.length ; i++){
  382. console.log("Query " + i + " : " + sqlQuerries[i]);
  383. self.sqlConnection.query(sqlQuerries[i], function(err, rows) {
  384. if(!err) {
  385. console.log(" -> Query [SUCCESS]");
  386. }
  387. else {
  388. //TODO : add better error handling code
  389. console.log(" -> Query [ERROR]");
  390. }
  391. });
  392. }
  393. }
  394. /**
  395. *
  396. *
  397. */
  398. StatdMySQLBackend.prototype.getUserCounters = function(_counters) {
  399. var userCounters = {};
  400. for(var counterName in _counters) {
  401. var counterNameParts = counterName.split('.');
  402. if(counterNameParts[0] !== "statsd") {
  403. userCounters[counterName] = _counters[counterName];
  404. }
  405. }
  406. return userCounters;
  407. }
  408. /**
  409. *
  410. *
  411. */
  412. StatdMySQLBackend.prototype.getStatsdCounters = function(_counters) {
  413. var statsdCounters = {};
  414. for(var counterName in _counters) {
  415. var counterNameParts = counterName.split('.');
  416. if(counterNameParts[0] === "statsd") {
  417. statsdCounters[counterName] = _counters[counterName];
  418. }
  419. }
  420. return statsdCounters;
  421. }
  422. /**
  423. *
  424. * @param error
  425. * @param backend_name
  426. * @param stat_name
  427. * @param stat_value
  428. */
  429. StatdMySQLBackend.prototype.onStatus = function(error, backend_name, stat_name, stat_value) {
  430. }
  431. exports.init = function(startupTime, config, events) {
  432. var instance = new StatdMySQLBackend(startupTime, config, events);
  433. return true;
  434. };