mysql-backend.js 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382
  1. ///////////////////////////////////////////////////////////////////////////////////
  2. // NodeJS Statsd MySQL Backend 1.0
  3. // ------------------------------------------------------------------------------
  4. //
  5. // Authors: Nicolas FRADIN, Damien PACAUD
  6. // Date: 31/10/2012
  7. //
  8. ///////////////////////////////////////////////////////////////////////////////////
  9. var _mysql = require('mysql'),
  10. util = require('util'),
  11. fs = require('fs');
  12. var STATSD_PACKETS_RECEIVED = "statsd.packets_received";
  13. var STATSD_BAD_LINES = "statsd.bad_lines_seen";
  14. /**
  15. * Backend Constructor
  16. *
  17. * Example config :
  18. *
  19. mysql: {
  20. host: "localhost",
  21. port: 3306,
  22. user: "root",
  23. password: "root",
  24. database: "statsd_db",
  25. tables: ["statsd_users", "statsd_statistics"]
  26. }
  27. *
  28. * @param startupTime
  29. * @param config
  30. * @param emmiter
  31. */
  32. function StatdMySQLBackend(startupTime, config, emitter) {
  33. var self = this;
  34. self.config = config.mysql || {};
  35. self.engines = {};
  36. // Verifying that the config file contains enough information for this backend to work
  37. if(!this.config.host || !this.config.database || !this.config.user) {
  38. console.log("You need to specify at least host, port, database and user for this mysql backend");
  39. process.exit(-1);
  40. }
  41. // Default port for mysql is 3306, if unset in conf file, we set it here to default
  42. if(!this.config.port) {
  43. this.config.port = 3306;
  44. }
  45. // Set backend path
  46. for(var backend_index in config.backends) {
  47. var currentBackend = config.backends[backend_index];
  48. if(currentBackend.indexOf('mysql-backend.js') > -1) {
  49. self.config.backendPath = currentBackend.substring(0, currentBackend.lastIndexOf('/')+1);
  50. }
  51. }
  52. //Default tables
  53. if(!this.config.tables) {
  54. this.config.tables = ["counters_statistics"];
  55. }
  56. // Default engines
  57. if(!self.config.engines) {
  58. self.config.engines = {};
  59. self.config.engines.countersEngine = "./countersEngine.js";
  60. }
  61. // Check if tables exists
  62. self.checkDatabase();
  63. // Load backend engines
  64. self.loadEngines();
  65. // Attach events
  66. emitter.on('flush', function(time_stamp, metrics) { self.onFlush(time_stamp, metrics); } );
  67. emitter.on('status', self.onStatus );
  68. }
  69. /**
  70. *
  71. *
  72. */
  73. StatdMySQLBackend.prototype.loadEngines = function() {
  74. var self = this;
  75. // Load counters engine
  76. self.engines.countersEngine = require(self.config.engines.countersEngine).init();
  77. if(self.engines.countersEngine === undefined) {
  78. console.log("Unable to load counter engine ! Please check...");
  79. process.exit(-1);
  80. }
  81. }
  82. /**
  83. * Open MySQL connection
  84. *
  85. * @return boolean Indicates if connection succeed
  86. */
  87. StatdMySQLBackend.prototype.openMySqlConnection = function() {
  88. var self = this;
  89. var canExecuteQuerries = true;
  90. // Create MySQL connection
  91. self.sqlConnection = _mysql.createConnection(this.config);
  92. self.sqlConnection.connect(function(error){
  93. canExecuteQuerries = false;
  94. });
  95. return canExecuteQuerries;
  96. }
  97. /**
  98. * Close MySQL connection
  99. *
  100. */
  101. StatdMySQLBackend.prototype.closeMySqlConnection = function() {
  102. var self = this;
  103. self.sqlConnection.end(function(error) {
  104. if(error){
  105. console.log("There was an error while trying to close DB connection : " + util.inspect(error));
  106. //Let's make sure that socket is destroyed
  107. self.sqlConnection.destroy();
  108. }
  109. });
  110. return;
  111. }
  112. /**
  113. *
  114. *
  115. */
  116. StatdMySQLBackend.prototype.checkDatabase = function(callback) {
  117. var self = this;
  118. var isConnected = self.openMySqlConnection();
  119. if(!isConnected) {
  120. console.log("Unable to connect to MySQL database ! Please check...");
  121. process.exit(-1);
  122. }
  123. // Check if tables exists
  124. for(var table_index in self.config.tables) {
  125. var table_name = self.config.tables[table_index];
  126. console.log("Check if table exists : '" + table_name + "'");
  127. self.sqlConnection.query('show tables like "'+table_name+'";', function(err, results, fields) {
  128. if(err) {
  129. console.log("Unbale to execute query !");
  130. process.exit(-1);
  131. }
  132. // If table doesn't exists
  133. if(results.length > 0) {
  134. console.log("Table '" + table_name + "' was found !");
  135. if(table_index == self.config.tables.length-1) {
  136. console.log("-- MySQL Backend is ready !");
  137. }
  138. } else {
  139. console.log("Table '" + table_name + "' was not found !");
  140. // Try to read SQL file for this table
  141. var sqlFilePath = self.config.backendPath + 'tables/' + table_name + '.sql';
  142. fs.readFile(sqlFilePath, 'utf8', function (err,data) {
  143. if (err) {
  144. console.log("Unable to read file: '" + sqlFilePath + "' ! Exit...");
  145. process.exit(-1);
  146. }
  147. self.sqlConnection.query(data, function(err, results, fields) {
  148. if(err) {
  149. console.log("Unable to create table: '" + table_name +"' ! Exit...");
  150. process.exit(-1);
  151. }
  152. console.log("Table '" + table_name +"' was created with success.");
  153. if(table_index == self.config.tables.length-1) {
  154. console.log("-- MySQL Backend is ready !");
  155. }
  156. });
  157. });
  158. }
  159. });
  160. }
  161. }
  162. /**
  163. * Method executed when statsd flush received datas
  164. *
  165. * @param time_stamp
  166. * @param metrics
  167. */
  168. StatdMySQLBackend.prototype.onFlush = function(time_stamp, metrics) {
  169. var self = this;
  170. var counters = metrics['counters'];
  171. var timers = metrics['timers'];
  172. var gauges = metrics['gauges'];
  173. var sets = metrics['sets'];
  174. var pctThreshold = metrics['pctThreshold'];
  175. self.handleCounters(counters,time_stamp);
  176. }
  177. /**
  178. * Handle and process received counters
  179. *
  180. * @param _counters received counters
  181. * @param time_stamp flush time_stamp
  182. */
  183. StatdMySQLBackend.prototype.handleCounters = function(_counters, time_stamp) {
  184. var self = this;
  185. var packets_received = parseInt(_counters[STATSD_PACKETS_RECEIVED]);
  186. var bad_lines_seen = parseInt(_counters[STATSD_BAD_LINES]);
  187. if(packets_received > 0) {
  188. // Get userCounters for this flush
  189. var userCounters = self.getUserCounters(_counters);
  190. var querries = [];
  191. console.log("Preaparing querries...");
  192. // Call countersEngine's buildQuerries method
  193. querries = self.engines.countersEngine.buildQuerries(userCounters, time_stamp);
  194. var querriesCount = querries.length;
  195. console.log("Querries count : " + querriesCount );
  196. // If at least one querry can be executed
  197. if(querriesCount > 0) {
  198. // Open MySQL connection
  199. var canExecuteQuerries = self.openMySqlConnection();
  200. // If connection succeed
  201. if(canExecuteQuerries) {
  202. console.log("Executing " + querriesCount + " querries...");
  203. // Execute querries
  204. self.executeQuerries(querries);
  205. // Close MySQL connection
  206. self.closeMySqlConnection();
  207. }
  208. }
  209. } else {
  210. console.log("No user packets received.");
  211. }
  212. return;
  213. }
  214. StatdMySQLBackend.prototype.executeQuerries = function(sqlQuerries) {
  215. var self = this;
  216. for(var i = 0 ; i < sqlQuerries.length ; i++){
  217. console.log("Query " + i + " : " + sqlQuerries[i]);
  218. self.sqlConnection.query(sqlQuerries[i], function(err, rows) {
  219. if(!err) {
  220. console.log(" -> Query [SUCCESS]");
  221. }
  222. else {
  223. //TODO : add better error handling code
  224. console.log(" -> Query [ERROR]");
  225. }
  226. });
  227. }
  228. }
  229. /**
  230. *
  231. *
  232. */
  233. StatdMySQLBackend.prototype.getUserCounters = function(_counters) {
  234. var userCounters = {};
  235. for(var counterName in _counters) {
  236. var counterNameParts = counterName.split('.');
  237. if(counterNameParts[0] !== "statsd") {
  238. userCounters[counterName] = _counters[counterName];
  239. }
  240. }
  241. return userCounters;
  242. }
  243. /**
  244. *
  245. *
  246. */
  247. StatdMySQLBackend.prototype.getStatsdCounters = function(_counters) {
  248. var statsdCounters = {};
  249. for(var counterName in _counters) {
  250. var counterNameParts = counterName.split('.');
  251. if(counterNameParts[0] === "statsd") {
  252. statsdCounters[counterName] = _counters[counterName];
  253. }
  254. }
  255. return statsdCounters;
  256. }
  257. /**
  258. *
  259. * @param error
  260. * @param backend_name
  261. * @param stat_name
  262. * @param stat_value
  263. */
  264. StatdMySQLBackend.prototype.onStatus = function(error, backend_name, stat_name, stat_value) {
  265. }
  266. exports.init = function(startupTime, config, events) {
  267. var instance = new StatdMySQLBackend(startupTime, config, events);
  268. return true;
  269. };
  270. /*
  271. * Backend example : repeater.js
  272. *
  273. var util = require('util'),
  274. dgram = require('dgram');
  275. function RepeaterBackend(startupTime, config, emitter){
  276. var self = this;
  277. this.config = config.repeater || [];
  278. this.sock = dgram.createSocket('udp6');
  279. // attach
  280. emitter.on('packet', function(packet, rinfo) { self.process(packet, rinfo); });
  281. };
  282. RepeaterBackend.prototype.process = function(packet, rinfo) {
  283. var self = this;
  284. hosts = self.config;
  285. for(var i=0; i<hosts.length; i++) {
  286. self.sock.send(packet,0,packet.length,hosts[i].port,hosts[i].host,
  287. function(err,bytes) {
  288. if (err) {
  289. console.log(err);
  290. }
  291. });
  292. }
  293. };
  294. exports.init = function(startupTime, config, events) {
  295. var instance = new RepeaterBackend(startupTime, config, events);
  296. return true;
  297. };
  298. */