mysql-backend.js 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273
  1. ///////////////////////////////////////////////////////////////////////////////////
  2. // NodeJS Statsd MySQL Backend 1.0
  3. // ------------------------------------------------------------------------------
  4. //
  5. // Authors: Nicolas FRADIN, Damien PACAUD
  6. // Date: 31/10/2012
  7. //
  8. ///////////////////////////////////////////////////////////////////////////////////
  9. var _mysql = require('mysql'),
  10. util = require('util');
  11. var STATSD_PACKETS_RECEIVED = "statsd.packets_received";
  12. var STATSD_BAD_LINES = "statsd.bad_lines_seen";
  13. /**
  14. * Backend Constructor
  15. *
  16. * @param startupTime
  17. * @param config
  18. * @param emmiter
  19. */
  20. function StatdMySQLBackend(startupTime, config, emitter) {
  21. var self = this;
  22. this.config = config.mysql || {};
  23. // Verifying that the config file contains enough information for this backend to work
  24. if(!this.config.host || !this.config.database || !this.config.user) {
  25. console.log("You need to specify at least host, port, database and user for this mysql backend");
  26. process.exit(-1);
  27. }
  28. // Default port for mysql is 3306, if unset in conf file, we set it here to default
  29. if(!this.config.port) {
  30. this.config.port = 3306;
  31. }
  32. // Attach events
  33. emitter.on('flush', function(time_stamp, metrics) { self.onFlush(time_stamp, metrics); } );
  34. emitter.on('status', self.onStatus );
  35. }
  36. /**
  37. * Open MySQL connection
  38. *
  39. * @return boolean Indicates if connection succeed
  40. */
  41. StatdMySQLBackend.prototype.openMySqlConnection = function() {
  42. var self = this;
  43. var canExecuteQuerries = true;
  44. // Create MySQL connection
  45. self.sqlConnection = _mysql.createConnection(this.config);
  46. self.sqlConnection.connect(function(error){
  47. canExecuteQuerries = false;
  48. });
  49. return canExecuteQuerries;
  50. }
  51. /**
  52. * Close MySQL connection
  53. *
  54. */
  55. StatdMySQLBackend.prototype.closeMySqlConnection = function() {
  56. var self = this;
  57. self.sqlConnection.end(function(error) {
  58. if(error){
  59. console.log("There was an error while trying to close DB connection : " + util.inspect(error));
  60. //Let's make sure that socket is destroyed
  61. self.sqlConnection.destroy();
  62. }
  63. });
  64. return;
  65. }
  66. /**
  67. * Method executed when statsd flush received datas
  68. *
  69. * @param time_stamp
  70. * @param metrics
  71. */
  72. StatdMySQLBackend.prototype.onFlush = function(time_stamp, metrics) {
  73. var self = this;
  74. var counters = metrics['counters'];
  75. var timers = metrics['timers'];
  76. var gauges = metrics['gauges'];
  77. var sets = metrics['sets'];
  78. var pctThreshold = metrics['pctThreshold'];
  79. self.handleCounters(counters,time_stamp);
  80. }
  81. /**
  82. * Handle and process received counters
  83. *
  84. * @param _counters received counters
  85. * @param time_stamp flush time_stamp
  86. */
  87. StatdMySQLBackend.prototype.handleCounters = function(_counters, time_stamp) {
  88. var self = this;
  89. var packets_received = parseInt(_counters[STATSD_PACKETS_RECEIVED]);
  90. var bad_lines_seen = parseInt(_counters[STATSD_BAD_LINES]);
  91. if(packets_received > 0) {
  92. // Get userCounters for this flush
  93. var userCounters = self.getUserCounters(_counters);
  94. var querries = [];
  95. console.log("Preaparing querries...");
  96. // Iterate on each userCounter
  97. for(var userCounterName in userCounters) {
  98. var counterValue = userCounters[userCounterName];
  99. if(counterValue === 0) {
  100. continue;
  101. } else {
  102. querries.push("insert into `statistics` (`timestamp`,`name`,`value`) values(" + time_stamp + ",'" + escape(userCounterName) +"'," + counterValue + ") on duplicate key update value = value + " + counterValue + ", timestamp = " + time_stamp);
  103. }
  104. }
  105. var querriesCount = querries.length;
  106. console.log("Querries count : " + querriesCount );
  107. // If at least one querry can be executed
  108. if(querriesCount > 0) {
  109. // Open MySQL connection
  110. var canExecuteQuerries = self.openMySqlConnection();
  111. // If connection succeed
  112. if(canExecuteQuerries) {
  113. console.log("Executing " + querriesCount + " querries...");
  114. // Execute querries
  115. self.executeQuerries(querries);
  116. // Close MySQL connection
  117. self.closeMySqlConnection();
  118. }
  119. }
  120. } else {
  121. console.log("No user packets received.");
  122. }
  123. return;
  124. }
  125. StatdMySQLBackend.prototype.executeQuerries = function(sqlQuerries) {
  126. var self = this;
  127. for(var i = 0 ; i < sqlQuerries.length ; i++){
  128. console.log("Query " + i + " : " + sqlQuerries[i]);
  129. self.sqlConnection.query(sqlQuerries[i], function(err, rows) {
  130. if(!err) {
  131. console.log("Query " + i + " [SUCCESS]");
  132. }
  133. else {
  134. //TODO : add better error handling code
  135. console.log("Query " + i + " [ERROR]");
  136. }
  137. });
  138. }
  139. }
  140. /**
  141. *
  142. *
  143. */
  144. StatdMySQLBackend.prototype.getUserCounters = function(_counters) {
  145. var userCounters = {};
  146. for(var counterName in _counters) {
  147. var counterNameParts = counterName.split('.');
  148. if(counterNameParts[0] !== "statsd") {
  149. userCounters[counterName] = _counters[counterName];
  150. }
  151. }
  152. return userCounters;
  153. }
  154. /**
  155. *
  156. *
  157. */
  158. StatdMySQLBackend.prototype.getStatsdCounters = function(_counters) {
  159. var statsdCounters = {};
  160. for(var counterName in _counters) {
  161. var counterNameParts = counterName.split('.');
  162. if(counterNameParts[0] === "statsd") {
  163. statsdCounters[counterName] = _counters[counterName];
  164. }
  165. }
  166. return statsdCounters;
  167. }
  168. /**
  169. *
  170. * @param error
  171. * @param backend_name
  172. * @param stat_name
  173. * @param stat_value
  174. */
  175. StatdMySQLBackend.prototype.onStatus = function(error, backend_name, stat_name, stat_value) {
  176. }
  177. exports.init = function(startupTime, config, events) {
  178. var instance = new StatdMySQLBackend(startupTime, config, events);
  179. return true;
  180. };
  181. /*
  182. * Backend example : repeater.js
  183. *
  184. var util = require('util'),
  185. dgram = require('dgram');
  186. function RepeaterBackend(startupTime, config, emitter){
  187. var self = this;
  188. this.config = config.repeater || [];
  189. this.sock = dgram.createSocket('udp6');
  190. // attach
  191. emitter.on('packet', function(packet, rinfo) { self.process(packet, rinfo); });
  192. };
  193. RepeaterBackend.prototype.process = function(packet, rinfo) {
  194. var self = this;
  195. hosts = self.config;
  196. for(var i=0; i<hosts.length; i++) {
  197. self.sock.send(packet,0,packet.length,hosts[i].port,hosts[i].host,
  198. function(err,bytes) {
  199. if (err) {
  200. console.log(err);
  201. }
  202. });
  203. }
  204. };
  205. exports.init = function(startupTime, config, events) {
  206. var instance = new RepeaterBackend(startupTime, config, events);
  207. return true;
  208. };
  209. */