'use strict'; var mongo = require('mongodb'), async = require('async'), util = require('util'), dbs = {}, options = { debug: false, prefix: true, size: 100, max: 2610, name: 'statsd', host: '127.0.0.1', port: 27017 }; /** * Prefix the db correctly */ var dbPrefix = function(metric) { var rtr = options.prefix ? metric.split('.')[0] : options.name; return rtr; }; /** * Prefix a collection name */ var colPrefix = function(metric_type, metric) { var ary = metric.split('.'); if (options.prefix) ary.shift(); ary.unshift(metric_type); return ary.join('_')+'_'+options.rate; }; /** * Aggregate the metrics */ var aggregate = { /** * Aggregate some metrics bro * @param {Number} time * @param {Stirng} key * @param {String} val */ gauges: function(time, key, val) { return { db: dbPrefix(key), col: key, data: { time: time, gauge: val, type: 'gauges' }, }; }, /** * Aggregate some timer_data bro * @param {Number} time * @param {Stirng} key * @param {String} vals */ timer_data: function(time, key, val) { val.time = time; return { db: dbPrefix(key), col: colPrefix('timers', key), data: { time: time, durations: val, type: 'timers' } }; }, /** * Aggregate some timers bro * @param {Number} time * @param {Stirng} key * @param {String} vals */ timers: function(time, key, val) { return { db: dbPrefix(key), col: key, data: { time: time, durations: val, type: 'timers' }, }; }, /** * Aggregate some counters bro * @param {Number} time * @param {Stirng} key * @param {String} val */ counters: function(time, key, val) { return { db: dbPrefix(key), col: key, data: { time: time, count: val, type: 'counters' }, }; }, /** * Aggregate some sets bro * @param {Number} time * @param {Stirng} key * @param {String} val */ sets: function(time, key, val) { return { db: dbPrefix(key), col: key, data: { time: time, set: val, type: 'sets' }, }; } }; /** * Insert the data to the database * @method insert * @param {String} database * @param {String} collection * @param {Object} metric * @param {Function} callback */ function insert(dbName, collection, metric){ mongo.connect("mongodb://" + options.host + "/" + options.name, function(err, db) { if(options.debug) console.log("Connected successfully to server"); var colInfo = {capped:true, size:options.size*options.max, max:options.max}; db.createCollection(collection, colInfo, function(err, coll) { if (options.debug) console.log("Collection created " + collection + "."); var col = db.collection(collection); col.insert(metric, function(err, result) { if (err) { console.log("Error occurred in inserting a document", err); } else { if(options.debug) console.log("Inserted a document in the collection. "); } db.close(); }); } ); }); } /** * our `flush` event handler */ var onFlush = function(time, metrics) { var metricTypes = ['gauges', 'timer_data', 'timers', 'counters', 'sets']; metricTypes.forEach(function(type, i){ var obj; for (var key in metrics[type]) { obj = aggregate[type](time, key, metrics[type][key]); insert(obj.db, obj.col, obj.data); }; }); return ; }; /** * Expose our init function to StatsD * @param {Number} startup_time * @param {Object} config * @param {Object} events */ exports.init = function(startup_time, config, events) { if (!startup_time || !config || !events) return false; options.debug = config.debug; if (typeof config.mongoPrefix == 'boolean' && typeof config.mongoName !== 'string') { console.log('config.mongoPrefix is false, config.mongoName must be set.'); return false; }; options.rate = parseInt(config.flushInterval/1000, 10); options.max = config.mongoMax ? parseInt(config.mongoMax, 10) : 2160; options.host = config.mongoHost || '127.0.0.1'; options.prefix = typeof config.mongoPrefix == 'boolean' ? config.mongoPrefix : true; options.name = config.mongoName; options.port = config.mongoPort || options.port; events.on('flush', onFlush); return true; };