GearmanExecute.php 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410
  1. <?php
  2. /**
  3. * Gearman Bundle for Symfony2
  4. *
  5. * For the full copyright and license information, please view the LICENSE
  6. * file that was distributed with this source code.
  7. *
  8. * Feel free to edit as you please, and have fun.
  9. *
  10. * @author Marc Morera <yuhu@mmoreram.com>
  11. */
  12. namespace Mmoreram\GearmanBundle\Service;
  13. use Symfony\Component\Console\Output\NullOutput;
  14. use Symfony\Component\Console\Output\OutputInterface;
  15. use Symfony\Component\DependencyInjection\ContainerAwareInterface;
  16. use Symfony\Component\DependencyInjection\ContainerInterface;
  17. use Symfony\Component\EventDispatcher\EventDispatcherInterface;
  18. use Mmoreram\GearmanBundle\Command\Util\GearmanOutputAwareInterface;
  19. use Mmoreram\GearmanBundle\Event\GearmanWorkExecutedEvent;
  20. use Mmoreram\GearmanBundle\Event\GearmanWorkStartingEvent;
  21. use Mmoreram\GearmanBundle\GearmanEvents;
  22. use Mmoreram\GearmanBundle\Service\Abstracts\AbstractGearmanService;
  23. use Mmoreram\GearmanBundle\Exceptions\ServerConnectionException;
  24. use Symfony\Component\OptionsResolver\OptionsResolver;
  25. /**
  26. * Gearman execute methods. All Worker methods
  27. *
  28. * @since 2.3.1
  29. */
  30. class GearmanExecute extends AbstractGearmanService
  31. {
  32. /**
  33. * @var ContainerInterface
  34. *
  35. * Container instance
  36. */
  37. private $container;
  38. /**
  39. * @var EventDispatcherInterface
  40. *
  41. * EventDispatcher instance
  42. */
  43. protected $eventDispatcher;
  44. /**
  45. * @var OutputInterface
  46. *
  47. * Output instance
  48. */
  49. protected $output;
  50. /**
  51. * @var OptionsResolver
  52. */
  53. protected $executeOptionsResolver;
  54. /**
  55. * Construct method
  56. *
  57. * @param GearmanCacheWrapper $gearmanCacheWrapper GearmanCacheWrapper
  58. * @param array $defaultSettings The default settings for the bundle
  59. */
  60. public function __construct(GearmanCacheWrapper $gearmanCacheWrapper, array $defaultSettings)
  61. {
  62. parent::__construct($gearmanCacheWrapper, $defaultSettings);
  63. $this->executeOptionsResolver = new OptionsResolver();
  64. $this->executeOptionsResolver
  65. ->setDefaults(array(
  66. 'iterations' => null,
  67. 'minimum_execution_time' => null,
  68. 'timeout' => null,
  69. ))
  70. ->setAllowedTypes(array(
  71. 'iterations' => array('null', 'integer'),
  72. 'minimum_execution_time' => array('null', 'integer'),
  73. 'timeout' => array('null', 'integer'),
  74. ))
  75. ;
  76. }
  77. /**
  78. * Set container
  79. *
  80. * @param ContainerInterface $container Container
  81. *
  82. * @return GearmanExecute self Object
  83. */
  84. public function setContainer(ContainerInterface $container)
  85. {
  86. $this->container = $container;
  87. return $this;
  88. }
  89. /**
  90. * Set event dispatcher
  91. *
  92. * @param EventDispatcherInterface $eventDispatcher
  93. *
  94. * @return GearmanExecute self Object
  95. */
  96. public function setEventDispatcher(EventDispatcherInterface $eventDispatcher)
  97. {
  98. $this->eventDispatcher = $eventDispatcher;
  99. return $this;
  100. }
  101. /**
  102. * Set output
  103. *
  104. * @param OutputInterface $output
  105. *
  106. * @return GearmanExecute self Object
  107. */
  108. public function setOutput(OutputInterface $output)
  109. {
  110. $this->output = $output;
  111. return $this;
  112. }
  113. /**
  114. * Executes a job given a jobName and given settings and annotations of job
  115. *
  116. * @param string $jobName Name of job to be executed
  117. * @param array $options Array of options passed to the callback
  118. * @param \GearmanWorker $gearmanWorker Worker instance to use
  119. */
  120. public function executeJob($jobName, array $options = array(), \GearmanWorker $gearmanWorker = null)
  121. {
  122. $worker = $this->getJob($jobName);
  123. if (false !== $worker) {
  124. $this->callJob($worker, $options, $gearmanWorker);
  125. }
  126. }
  127. /**
  128. * Given a worker, execute GearmanWorker function defined by job.
  129. *
  130. * @param array $worker Worker definition
  131. * @param array $options Array of options passed to the callback
  132. * @param \GearmanWorker $gearmanWorker Worker instance to use
  133. *
  134. * @throws ServerConnectionException if a connection to a server was not possible.
  135. *
  136. * @return GearmanExecute self Object
  137. */
  138. private function callJob(Array $worker, array $options = array(), \GearmanWorker $gearmanWorker = null)
  139. {
  140. if(is_null($gearmanWorker)) {
  141. $gearmanWorker = new \GearmanWorker;
  142. }
  143. if (isset($worker['job'])) {
  144. $jobs = array($worker['job']);
  145. $iterations = $worker['job']['iterations'];
  146. $minimumExecutionTime = $worker['job']['minimumExecutionTime'];
  147. $timeout = $worker['job']['timeout'];
  148. $successes = $this->addServers($gearmanWorker, $worker['job']['servers']);
  149. } else {
  150. $jobs = $worker['jobs'];
  151. $iterations = $worker['iterations'];
  152. $minimumExecutionTime = $worker['minimumExecutionTime'];
  153. $timeout = $worker['timeout'];
  154. $successes = $this->addServers($gearmanWorker, $worker['servers']);
  155. }
  156. $options = $this->executeOptionsResolver->resolve($options);
  157. $iterations = $options['iterations'] ?: $iterations;
  158. $minimumExecutionTime = $options['minimum_execution_time'] ?: $minimumExecutionTime;
  159. $timeout = $options['timeout'] ?: $timeout;
  160. if (count($successes) < 1) {
  161. if ($minimumExecutionTime > 0) {
  162. sleep($minimumExecutionTime);
  163. }
  164. throw new ServerConnectionException('Worker was unable to connect to any server.');
  165. }
  166. $objInstance = $this->createJob($worker);
  167. /**
  168. * Start the timer before running the worker.
  169. */
  170. $time = time();
  171. $this->runJob($gearmanWorker, $objInstance, $jobs, $iterations, $timeout);
  172. /**
  173. * If there is a minimum expected duration, wait out the remaining period if there is any.
  174. */
  175. if ($minimumExecutionTime > 0) {
  176. $now = time();
  177. $remaining = $minimumExecutionTime - ($now - $time);
  178. if ($remaining > 0) {
  179. sleep($remaining);
  180. }
  181. }
  182. return $this;
  183. }
  184. /**
  185. * Given a worker settings, return Job instance
  186. *
  187. * @param array $worker Worker settings
  188. *
  189. * @return Object Job instance
  190. */
  191. private function createJob(array $worker)
  192. {
  193. /**
  194. * If service is defined, we must retrieve this class with dependency injection
  195. *
  196. * Otherwise we just create it with a simple new()
  197. */
  198. if ($worker['service']) {
  199. $objInstance = $this->container->get($worker['service']);
  200. } else {
  201. $objInstance = new $worker['className'];
  202. /**
  203. * If instance of given object is instanceof
  204. * ContainerAwareInterface, we inject full container by calling
  205. * container setter.
  206. *
  207. * @see https://github.com/mmoreram/gearman-bundle/pull/12
  208. */
  209. if ($objInstance instanceof ContainerAwareInterface) {
  210. $objInstance->setContainer($this->container);
  211. }
  212. }
  213. return $objInstance;
  214. }
  215. /**
  216. * Given a GearmanWorker and an instance of Job, run it
  217. *
  218. * @param \GearmanWorker $gearmanWorker Gearman Worker
  219. * @param Object $objInstance Job instance
  220. * @param array $jobs Array of jobs to subscribe
  221. * @param integer $iterations Number of iterations
  222. * @param integer $timeout Timeout
  223. *
  224. * @return GearmanExecute self Object
  225. */
  226. private function runJob(\GearmanWorker $gearmanWorker, $objInstance, array $jobs, $iterations, $timeout = null)
  227. {
  228. /**
  229. * Set the output of this instance, this should allow workers to use the console output.
  230. */
  231. if ($objInstance instanceof GearmanOutputAwareInterface) {
  232. $objInstance->setOutput($this->output ? : new NullOutput());
  233. }
  234. /**
  235. * Every job defined in worker is added into GearmanWorker
  236. */
  237. foreach ($jobs as $job) {
  238. $gearmanWorker->addFunction(
  239. $job['realCallableName'],
  240. array($this, 'handleJob'),
  241. array(
  242. 'job_object_instance' => $objInstance,
  243. 'job_method' => $job['methodName'],
  244. 'jobs' => $jobs
  245. )
  246. );
  247. }
  248. /**
  249. * If iterations value is 0, is like worker will never die
  250. */
  251. $alive = (0 === $iterations);
  252. if ($timeout > 0) {
  253. $gearmanWorker->setTimeout($timeout * 1000);
  254. }
  255. /**
  256. * Executes GearmanWorker with all jobs defined
  257. */
  258. while ($gearmanWorker->work()) {
  259. $iterations--;
  260. $event = new GearmanWorkExecutedEvent($jobs, $iterations, $gearmanWorker->returnCode());
  261. $this->eventDispatcher->dispatch(GearmanEvents::GEARMAN_WORK_EXECUTED, $event);
  262. if ($gearmanWorker->returnCode() != GEARMAN_SUCCESS) {
  263. break;
  264. }
  265. /**
  266. * Only finishes its execution if alive is false and iterations
  267. * arrives to 0
  268. */
  269. if (!$alive && $iterations <= 0) {
  270. break;
  271. }
  272. }
  273. }
  274. /**
  275. * Adds into worker all defined Servers.
  276. * If any is defined, performs default method
  277. *
  278. * @param \GearmanWorker $gmworker Worker to perform configuration
  279. * @param array $servers Servers array
  280. *
  281. * @throws ServerConnectionException if a connection to a server was not possible.
  282. *
  283. * @return array Successfully added servers
  284. */
  285. private function addServers(\GearmanWorker $gmworker, array $servers)
  286. {
  287. $successes = array();
  288. if (!empty($servers)) {
  289. foreach ($servers as $server) {
  290. if (@$gmworker->addServer($server['host'], $server['port'])) {
  291. $successes[] = $server;
  292. }
  293. }
  294. } else {
  295. if (@$gmworker->addServer()) {
  296. $successes[] = array('127.0.0.1', 4730);
  297. }
  298. }
  299. return $successes;
  300. }
  301. /**
  302. * Executes a worker given a workerName subscribing all his jobs inside and
  303. * given settings and annotations of worker and jobs
  304. *
  305. * @param string $workerName Name of worker to be executed
  306. */
  307. public function executeWorker($workerName, array $options = array())
  308. {
  309. $worker = $this->getWorker($workerName);
  310. if (false !== $worker) {
  311. $this->callJob($worker, $options);
  312. }
  313. }
  314. /**
  315. * Wrapper function handler for all registered functions
  316. * This allows us to do some nice logging when jobs are started/finished
  317. *
  318. * @see https://github.com/brianlmoon/GearmanManager/blob/ffc828dac2547aff76cb4962bb3fcc4f454ec8a2/GearmanPeclManager.php#L95-206
  319. *
  320. * @param \GearmanJob $job
  321. * @param mixed $context
  322. *
  323. * @return mixed
  324. */
  325. public function handleJob(\GearmanJob $job, $context)
  326. {
  327. if (
  328. !is_array($context)
  329. || !array_key_exists('job_object_instance', $context)
  330. || !array_key_exists('job_method', $context)
  331. ) {
  332. throw new \InvalidArgumentException('$context shall be an array with job_object_instance and job_method key.');
  333. }
  334. $event = new GearmanWorkStartingEvent($context['jobs']);
  335. $this->eventDispatcher->dispatch(GearmanEvents::GEARMAN_WORK_STARTING, $event);
  336. $result = call_user_func_array(
  337. array($context['job_object_instance'], $context['job_method']),
  338. array($job, $context)
  339. );
  340. /**
  341. * Workaround for PECL bug #17114
  342. * http://pecl.php.net/bugs/bug.php?id=17114
  343. */
  344. $type = gettype($result);
  345. settype($result, $type);
  346. return $result;
  347. }
  348. }