123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190 |
- 'use strict';
- var mongo = require('mongodb'),
- async = require('async'),
- util = require('util'),
- dbs = {},
- options = {
- debug: false,
- prefix: true,
- size: 100,
- max: 2610,
- name: 'statsd',
- host: '127.0.0.1',
- port: 27017
- };
- /**
- * Prefix the db correctly
- */
- var dbPrefix = function(metric) {
- var rtr = options.prefix ? metric.split('.')[0] : options.name;
- return rtr;
- };
- /**
- * Prefix a collection name
- */
- var colPrefix = function(metric_type, metric) {
- var ary = metric.split('.');
- if (options.prefix) ary.shift();
- ary.unshift(metric_type);
- return ary.join('_')+'_'+options.rate;
- };
- /**
- * Aggregate the metrics
- */
- var aggregate = {
- /**
- * Aggregate some metrics bro
- * @param {Number} time
- * @param {Stirng} key
- * @param {String} val
- */
- gauges: function(time, key, val) {
- return {
- db: dbPrefix(key),
- col: colPrefix('gauges', key),
- data: {
- time: time,
- gauge: val
- },
- };
- },
- /**
- * Aggregate some timer_data bro
- * @param {Number} time
- * @param {Stirng} key
- * @param {String} vals
- */
- timer_data: function(time, key, val) {
- val.time = time;
- return {
- db: dbPrefix(key),
- col: colPrefix('timers', key),
- data: val
- };
- },
- /**
- * Aggregate some timers bro
- * @param {Number} time
- * @param {Stirng} key
- * @param {String} vals
- */
- timers: function(time, key, val) {
- return {
- db: dbPrefix(key),
- col: colPrefix('timers', key),
- data: {
- time: time,
- durations: val
- },
- };
- },
- /**
- * Aggregate some counters bro
- * @param {Number} time
- * @param {Stirng} key
- * @param {String} val
- */
- counters: function(time, key, val) {
- return {
- db: dbPrefix(key),
- col: colPrefix('counters', key),
- data: {
- time: time,
- count: val
- },
- };
- },
- /**
- * Aggregate some sets bro
- * @param {Number} time
- * @param {Stirng} key
- * @param {String} val
- */
- sets: function(time, key, val) {
- return {
- db: dbPrefix(key),
- col: colPrefix('sets', key),
- data: {
- time: time,
- set: val
- },
- };
- }
- };
- /**
- * Insert the data to the database
- * @method insert
- * @param {String} database
- * @param {String} collection
- * @param {Object} metric
- * @param {Function} callback
- */
- function insert(dbName, collection, metric){
- mongo.connect("mongodb://" + options.host + "/" + options.name, function(err, db) {
- if(options.debug) console.log("Connected successfully to server");
- var colInfo = {capped:true, size:options.size*options.max, max:options.max};
- db.createCollection(collection, colInfo,
- function(err, coll) {
- if (options.debug) console.log("Collection created " + collection + ".");
- var col = db.collection(collection);
- col.insert(metric, function(err, result) {
- if (err) {
- console.log("Error occurred in inserting a document", err);
- } else {
- if(options.debug) console.log("Inserted a document in the collection. ");
- }
- db.close();
- });
- }
- );
- });
- }
- /**
- * our `flush` event handler
- */
- var onFlush = function(time, metrics) {
- var metricTypes = ['gauges', 'timer_data', 'timers', 'counters', 'sets'];
- metricTypes.forEach(function(type, i){
- var obj;
- for (var key in metrics[type]) {
- obj = aggregate[type](time, key, metrics[type][key]);
- insert(obj.db, obj.col, obj.data);
- };
- });
- return ;
- };
- /**
- * Expose our init function to StatsD
- * @param {Number} startup_time
- * @param {Object} config
- * @param {Object} events
- */
- exports.init = function(startup_time, config, events) {
- if (!startup_time || !config || !events) return false;
- options.debug = config.debug;
- if (typeof config.mongoPrefix == 'boolean' && typeof config.mongoName !== 'string') {
- console.log('config.mongoPrefix is false, config.mongoName must be set.');
- return false;
- };
- options.rate = parseInt(config.flushInterval/1000, 10);
- options.max = config.mongoMax ? parseInt(config.mongoMax, 10) : 2160;
- options.host = config.mongoHost || '127.0.0.1';
- options.prefix = typeof config.mongoPrefix == 'boolean' ? config.mongoPrefix : true;
- options.name = config.mongoName;
- options.port = config.mongoPort || options.port;
- events.on('flush', onFlush);
- return true;
- };
|