Nicolas FRADIN 12 лет назад
Родитель
Сommit
516481296c
1 измененных файлов с 148 добавлено и 36 удалено
  1. 148 36
      mysql-backend.js

+ 148 - 36
mysql-backend.js

@@ -10,6 +10,9 @@
 var _mysql = require('mysql'),
     util = require('util');
 
+var STATSD_PACKETS_RECEIVED = "statsd.packets_received";
+var STATSD_BAD_LINES = "statsd.bad_lines_seen";
+
 
 /**
  * Backend Constructor
@@ -21,6 +24,7 @@ var _mysql = require('mysql'),
 function StatdMySQLBackend(startupTime, config, emitter) {
   var self = this;
   this.config = config.mysql || {};
+
   // Verifying that the config file contains enough information for this backend to work  
   if(!this.config.host || !this.config.database || !this.config.user) {
     console.log("You need to specify at least host, port, database and user for this mysql backend");
@@ -31,13 +35,55 @@ function StatdMySQLBackend(startupTime, config, emitter) {
   if(!this.config.port) {
     this.config.port = 3306;
   }
+
   // Attach events
   emitter.on('flush', function(time_stamp, metrics) { self.onFlush(time_stamp, metrics); } );
   emitter.on('status', self.onStatus );
 }
 
 
+
 /**
+ * Open MySQL connection
+ *
+ * @return boolean Indicates if connection succeed
+ */
+StatdMySQLBackend.prototype.openMySqlConnection = function() {
+  var self = this;
+  var canExecuteQuerries = true;
+
+  // Create MySQL connection
+  self.sqlConnection = _mysql.createConnection(this.config);
+  self.sqlConnection.connect(function(error){
+    canExecuteQuerries = false;
+  });
+
+  return canExecuteQuerries;
+}
+
+
+/**
+ * Close MySQL connection
+ *
+ */
+StatdMySQLBackend.prototype.closeMySqlConnection = function() {
+  var self = this;
+  self.sqlConnection.end(function(error) {
+    if(error){
+      console.log("There was an error while trying to close DB connection : " + util.inspect(error));
+      //Let's make sure that socket is destroyed
+      self.sqlConnection.destroy();
+    }
+  });
+
+  return;
+}
+
+
+
+
+/**
+ * Method executed when statsd flush received datas
  *
  * @param time_stamp
  * @param metrics
@@ -56,55 +102,121 @@ StatdMySQLBackend.prototype.onFlush = function(time_stamp, metrics) {
 }
 
 
+/**
+ * Handle and process received counters 
+ * 
+ * @param _counters received counters
+ * @param time_stamp flush time_stamp 
+ */
 StatdMySQLBackend.prototype.handleCounters = function(_counters, time_stamp) {
+  
   var self = this;
-  var querries = [];
-  var value = 0;
-  for(var counter in _counters) {
-    value = _counters[counter];
-    if(value === 0) {
-      continue;
+
+  var packets_received = parseInt(_counters[STATSD_PACKETS_RECEIVED]);
+  var bad_lines_seen = parseInt(_counters[STATSD_BAD_LINES]);
+
+  if(packets_received > 0) {
+    // Get userCounters for this flush
+    var userCounters = self.getUserCounters(_counters);
+    var querries = [];
+
+    console.log("Preaparing querries...");
+
+    // Iterate on each userCounter
+    for(var userCounterName in userCounters) {
+      var counterValue = userCounters[userCounterName];
+      if(counterValue === 0) {
+        continue;
+      } else {
+        querries.push("insert into `statistics` (`timestamp`,`name`,`value`) values(" + time_stamp + ",'" + escape(userCounterName) +"'," + counterValue + ") on duplicate key update value = value + " + counterValue + ", timestamp = " + time_stamp);
+      }
     }
-    else {
-      querries.push("insert into `statistics` (`timestamp`,`name`,`value`) values(" + time_stamp + ",'" + counter +"'," + value + ") on duplicate key update value = value + " + value + ", timestamp = " + time_stamp);
+
+    var querriesCount = querries.length;
+    console.log("Querries count : " + querriesCount );
+
+    // If at least one querry can be executed
+    if(querriesCount > 0) {
+
+      // Open MySQL connection
+      var canExecuteQuerries = self.openMySqlConnection();
+
+      // If connection succeed
+      if(canExecuteQuerries) {
+        console.log("Executing " + querriesCount + " querries...");
+        // Execute querries
+        self.executeQuerries(querries);
+
+        // Close MySQL connection
+        self.closeMySqlConnection();
+      }
+
     }
+
+  } else {
+    console.log("No user packets received.");
   }
-  self.executeQuerries(querries);
+
+  return;
+
 }
+  
 
 
 StatdMySQLBackend.prototype.executeQuerries = function(sqlQuerries) {
-
-  // Let's create a connection to the DB server
-  var connection = _mysql.createConnection(this.config);
   
-  connection.connect(function(err){
-    if(err){
-      console.log("There was an error while trying to connect to DB, please check");
-    }
-    else {
-      for(var i = 0 ; i < sqlQuerries.length ; i++){
-        console.log("trying to execute : " + sqlQuerries[i]);
-        connection.query(sqlQuerries[i], function(err, rows) {
-          if(!err) {
-            console.log("Query succesfully executed");
-          }
-          else {
-            //TODO : add better error handling code
-            console.log("Error while executing sql query"); 
-          }
-          connection.end(function(err) {
-            if(err){
-              console.log("There was an error while trying to close DB connection");
-              //Let's make sure that socket is destroyed
-              connection.destroy();
-            }
-          });
-        });  
+  var self = this;
+
+  for(var i = 0 ; i < sqlQuerries.length ; i++){
+    console.log("Query " + (i+1) + " : " + sqlQuerries[i]);
+    self.sqlConnection.query(sqlQuerries[i], function(err, rows) {
+      if(!err) {
+        console.log("Query " + (i+1) + " [SUCCESS]");
+      }
+      else {
+        //TODO : add better error handling code
+        console.log("Query " + (i+1) + " [ERROR]"); 
       }
+    });  
+  }
+
+}
+
+
+
+/**
+ *
+ *
+ */
+StatdMySQLBackend.prototype.getUserCounters = function(_counters) {
+  var userCounters = {};
+  for(var counterName in _counters) {
+    var counterNameParts = counterName.split('.');
+    if(counterNameParts[0] !== "statsd") {
+      userCounters[counterName] = _counters[counterName];
     }
-  });
+  }
+  return userCounters;
+}
+
+
+
+/**
+ *
+ *
+ */
+StatdMySQLBackend.prototype.getStatsdCounters = function(_counters) {
+  var statsdCounters = {};
+  for(var counterName in _counters) {
+    var counterNameParts = counterName.split('.');
+    if(counterNameParts[0] === "statsd") {
+      statsdCounters[counterName] = _counters[counterName];
+    }
+  }
+  return statsdCounters;
 }
+
+
 /**
  *
  * @param error