GearmanClient.php 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697
  1. <?php
  2. /**
  3. * Gearman Bundle for Symfony2
  4. *
  5. * For the full copyright and license information, please view the LICENSE
  6. * file that was distributed with this source code.
  7. *
  8. * Feel free to edit as you please, and have fun.
  9. *
  10. * @author Marc Morera <yuhu@mmoreram.com>
  11. */
  12. namespace Mmoreram\GearmanBundle\Service;
  13. use Mmoreram\GearmanBundle\Dispatcher\GearmanCallbacksDispatcher;
  14. use Mmoreram\GearmanBundle\GearmanMethods;
  15. use Mmoreram\GearmanBundle\Generator\UniqueJobIdentifierGenerator;
  16. use Mmoreram\GearmanBundle\Module\JobStatus;
  17. use Mmoreram\GearmanBundle\Service\Abstracts\AbstractGearmanService;
  18. /**
  19. * GearmanClient. Implementation of AbstractGearmanService
  20. *
  21. * @since 2.3.1
  22. */
  23. class GearmanClient extends AbstractGearmanService
  24. {
  25. /**
  26. * @var \GearmanClient
  27. *
  28. * Gearman native client instance
  29. */
  30. protected $gearmanClient;
  31. /**
  32. * @var GearmanCallbacksDispatcher
  33. *
  34. * Gearman callbacks dispatcher
  35. */
  36. protected $gearmanCallbacksDispatcher;
  37. /**
  38. * @var array
  39. *
  40. * Server set to define in what server must connect to
  41. */
  42. protected $servers = array();
  43. /**
  44. * @var array
  45. *
  46. * task structure to store all about called tasks
  47. */
  48. protected $taskStructure = array();
  49. /**
  50. * @var array
  51. *
  52. * Set default servers
  53. */
  54. protected $defaultServers;
  55. /**
  56. * @var array
  57. *
  58. * Set default settings
  59. */
  60. protected $settings;
  61. /**
  62. * @var UniqueJobIdentifierGenerator
  63. *
  64. * Unique Job Intefier Generator
  65. */
  66. protected $uniqueJobIdentifierGenerator;
  67. /**
  68. * @var int
  69. *
  70. * Return code from internal client.
  71. */
  72. protected $returnCode;
  73. /**
  74. * @return \GearmanClient
  75. */
  76. public function getNativeClient(){
  77. if ($this->gearmanClient === null){
  78. $this->gearmanClient = new \GearmanClient();
  79. }
  80. return $this->gearmanClient;
  81. }
  82. /**
  83. * Init tasks structure
  84. *
  85. * @return GearmanClient self Object
  86. */
  87. public function initTaskStructure()
  88. {
  89. $this->taskStructure = array();
  90. return $this;
  91. }
  92. /**
  93. * Set default servers
  94. *
  95. * @param array $defaultServers Default servers
  96. *
  97. * @return GearmanClient self Object
  98. */
  99. public function setDefaultServers(array $defaultServers)
  100. {
  101. $this->defaultServers = $defaultServers;
  102. return $this;
  103. }
  104. /**
  105. * Set UniqueJobIdentifierGenerator object
  106. *
  107. * @param UniqueJobIdentifierGenerator $uniqueJobIdentifierGenerator Unique Job Intefier Generator
  108. *
  109. * @return GearmanClient self Object
  110. */
  111. public function setUniqueJobIdentifierGenerator(UniqueJobIdentifierGenerator $uniqueJobIdentifierGenerator)
  112. {
  113. $this->uniqueJobIdentifierGenerator = $uniqueJobIdentifierGenerator;
  114. return $this;
  115. }
  116. /**
  117. * Set gearman callbacks
  118. *
  119. * @param GearmanCallbacksDispatcher $gearmanCallbacksDispatcher Gearman callbacks dispatcher
  120. *
  121. * @return GearmanClient self Object
  122. */
  123. public function setGearmanCallbacksDispatcher(GearmanCallbacksDispatcher $gearmanCallbacksDispatcher)
  124. {
  125. $this->gearmanCallbacksDispatcher = $gearmanCallbacksDispatcher;
  126. return $this;
  127. }
  128. /**
  129. * Set default settings
  130. *
  131. * @param array $settings
  132. *
  133. * @return GearmanClient self Object
  134. */
  135. public function setDefaultSettings($settings)
  136. {
  137. $this->settings = $settings;
  138. return $this;
  139. }
  140. /**
  141. * Set server to client. Empty all servers and set this one
  142. *
  143. * @param string $servername Server name (must be ip)
  144. * @param int $port Port of server. By default 4730
  145. *
  146. * @return GearmanClient Returns self object
  147. */
  148. public function setServer($servername, $port = 4730)
  149. {
  150. $this
  151. ->clearServers()
  152. ->addServer($servername, $port);
  153. return $this;
  154. }
  155. /**
  156. * Add server to client
  157. *
  158. * @param string $servername Server name (must be ip)
  159. * @param int $port Port of server. By default 4730
  160. *
  161. * @return GearmanClient Returns self object
  162. */
  163. public function addServer($servername, $port = 4730)
  164. {
  165. $this->servers[] = array(
  166. 'host' => $servername,
  167. 'port' => $port,
  168. );
  169. return $this;
  170. }
  171. /**
  172. * Clear server list
  173. *
  174. * @return GearmanClient Returns self object
  175. */
  176. public function clearServers()
  177. {
  178. $this->servers = array();
  179. return $this;
  180. }
  181. /**
  182. * Get real worker from job name and enqueues the action given one
  183. * method.
  184. *
  185. * @param string $jobName A GearmanBundle registered function the worker is to execute
  186. * @param string $params Parameters to send to job as string
  187. * @param string $method Method to execute
  188. * @param string $unique A unique ID used to identify a particular task
  189. *
  190. * @return mixed Return result of the call. If worker is not valid, return false
  191. */
  192. protected function enqueue($jobName, $params, $method, $unique)
  193. {
  194. $worker = $this->getJob($jobName);
  195. $unique = $this
  196. ->uniqueJobIdentifierGenerator
  197. ->generateUniqueKey($jobName, $params, $unique, $method);
  198. return $worker
  199. ? $this->doEnqueue($worker, $params, $method, $unique)
  200. : false;
  201. }
  202. /**
  203. * Execute a GearmanClient call given a worker, params and a method.
  204. *
  205. * If he GarmanClient call is asyncronous, result value will be a handler.
  206. * Otherwise, will return job result.
  207. *
  208. * @param array $worker Worker definition
  209. * @param string $params Parameters to send to job as string
  210. * @param string $method Method to execute
  211. * @param string $unique A unique ID used to identify a particular task
  212. *
  213. * @return mixed Return result of the GearmanClient call
  214. */
  215. protected function doEnqueue(array $worker, $params, $method, $unique)
  216. {
  217. $gearmanClient = $this->getNativeClient();
  218. $this->assignServers($gearmanClient);
  219. $result = $gearmanClient->$method($worker['job']['realCallableName'], $params, $unique);
  220. $this->returnCode = $gearmanClient->returnCode();
  221. $this->gearmanClient = null;
  222. return $result;
  223. }
  224. /**
  225. * Given a GearmanClient, set all included servers
  226. *
  227. * @param \GearmanClient $gearmanClient Object to include servers
  228. *
  229. * @return GearmanClient Returns self object
  230. */
  231. protected function assignServers(\GearmanClient $gearmanClient)
  232. {
  233. $servers = $this->defaultServers;
  234. if (!empty($this->servers)) {
  235. $servers = $this->servers;
  236. }
  237. /**
  238. * We include each server into gearman client
  239. */
  240. foreach ($servers as $server) {
  241. $gearmanClient->addServer($server['host'], $server['port']);
  242. }
  243. return $this;
  244. }
  245. /**
  246. * Job methods
  247. */
  248. /**
  249. * Runs a single task and returns some result, depending of method called.
  250. * Method called depends of default callable method setted on gearman
  251. * settings or overwritted on work or job annotations
  252. *
  253. * @param string $name A GearmanBundle registered function the worker is to execute
  254. * @param string $params Parameters to send to job as string
  255. * @param string $unique A unique ID used to identify a particular task
  256. *
  257. * @return mixed result depending of method called.
  258. */
  259. public function callJob($name, $params = '', $unique = null)
  260. {
  261. $worker = $this->getJob($name);
  262. $methodCallable = $worker['job']['defaultMethod'];
  263. return $this->enqueue($name, $params, $methodCallable, $unique);
  264. }
  265. /**
  266. * Runs a single task and returns a string representation of the result.
  267. * It is up to the GearmanClient and GearmanWorker to agree on the format of
  268. * the result.
  269. *
  270. * The GearmanClient::do() method is deprecated as of pecl/gearman 1.0.0.
  271. * Use GearmanClient::doNormal().
  272. *
  273. * @param string $name A GearmanBundle registered function the worker is to execute
  274. * @param string $params Parameters to send to job as string
  275. * @param string $unique A unique ID used to identify a particular task
  276. *
  277. * @return string A string representing the results of running a task.
  278. * @deprecated
  279. */
  280. public function doJob($name, $params = '', $unique = null)
  281. {
  282. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DONORMAL, $unique);
  283. }
  284. /**
  285. * Runs a single task and returns a string representation of the result.
  286. * It is up to the GearmanClient and GearmanWorker to agree on the format of
  287. * the result.
  288. *
  289. * @param string $name A GearmanBundle registered function the worker is to execute
  290. * @param string $params Parameters to send to job as string
  291. * @param string $unique A unique ID used to identify a particular task
  292. *
  293. * @return string A string representing the results of running a task.
  294. */
  295. public function doNormalJob($name, $params = '', $unique = null)
  296. {
  297. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DONORMAL, $unique);
  298. }
  299. /**
  300. * Runs a task in the background, returning a job handle which can be used
  301. * to get the status of the running task.
  302. *
  303. * @param string $name A GearmanBundle registered function the worker is to execute
  304. * @param string $params Parameters to send to job as string
  305. * @param string $unique A unique ID used to identify a particular task
  306. *
  307. * @return string Job handle for the submitted task.
  308. */
  309. public function doBackgroundJob($name, $params = '', $unique = null)
  310. {
  311. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOBACKGROUND, $unique);
  312. }
  313. /**
  314. * Runs a single high priority task and returns a string representation of
  315. * the result.
  316. *
  317. * It is up to the GearmanClient and GearmanWorker to agree on the format of
  318. * the result.
  319. *
  320. * High priority tasks will get precedence over normal and low priority
  321. * tasks in the job queue.
  322. *
  323. * @param string $name A GearmanBundle registered function the worker is to execute
  324. * @param string $params Parameters to send to job as string
  325. * @param string $unique A unique ID used to identify a particular task
  326. *
  327. * @return string A string representing the results of running a task.
  328. */
  329. public function doHighJob($name, $params = '', $unique = null)
  330. {
  331. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOHIGH, $unique);
  332. }
  333. /**
  334. * Runs a high priority task in the background, returning a job handle which
  335. * can be used to get the status of the running task.
  336. *
  337. * High priority tasks take precedence over normal and low priority tasks in
  338. * the job queue.
  339. *
  340. * @param string $name A GearmanBundle registered function the worker is to execute
  341. * @param string $params Parameters to send to job as string
  342. * @param string $unique A unique ID used to identify a particular task
  343. *
  344. * @return string The job handle for the submitted task.
  345. */
  346. public function doHighBackgroundJob($name, $params = '', $unique = null)
  347. {
  348. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOHIGHBACKGROUND, $unique);
  349. }
  350. /**
  351. * Runs a single low priority task and returns a string representation of
  352. * the result.
  353. *
  354. * It is up to the GearmanClient and GearmanWorker to agree on the format of
  355. * the result.
  356. *
  357. * Normal and high priority tasks will get precedence over low priority
  358. * tasks in the job queue.
  359. *
  360. * @param string $name A GearmanBundle registered function the worker is to execute
  361. * @param string $params Parameters to send to job as string
  362. * @param string $unique A unique ID used to identify a particular task
  363. *
  364. * @return string A string representing the results of running a task.
  365. */
  366. public function doLowJob($name, $params = '', $unique = null)
  367. {
  368. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOLOW, $unique);
  369. }
  370. /**
  371. * Runs a low priority task in the background, returning a job handle which
  372. * can be used to get the status of the running task.
  373. *
  374. * Normal and high priority tasks will get precedence over low priority
  375. * tasks in the job queue.
  376. *
  377. * @param string $name A GearmanBundle registered function the worker is to execute
  378. * @param string $params Parameters to send to job as string
  379. * @param string $unique A unique ID used to identify a particular task
  380. *
  381. * @return string The job handle for the submitted task.
  382. */
  383. public function doLowBackgroundJob($name, $params = '', $unique = null)
  384. {
  385. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOLOWBACKGROUND, $unique);
  386. }
  387. /**
  388. * Fetches the Status of a special Background Job.
  389. *
  390. * @param string $idJob The job handle string
  391. *
  392. * @return JobStatus Job status
  393. */
  394. public function getJobStatus($idJob)
  395. {
  396. $gearmanClient = $this->getNativeClient();
  397. $this->assignServers($gearmanClient);
  398. $statusData = $gearmanClient->jobStatus($idJob);
  399. $jobStatus = new JobStatus($statusData);
  400. $this->gearmanClient = null;
  401. return $jobStatus;
  402. }
  403. /**
  404. * Gets the return code from the last run job.
  405. *
  406. * @return int
  407. */
  408. public function getReturnCode()
  409. {
  410. return $this->returnCode;
  411. }
  412. /**
  413. * Task methods
  414. */
  415. /**
  416. * Adds a task to be run in parallel with other tasks.
  417. * Call this method for all the tasks to be run in parallel, then call
  418. * GearmanClient::runTasks() to perform the work.
  419. *
  420. * Note that enough workers need to be available for the tasks to all run in
  421. * parallel.
  422. *
  423. * @param string $name A GermanBundle registered function to be executed
  424. * @param string $params Parameters to send to task as string
  425. * @param Mixed &$context Application context to associate with a task
  426. * @param string $unique A unique ID used to identify a particular task
  427. *
  428. * @return GearmanClient Return this object
  429. */
  430. public function addTask($name, $params = '', &$context = null, $unique = null)
  431. {
  432. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASK);
  433. return $this;
  434. }
  435. /**
  436. * Adds a high priority task to be run in parallel with other tasks.
  437. * Call this method for all the high priority tasks to be run in parallel,
  438. * then call GearmanClient::runTasks() to perform the work.
  439. *
  440. * Tasks with a high priority will be selected from the queue before those
  441. * of normal or low priority.
  442. *
  443. * @param string $name A GermanBundle registered function to be executed
  444. * @param string $params Parameters to send to task as string
  445. * @param Mixed &$context Application context to associate with a task
  446. * @param string $unique A unique ID used to identify a particular task
  447. *
  448. * @return GearmanClient Return this object
  449. */
  450. public function addTaskHigh($name, $params = '', &$context = null, $unique = null)
  451. {
  452. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKHIGH);
  453. return $this;
  454. }
  455. /**
  456. * Adds a low priority background task to be run in parallel with other
  457. * tasks.
  458. *
  459. * Call this method for all the tasks to be run in parallel, then call
  460. * GearmanClient::runTasks() to perform the work.
  461. *
  462. * Tasks with a low priority will be selected from the queue after those of
  463. * normal or low priority.
  464. *
  465. * @param string $name A GermanBundle registered function to be executed
  466. * @param string $params Parameters to send to task as string
  467. * @param Mixed &$context Application context to associate with a task
  468. * @param string $unique A unique ID used to identify a particular task
  469. *
  470. * @return GearmanClient Return this object
  471. */
  472. public function addTaskLow($name, $params = '', &$context = null, $unique = null)
  473. {
  474. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKLOW);
  475. return $this;
  476. }
  477. /**
  478. * Adds a background task to be run in parallel with other tasks
  479. * Call this method for all the tasks to be run in parallel, then call
  480. * GearmanClient::runTasks() to perform the work.
  481. *
  482. * @param string $name A GermanBundle registered function to be executed
  483. * @param string $params Parameters to send to task as string
  484. * @param Mixed &$context Application context to associate with a task
  485. * @param string $unique A unique ID used to identify a particular task
  486. *
  487. * @return GearmanClient Return this object
  488. */
  489. public function addTaskBackground($name, $params = '', &$context = null, $unique = null)
  490. {
  491. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKBACKGROUND);
  492. return $this;
  493. }
  494. /**
  495. * Adds a high priority background task to be run in parallel with other
  496. * tasks.
  497. *
  498. * Call this method for all the tasks to be run in parallel, then call
  499. * GearmanClient::runTasks() to perform the work.
  500. *
  501. * Tasks with a high priority will be selected from the queue before those
  502. * of normal or low priority.
  503. *
  504. * @param string $name A GermanBundle registered function to be executed
  505. * @param string $params Parameters to send to task as string
  506. * @param Mixed &$context Application context to associate with a task
  507. * @param string $unique A unique ID used to identify a particular task
  508. *
  509. * @return GearmanClient Return this object
  510. */
  511. public function addTaskHighBackground($name, $params = '', &$context = null, $unique = null)
  512. {
  513. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKHIGHBACKGROUND);
  514. return $this;
  515. }
  516. /**
  517. * Adds a low priority background task to be run in parallel with other
  518. * tasks.
  519. *
  520. * Call this method for all the tasks to be run in parallel, then call
  521. * GearmanClient::runTasks() to perform the work.
  522. *
  523. * Tasks with a low priority will be selected from the queue after those of
  524. * normal or high priority.
  525. *
  526. * @param string $name A GermanBundle registered function to be executed
  527. * @param string $params Parameters to send to task as string
  528. * @param Mixed &$context Application context to associate with a task
  529. * @param string $unique A unique ID used to identify a particular task
  530. *
  531. * @return GearmanClient Return this object
  532. */
  533. public function addTaskLowBackground($name, $params = '', &$context = null, $unique = null)
  534. {
  535. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKLOWBACKGROUND);
  536. return $this;
  537. }
  538. /**
  539. * Adds a task into the structure of tasks with included type of call
  540. *
  541. * @param string $name A GermanBundle registered function to be executed
  542. * @param string $params Parameters to send to task as string
  543. * @param Mixed $context Application context to associate with a task
  544. * @param string $unique A unique ID used to identify a particular task
  545. * @param string $method Method to perform
  546. *
  547. * @return GearmanClient Return this object
  548. */
  549. protected function enqueueTask($name, $params, &$context, $unique, $method)
  550. {
  551. $contextReference = array('context' => &$context);
  552. $task = array(
  553. 'name' => $name,
  554. 'params' => $params,
  555. 'context' => $contextReference,
  556. 'unique' => $this->uniqueJobIdentifierGenerator->generateUniqueKey($name, $params, $unique, $method),
  557. 'method' => $method,
  558. );
  559. $this->addTaskToStructure($task);
  560. return $this;
  561. }
  562. /**
  563. * Appends a task structure into taskStructure array
  564. *
  565. * @param array $task Task structure
  566. *
  567. * @return GearmanClient Return this object
  568. */
  569. protected function addTaskToStructure(array $task)
  570. {
  571. $this->taskStructure[] = $task;
  572. return $this;
  573. }
  574. /**
  575. * For a set of tasks previously added with
  576. *
  577. * GearmanClient::addTask(),
  578. * GearmanClient::addTaskHigh(),
  579. * GearmanClient::addTaskLow(),
  580. * GearmanClient::addTaskBackground(),
  581. * GearmanClient::addTaskHighBackground(),
  582. * GearmanClient::addTaskLowBackground(),
  583. *
  584. * this call starts running the tasks in parallel.
  585. * Note that enough workers need to be available for the tasks to all run in parallel
  586. *
  587. * @return boolean run tasks result
  588. */
  589. public function runTasks()
  590. {
  591. $gearmanClient = $this->getNativeClient();
  592. $this->assignServers($gearmanClient);
  593. if ($this->settings['callbacks']) {
  594. $this->gearmanCallbacksDispatcher->assignTaskCallbacks($gearmanClient);
  595. }
  596. foreach ($this->taskStructure as $task) {
  597. $type = $task['method'];
  598. $jobName = $task['name'];
  599. $worker = $this->getJob($jobName);
  600. if (false !== $worker) {
  601. $gearmanClient->$type(
  602. $worker['job']['realCallableName'],
  603. $task['params'],
  604. $task['context'],
  605. $task['unique']
  606. );
  607. }
  608. }
  609. $this->initTaskStructure();
  610. $this->gearmanClient = null;
  611. return $gearmanClient->runTasks();
  612. }
  613. }