GearmanExecute.php 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439
  1. <?php
  2. /**
  3. * Gearman Bundle for Symfony2
  4. *
  5. * For the full copyright and license information, please view the LICENSE
  6. * file that was distributed with this source code.
  7. *
  8. * Feel free to edit as you please, and have fun.
  9. *
  10. * @author Marc Morera <yuhu@mmoreram.com>
  11. */
  12. namespace Mmoreram\GearmanBundle\Service;
  13. use Symfony\Component\Console\Output\NullOutput;
  14. use Symfony\Component\Console\Output\OutputInterface;
  15. use Symfony\Component\DependencyInjection\ContainerAwareInterface;
  16. use Symfony\Component\DependencyInjection\ContainerInterface;
  17. use Symfony\Component\EventDispatcher\EventDispatcherInterface;
  18. use Mmoreram\GearmanBundle\Command\Util\GearmanOutputAwareInterface;
  19. use Mmoreram\GearmanBundle\Event\GearmanWorkExecutedEvent;
  20. use Mmoreram\GearmanBundle\Event\GearmanWorkStartingEvent;
  21. use Mmoreram\GearmanBundle\GearmanEvents;
  22. use Mmoreram\GearmanBundle\Service\Abstracts\AbstractGearmanService;
  23. use Mmoreram\GearmanBundle\Exceptions\ServerConnectionException;
  24. use Symfony\Component\OptionsResolver\OptionsResolver;
  25. /**
  26. * Gearman execute methods. All Worker methods
  27. *
  28. * @since 2.3.1
  29. */
  30. class GearmanExecute extends AbstractGearmanService
  31. {
  32. /**
  33. * @var ContainerInterface
  34. *
  35. * Container instance
  36. */
  37. private $container;
  38. /**
  39. * @var EventDispatcherInterface
  40. *
  41. * EventDispatcher instance
  42. */
  43. protected $eventDispatcher;
  44. /**
  45. * @var OutputInterface
  46. *
  47. * Output instance
  48. */
  49. protected $output;
  50. /**
  51. * @var OptionsResolver
  52. */
  53. protected $executeOptionsResolver;
  54. /**
  55. * Boolean to track if a system signal has been received
  56. * @var boolean
  57. */
  58. protected $stopWorkSignalReceived;
  59. /**
  60. * Construct method
  61. *
  62. * @param GearmanCacheWrapper $gearmanCacheWrapper GearmanCacheWrapper
  63. * @param array $defaultSettings The default settings for the bundle
  64. */
  65. public function __construct(GearmanCacheWrapper $gearmanCacheWrapper, array $defaultSettings)
  66. {
  67. parent::__construct($gearmanCacheWrapper, $defaultSettings);
  68. $this->executeOptionsResolver = new OptionsResolver();
  69. $this->executeOptionsResolver
  70. ->setDefaults(array(
  71. 'iterations' => null,
  72. 'minimum_execution_time' => null,
  73. 'timeout' => null,
  74. ))
  75. ->setAllowedTypes(array(
  76. 'iterations' => array('null', 'scalar'),
  77. 'minimum_execution_time' => array('null', 'scalar'),
  78. 'timeout' => array('null', 'scalar'),
  79. ))
  80. ;
  81. $this->stopWorkSignalReceived = false;
  82. /**
  83. * If the pcntl_signal exists, subscribe to the terminate and restart events for graceful worker stops.
  84. */
  85. if(false !== function_exists('pcntl_signal'))
  86. {
  87. declare(ticks = 1);
  88. pcntl_signal(SIGTERM, array($this,"handleSystemSignal"));
  89. pcntl_signal(SIGHUP, array($this,"handleSystemSignal"));
  90. }
  91. }
  92. /**
  93. * Toggles that work should be stopped, we only subscribe to SIGTERM and SIGHUP
  94. * @param int $signno Signal number
  95. */
  96. public function handleSystemSignal($signo)
  97. {
  98. $this->stopWorkSignalReceived = true;
  99. }
  100. /**
  101. * Set container
  102. *
  103. * @param ContainerInterface $container Container
  104. *
  105. * @return GearmanExecute self Object
  106. */
  107. public function setContainer(ContainerInterface $container)
  108. {
  109. $this->container = $container;
  110. return $this;
  111. }
  112. /**
  113. * Set event dispatcher
  114. *
  115. * @param EventDispatcherInterface $eventDispatcher
  116. *
  117. * @return GearmanExecute self Object
  118. */
  119. public function setEventDispatcher(EventDispatcherInterface $eventDispatcher)
  120. {
  121. $this->eventDispatcher = $eventDispatcher;
  122. return $this;
  123. }
  124. /**
  125. * Set output
  126. *
  127. * @param OutputInterface $output
  128. *
  129. * @return GearmanExecute self Object
  130. */
  131. public function setOutput(OutputInterface $output)
  132. {
  133. $this->output = $output;
  134. return $this;
  135. }
  136. /**
  137. * Executes a job given a jobName and given settings and annotations of job
  138. *
  139. * @param string $jobName Name of job to be executed
  140. * @param array $options Array of options passed to the callback
  141. * @param \GearmanWorker $gearmanWorker Worker instance to use
  142. */
  143. public function executeJob($jobName, array $options = array(), \GearmanWorker $gearmanWorker = null)
  144. {
  145. $worker = $this->getJob($jobName);
  146. if (false !== $worker) {
  147. $this->callJob($worker, $options, $gearmanWorker);
  148. }
  149. }
  150. /**
  151. * Given a worker, execute GearmanWorker function defined by job.
  152. *
  153. * @param array $worker Worker definition
  154. * @param array $options Array of options passed to the callback
  155. * @param \GearmanWorker $gearmanWorker Worker instance to use
  156. *
  157. * @throws ServerConnectionException if a connection to a server was not possible.
  158. *
  159. * @return GearmanExecute self Object
  160. */
  161. private function callJob(Array $worker, array $options = array(), \GearmanWorker $gearmanWorker = null)
  162. {
  163. if(is_null($gearmanWorker)) {
  164. $gearmanWorker = new \GearmanWorker;
  165. }
  166. if (isset($worker['job'])) {
  167. $jobs = array($worker['job']);
  168. $iterations = $worker['job']['iterations'];
  169. $minimumExecutionTime = $worker['job']['minimumExecutionTime'];
  170. $timeout = $worker['job']['timeout'];
  171. $successes = $this->addServers($gearmanWorker, $worker['job']['servers']);
  172. } else {
  173. $jobs = $worker['jobs'];
  174. $iterations = $worker['iterations'];
  175. $minimumExecutionTime = $worker['minimumExecutionTime'];
  176. $timeout = $worker['timeout'];
  177. $successes = $this->addServers($gearmanWorker, $worker['servers']);
  178. }
  179. $options = $this->executeOptionsResolver->resolve($options);
  180. $iterations = $options['iterations'] ?: $iterations;
  181. $minimumExecutionTime = $options['minimum_execution_time'] ?: $minimumExecutionTime;
  182. $timeout = $options['timeout'] ?: $timeout;
  183. if (count($successes) < 1) {
  184. if ($minimumExecutionTime > 0) {
  185. sleep($minimumExecutionTime);
  186. }
  187. throw new ServerConnectionException('Worker was unable to connect to any server.');
  188. }
  189. $objInstance = $this->createJob($worker);
  190. /**
  191. * Start the timer before running the worker.
  192. */
  193. $time = time();
  194. $this->runJob($gearmanWorker, $objInstance, $jobs, $iterations, $timeout);
  195. /**
  196. * If there is a minimum expected duration, wait out the remaining period if there is any.
  197. */
  198. if ($minimumExecutionTime > 0) {
  199. $now = time();
  200. $remaining = $minimumExecutionTime - ($now - $time);
  201. if ($remaining > 0) {
  202. sleep($remaining);
  203. }
  204. }
  205. return $this;
  206. }
  207. /**
  208. * Given a worker settings, return Job instance
  209. *
  210. * @param array $worker Worker settings
  211. *
  212. * @return Object Job instance
  213. */
  214. private function createJob(array $worker)
  215. {
  216. /**
  217. * If service is defined, we must retrieve this class with dependency injection
  218. *
  219. * Otherwise we just create it with a simple new()
  220. */
  221. if ($worker['service']) {
  222. $objInstance = $this->container->get($worker['service']);
  223. } else {
  224. $objInstance = new $worker['className'];
  225. /**
  226. * If instance of given object is instanceof
  227. * ContainerAwareInterface, we inject full container by calling
  228. * container setter.
  229. *
  230. * @see https://github.com/mmoreram/gearman-bundle/pull/12
  231. */
  232. if ($objInstance instanceof ContainerAwareInterface) {
  233. $objInstance->setContainer($this->container);
  234. }
  235. }
  236. return $objInstance;
  237. }
  238. /**
  239. * Given a GearmanWorker and an instance of Job, run it
  240. *
  241. * @param \GearmanWorker $gearmanWorker Gearman Worker
  242. * @param Object $objInstance Job instance
  243. * @param array $jobs Array of jobs to subscribe
  244. * @param integer $iterations Number of iterations
  245. * @param integer $timeout Timeout
  246. *
  247. * @return GearmanExecute self Object
  248. */
  249. private function runJob(\GearmanWorker $gearmanWorker, $objInstance, array $jobs, $iterations, $timeout = null)
  250. {
  251. /**
  252. * Set the output of this instance, this should allow workers to use the console output.
  253. */
  254. if ($objInstance instanceof GearmanOutputAwareInterface) {
  255. $objInstance->setOutput($this->output ? : new NullOutput());
  256. }
  257. /**
  258. * Every job defined in worker is added into GearmanWorker
  259. */
  260. foreach ($jobs as $job) {
  261. $gearmanWorker->addFunction(
  262. $job['realCallableName'],
  263. array($this, 'handleJob'),
  264. array(
  265. 'job_object_instance' => $objInstance,
  266. 'job_method' => $job['methodName'],
  267. 'jobs' => $jobs
  268. )
  269. );
  270. }
  271. /**
  272. * If iterations value is 0, is like worker will never die
  273. */
  274. $alive = (0 === $iterations);
  275. if ($timeout > 0) {
  276. $gearmanWorker->setTimeout($timeout * 1000);
  277. }
  278. /**
  279. * Executes GearmanWorker with all jobs defined
  280. */
  281. while (false === $this->stopWorkSignalReceived && $gearmanWorker->work()) {
  282. $iterations--;
  283. $event = new GearmanWorkExecutedEvent($jobs, $iterations, $gearmanWorker->returnCode());
  284. $this->eventDispatcher->dispatch(GearmanEvents::GEARMAN_WORK_EXECUTED, $event);
  285. if ($gearmanWorker->returnCode() != GEARMAN_SUCCESS) {
  286. break;
  287. }
  288. /**
  289. * Only finishes its execution if alive is false and iterations
  290. * arrives to 0
  291. */
  292. if (!$alive && $iterations <= 0) {
  293. break;
  294. }
  295. }
  296. }
  297. /**
  298. * Adds into worker all defined Servers.
  299. * If any is defined, performs default method
  300. *
  301. * @param \GearmanWorker $gmworker Worker to perform configuration
  302. * @param array $servers Servers array
  303. *
  304. * @throws ServerConnectionException if a connection to a server was not possible.
  305. *
  306. * @return array Successfully added servers
  307. */
  308. private function addServers(\GearmanWorker $gmworker, array $servers)
  309. {
  310. $successes = array();
  311. if (!empty($servers)) {
  312. foreach ($servers as $server) {
  313. if (@$gmworker->addServer($server['host'], $server['port'])) {
  314. $successes[] = $server;
  315. }
  316. }
  317. } else {
  318. if (@$gmworker->addServer()) {
  319. $successes[] = array('127.0.0.1', 4730);
  320. }
  321. }
  322. return $successes;
  323. }
  324. /**
  325. * Executes a worker given a workerName subscribing all his jobs inside and
  326. * given settings and annotations of worker and jobs
  327. *
  328. * @param string $workerName Name of worker to be executed
  329. */
  330. public function executeWorker($workerName, array $options = array())
  331. {
  332. $worker = $this->getWorker($workerName);
  333. if (false !== $worker) {
  334. $this->callJob($worker, $options);
  335. }
  336. }
  337. /**
  338. * Wrapper function handler for all registered functions
  339. * This allows us to do some nice logging when jobs are started/finished
  340. *
  341. * @see https://github.com/brianlmoon/GearmanManager/blob/ffc828dac2547aff76cb4962bb3fcc4f454ec8a2/GearmanPeclManager.php#L95-206
  342. *
  343. * @param \GearmanJob $job
  344. * @param mixed $context
  345. *
  346. * @return mixed
  347. */
  348. public function handleJob(\GearmanJob $job, $context)
  349. {
  350. if (
  351. !is_array($context)
  352. || !array_key_exists('job_object_instance', $context)
  353. || !array_key_exists('job_method', $context)
  354. ) {
  355. throw new \InvalidArgumentException('$context shall be an array with job_object_instance and job_method key.');
  356. }
  357. $event = new GearmanWorkStartingEvent($context['jobs']);
  358. $this->eventDispatcher->dispatch(GearmanEvents::GEARMAN_WORK_STARTING, $event);
  359. $result = call_user_func_array(
  360. array($context['job_object_instance'], $context['job_method']),
  361. array($job, $context)
  362. );
  363. /**
  364. * Workaround for PECL bug #17114
  365. * http://pecl.php.net/bugs/bug.php?id=17114
  366. */
  367. $type = gettype($result);
  368. settype($result, $type);
  369. return $result;
  370. }
  371. }