GearmanExecute.php 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. <?php
  2. /**
  3. * Gearman Bundle for Symfony2
  4. *
  5. * @author Marc Morera <yuhu@mmoreram.com>
  6. * @since 2013
  7. */
  8. namespace Mmoreram\GearmanBundle\Service;
  9. use Symfony\Component\DependencyInjection\ContainerInterface;
  10. use Symfony\Component\DependencyInjection\ContainerAwareInterface;
  11. use Mmoreram\GearmanBundle\Service\Abstracts\AbstractGearmanService;
  12. /**
  13. * Gearman execute methods. All Worker methods
  14. *
  15. * @author Marc Morera <yuhu@mmoreram.com>
  16. */
  17. class GearmanExecute extends AbstractGearmanService
  18. {
  19. /**
  20. * @var ContainerInterface
  21. *
  22. * Container instance
  23. */
  24. private $container;
  25. /**
  26. * Set container
  27. *
  28. * @param ContainerInterface $container Container
  29. *
  30. * @return GearmanExecute self Object
  31. */
  32. public function setContainer(ContainerInterface $container)
  33. {
  34. $this->container = $container;
  35. }
  36. /**
  37. * Executes a job given a jobName and given settings and annotations of job
  38. *
  39. * @param string $jobName Name of job to be executed
  40. */
  41. public function executeJob($jobName)
  42. {
  43. $worker = $this->getJob($jobName);
  44. if (false !== $worker) {
  45. $this->callJob($worker);
  46. }
  47. }
  48. /**
  49. * Given a worker, execute GearmanWorker function defined by job.
  50. *
  51. * @param array $worker Worker definition
  52. *
  53. * @return GearmanExecute self Object
  54. */
  55. private function callJob(Array $worker)
  56. {
  57. $gearmanWorker = new \GearmanWorker;
  58. if (isset($worker['job'])) {
  59. $jobs = array($worker['job']);
  60. $iterations = $worker['job']['iterations'];
  61. $this->addServers($gearmanWorker, $worker['job']['servers']);
  62. } else {
  63. $jobs = $worker['jobs'];
  64. $iterations = $worker['iterations'];
  65. $this->addServers($gearmanWorker, $worker['servers']);
  66. }
  67. $objInstance = $this->createJob($worker);
  68. $this->runJob($gearmanWorker, $objInstance, $jobs, $iterations);
  69. return $this;
  70. }
  71. /**
  72. * Given a worker settings, return Job instance
  73. *
  74. * @param array $worker Worker settings
  75. *
  76. * @return Object Job instance
  77. */
  78. private function createJob(array $worker)
  79. {
  80. /**
  81. * If service is defined, we must retrieve this class with dependency injection
  82. *
  83. * Otherwise we just create it with a simple new()
  84. */
  85. if ($worker['service']) {
  86. $objInstance = $this->container->get($worker['service']);
  87. } else {
  88. $objInstance = new $worker['className'];
  89. /**
  90. * If instance of given object is instanceof
  91. * ContainerAwareInterface, we inject full container by calling
  92. * container setter.
  93. *
  94. * @see https://github.com/mmoreram/gearman-bundle/pull/12
  95. */
  96. if ($objInstance instanceof ContainerAwareInterface) {
  97. $objInstance->setContainer($this->container);
  98. }
  99. }
  100. return $objInstance;
  101. }
  102. /**
  103. * Given a GearmanWorker and an instance of Job, run it
  104. *
  105. * @param \GearmanWorker $gearmanWorker Gearman Worker
  106. * @param Object $objInstance Job instance
  107. * @param array $jobs Array of jobs to subscribe
  108. * @param integer $iterations Number of iterations
  109. *
  110. * @return GearmanExecute self Object
  111. */
  112. private function runJob(\GearmanWorker $gearmanWorker, $objInstance, array $jobs, $iterations)
  113. {
  114. /**
  115. * Every job defined in worker is added into GearmanWorker
  116. */
  117. foreach ($jobs as $job) {
  118. $gearmanWorker->addFunction($job['realCallableName'], array($objInstance, $job['methodName']));
  119. }
  120. /**
  121. * If iterations value is 0, is like worker will never die
  122. */
  123. $alive = (0 == $iterations);
  124. /**
  125. * Executes GearmanWorker with all jobs defined
  126. */
  127. while ($gearmanWorker->work()) {
  128. if ($gearmanWorker->returnCode() != GEARMAN_SUCCESS) {
  129. break;
  130. }
  131. /**
  132. * Only finishes its execution if alive is false and iterations
  133. * arrives to 0
  134. */
  135. if (!$alive && --$iterations <= 0) {
  136. break;
  137. }
  138. }
  139. }
  140. /**
  141. * Adds into worker all defined Servers.
  142. * If any is defined, performs default method
  143. *
  144. * @param \GearmanWorker $gmworker Worker to perform configuration
  145. * @param array $servers Servers array
  146. */
  147. private function addServers(\GearmanWorker $gmworker, Array $servers)
  148. {
  149. if (!empty($servers)) {
  150. foreach ($servers as $server) {
  151. $gmworker->addServer($server['host'], $server['port']);
  152. }
  153. } else {
  154. $gmworker->addServer();
  155. }
  156. }
  157. /**
  158. * Executes a worker given a workerName subscribing all his jobs inside and
  159. * given settings and annotations of worker and jobs
  160. *
  161. * @param string $workerName Name of worker to be executed
  162. */
  163. public function executeWorker($workerName)
  164. {
  165. $worker = $this->getWorker($workerName);
  166. if (false !== $worker) {
  167. $this->callJob($worker);
  168. }
  169. }
  170. }