mongodb.js 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. 'use strict';
  2. var mongo = require('mongodb'),
  3. async = require('async'),
  4. util = require('util'),
  5. dbs = {},
  6. options = {
  7. debug: false,
  8. prefix: true,
  9. size: 100,
  10. max: 2610,
  11. name: 'statsd',
  12. host: '127.0.0.1',
  13. port: 27017
  14. };
  15. /**
  16. * Prefix the db correctly
  17. */
  18. var dbPrefix = function(metric) {
  19. var rtr = options.prefix ? metric.split('.')[0] : options.name;
  20. return rtr;
  21. };
  22. /**
  23. * Prefix a collection name
  24. */
  25. var colPrefix = function(metric_type, metric) {
  26. var ary = metric.split('.');
  27. if (options.prefix) ary.shift();
  28. ary.unshift(metric_type);
  29. return ary.join('_')+'_'+options.rate;
  30. };
  31. /**
  32. * Aggregate the metrics
  33. */
  34. var aggregate = {
  35. /**
  36. * Aggregate some metrics bro
  37. * @param {Number} time
  38. * @param {Stirng} key
  39. * @param {String} val
  40. */
  41. gauges: function(time, key, val) {
  42. return {
  43. db: dbPrefix(key),
  44. col: colPrefix('gauges', key),
  45. data: {
  46. time: time,
  47. gauge: val
  48. },
  49. };
  50. },
  51. /**
  52. * Aggregate some timer_data bro
  53. * @param {Number} time
  54. * @param {Stirng} key
  55. * @param {String} vals
  56. */
  57. timer_data: function(time, key, val) {
  58. val.time = time;
  59. return {
  60. db: dbPrefix(key),
  61. col: colPrefix('timers', key),
  62. data: val
  63. };
  64. },
  65. /**
  66. * Aggregate some timers bro
  67. * @param {Number} time
  68. * @param {Stirng} key
  69. * @param {String} vals
  70. */
  71. timers: function(time, key, val) {
  72. return {
  73. db: dbPrefix(key),
  74. col: colPrefix('timers', key),
  75. data: {
  76. time: time,
  77. durations: val
  78. },
  79. };
  80. },
  81. /**
  82. * Aggregate some counters bro
  83. * @param {Number} time
  84. * @param {Stirng} key
  85. * @param {String} val
  86. */
  87. counters: function(time, key, val) {
  88. return {
  89. db: dbPrefix(key),
  90. col: colPrefix('counters', key),
  91. data: {
  92. time: time,
  93. count: val
  94. },
  95. };
  96. },
  97. /**
  98. * Aggregate some sets bro
  99. * @param {Number} time
  100. * @param {Stirng} key
  101. * @param {String} val
  102. */
  103. sets: function(time, key, val) {
  104. return {
  105. db: dbPrefix(key),
  106. col: colPrefix('sets', key),
  107. data: {
  108. time: time,
  109. set: val
  110. },
  111. };
  112. }
  113. };
  114. /**
  115. * Insert the data to the database
  116. * @method insert
  117. * @param {String} database
  118. * @param {String} collection
  119. * @param {Object} metric
  120. * @param {Function} callback
  121. */
  122. function insert(dbName, collection, metric){
  123. mongo.connect("mongodb://" + options.host + "/" + options.name, function(err, db) {
  124. if(options.debug) console.log("Connected successfully to server");
  125. var colInfo = {capped:true, size:options.size*options.max, max:options.max};
  126. db.createCollection(collection, colInfo,
  127. function(err, coll) {
  128. if (options.debug) console.log("Collection created " + collection + ".");
  129. var col = db.collection(collection);
  130. col.insert(metric, function(err, result) {
  131. if (err) {
  132. console.log("Error occurred in inserting a document", err);
  133. } else {
  134. if(options.debug) console.log("Inserted a document in the collection. ");
  135. }
  136. db.close();
  137. });
  138. }
  139. );
  140. });
  141. }
  142. /**
  143. * our `flush` event handler
  144. */
  145. var onFlush = function(time, metrics) {
  146. var metricTypes = ['gauges', 'timer_data', 'timers', 'counters', 'sets'];
  147. metricTypes.forEach(function(type, i){
  148. var obj;
  149. for (var key in metrics[type]) {
  150. obj = aggregate[type](time, key, metrics[type][key]);
  151. insert(obj.db, obj.col, obj.data);
  152. };
  153. });
  154. return ;
  155. };
  156. /**
  157. * Expose our init function to StatsD
  158. * @param {Number} startup_time
  159. * @param {Object} config
  160. * @param {Object} events
  161. */
  162. exports.init = function(startup_time, config, events) {
  163. if (!startup_time || !config || !events) return false;
  164. options.debug = config.debug;
  165. if (typeof config.mongoPrefix == 'boolean' && typeof config.mongoName !== 'string') {
  166. console.log('config.mongoPrefix is false, config.mongoName must be set.');
  167. return false;
  168. };
  169. options.rate = parseInt(config.flushInterval/1000, 10);
  170. options.max = config.mongoMax ? parseInt(config.mongoMax, 10) : 2160;
  171. options.host = config.mongoHost || '127.0.0.1';
  172. options.prefix = typeof config.mongoPrefix == 'boolean' ? config.mongoPrefix : true;
  173. options.name = config.mongoName;
  174. options.port = config.mongoPort || options.port;
  175. events.on('flush', onFlush);
  176. return true;
  177. };