mysql-backend.js 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. ///////////////////////////////////////////////////////////////////////////////////
  2. // NodeJS Statsd MySQL Backend 1.0
  3. // ------------------------------------------------------------------------------
  4. //
  5. // Authors: Nicolas FRADIN, Damien PACAUD
  6. // Date: 31/10/2012
  7. //
  8. ///////////////////////////////////////////////////////////////////////////////////
  9. var _mysql = require('mysql'),
  10. util = require('util'),
  11. events = require('events');
  12. var STATSD_PACKETS_RECEIVED = "statsd.packets_received";
  13. var STATSD_BAD_LINES = "statsd.bad_lines_seen";
  14. var engineEvents = new events.EventEmitter();
  15. /**
  16. * Backend Constructor
  17. *
  18. * @param startupTime
  19. * @param config
  20. * @param emmiter
  21. */
  22. function StatdMySQLBackend(startupTime, config, emitter) {
  23. var self = this;
  24. self.config = config.mysql || {};
  25. self.engines = {};
  26. // Verifying that the config file contains enough information for this backend to work
  27. if(!this.config.host || !this.config.database || !this.config.user) {
  28. console.log("You need to specify at least host, port, database and user for this mysql backend");
  29. process.exit(-1);
  30. }
  31. // Default port for mysql is 3306, if unset in conf file, we set it here to default
  32. if(!this.config.port) {
  33. this.config.port = 3306;
  34. }
  35. // Default engines
  36. if(!self.config.engines) {
  37. self.config.engines = {};
  38. self.config.engines.countersEngine = "./countersEngine.js";
  39. }
  40. // Load backend engines
  41. self.loadEngines();
  42. // Attach events
  43. emitter.on('flush', function(time_stamp, metrics) { self.onFlush(time_stamp, metrics); } );
  44. emitter.on('status', self.onStatus );
  45. }
  46. /**
  47. *
  48. *
  49. */
  50. StatdMySQLBackend.prototype.loadEngines = function() {
  51. var self = this;
  52. // Load counters engine
  53. self.engines.countersEngine = require(self.config.engines.countersEngine).init();
  54. if(self.engines.countersEngine === undefined) {
  55. console.log("Unable to load counter engine ! Please check...");
  56. process.exit(-1);
  57. }
  58. }
  59. /**
  60. * Open MySQL connection
  61. *
  62. * @return boolean Indicates if connection succeed
  63. */
  64. StatdMySQLBackend.prototype.openMySqlConnection = function() {
  65. var self = this;
  66. var canExecuteQuerries = true;
  67. // Create MySQL connection
  68. self.sqlConnection = _mysql.createConnection(this.config);
  69. self.sqlConnection.connect(function(error){
  70. canExecuteQuerries = false;
  71. });
  72. return canExecuteQuerries;
  73. }
  74. /**
  75. * Close MySQL connection
  76. *
  77. */
  78. StatdMySQLBackend.prototype.closeMySqlConnection = function() {
  79. var self = this;
  80. self.sqlConnection.end(function(error) {
  81. if(error){
  82. console.log("There was an error while trying to close DB connection : " + util.inspect(error));
  83. //Let's make sure that socket is destroyed
  84. self.sqlConnection.destroy();
  85. }
  86. });
  87. return;
  88. }
  89. /**
  90. * Method executed when statsd flush received datas
  91. *
  92. * @param time_stamp
  93. * @param metrics
  94. */
  95. StatdMySQLBackend.prototype.onFlush = function(time_stamp, metrics) {
  96. var self = this;
  97. var counters = metrics['counters'];
  98. var timers = metrics['timers'];
  99. var gauges = metrics['gauges'];
  100. var sets = metrics['sets'];
  101. var pctThreshold = metrics['pctThreshold'];
  102. self.handleCounters(counters,time_stamp);
  103. }
  104. /**
  105. * Handle and process received counters
  106. *
  107. * @param _counters received counters
  108. * @param time_stamp flush time_stamp
  109. */
  110. StatdMySQLBackend.prototype.handleCounters = function(_counters, time_stamp) {
  111. var self = this;
  112. var packets_received = parseInt(_counters[STATSD_PACKETS_RECEIVED]);
  113. var bad_lines_seen = parseInt(_counters[STATSD_BAD_LINES]);
  114. if(packets_received > 0) {
  115. // Get userCounters for this flush
  116. var userCounters = self.getUserCounters(_counters);
  117. var querries = [];
  118. console.log("Preaparing querries...");
  119. // Call countersEngine's buildQuerries method
  120. querries = self.engines.countersEngine.buildQuerries(userCounters, time_stamp);
  121. var querriesCount = querries.length;
  122. console.log("Querries count : " + querriesCount );
  123. // If at least one querry can be executed
  124. if(querriesCount > 0) {
  125. // Open MySQL connection
  126. var canExecuteQuerries = self.openMySqlConnection();
  127. // If connection succeed
  128. if(canExecuteQuerries) {
  129. console.log("Executing " + querriesCount + " querries...");
  130. // Execute querries
  131. self.executeQuerries(querries);
  132. // Close MySQL connection
  133. self.closeMySqlConnection();
  134. }
  135. }
  136. } else {
  137. console.log("No user packets received.");
  138. }
  139. return;
  140. }
  141. StatdMySQLBackend.prototype.executeQuerries = function(sqlQuerries) {
  142. var self = this;
  143. for(var i = 0 ; i < sqlQuerries.length ; i++){
  144. console.log("Query " + i + " : " + sqlQuerries[i]);
  145. self.sqlConnection.query(sqlQuerries[i], function(err, rows) {
  146. if(!err) {
  147. console.log(" -> Query [SUCCESS]");
  148. }
  149. else {
  150. //TODO : add better error handling code
  151. console.log(" -> Query [ERROR]");
  152. }
  153. });
  154. }
  155. }
  156. /**
  157. *
  158. *
  159. */
  160. StatdMySQLBackend.prototype.getUserCounters = function(_counters) {
  161. var userCounters = {};
  162. for(var counterName in _counters) {
  163. var counterNameParts = counterName.split('.');
  164. if(counterNameParts[0] !== "statsd") {
  165. userCounters[counterName] = _counters[counterName];
  166. }
  167. }
  168. return userCounters;
  169. }
  170. /**
  171. *
  172. *
  173. */
  174. StatdMySQLBackend.prototype.getStatsdCounters = function(_counters) {
  175. var statsdCounters = {};
  176. for(var counterName in _counters) {
  177. var counterNameParts = counterName.split('.');
  178. if(counterNameParts[0] === "statsd") {
  179. statsdCounters[counterName] = _counters[counterName];
  180. }
  181. }
  182. return statsdCounters;
  183. }
  184. /**
  185. *
  186. * @param error
  187. * @param backend_name
  188. * @param stat_name
  189. * @param stat_value
  190. */
  191. StatdMySQLBackend.prototype.onStatus = function(error, backend_name, stat_name, stat_value) {
  192. }
  193. exports.init = function(startupTime, config, events) {
  194. var instance = new StatdMySQLBackend(startupTime, config, events);
  195. return true;
  196. };
  197. /*
  198. * Backend example : repeater.js
  199. *
  200. var util = require('util'),
  201. dgram = require('dgram');
  202. function RepeaterBackend(startupTime, config, emitter){
  203. var self = this;
  204. this.config = config.repeater || [];
  205. this.sock = dgram.createSocket('udp6');
  206. // attach
  207. emitter.on('packet', function(packet, rinfo) { self.process(packet, rinfo); });
  208. };
  209. RepeaterBackend.prototype.process = function(packet, rinfo) {
  210. var self = this;
  211. hosts = self.config;
  212. for(var i=0; i<hosts.length; i++) {
  213. self.sock.send(packet,0,packet.length,hosts[i].port,hosts[i].host,
  214. function(err,bytes) {
  215. if (err) {
  216. console.log(err);
  217. }
  218. });
  219. }
  220. };
  221. exports.init = function(startupTime, config, events) {
  222. var instance = new RepeaterBackend(startupTime, config, events);
  223. return true;
  224. };
  225. */