GearmanExecute.php 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. <?php
  2. /**
  3. * Gearman Bundle for Symfony2
  4. *
  5. * For the full copyright and license information, please view the LICENSE
  6. * file that was distributed with this source code.
  7. *
  8. * Feel free to edit as you please, and have fun.
  9. *
  10. * @author Marc Morera <yuhu@mmoreram.com>
  11. */
  12. namespace Mmoreram\GearmanBundle\Service;
  13. use Symfony\Component\DependencyInjection\ContainerInterface;
  14. use Symfony\Component\DependencyInjection\ContainerAwareInterface;
  15. use Mmoreram\GearmanBundle\Service\Abstracts\AbstractGearmanService;
  16. /**
  17. * Gearman execute methods. All Worker methods
  18. *
  19. * @since 2.3.1
  20. */
  21. class GearmanExecute extends AbstractGearmanService
  22. {
  23. /**
  24. * @var ContainerInterface
  25. *
  26. * Container instance
  27. */
  28. private $container;
  29. /**
  30. * Set container
  31. *
  32. * @param ContainerInterface $container Container
  33. *
  34. * @return GearmanExecute self Object
  35. */
  36. public function setContainer(ContainerInterface $container)
  37. {
  38. $this->container = $container;
  39. }
  40. /**
  41. * Executes a job given a jobName and given settings and annotations of job
  42. *
  43. * @param string $jobName Name of job to be executed
  44. */
  45. public function executeJob($jobName)
  46. {
  47. $worker = $this->getJob($jobName);
  48. if (false !== $worker) {
  49. $this->callJob($worker);
  50. }
  51. }
  52. /**
  53. * Given a worker, execute GearmanWorker function defined by job.
  54. *
  55. * @param array $worker Worker definition
  56. *
  57. * @return GearmanExecute self Object
  58. */
  59. private function callJob(Array $worker)
  60. {
  61. $gearmanWorker = new \GearmanWorker;
  62. if (isset($worker['job'])) {
  63. $jobs = array($worker['job']);
  64. $iterations = $worker['job']['iterations'];
  65. $this->addServers($gearmanWorker, $worker['job']['servers']);
  66. } else {
  67. $jobs = $worker['jobs'];
  68. $iterations = $worker['iterations'];
  69. $this->addServers($gearmanWorker, $worker['servers']);
  70. }
  71. $objInstance = $this->createJob($worker);
  72. $this->runJob($gearmanWorker, $objInstance, $jobs, $iterations);
  73. return $this;
  74. }
  75. /**
  76. * Given a worker settings, return Job instance
  77. *
  78. * @param array $worker Worker settings
  79. *
  80. * @return Object Job instance
  81. */
  82. private function createJob(array $worker)
  83. {
  84. /**
  85. * If service is defined, we must retrieve this class with dependency injection
  86. *
  87. * Otherwise we just create it with a simple new()
  88. */
  89. if ($worker['service']) {
  90. $objInstance = $this->container->get($worker['service']);
  91. } else {
  92. $objInstance = new $worker['className'];
  93. /**
  94. * If instance of given object is instanceof
  95. * ContainerAwareInterface, we inject full container by calling
  96. * container setter.
  97. *
  98. * @see https://github.com/mmoreram/gearman-bundle/pull/12
  99. */
  100. if ($objInstance instanceof ContainerAwareInterface) {
  101. $objInstance->setContainer($this->container);
  102. }
  103. }
  104. return $objInstance;
  105. }
  106. /**
  107. * Given a GearmanWorker and an instance of Job, run it
  108. *
  109. * @param \GearmanWorker $gearmanWorker Gearman Worker
  110. * @param Object $objInstance Job instance
  111. * @param array $jobs Array of jobs to subscribe
  112. * @param integer $iterations Number of iterations
  113. *
  114. * @return GearmanExecute self Object
  115. */
  116. private function runJob(\GearmanWorker $gearmanWorker, $objInstance, array $jobs, $iterations)
  117. {
  118. /**
  119. * Every job defined in worker is added into GearmanWorker
  120. */
  121. foreach ($jobs as $job) {
  122. $gearmanWorker->addFunction($job['realCallableName'], array($objInstance, $job['methodName']));
  123. }
  124. /**
  125. * If iterations value is 0, is like worker will never die
  126. */
  127. $alive = (0 == $iterations);
  128. /**
  129. * Executes GearmanWorker with all jobs defined
  130. */
  131. while ($gearmanWorker->work()) {
  132. if ($gearmanWorker->returnCode() != GEARMAN_SUCCESS) {
  133. break;
  134. }
  135. /**
  136. * Only finishes its execution if alive is false and iterations
  137. * arrives to 0
  138. */
  139. if (!$alive && --$iterations <= 0) {
  140. break;
  141. }
  142. }
  143. }
  144. /**
  145. * Adds into worker all defined Servers.
  146. * If any is defined, performs default method
  147. *
  148. * @param \GearmanWorker $gmworker Worker to perform configuration
  149. * @param array $servers Servers array
  150. */
  151. private function addServers(\GearmanWorker $gmworker, Array $servers)
  152. {
  153. if (!empty($servers)) {
  154. foreach ($servers as $server) {
  155. $gmworker->addServer($server['host'], $server['port']);
  156. }
  157. } else {
  158. $gmworker->addServer();
  159. }
  160. }
  161. /**
  162. * Executes a worker given a workerName subscribing all his jobs inside and
  163. * given settings and annotations of worker and jobs
  164. *
  165. * @param string $workerName Name of worker to be executed
  166. */
  167. public function executeWorker($workerName)
  168. {
  169. $worker = $this->getWorker($workerName);
  170. if (false !== $worker) {
  171. $this->callJob($worker);
  172. }
  173. }
  174. }