GearmanExecute.php 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. <?php
  2. /**
  3. * Gearman Bundle for Symfony2
  4. *
  5. * @author Marc Morera <yuhu@mmoreram.com>
  6. * @since 2013
  7. */
  8. namespace Mmoreram\GearmanBundle\Service;
  9. use Mmoreram\GearmanBundle\Service\Abstracts\AbstractGearmanService;
  10. use Symfony\Component\DependencyInjection\Container;
  11. use Symfony\Component\DependencyInjection\ContainerAwareInterface;
  12. use Object;
  13. /**
  14. * Gearman execute methods. All Worker methods
  15. *
  16. * @author Marc Morera <yuhu@mmoreram.com>
  17. */
  18. class GearmanExecute extends AbstractGearmanService
  19. {
  20. /**
  21. * @var Container
  22. *
  23. * Container instance
  24. */
  25. private $container;
  26. /**
  27. * Set container
  28. *
  29. * @param Container $container Container
  30. *
  31. * @return GearmanExecute self Object
  32. */
  33. public function setContainer(Container $container)
  34. {
  35. $this->container = $container;
  36. }
  37. /**
  38. * Executes a job given a jobName and given settings and annotations of job
  39. *
  40. * @param string $jobName Name of job to be executed
  41. */
  42. public function executeJob($jobName)
  43. {
  44. $worker = $this->getJob($jobName);
  45. if (false !== $worker) {
  46. $this->callJob($worker);
  47. }
  48. }
  49. /**
  50. * Given a worker, execute GearmanWorker function defined by job.
  51. *
  52. * @param array $worker Worker definition
  53. *
  54. * @return GearmanExecute self Object
  55. */
  56. private function callJob(Array $worker)
  57. {
  58. $gearmanWorker = new \GearmanWorker;
  59. if (isset($worker['job'])) {
  60. $jobs = array($worker['job']);
  61. $iterations = $worker['job']['iterations'];
  62. $this->addServers($gearmanWorker, $worker['job']['servers']);
  63. } else {
  64. $jobs = $worker['jobs'];
  65. $iterations = $worker['iterations'];
  66. $this->addServers($gearmanWorker, $worker['servers']);
  67. }
  68. $objInstance = $this->createJob($worker);
  69. $this->runJob($gearmanWorker, $objInstance, $jobs);
  70. return $this;
  71. }
  72. /**
  73. * Given a worker settings, return Job instance
  74. *
  75. * @parma array $worker Worker settings
  76. *
  77. * @return Object Job instance
  78. */
  79. private function createJob(array $worker)
  80. {
  81. /**
  82. * If service is defined, we must retrieve this class with dependency injection
  83. *
  84. * Otherwise we just create it with a simple new()
  85. */
  86. if ($worker['service']) {
  87. $objInstance = $this->container->get($worker['service']);
  88. } else {
  89. $objInstance = new $worker['className'];
  90. /**
  91. * If instance of given object is instanceof ContainerAwareInterface, we inject full container
  92. * by calling container setter.
  93. *
  94. * @see https://github.com/mmoreram/gearman-bundle/pull/12
  95. */
  96. if ($objInstance instanceof ContainerAwareInterface) {
  97. $objInstance->setContainer($this->container);
  98. }
  99. }
  100. return $objInstance;
  101. }
  102. /**
  103. * Given a GearmanWorker and an instance of Job, run it
  104. *
  105. * @param \GearmanWorker $gearmanWorker Gearman Worker
  106. * @param Object $objInstance Job instance
  107. * @param array $jobs Array of jobs to subscribe
  108. *
  109. * @return GearmanExecute self Object
  110. */
  111. private function runJob(\GearmanWorker $gearmanWorker, $objInstance, array $jobs)
  112. {
  113. /**
  114. * Every job defined in worker is added into GearmanWorker
  115. */
  116. foreach ($jobs as $job) {
  117. $gearmanWorker->addFunction($job['realCallableName'], array($objInstance, $job['methodName']));
  118. }
  119. /**
  120. * Executes GearmanWorker with all jobs defined
  121. */
  122. while ($gearmanWorker->work()) {
  123. if ($gearmanWorker->returnCode() != GEARMAN_SUCCESS) {
  124. break;
  125. }
  126. if ($iterations-- <= 0) {
  127. break;
  128. }
  129. }
  130. }
  131. /**
  132. * Adds into worker all defined Servers.
  133. * If any is defined, performs default method
  134. *
  135. * @param \GearmanWorker $gmworker Worker to perform configuration
  136. * @param array $servers Servers array
  137. */
  138. private function addServers(\GearmanWorker $gmworker, Array $servers)
  139. {
  140. if (!empty($servers)) {
  141. foreach ($servers as $server) {
  142. $gmworker->addServer($server['host'], $server['port']);
  143. }
  144. } else {
  145. $gmworker->addServer();
  146. }
  147. }
  148. /**
  149. * Executes a worker given a workerName subscribing all his jobs inside and given settings and annotations of worker and jobs
  150. *
  151. * @param string $workerName Name of worker to be executed
  152. */
  153. public function executeWorker($workerName)
  154. {
  155. $worker = $this->getWorker($workerName);
  156. if (false !== $worker) {
  157. $this->callJob($worker);
  158. }
  159. }
  160. }