mongodb.js 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. 'use strict';
  2. var mongo = require('mongodb'),
  3. async = require('async'),
  4. util = require('util'),
  5. dbs = {},
  6. options = {
  7. debug: false,
  8. prefix: true,
  9. size: 100,
  10. max: 2610,
  11. name: 'statsd',
  12. host: '127.0.0.1',
  13. port: 27017
  14. };
  15. /**
  16. * Prefix the db correctly
  17. */
  18. var dbPrefix = function(metric) {
  19. var rtr = options.prefix ? metric.split('.')[0] : options.name;
  20. return rtr;
  21. };
  22. /**
  23. * Prefix a collection name
  24. */
  25. var colPrefix = function(metric_type, metric) {
  26. var ary = metric.split('.');
  27. if (options.prefix) ary.shift();
  28. ary.unshift(metric_type);
  29. return ary.join('_')+'_'+options.rate;
  30. };
  31. /**
  32. * Aggregate the metrics
  33. */
  34. var aggregate = {
  35. /**
  36. * Aggregate some metrics bro
  37. * @param {Number} time
  38. * @param {Stirng} key
  39. * @param {String} val
  40. */
  41. gauges: function(time, key, val) {
  42. return {
  43. db: dbPrefix(key),
  44. col: key,
  45. data: {
  46. time: time,
  47. gauge: val,
  48. type: 'gauges'
  49. },
  50. };
  51. },
  52. /**
  53. * Aggregate some timer_data bro
  54. * @param {Number} time
  55. * @param {Stirng} key
  56. * @param {String} vals
  57. */
  58. timer_data: function(time, key, val) {
  59. val.time = time;
  60. return {
  61. db: dbPrefix(key),
  62. col: colPrefix('timers', key),
  63. data: {
  64. time: time,
  65. durations: val,
  66. type: 'timers'
  67. }
  68. };
  69. },
  70. /**
  71. * Aggregate some timers bro
  72. * @param {Number} time
  73. * @param {Stirng} key
  74. * @param {String} vals
  75. */
  76. timers: function(time, key, val) {
  77. return {
  78. db: dbPrefix(key),
  79. col: key,
  80. data: {
  81. time: time,
  82. durations: val,
  83. type: 'timers'
  84. },
  85. };
  86. },
  87. /**
  88. * Aggregate some counters bro
  89. * @param {Number} time
  90. * @param {Stirng} key
  91. * @param {String} val
  92. */
  93. counters: function(time, key, val) {
  94. return {
  95. db: dbPrefix(key),
  96. col: key,
  97. data: {
  98. time: time,
  99. count: val,
  100. type: 'counters'
  101. },
  102. };
  103. },
  104. /**
  105. * Aggregate some sets bro
  106. * @param {Number} time
  107. * @param {Stirng} key
  108. * @param {String} val
  109. */
  110. sets: function(time, key, val) {
  111. return {
  112. db: dbPrefix(key),
  113. col: key,
  114. data: {
  115. time: time,
  116. set: val,
  117. type: 'sets'
  118. },
  119. };
  120. }
  121. };
  122. /**
  123. * Insert the data to the database
  124. * @method insert
  125. * @param {String} database
  126. * @param {String} collection
  127. * @param {Object} metric
  128. * @param {Function} callback
  129. */
  130. function insert(dbName, collection, metric){
  131. mongo.connect("mongodb://" + options.host + "/" + options.name, function(err, db) {
  132. if(options.debug) console.log("Connected successfully to server");
  133. var colInfo = {};
  134. db.createCollection(collection, colInfo,
  135. function(err, coll) {
  136. if (options.debug) console.log("Collection created " + collection + ".");
  137. var col = db.collection(collection);
  138. col.insert(metric, function(err, result) {
  139. if (err) {
  140. console.log("Error occurred in inserting a document", err);
  141. } else {
  142. if(options.debug) console.log("Inserted a document in the collection. ");
  143. }
  144. db.close();
  145. });
  146. }
  147. );
  148. });
  149. }
  150. /**
  151. * our `flush` event handler
  152. */
  153. var onFlush = function(time, metrics) {
  154. var metricTypes = ['gauges', 'timer_data', 'timers', 'counters', 'sets'];
  155. metricTypes.forEach(function(type, i){
  156. var obj;
  157. for (var key in metrics[type]) {
  158. obj = aggregate[type](time, key, metrics[type][key]);
  159. insert(obj.db, obj.col, obj.data);
  160. };
  161. });
  162. return ;
  163. };
  164. /**
  165. * Expose our init function to StatsD
  166. * @param {Number} startup_time
  167. * @param {Object} config
  168. * @param {Object} events
  169. */
  170. exports.init = function(startup_time, config, events) {
  171. if (!startup_time || !config || !events) return false;
  172. options.debug = config.debug;
  173. if (typeof config.mongoPrefix == 'boolean' && typeof config.mongoName !== 'string') {
  174. console.log('config.mongoPrefix is false, config.mongoName must be set.');
  175. return false;
  176. };
  177. options.rate = parseInt(config.flushInterval/1000, 10);
  178. options.max = config.mongoMax ? parseInt(config.mongoMax, 10) : 2160;
  179. options.host = config.mongoHost || '127.0.0.1';
  180. options.prefix = typeof config.mongoPrefix == 'boolean' ? config.mongoPrefix : true;
  181. options.name = config.mongoName;
  182. options.port = config.mongoPort || options.port;
  183. events.on('flush', onFlush);
  184. return true;
  185. };