GearmanClient.php 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674
  1. <?php
  2. /**
  3. * Gearman Bundle for Symfony2
  4. *
  5. * For the full copyright and license information, please view the LICENSE
  6. * file that was distributed with this source code.
  7. *
  8. * Feel free to edit as you please, and have fun.
  9. *
  10. * @author Marc Morera <yuhu@mmoreram.com>
  11. */
  12. namespace Mmoreram\GearmanBundle\Service;
  13. use Mmoreram\GearmanBundle\Service\Abstracts\AbstractGearmanService;
  14. use Mmoreram\GearmanBundle\GearmanMethods;
  15. use Mmoreram\GearmanBundle\Module\JobStatus;
  16. use Mmoreram\GearmanBundle\Generator\UniqueJobIdentifierGenerator;
  17. use Mmoreram\GearmanBundle\Dispatcher\GearmanCallbacksDispatcher;
  18. /**
  19. * GearmanClient. Implementation of AbstractGearmanService
  20. *
  21. * @since 2.3.1
  22. */
  23. class GearmanClient extends AbstractGearmanService
  24. {
  25. /**
  26. * @var GearmanCallbacksDispatcher
  27. *
  28. * Gearman callbacks dispatcher
  29. */
  30. protected $gearmanCallbacksDispatcher;
  31. /**
  32. * @var array
  33. *
  34. * Server set to define in what server must connect to
  35. */
  36. protected $servers = array();
  37. /**
  38. * @var array
  39. *
  40. * task structure to store all about called tasks
  41. */
  42. protected $taskStructure = array();
  43. /**
  44. * @var array
  45. *
  46. * Set default servers
  47. */
  48. protected $defaultServers;
  49. /**
  50. * @var array
  51. *
  52. * Set default settings
  53. */
  54. protected $settings;
  55. /**
  56. * @var UniqueJobIdentifierGenerator
  57. *
  58. * Unique Job Intefier Generator
  59. */
  60. protected $uniqueJobIdentifierGenerator;
  61. /**
  62. * @var int
  63. *
  64. * Return code from internal client.
  65. */
  66. protected $returnCode;
  67. /**
  68. * Init tasks structure
  69. *
  70. * @return GearmanClient self Object
  71. */
  72. public function initTaskStructure()
  73. {
  74. $this->taskStructure = array();
  75. return $this;
  76. }
  77. /**
  78. * Set default servers
  79. *
  80. * @param array $defaultServers Default servers
  81. *
  82. * @return GearmanClient self Object
  83. */
  84. public function setDefaultServers(array $defaultServers)
  85. {
  86. $this->defaultServers = $defaultServers;
  87. return $this;
  88. }
  89. /**
  90. * Set UniqueJobIdentifierGenerator object
  91. *
  92. * @param UniqueJobIdentifierGenerator $uniqueJobIdentifierGenerator Unique Job Intefier Generator
  93. *
  94. * @return GearmanClient self Object
  95. */
  96. public function setUniqueJobIdentifierGenerator(UniqueJobIdentifierGenerator $uniqueJobIdentifierGenerator)
  97. {
  98. $this->uniqueJobIdentifierGenerator = $uniqueJobIdentifierGenerator;
  99. return $this;
  100. }
  101. /**
  102. * Set gearman callbacks
  103. *
  104. * @param GearmanCallbacksDispatcher $gearmanCallbacksDispatcher Gearman callbacks dispatcher
  105. *
  106. * @return GearmanClient self Object
  107. */
  108. public function setGearmanCallbacksDispatcher(GearmanCallbacksDispatcher $gearmanCallbacksDispatcher)
  109. {
  110. $this->gearmanCallbacksDispatcher = $gearmanCallbacksDispatcher;
  111. return $this;
  112. }
  113. /**
  114. * Set default settings
  115. *
  116. * @param array $settings
  117. *
  118. * @return GearmanClient self Object
  119. */
  120. public function setDefaultSettings($settings)
  121. {
  122. $this->settings = $settings;
  123. return $this;
  124. }
  125. /**
  126. * Set server to client. Empty all servers and set this one
  127. *
  128. * @param string $servername Server name (must be ip)
  129. * @param int $port Port of server. By default 4730
  130. *
  131. * @return GearmanClient Returns self object
  132. */
  133. public function setServer($servername, $port = 4730)
  134. {
  135. $this
  136. ->clearServers()
  137. ->addServer($servername, $port);
  138. return $this;
  139. }
  140. /**
  141. * Add server to client
  142. *
  143. * @param string $servername Server name (must be ip)
  144. * @param int $port Port of server. By default 4730
  145. *
  146. * @return GearmanClient Returns self object
  147. */
  148. public function addServer($servername, $port = 4730)
  149. {
  150. $this->servers[] = array(
  151. 'host' => $servername,
  152. 'port' => $port,
  153. );
  154. return $this;
  155. }
  156. /**
  157. * Clear server list
  158. *
  159. * @return GearmanClient Returns self object
  160. */
  161. public function clearServers()
  162. {
  163. $this->servers = array();
  164. return $this;
  165. }
  166. /**
  167. * Get real worker from job name and enqueues the action given one
  168. * method.
  169. *
  170. * @param string $jobName A GearmanBundle registered function the worker is to execute
  171. * @param string $params Parameters to send to job as string
  172. * @param string $method Method to execute
  173. * @param string $unique A unique ID used to identify a particular task
  174. *
  175. * @return mixed Return result of the call. If worker is not valid, return false
  176. */
  177. protected function enqueue($jobName, $params, $method, $unique)
  178. {
  179. $worker = $this->getJob($jobName);
  180. $unique = $this
  181. ->uniqueJobIdentifierGenerator
  182. ->generateUniqueKey($jobName, $params, $unique, $method);
  183. return $worker
  184. ? $this->doEnqueue($worker, $params, $method, $unique)
  185. : false;
  186. }
  187. /**
  188. * Execute a GearmanClient call given a worker, params and a method.
  189. *
  190. * If he GarmanClient call is asyncronous, result value will be a handler.
  191. * Otherwise, will return job result.
  192. *
  193. * @param array $worker Worker definition
  194. * @param string $params Parameters to send to job as string
  195. * @param string $method Method to execute
  196. * @param string $unique A unique ID used to identify a particular task
  197. *
  198. * @return mixed Return result of the GearmanClient call
  199. */
  200. protected function doEnqueue(array $worker, $params, $method, $unique)
  201. {
  202. $gearmanClient = new \GearmanClient();
  203. $this->assignServers($gearmanClient);
  204. $result = $gearmanClient->$method($worker['job']['realCallableName'], $params, $unique);
  205. $this->returnCode = $gearmanClient->returnCode();
  206. return $result;
  207. }
  208. /**
  209. * Given a GearmanClient, set all included servers
  210. *
  211. * @param \GearmanClient $gearmanClient Object to include servers
  212. *
  213. * @return GearmanClient Returns self object
  214. */
  215. protected function assignServers(\GearmanClient $gearmanClient)
  216. {
  217. $servers = $this->defaultServers;
  218. if (!empty($this->servers)) {
  219. $servers = $this->servers;
  220. }
  221. /**
  222. * We include each server into gearman client
  223. */
  224. foreach ($servers as $server) {
  225. $gearmanClient->addServer($server['host'], $server['port']);
  226. }
  227. return $this;
  228. }
  229. /**
  230. * Job methods
  231. */
  232. /**
  233. * Runs a single task and returns some result, depending of method called.
  234. * Method called depends of default callable method setted on gearman
  235. * settings or overwritted on work or job annotations
  236. *
  237. * @param string $name A GearmanBundle registered function the worker is to execute
  238. * @param string $params Parameters to send to job as string
  239. * @param string $unique A unique ID used to identify a particular task
  240. *
  241. * @return mixed result depending of method called.
  242. */
  243. public function callJob($name, $params = '', $unique = null)
  244. {
  245. $worker = $this->getJob($name);
  246. $methodCallable = $worker['job']['defaultMethod'];
  247. return $this->enqueue($name, $params, $methodCallable, $unique);
  248. }
  249. /**
  250. * Runs a single task and returns a string representation of the result.
  251. * It is up to the GearmanClient and GearmanWorker to agree on the format of
  252. * the result.
  253. *
  254. * The GearmanClient::do() method is deprecated as of pecl/gearman 1.0.0.
  255. * Use GearmanClient::doNormal().
  256. *
  257. * @param string $name A GearmanBundle registered function the worker is to execute
  258. * @param string $params Parameters to send to job as string
  259. * @param string $unique A unique ID used to identify a particular task
  260. *
  261. * @return string A string representing the results of running a task.
  262. * @deprecated
  263. */
  264. public function doJob($name, $params = '', $unique = null)
  265. {
  266. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DONORMAL, $unique);
  267. }
  268. /**
  269. * Runs a single task and returns a string representation of the result.
  270. * It is up to the GearmanClient and GearmanWorker to agree on the format of
  271. * the result.
  272. *
  273. * @param string $name A GearmanBundle registered function the worker is to execute
  274. * @param string $params Parameters to send to job as string
  275. * @param string $unique A unique ID used to identify a particular task
  276. *
  277. * @return string A string representing the results of running a task.
  278. */
  279. public function doNormalJob($name, $params = '', $unique = null)
  280. {
  281. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DONORMAL, $unique);
  282. }
  283. /**
  284. * Runs a task in the background, returning a job handle which can be used
  285. * to get the status of the running task.
  286. *
  287. * @param string $name A GearmanBundle registered function the worker is to execute
  288. * @param string $params Parameters to send to job as string
  289. * @param string $unique A unique ID used to identify a particular task
  290. *
  291. * @return string Job handle for the submitted task.
  292. */
  293. public function doBackgroundJob($name, $params = '', $unique = null)
  294. {
  295. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOBACKGROUND, $unique);
  296. }
  297. /**
  298. * Runs a single high priority task and returns a string representation of
  299. * the result.
  300. *
  301. * It is up to the GearmanClient and GearmanWorker to agree on the format of
  302. * the result.
  303. *
  304. * High priority tasks will get precedence over normal and low priority
  305. * tasks in the job queue.
  306. *
  307. * @param string $name A GearmanBundle registered function the worker is to execute
  308. * @param string $params Parameters to send to job as string
  309. * @param string $unique A unique ID used to identify a particular task
  310. *
  311. * @return string A string representing the results of running a task.
  312. */
  313. public function doHighJob($name, $params = '', $unique = null)
  314. {
  315. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOHIGH, $unique);
  316. }
  317. /**
  318. * Runs a high priority task in the background, returning a job handle which
  319. * can be used to get the status of the running task.
  320. *
  321. * High priority tasks take precedence over normal and low priority tasks in
  322. * the job queue.
  323. *
  324. * @param string $name A GearmanBundle registered function the worker is to execute
  325. * @param string $params Parameters to send to job as string
  326. * @param string $unique A unique ID used to identify a particular task
  327. *
  328. * @return string The job handle for the submitted task.
  329. */
  330. public function doHighBackgroundJob($name, $params = '', $unique = null)
  331. {
  332. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOHIGHBACKGROUND, $unique);
  333. }
  334. /**
  335. * Runs a single low priority task and returns a string representation of
  336. * the result.
  337. *
  338. * It is up to the GearmanClient and GearmanWorker to agree on the format of
  339. * the result.
  340. *
  341. * Normal and high priority tasks will get precedence over low priority
  342. * tasks in the job queue.
  343. *
  344. * @param string $name A GearmanBundle registered function the worker is to execute
  345. * @param string $params Parameters to send to job as string
  346. * @param string $unique A unique ID used to identify a particular task
  347. *
  348. * @return string A string representing the results of running a task.
  349. */
  350. public function doLowJob($name, $params = '', $unique = null)
  351. {
  352. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOLOW, $unique);
  353. }
  354. /**
  355. * Runs a low priority task in the background, returning a job handle which
  356. * can be used to get the status of the running task.
  357. *
  358. * Normal and high priority tasks will get precedence over low priority
  359. * tasks in the job queue.
  360. *
  361. * @param string $name A GearmanBundle registered function the worker is to execute
  362. * @param string $params Parameters to send to job as string
  363. * @param string $unique A unique ID used to identify a particular task
  364. *
  365. * @return string The job handle for the submitted task.
  366. */
  367. public function doLowBackgroundJob($name, $params = '', $unique = null)
  368. {
  369. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOLOWBACKGROUND, $unique);
  370. }
  371. /**
  372. * Fetches the Status of a special Background Job.
  373. *
  374. * @param string $idJob The job handle string
  375. *
  376. * @return JobStatus Job status
  377. */
  378. public function getJobStatus($idJob)
  379. {
  380. $gearmanClient = new \GearmanClient();
  381. $this->assignServers($gearmanClient);
  382. $statusData = $gearmanClient->jobStatus($idJob);
  383. $jobStatus = new JobStatus($statusData);
  384. return $jobStatus;
  385. }
  386. /**
  387. * Gets the return code from the last run job.
  388. *
  389. * @return int
  390. */
  391. public function getReturnCode()
  392. {
  393. return $this->returnCode;
  394. }
  395. /**
  396. * Task methods
  397. */
  398. /**
  399. * Adds a task to be run in parallel with other tasks.
  400. * Call this method for all the tasks to be run in parallel, then call
  401. * GearmanClient::runTasks() to perform the work.
  402. *
  403. * Note that enough workers need to be available for the tasks to all run in
  404. * parallel.
  405. *
  406. * @param string $name A GermanBundle registered function to be executed
  407. * @param string $params Parameters to send to task as string
  408. * @param Mixed &$context Application context to associate with a task
  409. * @param string $unique A unique ID used to identify a particular task
  410. *
  411. * @return GearmanClient Return this object
  412. */
  413. public function addTask($name, $params = '', &$context = null, $unique = null)
  414. {
  415. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASK);
  416. return $this;
  417. }
  418. /**
  419. * Adds a high priority task to be run in parallel with other tasks.
  420. * Call this method for all the high priority tasks to be run in parallel,
  421. * then call GearmanClient::runTasks() to perform the work.
  422. *
  423. * Tasks with a high priority will be selected from the queue before those
  424. * of normal or low priority.
  425. *
  426. * @param string $name A GermanBundle registered function to be executed
  427. * @param string $params Parameters to send to task as string
  428. * @param Mixed &$context Application context to associate with a task
  429. * @param string $unique A unique ID used to identify a particular task
  430. *
  431. * @return GearmanClient Return this object
  432. */
  433. public function addTaskHigh($name, $params = '', &$context = null, $unique = null)
  434. {
  435. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKHIGH);
  436. return $this;
  437. }
  438. /**
  439. * Adds a low priority background task to be run in parallel with other
  440. * tasks.
  441. *
  442. * Call this method for all the tasks to be run in parallel, then call
  443. * GearmanClient::runTasks() to perform the work.
  444. *
  445. * Tasks with a low priority will be selected from the queue after those of
  446. * normal or low priority.
  447. *
  448. * @param string $name A GermanBundle registered function to be executed
  449. * @param string $params Parameters to send to task as string
  450. * @param Mixed &$context Application context to associate with a task
  451. * @param string $unique A unique ID used to identify a particular task
  452. *
  453. * @return GearmanClient Return this object
  454. */
  455. public function addTaskLow($name, $params = '', &$context = null, $unique = null)
  456. {
  457. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKLOW);
  458. return $this;
  459. }
  460. /**
  461. * Adds a background task to be run in parallel with other tasks
  462. * Call this method for all the tasks to be run in parallel, then call
  463. * GearmanClient::runTasks() to perform the work.
  464. *
  465. * @param string $name A GermanBundle registered function to be executed
  466. * @param string $params Parameters to send to task as string
  467. * @param Mixed &$context Application context to associate with a task
  468. * @param string $unique A unique ID used to identify a particular task
  469. *
  470. * @return GearmanClient Return this object
  471. */
  472. public function addTaskBackground($name, $params = '', &$context = null, $unique = null)
  473. {
  474. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKBACKGROUND);
  475. return $this;
  476. }
  477. /**
  478. * Adds a high priority background task to be run in parallel with other
  479. * tasks.
  480. *
  481. * Call this method for all the tasks to be run in parallel, then call
  482. * GearmanClient::runTasks() to perform the work.
  483. *
  484. * Tasks with a high priority will be selected from the queue before those
  485. * of normal or low priority.
  486. *
  487. * @param string $name A GermanBundle registered function to be executed
  488. * @param string $params Parameters to send to task as string
  489. * @param Mixed &$context Application context to associate with a task
  490. * @param string $unique A unique ID used to identify a particular task
  491. *
  492. * @return GearmanClient Return this object
  493. */
  494. public function addTaskHighBackground($name, $params = '', &$context = null, $unique = null)
  495. {
  496. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKHIGHBACKGROUND);
  497. return $this;
  498. }
  499. /**
  500. * Adds a low priority background task to be run in parallel with other
  501. * tasks.
  502. *
  503. * Call this method for all the tasks to be run in parallel, then call
  504. * GearmanClient::runTasks() to perform the work.
  505. *
  506. * Tasks with a low priority will be selected from the queue after those of
  507. * normal or high priority.
  508. *
  509. * @param string $name A GermanBundle registered function to be executed
  510. * @param string $params Parameters to send to task as string
  511. * @param Mixed &$context Application context to associate with a task
  512. * @param string $unique A unique ID used to identify a particular task
  513. *
  514. * @return GearmanClient Return this object
  515. */
  516. public function addTaskLowBackground($name, $params = '', &$context = null, $unique = null)
  517. {
  518. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKLOWBACKGROUND);
  519. return $this;
  520. }
  521. /**
  522. * Adds a task into the structure of tasks with included type of call
  523. *
  524. * @param string $name A GermanBundle registered function to be executed
  525. * @param string $params Parameters to send to task as string
  526. * @param Mixed $context Application context to associate with a task
  527. * @param string $unique A unique ID used to identify a particular task
  528. * @param string $method Method to perform
  529. *
  530. * @return GearmanClient Return this object
  531. */
  532. protected function enqueueTask($name, $params, $context, $unique, $method)
  533. {
  534. $task = array(
  535. 'name' => $name,
  536. 'params' => $params,
  537. 'context' => $context,
  538. 'unique' => $this->uniqueJobIdentifierGenerator->generateUniqueKey($name, $params, $unique, $method),
  539. 'method' => $method,
  540. );
  541. $this->addTaskToStructure($task);
  542. return $this;
  543. }
  544. /**
  545. * Appends a task structure into taskStructure array
  546. *
  547. * @param array $task Task structure
  548. *
  549. * @return GearmanClient Return this object
  550. */
  551. protected function addTaskToStructure(array $task)
  552. {
  553. $this->taskStructure[] = $task;
  554. return $this;
  555. }
  556. /**
  557. * For a set of tasks previously added with
  558. *
  559. * GearmanClient::addTask(),
  560. * GearmanClient::addTaskHigh(),
  561. * GearmanClient::addTaskLow(),
  562. * GearmanClient::addTaskBackground(),
  563. * GearmanClient::addTaskHighBackground(),
  564. * GearmanClient::addTaskLowBackground(),
  565. *
  566. * this call starts running the tasks in parallel.
  567. * Note that enough workers need to be available for the tasks to all run in parallel
  568. *
  569. * @return boolean run tasks result
  570. */
  571. public function runTasks()
  572. {
  573. $gearmanClient = new \GearmanClient();
  574. $this->assignServers($gearmanClient);
  575. if ($this->settings['callbacks']) {
  576. $this->gearmanCallbacksDispatcher->assignTaskCallbacks($gearmanClient);
  577. }
  578. foreach ($this->taskStructure as $task) {
  579. $type = $task['method'];
  580. $jobName = $task['name'];
  581. $worker = $this->getJob($jobName);
  582. if (false !== $worker) {
  583. $gearmanClient->$type(
  584. $worker['job']['realCallableName'],
  585. $task['params'],
  586. $task['context'],
  587. $task['unique']
  588. );
  589. }
  590. }
  591. $this->initTaskStructure();
  592. return $gearmanClient->runTasks();
  593. }
  594. }