Forráskód Böngészése

Added Tasks functionalities. Waiting for callbacks...

Marc 13 éve
szülő
commit
0306fa7b9a
4 módosított fájl, 303 hozzáadás és 53 törlés
  1. 1 0
      .gitignore
  2. 0 2
      Project/@todo
  3. 7 3
      Project/Reportings/codesniffer.log
  4. 295 48
      Service/GearmanClient.php

+ 1 - 0
.gitignore

@@ -1,3 +1,4 @@
 /.settings/
 .buildpath
 .project
+Resources/Sandbox/

+ 0 - 2
Project/@todo

@@ -1,8 +1,6 @@
 On this realase
 
 * Testings.
-* Worker execution with all jobs inside.
-* Description of job and worker with supervisord information and more.
 * Check documentation.
 
 

+ 7 - 3
Project/Reportings/codesniffer.log

@@ -1,4 +1,4 @@
-Time: 0 seconds, Memory: 4.75Mb
+Time: 1 second, Memory: 4.75Mb
 
 Time: 0 seconds, Memory: 4.00Mb
 
@@ -14,7 +14,7 @@ Time: 0 seconds, Memory: 5.00Mb
 
 Time: 0 seconds, Memory: 5.25Mb
 
-Time: 1 second, Memory: 6.50Mb
+Time: 1 second, Memory: 9.25Mb
 
 Time: 0 seconds, Memory: 5.75Mb
 
@@ -32,7 +32,7 @@ Time: 0 seconds, Memory: 6.25Mb
 
 Time: 0 seconds, Memory: 4.50Mb
 
-Time: 0 seconds, Memory: 4.50Mb
+Time: 1 second, Memory: 4.50Mb
 
 Time: 0 seconds, Memory: 4.75Mb
 
@@ -50,6 +50,8 @@ Time: 0 seconds, Memory: 4.50Mb
 
 Time: 0 seconds, Memory: 4.50Mb
 
+Time: 1 second, Memory: 12.00Mb
+
 Time: 0 seconds, Memory: 5.00Mb
 
 Time: 0 seconds, Memory: 5.00Mb
@@ -62,6 +64,8 @@ Time: 0 seconds, Memory: 5.00Mb
 
 Time: 0 seconds, Memory: 5.00Mb
 
+Time: 0 seconds, Memory: 4.75Mb
+
 Time: 0 seconds, Memory: 5.00Mb
 
 Time: 0 seconds, Memory: 4.75Mb

+ 295 - 48
Service/GearmanClient.php

@@ -14,6 +14,15 @@ use Mmoreramerino\GearmanBundle\Exceptions\NoCallableGearmanMethodException;
 class GearmanClient extends GearmanService
 {
 
+    /**
+     * Construct method.
+     * Performs all init actions, like initialize tasks structure
+     */
+    public function __construct()
+    {
+        $this->resetTaskStructure();
+    }
+
     /**
      * Server variable to define in what server must connect to
      *
@@ -58,6 +67,99 @@ class GearmanClient extends GearmanService
      }
 
 
+    /**
+     * Get real worker from job name and enqueues the action given one
+     *     method.
+     *
+     * @param string $jobName A GearmanBundle registered function the worker is to execute
+     * @param mixed  $params  Parameters to send to job
+     * @param string $method  Method to execute
+     * @param string $unique  A unique ID used to identify a particular task
+     *
+     * @return mixed Return result of the call
+     */
+    private function enqueue($jobName, $params, $method, $unique)
+    {
+        $worker = $this->getJob($jobName);
+        if (false !== $worker) {
+            return $this->doEnqueue($worker, $params, $method, $unique);
+        }
+
+        return false;
+    }
+
+    /**
+     * Execute a GearmanClient call given a worker, params and a method.
+     * If any method is given, it performs a "do" call
+     *
+     * If he GarmanClient call is asyncronous, result value will be a handler.
+     * Otherwise, will return job result.
+     *
+     * @param array  $worker Worker definition
+     * @param mixed  $params Parameters to send to job
+     * @param string $method Method to execute
+     * @param string $unique A unique ID used to identify a particular task
+     *
+     * @return mixed  Return result of the GearmanClient call
+     */
+    private function doEnqueue(Array $worker, $params = '', $method = 'do', $unique = null)
+    {
+        $gmclient = new \GearmanClient();
+        $this->assignServers($gmclient);
+
+        return $gmclient->$method($worker['job']['realCallableName'], serialize($params), $unique);
+    }
+
+    /**
+     * Set server of gearman
+     *
+     * @param type $servername Server name (must be ip)
+     * @param type $port       Port of server. By default 4730
+     *
+     * @return GearmanClient Returns self object
+     */
+    public function setServer($servername, $port = 4730)
+    {
+        $this->server = array($servername, $port);
+
+        return $this;
+    }
+
+    /**
+     * Given a GearmanClient, set all included servers
+     *
+     * @param GearmanClient $gearmanClient Object to include servers
+     *
+     * return true
+     */
+    private function assignServers(\GearmanClient $gearmanClient)
+    {
+        if (null === $this->server || !is_array($this->server)) {
+
+            $gearmanClient->addServer();
+        } else {
+
+            $gearmanClient->addServer($this->server[0], $this->server[1]);
+        }
+    }
+
+    /**
+     * Clear server slot
+     *
+     * @return GearmanClient Returns self object
+     */
+    public function clearServers()
+    {
+        $this->server = null;
+
+        return $this;
+    }
+
+
+    /**
+     * Job methods
+     */
+
     /**
      * Runs a single task and returns a string representation of the result.
      * It is up to the GearmanClient and GearmanWorker to agree on the format of the result.
@@ -65,13 +167,14 @@ class GearmanClient extends GearmanService
      *
      * @param string $name   A GearmanBundle registered function the worker is to execute
      * @param Mixed  $params Parameters to send to job
+     * @param string $unique A unique ID used to identify a particular task
      *
      * @return string A string representing the results of running a task.
      * @depracated
      */
-    public function doJob($name, $params = array())
+    public function doJob($name, $params = array(), $unique = null)
     {
-        return $this->enqueue($name, $params, 'do');
+        return $this->enqueue($name, $params, 'do', $unique);
     }
 
     /**
@@ -80,12 +183,13 @@ class GearmanClient extends GearmanService
      *
      * @param string $name   A GearmanBundle registered function the worker is to execute
      * @param Mixed  $params Parameters to send to job
+     * @param string $unique A unique ID used to identify a particular task
      *
      * @return string A string representing the results of running a task.
      */
-    public function doNormalJob($name, $params = array())
+    public function doNormalJob($name, $params = array(), $unique = null)
     {
-        return $this->enqueue($name, $params, 'doNormal');
+        return $this->enqueue($name, $params, 'doNormal', $unique);
     }
 
 
@@ -95,12 +199,13 @@ class GearmanClient extends GearmanService
      *
      * @param string $name   A GearmanBundle registered function the worker is to execute
      * @param Mixed  $params Parameters to send to job
+     * @param string $unique A unique ID used to identify a particular task
      *
      * @return string Job handle for the submitted task.
      */
-    public function doBackgroundJob($name, $params = array())
+    public function doBackgroundJob($name, $params = array(), $unique = null)
     {
-        return $this->enqueue($name, $params, 'doBackground');
+        return $this->enqueue($name, $params, 'doBackground', $unique);
     }
 
 
@@ -111,12 +216,13 @@ class GearmanClient extends GearmanService
      *
      * @param string $name   A GearmanBundle registered function the worker is to execute
      * @param Mixed  $params Parameters to send to job
+     * @param string $unique A unique ID used to identify a particular task
      *
      * @return string A string representing the results of running a task.
      */
-    public function doHighJob($name, $params = array())
+    public function doHighJob($name, $params = array(), $unique = null)
     {
-        return $this->enqueue($name, $params, 'doHigh');
+        return $this->enqueue($name, $params, 'doHigh', $unique);
     }
 
     /**
@@ -125,12 +231,13 @@ class GearmanClient extends GearmanService
      *
      * @param string $name   A GearmanBundle registered function the worker is to execute
      * @param Mixed  $params Parameters to send to job
+     * @param string $unique A unique ID used to identify a particular task
      *
      * @return string The job handle for the submitted task.
      */
-    public function doHighBackgroundJob($name, $params = array())
+    public function doHighBackgroundJob($name, $params = array(), $unique = null)
     {
-        return $this->enqueue($name, $params, 'doHighBackground');
+        return $this->enqueue($name, $params, 'doHighBackground', $unique);
     }
 
     /**
@@ -140,12 +247,13 @@ class GearmanClient extends GearmanService
      *
      * @param string $name   A GearmanBundle registered function the worker is to execute
      * @param Mixed  $params Parameters to send to job
+     * @param string $unique A unique ID used to identify a particular task
      *
      * @return string A string representing the results of running a task.
      */
-    public function doLowJob($name, $params = array())
+    public function doLowJob($name, $params = array(), $unique = null)
     {
-        return $this->enqueue($name, $params, 'doLow');
+        return $this->enqueue($name, $params, 'doLow', $unique);
     }
 
     /**
@@ -154,72 +262,211 @@ class GearmanClient extends GearmanService
      *
      * @param string $name   A GearmanBundle registered function the worker is to execute
      * @param Mixed  $params Parameters to send to job
+     * @param string $unique A unique ID used to identify a particular task
      *
      * @return string The job handle for the submitted task.
      */
-    public function doLowBackgroundJob($name, $params = array())
+    public function doLowBackgroundJob($name, $params = array(), $unique = null)
     {
-        return $this->enqueue($name, $params, 'doLowBackground');
+        return $this->enqueue($name, $params, 'doLowBackground', $unique);
     }
 
 
     /**
-     * Get real worker from job name and enqueues the action given one
-     *     method.
+     * Task methods
+     */
+
+    /**
+     * task structure to store all about called tasks
      *
-     * @param string $jobName A GearmanBundle registered function the worker is to execute
-     * @param mixed  $params  Parameters to send to job
-     * @param string $method  Method to execute
+     * @var $taskStructure
+     */
+    public $taskStructure = null;
+
+
+    /**
+     * Reset all tasks structure. Remove all set values
      *
-     * @return mixed Return result of the call
+     * @return true;
      */
-    private function enqueue($jobName, $params, $method)
+    public function resetTaskStructure()
     {
-        $worker = $this->getJob($jobName);
-        if (false !== $worker) {
-            return $this->doEnqueue($worker, $params, $method);
-        }
+        $this->taskStructure = array(
+            'tasks'             =>  array(),
+        );
 
-        return false;
+        return true;
     }
 
+
     /**
-     * Execute a GearmanClient call given a worker, params and a method.
-     * If any method is given, it performs a "do" call
+     * Adds a task to be run in parallel with other tasks.
+     * Call this method for all the tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
+     * Note that enough workers need to be available for the tasks to all run in parallel.
      *
-     * If he GarmanClient call is asyncronous, result value will be a handler.
-     * Otherwise, will return job result.
+     * @param string $name     A GermanBundle registered function to be executed
+     * @param Mixed  $params   Parameters to send to task
+     * @param Mixed  &$context Application context to associate with a task
+     * @param string $unique   A unique ID used to identify a particular task
      *
-     * @param array  $worker Worker definition
-     * @param mixed  $params Parameters to send to job
-     * @param string $method Method to execute
+     * @return GearmanClient Return this object
+     */
+    public function addTask($name, $params =array(), &$context = null, $unique = null)
+    {
+        $this->enqueueTask($name, $params, $context, $unique, 'addTask');
+
+        return $this;
+    }
+
+    /**
+     * Adds a high priority task to be run in parallel with other tasks.
+     * Call this method for all the high priority tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
+     * Tasks with a high priority will be selected from the queue before those of normal or low priority.
      *
-     * @return mixed  Return result of the GearmanClient call
+     * @param string $name     A GermanBundle registered function to be executed
+     * @param Mixed  $params   Parameters to send to task
+     * @param Mixed  &$context Application context to associate with a task
+     * @param string $unique   A unique ID used to identify a particular task
+     *
+     * @return GearmanClient Return this object
      */
-    private function doEnqueue(Array $worker, $params='', $method='do')
+    public function addTaskHigh($name, $params =array(), &$context = null, $unique = null)
     {
-        $gmclient= new \GearmanClient();
+        $this->enqueueTask($name, $params, $context, $unique, 'addTaskHigh');
 
-        if (null === $this->server || !is_array($this->server)) {
-            $gmclient->addServer();
-        } else {
-            $gmclient->addServer($this->server[0], $this->server[1]);
-        }
+        return $this;
+    }
 
-        return $gmclient->$method($worker['job']['realCallableName'], serialize($params));
+    /**
+     * Adds a low priority background task to be run in parallel with other tasks.
+     * Call this method for all the tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
+     * Tasks with a low priority will be selected from the queue after those of normal or low priority.
+     *
+     * @param string $name     A GermanBundle registered function to be executed
+     * @param Mixed  $params   Parameters to send to task
+     * @param Mixed  &$context Application context to associate with a task
+     * @param string $unique   A unique ID used to identify a particular task
+     *
+     * @return GearmanClient Return this object
+     */
+    public function addTaskLow($name, $params =array(), &$context = null, $unique = null)
+    {
+        $this->enqueueTask($name, $params, $context, $unique, 'addTaskLow');
+
+        return $this;
     }
 
     /**
-     * Set server of gearman
+     * Adds a background task to be run in parallel with other tasks
+     * Call this method for all the tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
      *
-     * @param type $servername Server name (must be ip)
-     * @param type $port       Port of server. By default 4730
+     * @param string $name     A GermanBundle registered function to be executed
+     * @param Mixed  $params   Parameters to send to task
+     * @param Mixed  &$context Application context to associate with a task
+     * @param string $unique   A unique ID used to identify a particular task
      *
-     * @return Gearman Returns self object
+     * @return GearmanClient Return this object
      */
-    public function setServer($servername, $port = 4730)
+    public function addTaskBackground($name, $params =array(), &$context = null, $unique = null)
     {
-        $this->server = array($servername, $port);
+        $this->enqueueTask($name, $params, $context, $unique, 'addTaskBackground');
+
+        return $this;
+    }
+
+    /**
+     * Adds a high priority background task to be run in parallel with other tasks.
+     * Call this method for all the tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
+     * Tasks with a high priority will be selected from the queue before those of normal or low priority.
+     *
+     * @param string $name     A GermanBundle registered function to be executed
+     * @param Mixed  $params   Parameters to send to task
+     * @param Mixed  &$context Application context to associate with a task
+     * @param string $unique   A unique ID used to identify a particular task
+     *
+     * @return GearmanClient Return this object
+     */
+    public function addTaskHighBackground($name, $params =array(), &$context = null, $unique = null)
+    {
+        $this->enqueueTask($name, $params, $context, $unique, 'addTaskHighBackground');
+
         return $this;
     }
+
+    /**
+     * Adds a low priority background task to be run in parallel with other tasks.
+     * Call this method for all the tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
+     * Tasks with a low priority will be selected from the queue after those of normal or high priority.
+     *
+     * @param string $name     A GermanBundle registered function to be executed
+     * @param Mixed  $params   Parameters to send to task
+     * @param Mixed  &$context Application context to associate with a task
+     * @param string $unique   A unique ID used to identify a particular task
+     *
+     * @return GearmanClient Return this object
+     */
+    public function addTaskLowBackground($name, $params =array(), &$context = null, $unique = null)
+    {
+        $this->enqueueTask($name, $params, $context, $unique, 'addTaskLowBackground');
+
+        return $this;
+    }
+
+
+    /**
+     * Adds a task into the structure of tasks with included type of call
+     *
+     * @param string $name    A GermanBundle registered function to be executed
+     * @param Mixed  $params  Parameters to send to task
+     * @param Mixed  $context Application context to associate with a task
+     * @param string $unique  A unique ID used to identify a particular task
+     * @param string $method  Method to perform
+     */
+    private function enqueueTask($name, $params, $context, $unique, $method)
+    {
+        $task = array(
+            'name'      =>  $name,
+            'params'    =>  $params,
+            'context'   =>  $context,
+            'unique'    =>  $unique,
+            'method'    =>  $method,
+            );
+        $this->addTaskToStructure($task);
+    }
+
+    /**
+     * Appends a task structure into taskStructure array
+     *
+     * @param array $task Task structure
+     */
+    private function addTaskToStructure(array $task)
+    {
+        $this->taskStructure['tasks'][] = $task;
+    }
+
+
+    /**
+     * For a set of tasks previously added with GearmanClient::addTask(), GearmanClient::addTaskHigh(),
+     * GearmanClient::addTaskLow(), GearmanClient::addTaskBackground(), GearmanClient::addTaskHighBackground(),
+     * or GearmanClient::addTaskLowBackground(), this call starts running the tasks in parallel.
+     * Note that enough workers need to be available for the tasks to all run in parallel
+     *
+     * @return true
+     */
+    public function runTasks()
+    {
+        $taskStructure = $this->taskStructure;
+        $gearmanClient = new \GearmanClient();
+        $this->assignServers($gearmanClient);
+
+        foreach ($taskStructure['tasks'] as $task) {
+            $type = $task['method'];
+            $jobName = $task['name'];
+            $worker = $this->getJob($jobName);
+            if (false !== $worker) {
+                $gearmanClient->$type($worker['job']['realCallableName'], serialize($type['params']), $type['context'], $type['unique']);
+            }
+        }
+        return $gearmanClient->runTasks();
+    }
 }