123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321 |
- <?php
- /**
- * Gearman Bundle for Symfony2
- *
- * For the full copyright and license information, please view the LICENSE
- * file that was distributed with this source code.
- *
- * Feel free to edit as you please, and have fun.
- *
- * @author Marc Morera <yuhu@mmoreram.com>
- */
- namespace Mmoreram\GearmanBundle\Service;
- use Symfony\Component\Console\Output\NullOutput;
- use Symfony\Component\Console\Output\OutputInterface;
- use Symfony\Component\DependencyInjection\ContainerAwareInterface;
- use Symfony\Component\DependencyInjection\ContainerInterface;
- use Symfony\Component\EventDispatcher\EventDispatcherInterface;
- use Mmoreram\GearmanBundle\Command\Util\GearmanOutputAwareInterface;
- use Mmoreram\GearmanBundle\Event\GearmanWorkExecutedEvent;
- use Mmoreram\GearmanBundle\GearmanEvents;
- use Mmoreram\GearmanBundle\Service\Abstracts\AbstractGearmanService;
- /**
- * Gearman execute methods. All Worker methods
- *
- * @since 2.3.1
- */
- class GearmanExecute extends AbstractGearmanService
- {
- /**
- * @var ContainerInterface
- *
- * Container instance
- */
- private $container;
- /**
- * @var EventDispatcherInterface
- *
- * EventDispatcher instance
- */
- protected $eventDispatcher;
- /**
- * @var OutputInterface
- *
- * Output instance
- */
- protected $output;
- /**
- * Set container
- *
- * @param ContainerInterface $container Container
- *
- * @return GearmanExecute self Object
- */
- public function setContainer(ContainerInterface $container)
- {
- $this->container = $container;
- return $this;
- }
- /**
- * Set event dispatcher
- *
- * @param EventDispatcherInterface $eventDispatcher
- *
- * @return GearmanExecute self Object
- */
- public function setEventDispatcher(EventDispatcherInterface $eventDispatcher)
- {
- $this->eventDispatcher = $eventDispatcher;
- return $this;
- }
- /**
- * Set output
- *
- * @param OutputInterface $output
- *
- * @return GearmanExecute self Object
- */
- public function setOutput(OutputInterface $output)
- {
- $this->output = $output;
- return $this;
- }
- /**
- * Executes a job given a jobName and given settings and annotations of job
- *
- * @param string $jobName Name of job to be executed
- * @param GearmanWorker $gearmanWorker Worker instance to use
- */
- public function executeJob($jobName, \GearmanWorker $gearmanWorker = null)
- {
- $worker = $this->getJob($jobName);
- if (false !== $worker) {
- $this->callJob($worker, $gearmanWorker);
- }
- }
- /**
- * Given a worker, execute GearmanWorker function defined by job.
- *
- * @param array $worker Worker definition
- * @param GearmanWorker $gearmanWorker Worker instance to use
- * @return GearmanExecute self Object
- */
- private function callJob(Array $worker, \GearmanWorker $gearmanWorker = null)
- {
- if(is_null($gearmanWorker)){
- $gearmanWorker = new \GearmanWorker;
- }
- if (isset($worker['job'])) {
- $jobs = array($worker['job']);
- $iterations = $worker['job']['iterations'];
- $this->addServers($gearmanWorker, $worker['job']['servers']);
- } else {
- $jobs = $worker['jobs'];
- $iterations = $worker['iterations'];
- $this->addServers($gearmanWorker, $worker['servers']);
- }
- $objInstance = $this->createJob($worker);
- $this->runJob($gearmanWorker, $objInstance, $jobs, $iterations);
- return $this;
- }
- /**
- * Given a worker settings, return Job instance
- *
- * @param array $worker Worker settings
- *
- * @return Object Job instance
- */
- private function createJob(array $worker)
- {
- /**
- * If service is defined, we must retrieve this class with dependency injection
- *
- * Otherwise we just create it with a simple new()
- */
- if ($worker['service']) {
- $objInstance = $this->container->get($worker['service']);
- } else {
- $objInstance = new $worker['className'];
- /**
- * If instance of given object is instanceof
- * ContainerAwareInterface, we inject full container by calling
- * container setter.
- *
- * @see https://github.com/mmoreram/gearman-bundle/pull/12
- */
- if ($objInstance instanceof ContainerAwareInterface) {
- $objInstance->setContainer($this->container);
- }
- }
- return $objInstance;
- }
- /**
- * Given a GearmanWorker and an instance of Job, run it
- *
- * @param \GearmanWorker $gearmanWorker Gearman Worker
- * @param Object $objInstance Job instance
- * @param array $jobs Array of jobs to subscribe
- * @param integer $iterations Number of iterations
- *
- * @return GearmanExecute self Object
- */
- private function runJob(\GearmanWorker $gearmanWorker, $objInstance, array $jobs, $iterations)
- {
- /**
- * Set the output of this instance, this should allow workers to use the console output.
- */
- if ($objInstance instanceof GearmanOutputAwareInterface) {
- $objInstance->setOutput($this->output ? : new NullOutput());
- }
- /**
- * Every job defined in worker is added into GearmanWorker
- */
- foreach ($jobs as $job) {
- $gearmanWorker->addFunction(
- $job['realCallableName'],
- array($this, 'handleJob'),
- array(
- 'job_object_instance' => $objInstance,
- 'job_method' => $job['methodName'],
- )
- );
- }
- /**
- * If iterations value is 0, is like worker will never die
- */
- $alive = (0 == $iterations);
- /**
- * Executes GearmanWorker with all jobs defined
- */
- while ($gearmanWorker->work()) {
- $iterations--;
- $event = new GearmanWorkExecutedEvent($jobs, $iterations, $gearmanWorker->returnCode());
- $this->eventDispatcher->dispatch(GearmanEvents::GEARMAN_WORK_EXECUTED, $event);
- if ($gearmanWorker->returnCode() != GEARMAN_SUCCESS) {
- break;
- }
- /**
- * Only finishes its execution if alive is false and iterations
- * arrives to 0
- */
- if (!$alive && $iterations <= 0) {
- break;
- }
- }
- }
- /**
- * Adds into worker all defined Servers.
- * If any is defined, performs default method
- *
- * @param \GearmanWorker $gmworker Worker to perform configuration
- * @param array $servers Servers array
- */
- private function addServers(\GearmanWorker $gmworker, Array $servers)
- {
- if (!empty($servers)) {
- foreach ($servers as $server) {
- $gmworker->addServer($server['host'], $server['port']);
- }
- } else {
- $gmworker->addServer();
- }
- }
- /**
- * Executes a worker given a workerName subscribing all his jobs inside and
- * given settings and annotations of worker and jobs
- *
- * @param string $workerName Name of worker to be executed
- */
- public function executeWorker($workerName)
- {
- $worker = $this->getWorker($workerName);
- if (false !== $worker) {
- $this->callJob($worker);
- }
- }
- /**
- * Wrapper function handler for all registered functions
- * This allows us to do some nice logging when jobs are started/finished
- *
- * @see https://github.com/brianlmoon/GearmanManager/blob/ffc828dac2547aff76cb4962bb3fcc4f454ec8a2/GearmanPeclManager.php#L95-206
- *
- * @param \GearmanJob $job
- * @param mixed $context
- *
- * @return mixed
- */
- public function handleJob(\GearmanJob $job, $context)
- {
- if (
- !is_array($context)
- || !array_key_exists('job_object_instance', $context)
- || !array_key_exists('job_method', $context)
- ) {
-
- }
- $result = call_user_func_array(
- array($context['job_object_instance'], $context['job_method']),
- array($job, $context)
- );
- /**
- * Workaround for PECL bug #17114
- * http://pecl.php.net/bugs/bug.php?id=17114
- */
- $type = gettype($result);
- settype($result, $type);
- return $result;
- }
- }
|