Procházet zdrojové kódy

Merge pull request #138 from lafourchette/feature/worker-management

New GEARMAN_WORK_STARTING event on new job
Marc Morera před 10 roky
rodič
revize
5eb1220fa8

+ 51 - 0
Event/GearmanWorkStartingEvent.php

@@ -0,0 +1,51 @@
+<?php
+
+/**
+ * Gearman Bundle for Symfony2
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ *
+ * Feel free to edit as you please, and have fun.
+ *
+ * @author Marc Morera <yuhu@mmoreram.com>
+ */
+
+namespace Mmoreram\GearmanBundle\Event;
+
+use Symfony\Component\EventDispatcher\Event;
+
+/**
+ * GearmanWorkStartingEvent
+ *
+ * @author David Moreau <dmoreau@lafourchette.com>
+ */
+class GearmanWorkStartingEvent extends Event
+{
+    /**
+     * @var array
+     *
+     * Gearman jobs running
+     */
+    protected $jobs;
+
+    /**
+     * Construct method
+     *
+     * @param array $jobs Jobs
+     */
+    public function __construct(array $jobs)
+    {
+        $this->jobs = $jobs;
+    }
+
+    /**
+     * Get Gearman Work subscribed jobs
+     *
+     * @return array Subscribed jobs
+     */
+    public function getJobs()
+    {
+        return $this->jobs;
+    }
+}

+ 9 - 0
GearmanEvents.php

@@ -115,4 +115,13 @@ class GearmanEvents
      * @var string
      */
     const GEARMAN_WORK_EXECUTED = 'gearman.work.executed';
+
+    /**
+     * Sets a function to be called when a worker is starting a job.
+     *
+     * This will be fired when the worker start another work cycle.
+     *
+     * @var string
+     */
+    const GEARMAN_WORK_STARTING = 'gearman.work.starting';
 }

+ 17 - 1
Resources/docs/kernel_events.rst

@@ -129,6 +129,22 @@ The second method will return `$context` that you could add in the `addTask()` m
             tags:
               - { name: kernel.event_listener, event: gearman.client.callback.workload, method: onWorkload }
 
+Starting Work Event
+~~~~~~~~~~~~~~~~~~
+
+This event receives as parameter an instanceof `Mmoreram\GearmanBundle\Event\GearmanWorkStartingEvent` with one method:
+`$event->getJobs()` returns the configuration of the jobs.
+
+This event is dispatched before a job starts.
+
+.. code-block:: yml
+
+    services:
+        my_event_listener:
+            class: AcmeBundle\EventListener\MyEventListener
+            tags:
+              - { name: kernel.event_listener, event: gearman.work.starting, method: onWorkStarting }
+
 Execute Work Event
 ~~~~~~~~~~~~~~~~~~
 
@@ -145,4 +161,4 @@ This event is dispatched after a job has been completed.  After this event is co
         my_event_listener:
             class: AcmeBundle\EventListener\MyEventListener
             tags:
-              - { name: kernel.event_listener, event: gearman.work.executed, method: onWorkExecuted }
+              - { name: kernel.event_listener, event: gearman.work.executed, method: onWorkExecuted }

+ 59 - 7
Service/GearmanExecute.php

@@ -21,6 +21,7 @@ use Symfony\Component\EventDispatcher\EventDispatcherInterface;
 
 use Mmoreram\GearmanBundle\Command\Util\GearmanOutputAwareInterface;
 use Mmoreram\GearmanBundle\Event\GearmanWorkExecutedEvent;
+use Mmoreram\GearmanBundle\Event\GearmanWorkStartingEvent;
 use Mmoreram\GearmanBundle\GearmanEvents;
 use Mmoreram\GearmanBundle\Service\Abstracts\AbstractGearmanService;
 use Mmoreram\GearmanBundle\Exceptions\ServerConnectionException;
@@ -130,14 +131,15 @@ class GearmanExecute extends AbstractGearmanService
      * Executes a job given a jobName and given settings and annotations of job
      *
      * @param string $jobName Name of job to be executed
+     * @param array $options Array of options passed to the callback
+     * @param GearmanWorker $gearmanWorker Worker instance to use
      */
-    public function executeJob($jobName, array $options = array())
+    public function executeJob($jobName, array $options = array(), \GearmanWorker $gearmanWorker = null)
     {
         $worker = $this->getJob($jobName);
 
         if (false !== $worker) {
-
-            $this->callJob($worker, $options);
+            $this->callJob($worker, $options, $gearmanWorker);
         }
     }
 
@@ -145,12 +147,15 @@ class GearmanExecute extends AbstractGearmanService
      * Given a worker, execute GearmanWorker function defined by job.
      *
      * @param array $worker Worker definition
-     *
+     * @param array $options Array of options passed to the callback
+     * @param GearmanWorker $gearmanWorker Worker instance to use
      * @return GearmanExecute self Object
      */
-    private function callJob(array $worker, array $options = array())
+    private function callJob(Array $worker, array $options = array(), \GearmanWorker $gearmanWorker = null)
     {
-        $gearmanWorker = new \GearmanWorker;
+        if(is_null($gearmanWorker)){
+            $gearmanWorker = new \GearmanWorker;
+        }
         $minimumExecutionTime = null;
 
         if (isset($worker['job'])) {
@@ -266,7 +271,15 @@ class GearmanExecute extends AbstractGearmanService
          */
         foreach ($jobs as $job) {
 
-            $gearmanWorker->addFunction($job['realCallableName'], array($objInstance, $job['methodName']));
+            $gearmanWorker->addFunction(
+                $job['realCallableName'],
+                array($this, 'handleJob'),
+                array(
+                    'job_object_instance' => $objInstance,
+                    'job_method' => $job['methodName'],
+                    'jobs' => $jobs
+                )
+            );
         }
 
         /**
@@ -348,4 +361,43 @@ class GearmanExecute extends AbstractGearmanService
             $this->callJob($worker, $options);
         }
     }
+
+    /**
+     * Wrapper function handler for all registered functions
+     * This allows us to do some nice logging when jobs are started/finished
+     *
+     * @see https://github.com/brianlmoon/GearmanManager/blob/ffc828dac2547aff76cb4962bb3fcc4f454ec8a2/GearmanPeclManager.php#L95-206
+     *
+     * @param \GearmanJob $job
+     * @param mixed $context
+     *
+     * @return mixed
+     */
+    public function handleJob(\GearmanJob $job, $context)
+    {
+        if (
+            !is_array($context)
+            || !array_key_exists('job_object_instance', $context)
+            || !array_key_exists('job_method', $context)
+        ) {
+            throw new \InvalidArgumentException('$context shall be an array with job_object_instance and job_method key.');
+        }
+
+        $event = new GearmanWorkStartingEvent($context['jobs']);
+        $this->eventDispatcher->dispatch(GearmanEvents::GEARMAN_WORK_STARTING, $event);
+
+        $result = call_user_func_array(
+            array($context['job_object_instance'], $context['job_method']),
+            array($job, $context)
+        );
+
+        /**
+         * Workaround for PECL bug #17114
+         * http://pecl.php.net/bugs/bug.php?id=17114
+         */
+        $type = gettype($result);
+        settype($result, $type);
+
+        return $result;
+    }
 }

+ 83 - 0
Tests/Service/GearmanExecuteTest.php

@@ -13,7 +13,9 @@
 
 namespace Mmoreram\GearmanBundle\Tests\Service;
 
+use Mmoreram\GearmanBundle\Service\GearmanExecute;
 use Symfony\Bundle\FrameworkBundle\Test\WebTestCase;
+use Mmoreram\GearmanBundle\GearmanEvents;
 
 /**
  * Tests GearmanExecute class
@@ -36,4 +38,85 @@ class GearmanExecuteTest extends WebTestCase
                 ->get('gearman.execute')
         );
     }
+
+    public function testDispatchingEventsOnJob()
+    {
+        // Worker mock
+        $worker = $this->getMockBuilder('\GearmanWorker')
+            ->disableOriginalConstructor()
+            ->getMock();
+        $worker->method('addServer')->willReturn(true);
+
+        // Wrapper mock
+        $workers = array(
+            0 => array(
+                'className'    => "Mmoreram\\GearmanBundle\\Tests\\Service\\Mocks\\SingleCleanFile",
+                'fileName'     => dirname(__FILE__) . '/Mocks/SingleCleanFile.php',
+                'callableName' => null,
+                'description'  => "test",
+                'service'      => false,
+                'servers'      => array(),
+                'iterations'   => 1,
+                'timeout'      => null,
+                'minimumExecutionTime' => null,
+                'jobs' => array(
+                    0 => array(
+                        'callableName'             => "test",
+                        'methodName'               => "test",
+                        'realCallableName'         => "test",
+                        'jobPrefix'                => NULL,
+                        'realCallableNameNoPrefix' => "test",
+                        'description'              => "test",
+                        'iterations'               => 1,
+                        'servers'                  => array(),
+                        'defaultMethod'            => "doBackground",
+                        'minimumExecutionTime'     => null,
+                        'timeout'                  => null,
+                    )
+                )
+            )
+        );
+        $wrapper = $this->getMockBuilder('Mmoreram\GearmanBundle\Service\GearmanCacheWrapper')
+            ->disableOriginalConstructor()
+            ->getMock();
+        $wrapper->method('getWorkers')
+            ->willReturn($workers);
+
+        // Prepare a dispatcher to listen to tested events
+        $startingFlag = false;
+        $executedFlag = false;
+
+        $dispatcher = new \Symfony\Component\EventDispatcher\EventDispatcher();
+        $dispatcher->addListener(GearmanEvents::GEARMAN_WORK_STARTING, function() use (&$startingFlag){
+            $startingFlag = true;
+        });
+        $dispatcher->addListener(GearmanEvents::GEARMAN_WORK_EXECUTED, function() use (&$executedFlag){
+            $executedFlag = true;
+        });
+
+        // Create the service under test
+        $service = new GearmanExecute($wrapper, array());
+        $service->setEventDispatcher($dispatcher);
+
+        // We need a job object, this part could be improved
+        $object = new \Mmoreram\GearmanBundle\Tests\Service\Mocks\SingleCleanFile();
+
+        // Finalize worker mock by making it call our job object
+        // This is normally handled by Gearman, but for test purpose we must simulate it
+        $worker->method('work')->will($this->returnCallback(function() use ($service, $object){
+            $service->handleJob(new \GearmanJob(), array(
+                'job_object_instance' => $object,
+                'job_method' => 'myMethod',
+                'jobs' => array()
+            ));
+            return true;
+        }));
+
+        // Execute a job :)
+        $service->executeJob('test', array(), $worker);
+
+        // Do we have the events ?
+        $this->assertTrue($startingFlag);
+        $this->assertTrue($executedFlag);
+    }
 }