mysql-backend.js 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384
  1. ///////////////////////////////////////////////////////////////////////////////////
  2. // NodeJS Statsd MySQL Backend 0.1.0-alpha1
  3. // ------------------------------------------------------------------------------
  4. //
  5. // Authors: Nicolas FRADIN, Damien PACAUD
  6. // Date: 31/10/2012
  7. //
  8. ///////////////////////////////////////////////////////////////////////////////////
  9. var _mysql = require('mysql'),
  10. util = require('util'),
  11. fs = require('fs');
  12. var STATSD_PACKETS_RECEIVED = "statsd.packets_received";
  13. var STATSD_BAD_LINES = "statsd.bad_lines_seen";
  14. /**
  15. * Backend Constructor
  16. *
  17. * Example config :
  18. *
  19. mysql: {
  20. host: "localhost",
  21. port: 3306,
  22. user: "root",
  23. password: "root",
  24. database: "statsd_db",
  25. tables: ["statsd_users", "statsd_statistics"]
  26. }
  27. *
  28. * @param startupTime
  29. * @param config
  30. * @param emmiter
  31. */
  32. function StatdMySQLBackend(startupTime, config, emitter) {
  33. var self = this;
  34. self.config = config.mysql || {};
  35. self.engines = {
  36. counters: [],
  37. gauges: [],
  38. timers: [],
  39. sets: []
  40. };
  41. // Verifying that the config file contains enough information for this backend to work
  42. if(!this.config.host || !this.config.database || !this.config.user) {
  43. console.log("You need to specify at least host, port, database and user for this mysql backend");
  44. process.exit(-1);
  45. }
  46. // Default port for mysql is 3306, if unset in conf file, we set it here to default
  47. if(!this.config.port) {
  48. this.config.port = 3306;
  49. }
  50. // Set backend path
  51. for(var backend_index in config.backends) {
  52. var currentBackend = config.backends[backend_index];
  53. if(currentBackend.indexOf('mysql-backend.js') > -1) {
  54. self.config.backendPath = currentBackend.substring(0, currentBackend.lastIndexOf('/')+1);
  55. }
  56. }
  57. //Default tables
  58. if(!this.config.tables) {
  59. this.config.tables = {counters: ["counters_statistics"]};
  60. }
  61. // Default engines
  62. if(!self.config.engines) {
  63. self.config.engines = {
  64. counters: ["engines/countersEngine.js"],
  65. gauges: [],
  66. timers: [],
  67. sets: []
  68. };
  69. }
  70. // Check if tables exists
  71. self.checkDatabase();
  72. // Load backend engines
  73. self.loadEngines();
  74. // Attach events
  75. emitter.on('flush', function(time_stamp, metrics) { self.onFlush(time_stamp, metrics); } );
  76. emitter.on('status', self.onStatus );
  77. }
  78. /**
  79. * Load MySQL Backend Query Engines
  80. *
  81. */
  82. StatdMySQLBackend.prototype.loadEngines = function() {
  83. var self = this;
  84. // Iterate on each engine type defined in configuration
  85. for(var engineType in self.config.engines) {
  86. var typeEngines = self.config.engines[engineType];
  87. // Load engines for current type
  88. for(var engineIndex in typeEngines) {
  89. // Get current engine path
  90. var enginePath = typeEngines[engineIndex];
  91. // Load current engine
  92. var currentEngine = require(self.config.backendPath + enginePath).init();
  93. if(currentEngine === undefined) {
  94. console.log("Unable to load engine '" + enginePath + "' ! Please check...");
  95. process.exit(-1);
  96. }
  97. // Add engine to MySQL Backend engines
  98. self.engines.counters.push(currentEngine);
  99. }
  100. }
  101. }
  102. /**
  103. * Open MySQL connection
  104. *
  105. * @return boolean Indicates if connection succeed
  106. */
  107. StatdMySQLBackend.prototype.openMySqlConnection = function() {
  108. var self = this;
  109. var canExecuteQuerries = true;
  110. // Create MySQL connection
  111. self.sqlConnection = _mysql.createConnection(this.config);
  112. self.sqlConnection.connect(function(error){
  113. canExecuteQuerries = false;
  114. });
  115. return canExecuteQuerries;
  116. }
  117. /**
  118. * Close MySQL connection
  119. *
  120. */
  121. StatdMySQLBackend.prototype.closeMySqlConnection = function() {
  122. var self = this;
  123. self.sqlConnection.end(function(error) {
  124. if(error){
  125. console.log("There was an error while trying to close DB connection : " + util.inspect(error));
  126. //Let's make sure that socket is destroyed
  127. self.sqlConnection.destroy();
  128. }
  129. });
  130. return;
  131. }
  132. /**
  133. * Check if required tables are created. If not create them.
  134. *
  135. */
  136. StatdMySQLBackend.prototype.checkDatabase = function() {
  137. var self = this;
  138. var isConnected = self.openMySqlConnection();
  139. if(!isConnected) {
  140. console.log("Unable to connect to MySQL database ! Please check...");
  141. process.exit(-1);
  142. }
  143. // Iterate on each stat type (counters, gauges, ...)
  144. var tables = self.config.tables
  145. for(var statType in tables) {
  146. // Get tables for current stat type
  147. var typeTables = tables[statType];
  148. // Iterate on each table for current stat type
  149. for(var table_index in typeTables) {
  150. var table_name = typeTables[table_index];
  151. console.log("Check if table exists : '" + table_name + "'");
  152. self.sqlConnection.query('show tables like "'+table_name+'";', function(err, results, fields) {
  153. if(err) {
  154. console.log("Unbale to execute query !");
  155. process.exit(-1);
  156. }
  157. // If table doesn't exists
  158. if(results.length > 0) {
  159. console.log("Table '" + table_name + "' was found !");
  160. } else {
  161. console.log("Table '" + table_name + "' was not found !");
  162. // Try to read SQL file for this table
  163. var sqlFilePath = self.config.backendPath + 'tables/' + table_name + '.sql';
  164. fs.readFile(sqlFilePath, 'utf8', function (err,data) {
  165. if (err) {
  166. console.log("Unable to read file: '" + sqlFilePath + "' ! Exit...");
  167. process.exit(-1);
  168. }
  169. // Split querries
  170. var querries = data.split("$$");
  171. // Execute each query
  172. for(var queryIndex in querries) {
  173. var query = querries[queryIndex];
  174. if(query.trim() == "") continue;
  175. self.sqlConnection.query(query, function(err, results, fields) {
  176. if(err) {
  177. console.log("Unable to execute query: '" + query +"' for table '"+table_name+"' ! Exit...");
  178. process.exit(-1);
  179. }
  180. });
  181. }
  182. console.log("Table '" + table_name +"' was created with success.");
  183. });
  184. }
  185. });
  186. }
  187. }
  188. }
  189. /**
  190. * Method executed when statsd flush received datas
  191. *
  192. * @param time_stamp
  193. * @param metrics
  194. */
  195. StatdMySQLBackend.prototype.onFlush = function(time_stamp, metrics) {
  196. var self = this;
  197. var counters = metrics['counters'];
  198. var timers = metrics['timers'];
  199. var gauges = metrics['gauges'];
  200. var sets = metrics['sets'];
  201. var pctThreshold = metrics['pctThreshold'];
  202. // Handle statsd counters
  203. self.handleCounters(counters,time_stamp);
  204. }
  205. /**
  206. * Handle and process received counters
  207. *
  208. * @param _counters received counters
  209. * @param time_stamp flush time_stamp
  210. */
  211. StatdMySQLBackend.prototype.handleCounters = function(_counters, time_stamp) {
  212. var self = this;
  213. var packets_received = parseInt(_counters[STATSD_PACKETS_RECEIVED]);
  214. var bad_lines_seen = parseInt(_counters[STATSD_BAD_LINES]);
  215. if(packets_received > 0) {
  216. // Get userCounters for this flush
  217. var userCounters = self.getUserCounters(_counters);
  218. var querries = [];
  219. console.log("Preaparing querries...");
  220. // Open MySQL connection
  221. var canExecuteQuerries = self.openMySqlConnection();
  222. if(canExecuteQuerries) {
  223. //////////////////////////////////////////////////////////////////////
  224. // Call buildQuerries method on each counterEngine
  225. for(var countersEngineIndex in self.engines.counters) {
  226. console.log("countersEngineIndex = " + countersEngineIndex);
  227. var countersEngine = self.engines.counters[countersEngineIndex];
  228. // Add current engine querries to querries list
  229. var engineQuerries = countersEngine.buildQuerries(userCounters, time_stamp);
  230. querries = querries.concat(engineQuerries);
  231. // Insert data into database every 100 query
  232. if(querries.length >= 100) {
  233. // Execute querries
  234. self.executeQuerries(querries);
  235. querries = [];
  236. }
  237. }
  238. if(querries.length > 0) {
  239. // Execute querries
  240. self.executeQuerries(querries);
  241. querries = [];
  242. }
  243. }
  244. // Close MySQL Connection
  245. self.closeMySqlConnection();
  246. return;
  247. }
  248. StatdMySQLBackend.prototype.executeQuerries = function(sqlQuerries) {
  249. var self = this;
  250. for(var i = 0 ; i < sqlQuerries.length ; i++){
  251. console.log("Query " + i + " : " + sqlQuerries[i]);
  252. self.sqlConnection.query(sqlQuerries[i], function(err, rows) {
  253. if(!err) {
  254. console.log(" -> Query [SUCCESS]");
  255. }
  256. else {
  257. //TODO : add better error handling code
  258. console.log(" -> Query [ERROR]");
  259. }
  260. });
  261. }
  262. }
  263. /**
  264. *
  265. *
  266. */
  267. StatdMySQLBackend.prototype.getUserCounters = function(_counters) {
  268. var userCounters = {};
  269. for(var counterName in _counters) {
  270. var counterNameParts = counterName.split('.');
  271. if(counterNameParts[0] !== "statsd") {
  272. userCounters[counterName] = _counters[counterName];
  273. }
  274. }
  275. return userCounters;
  276. }
  277. /**
  278. *
  279. *
  280. */
  281. StatdMySQLBackend.prototype.getStatsdCounters = function(_counters) {
  282. var statsdCounters = {};
  283. for(var counterName in _counters) {
  284. var counterNameParts = counterName.split('.');
  285. if(counterNameParts[0] === "statsd") {
  286. statsdCounters[counterName] = _counters[counterName];
  287. }
  288. }
  289. return statsdCounters;
  290. }
  291. /**
  292. *
  293. * @param error
  294. * @param backend_name
  295. * @param stat_name
  296. * @param stat_value
  297. */
  298. StatdMySQLBackend.prototype.onStatus = function(error, backend_name, stat_name, stat_value) {
  299. }
  300. exports.init = function(startupTime, config, events) {
  301. var instance = new StatdMySQLBackend(startupTime, config, events);
  302. return true;
  303. };