GearmanClient.php 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654
  1. <?php
  2. /**
  3. * Gearman Bundle for Symfony2
  4. *
  5. * For the full copyright and license information, please view the LICENSE
  6. * file that was distributed with this source code.
  7. *
  8. * Feel free to edit as you please, and have fun.
  9. *
  10. * @author Marc Morera <yuhu@mmoreram.com>
  11. */
  12. namespace Mmoreram\GearmanBundle\Service;
  13. use Mmoreram\GearmanBundle\Service\Abstracts\AbstractGearmanService;
  14. use Mmoreram\GearmanBundle\GearmanMethods;
  15. use Mmoreram\GearmanBundle\Module\JobStatus;
  16. use Mmoreram\GearmanBundle\Generator\UniqueJobIdentifierGenerator;
  17. use Mmoreram\GearmanBundle\Dispatcher\GearmanCallbacksDispatcher;
  18. /**
  19. * GearmanClient. Implementation of AbstractGearmanService
  20. *
  21. * @since 2.3.1
  22. */
  23. class GearmanClient extends AbstractGearmanService
  24. {
  25. /**
  26. * @var GearmanCallbacksDispatcher
  27. *
  28. * Gearman callbacks dispatcher
  29. */
  30. protected $gearmanCallbacksDispatcher;
  31. /**
  32. * @var array
  33. *
  34. * Server set to define in what server must connect to
  35. */
  36. protected $servers = array();
  37. /**
  38. * @var array
  39. *
  40. * task structure to store all about called tasks
  41. */
  42. protected $taskStructure = array();
  43. /**
  44. * @var array
  45. *
  46. * Set default servers
  47. */
  48. protected $defaultServers;
  49. /**
  50. * @var array
  51. *
  52. * Set default settings
  53. */
  54. protected $settings;
  55. /**
  56. * @var UniqueJobIdentifierGenerator
  57. *
  58. * Unique Job Intefier Generator
  59. */
  60. protected $uniqueJobIdentifierGenerator;
  61. /**
  62. * Init tasks structure
  63. *
  64. * @return GearmanClient self Object
  65. */
  66. public function initTaskStructure()
  67. {
  68. $this->taskStructure = array();
  69. return $this;
  70. }
  71. /**
  72. * Set default servers
  73. *
  74. * @param array $defaultServers Default servers
  75. *
  76. * @return GearmanClient self Object
  77. */
  78. public function setDefaultServers(array $defaultServers)
  79. {
  80. $this->defaultServers = $defaultServers;
  81. return $this;
  82. }
  83. /**
  84. * Set UniqueJobIdentifierGenerator object
  85. *
  86. * @param UniqueJobIdentifierGenerator $uniqueJobIdentifierGenerator Unique Job Intefier Generator
  87. *
  88. * @return GearmanClient self Object
  89. */
  90. public function setUniqueJobIdentifierGenerator(UniqueJobIdentifierGenerator $uniqueJobIdentifierGenerator)
  91. {
  92. $this->uniqueJobIdentifierGenerator = $uniqueJobIdentifierGenerator;
  93. return $this;
  94. }
  95. /**
  96. * Set gearman callbacks
  97. *
  98. * @param GearmanCallbacksDispatcher $gearmanCallbacksDispatcher Gearman callbacks dispatcher
  99. *
  100. * @return GearmanClient self Object
  101. */
  102. public function setGearmanCallbacksDispatcher(GearmanCallbacksDispatcher $gearmanCallbacksDispatcher)
  103. {
  104. $this->gearmanCallbacksDispatcher = $gearmanCallbacksDispatcher;
  105. return $this;
  106. }
  107. /**
  108. * Set default settings
  109. *
  110. * @param array $settings
  111. *
  112. * @return GearmanClient self Object
  113. */
  114. public function setDefaultSettings($settings)
  115. {
  116. $this->settings = $settings;
  117. return $this;
  118. }
  119. /**
  120. * Set server to client. Empty all servers and set this one
  121. *
  122. * @param string $servername Server name (must be ip)
  123. * @param int $port Port of server. By default 4730
  124. *
  125. * @return GearmanClient Returns self object
  126. */
  127. public function setServer($servername, $port = 4730)
  128. {
  129. $this
  130. ->clearServers()
  131. ->addServer($servername, $port);
  132. return $this;
  133. }
  134. /**
  135. * Add server to client
  136. *
  137. * @param string $servername Server name (must be ip)
  138. * @param int $port Port of server. By default 4730
  139. *
  140. * @return GearmanClient Returns self object
  141. */
  142. public function addServer($servername, $port = 4730)
  143. {
  144. $this->servers[] = array(
  145. 'host' => $servername,
  146. 'port' => $port,
  147. );
  148. return $this;
  149. }
  150. /**
  151. * Clear server list
  152. *
  153. * @return GearmanClient Returns self object
  154. */
  155. public function clearServers()
  156. {
  157. $this->servers = array();
  158. return $this;
  159. }
  160. /**
  161. * Get real worker from job name and enqueues the action given one
  162. * method.
  163. *
  164. * @param string $jobName A GearmanBundle registered function the worker is to execute
  165. * @param string $params Parameters to send to job as string
  166. * @param string $method Method to execute
  167. * @param string $unique A unique ID used to identify a particular task
  168. *
  169. * @return mixed Return result of the call. If worker is not valid, return false
  170. */
  171. protected function enqueue($jobName, $params, $method, $unique)
  172. {
  173. $worker = $this->getJob($jobName);
  174. $unique = $this
  175. ->uniqueJobIdentifierGenerator
  176. ->generateUniqueKey($jobName, $params, $unique, $method);
  177. return $worker
  178. ? $this->doEnqueue($worker, $params, $method, $unique)
  179. : false;
  180. }
  181. /**
  182. * Execute a GearmanClient call given a worker, params and a method.
  183. *
  184. * If he GarmanClient call is asyncronous, result value will be a handler.
  185. * Otherwise, will return job result.
  186. *
  187. * @param array $worker Worker definition
  188. * @param string $params Parameters to send to job as string
  189. * @param string $method Method to execute
  190. * @param string $unique A unique ID used to identify a particular task
  191. *
  192. * @return mixed Return result of the GearmanClient call
  193. */
  194. protected function doEnqueue(array $worker, $params, $method, $unique)
  195. {
  196. $gearmanClient = new \GearmanClient();
  197. $this->assignServers($gearmanClient);
  198. return $gearmanClient->$method($worker['job']['realCallableName'], $params, $unique);
  199. }
  200. /**
  201. * Given a GearmanClient, set all included servers
  202. *
  203. * @param \GearmanClient $gearmanClient Object to include servers
  204. *
  205. * @return GearmanClient Returns self object
  206. */
  207. protected function assignServers(\GearmanClient $gearmanClient)
  208. {
  209. $servers = $this->defaultServers;
  210. if (!empty($this->servers)) {
  211. $servers = $this->servers;
  212. }
  213. /**
  214. * We include each server into gearman client
  215. */
  216. foreach ($servers as $server) {
  217. $gearmanClient->addServer($server['host'], $server['port']);
  218. }
  219. return $this;
  220. }
  221. /**
  222. * Job methods
  223. */
  224. /**
  225. * Runs a single task and returns some result, depending of method called.
  226. * Method called depends of default callable method setted on gearman
  227. * settings or overwritted on work or job annotations
  228. *
  229. * @param string $name A GearmanBundle registered function the worker is to execute
  230. * @param string $params Parameters to send to job as string
  231. * @param string $unique A unique ID used to identify a particular task
  232. *
  233. * @return mixed result depending of method called.
  234. */
  235. public function callJob($name, $params = '', $unique = null)
  236. {
  237. $worker = $this->getJob($name);
  238. $methodCallable = $worker['job']['defaultMethod'];
  239. return $this->enqueue($name, $params, $methodCallable, $unique);
  240. }
  241. /**
  242. * Runs a single task and returns a string representation of the result.
  243. * It is up to the GearmanClient and GearmanWorker to agree on the format of
  244. * the result.
  245. *
  246. * The GearmanClient::do() method is deprecated as of pecl/gearman 1.0.0.
  247. * Use GearmanClient::doNormal().
  248. *
  249. * @param string $name A GearmanBundle registered function the worker is to execute
  250. * @param string $params Parameters to send to job as string
  251. * @param string $unique A unique ID used to identify a particular task
  252. *
  253. * @return string A string representing the results of running a task.
  254. * @deprecated
  255. */
  256. public function doJob($name, $params = '', $unique = null)
  257. {
  258. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DONORMAL, $unique);
  259. }
  260. /**
  261. * Runs a single task and returns a string representation of the result.
  262. * It is up to the GearmanClient and GearmanWorker to agree on the format of
  263. * the result.
  264. *
  265. * @param string $name A GearmanBundle registered function the worker is to execute
  266. * @param string $params Parameters to send to job as string
  267. * @param string $unique A unique ID used to identify a particular task
  268. *
  269. * @return string A string representing the results of running a task.
  270. */
  271. public function doNormalJob($name, $params = '', $unique = null)
  272. {
  273. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DONORMAL, $unique);
  274. }
  275. /**
  276. * Runs a task in the background, returning a job handle which can be used
  277. * to get the status of the running task.
  278. *
  279. * @param string $name A GearmanBundle registered function the worker is to execute
  280. * @param string $params Parameters to send to job as string
  281. * @param string $unique A unique ID used to identify a particular task
  282. *
  283. * @return string Job handle for the submitted task.
  284. */
  285. public function doBackgroundJob($name, $params = '', $unique = null)
  286. {
  287. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOBACKGROUND, $unique);
  288. }
  289. /**
  290. * Runs a single high priority task and returns a string representation of
  291. * the result.
  292. *
  293. * It is up to the GearmanClient and GearmanWorker to agree on the format of
  294. * the result.
  295. *
  296. * High priority tasks will get precedence over normal and low priority
  297. * tasks in the job queue.
  298. *
  299. * @param string $name A GearmanBundle registered function the worker is to execute
  300. * @param string $params Parameters to send to job as string
  301. * @param string $unique A unique ID used to identify a particular task
  302. *
  303. * @return string A string representing the results of running a task.
  304. */
  305. public function doHighJob($name, $params = '', $unique = null)
  306. {
  307. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOHIGH, $unique);
  308. }
  309. /**
  310. * Runs a high priority task in the background, returning a job handle which
  311. * can be used to get the status of the running task.
  312. *
  313. * High priority tasks take precedence over normal and low priority tasks in
  314. * the job queue.
  315. *
  316. * @param string $name A GearmanBundle registered function the worker is to execute
  317. * @param string $params Parameters to send to job as string
  318. * @param string $unique A unique ID used to identify a particular task
  319. *
  320. * @return string The job handle for the submitted task.
  321. */
  322. public function doHighBackgroundJob($name, $params = '', $unique = null)
  323. {
  324. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOHIGHBACKGROUND, $unique);
  325. }
  326. /**
  327. * Runs a single low priority task and returns a string representation of
  328. * the result.
  329. *
  330. * It is up to the GearmanClient and GearmanWorker to agree on the format of
  331. * the result.
  332. *
  333. * Normal and high priority tasks will get precedence over low priority
  334. * tasks in the job queue.
  335. *
  336. * @param string $name A GearmanBundle registered function the worker is to execute
  337. * @param string $params Parameters to send to job as string
  338. * @param string $unique A unique ID used to identify a particular task
  339. *
  340. * @return string A string representing the results of running a task.
  341. */
  342. public function doLowJob($name, $params = '', $unique = null)
  343. {
  344. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOLOW, $unique);
  345. }
  346. /**
  347. * Runs a low priority task in the background, returning a job handle which
  348. * can be used to get the status of the running task.
  349. *
  350. * Normal and high priority tasks will get precedence over low priority
  351. * tasks in the job queue.
  352. *
  353. * @param string $name A GearmanBundle registered function the worker is to execute
  354. * @param string $params Parameters to send to job as string
  355. * @param string $unique A unique ID used to identify a particular task
  356. *
  357. * @return string The job handle for the submitted task.
  358. */
  359. public function doLowBackgroundJob($name, $params = '', $unique = null)
  360. {
  361. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOLOWBACKGROUND, $unique);
  362. }
  363. /**
  364. * Fetches the Status of a special Background Job.
  365. *
  366. * @param string $idJob The job handle string
  367. *
  368. * @return JobStatus Job status
  369. */
  370. public function getJobStatus($idJob)
  371. {
  372. $gearmanClient = new \GearmanClient();
  373. $this->assignServers($gearmanClient);
  374. $statusData = $gearmanClient->jobStatus($idJob);
  375. $jobStatus = new JobStatus($statusData);
  376. return $jobStatus;
  377. }
  378. /**
  379. * Task methods
  380. */
  381. /**
  382. * Adds a task to be run in parallel with other tasks.
  383. * Call this method for all the tasks to be run in parallel, then call
  384. * GearmanClient::runTasks() to perform the work.
  385. *
  386. * Note that enough workers need to be available for the tasks to all run in
  387. * parallel.
  388. *
  389. * @param string $name A GermanBundle registered function to be executed
  390. * @param string $params Parameters to send to task as string
  391. * @param Mixed &$context Application context to associate with a task
  392. * @param string $unique A unique ID used to identify a particular task
  393. *
  394. * @return GearmanClient Return this object
  395. */
  396. public function addTask($name, $params = '', &$context = null, $unique = null)
  397. {
  398. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASK);
  399. return $this;
  400. }
  401. /**
  402. * Adds a high priority task to be run in parallel with other tasks.
  403. * Call this method for all the high priority tasks to be run in parallel,
  404. * then call GearmanClient::runTasks() to perform the work.
  405. *
  406. * Tasks with a high priority will be selected from the queue before those
  407. * of normal or low priority.
  408. *
  409. * @param string $name A GermanBundle registered function to be executed
  410. * @param string $params Parameters to send to task as string
  411. * @param Mixed &$context Application context to associate with a task
  412. * @param string $unique A unique ID used to identify a particular task
  413. *
  414. * @return GearmanClient Return this object
  415. */
  416. public function addTaskHigh($name, $params = '', &$context = null, $unique = null)
  417. {
  418. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKHIGH);
  419. return $this;
  420. }
  421. /**
  422. * Adds a low priority background task to be run in parallel with other
  423. * tasks.
  424. *
  425. * Call this method for all the tasks to be run in parallel, then call
  426. * GearmanClient::runTasks() to perform the work.
  427. *
  428. * Tasks with a low priority will be selected from the queue after those of
  429. * normal or low priority.
  430. *
  431. * @param string $name A GermanBundle registered function to be executed
  432. * @param string $params Parameters to send to task as string
  433. * @param Mixed &$context Application context to associate with a task
  434. * @param string $unique A unique ID used to identify a particular task
  435. *
  436. * @return GearmanClient Return this object
  437. */
  438. public function addTaskLow($name, $params = '', &$context = null, $unique = null)
  439. {
  440. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKLOW);
  441. return $this;
  442. }
  443. /**
  444. * Adds a background task to be run in parallel with other tasks
  445. * Call this method for all the tasks to be run in parallel, then call
  446. * GearmanClient::runTasks() to perform the work.
  447. *
  448. * @param string $name A GermanBundle registered function to be executed
  449. * @param string $params Parameters to send to task as string
  450. * @param Mixed &$context Application context to associate with a task
  451. * @param string $unique A unique ID used to identify a particular task
  452. *
  453. * @return GearmanClient Return this object
  454. */
  455. public function addTaskBackground($name, $params = '', &$context = null, $unique = null)
  456. {
  457. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKBACKGROUND);
  458. return $this;
  459. }
  460. /**
  461. * Adds a high priority background task to be run in parallel with other
  462. * tasks.
  463. *
  464. * Call this method for all the tasks to be run in parallel, then call
  465. * GearmanClient::runTasks() to perform the work.
  466. *
  467. * Tasks with a high priority will be selected from the queue before those
  468. * of normal or low priority.
  469. *
  470. * @param string $name A GermanBundle registered function to be executed
  471. * @param string $params Parameters to send to task as string
  472. * @param Mixed &$context Application context to associate with a task
  473. * @param string $unique A unique ID used to identify a particular task
  474. *
  475. * @return GearmanClient Return this object
  476. */
  477. public function addTaskHighBackground($name, $params = '', &$context = null, $unique = null)
  478. {
  479. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKHIGHBACKGROUND);
  480. return $this;
  481. }
  482. /**
  483. * Adds a low priority background task to be run in parallel with other
  484. * tasks.
  485. *
  486. * Call this method for all the tasks to be run in parallel, then call
  487. * GearmanClient::runTasks() to perform the work.
  488. *
  489. * Tasks with a low priority will be selected from the queue after those of
  490. * normal or high priority.
  491. *
  492. * @param string $name A GermanBundle registered function to be executed
  493. * @param string $params Parameters to send to task as string
  494. * @param Mixed &$context Application context to associate with a task
  495. * @param string $unique A unique ID used to identify a particular task
  496. *
  497. * @return GearmanClient Return this object
  498. */
  499. public function addTaskLowBackground($name, $params = '', &$context = null, $unique = null)
  500. {
  501. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKLOWBACKGROUND);
  502. return $this;
  503. }
  504. /**
  505. * Adds a task into the structure of tasks with included type of call
  506. *
  507. * @param string $name A GermanBundle registered function to be executed
  508. * @param string $params Parameters to send to task as string
  509. * @param Mixed $context Application context to associate with a task
  510. * @param string $unique A unique ID used to identify a particular task
  511. * @param string $method Method to perform
  512. *
  513. * @return GearmanClient Return this object
  514. */
  515. protected function enqueueTask($name, $params, $context, $unique, $method)
  516. {
  517. $task = array(
  518. 'name' => $name,
  519. 'params' => $params,
  520. 'context' => $context,
  521. 'unique' => $this->uniqueJobIdentifierGenerator->generateUniqueKey($name, $params, $unique, $method),
  522. 'method' => $method,
  523. );
  524. $this->addTaskToStructure($task);
  525. return $this;
  526. }
  527. /**
  528. * Appends a task structure into taskStructure array
  529. *
  530. * @param array $task Task structure
  531. *
  532. * @return GearmanClient Return this object
  533. */
  534. protected function addTaskToStructure(array $task)
  535. {
  536. $this->taskStructure[] = $task;
  537. return $this;
  538. }
  539. /**
  540. * For a set of tasks previously added with
  541. *
  542. * GearmanClient::addTask(),
  543. * GearmanClient::addTaskHigh(),
  544. * GearmanClient::addTaskLow(),
  545. * GearmanClient::addTaskBackground(),
  546. * GearmanClient::addTaskHighBackground(),
  547. * GearmanClient::addTaskLowBackground(),
  548. *
  549. * this call starts running the tasks in parallel.
  550. * Note that enough workers need to be available for the tasks to all run in parallel
  551. *
  552. * @return boolean run tasks result
  553. */
  554. public function runTasks()
  555. {
  556. $gearmanClient = new \GearmanClient();
  557. $this->assignServers($gearmanClient);
  558. if ($this->settings['callbacks']) {
  559. $this->gearmanCallbacksDispatcher->assignTaskCallbacks($gearmanClient);
  560. }
  561. foreach ($this->taskStructure as $task) {
  562. $type = $task['method'];
  563. $jobName = $task['name'];
  564. $worker = $this->getJob($jobName);
  565. if (false !== $worker) {
  566. $gearmanClient->$type(
  567. $worker['job']['realCallableName'],
  568. $task['params'],
  569. $task['context'],
  570. $task['unique']
  571. );
  572. }
  573. }
  574. $this->initTaskStructure();
  575. return $gearmanClient->runTasks();
  576. }
  577. }