GearmanClient.php 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648
  1. <?php
  2. /**
  3. * Gearman Bundle for Symfony2
  4. *
  5. * @author Marc Morera <yuhu@mmoreram.com>
  6. * @since 2013
  7. */
  8. namespace Mmoreram\GearmanBundle\Service;
  9. use Mmoreram\GearmanBundle\Service\Abstracts\AbstractGearmanService;
  10. use Mmoreram\GearmanBundle\GearmanMethods;
  11. use Mmoreram\GearmanBundle\Module\JobStatus;
  12. use Mmoreram\GearmanBundle\Generator\UniqueJobIdentifierGenerator;
  13. use Mmoreram\GearmanBundle\Dispatcher\GearmanCallbacksDispatcher;
  14. /**
  15. * GearmanClient. Implementation of AbstractGearmanService
  16. */
  17. class GearmanClient extends AbstractGearmanService
  18. {
  19. /**
  20. * @var GearmanCallbacksDispatcher
  21. *
  22. * Gearman callbacks dispatcher
  23. */
  24. protected $gearmanCallbacksDispatcher;
  25. /**
  26. * @var array
  27. *
  28. * Server set to define in what server must connect to
  29. */
  30. protected $servers = array();
  31. /**
  32. * @var array
  33. *
  34. * task structure to store all about called tasks
  35. */
  36. protected $taskStructure = array();
  37. /**
  38. * @var array
  39. *
  40. * Set default servers
  41. */
  42. protected $defaultServers;
  43. /**
  44. * @var array
  45. *
  46. * Set default settings
  47. */
  48. protected $settings;
  49. /**
  50. * @var UniqueJobIdentifierGenerator
  51. *
  52. * Unique Job Intefier Generator
  53. */
  54. protected $uniqueJobIdentifierGenerator;
  55. /**
  56. * Init tasks structure
  57. *
  58. * @return GearmanClient self Object
  59. */
  60. public function initTaskStructure()
  61. {
  62. $this->taskStructure = array();
  63. return $this;
  64. }
  65. /**
  66. * Set default servers
  67. *
  68. * @param array $defaultServers Default servers
  69. *
  70. * @return GearmanClient self Object
  71. */
  72. public function setDefaultServers(array $defaultServers)
  73. {
  74. $this->defaultServers = $defaultServers;
  75. return $this;
  76. }
  77. /**
  78. * Set UniqueJobIdentifierGenerator object
  79. *
  80. * @param UniqueJobIdentifierGenerator $uniqueJobIdentifierGenerator Unique Job Intefier Generator
  81. *
  82. * @return GearmanClient self Object
  83. */
  84. public function setUniqueJobIdentifierGenerator(UniqueJobIdentifierGenerator $uniqueJobIdentifierGenerator)
  85. {
  86. $this->uniqueJobIdentifierGenerator = $uniqueJobIdentifierGenerator;
  87. return $this;
  88. }
  89. /**
  90. * Set gearman callbacks
  91. *
  92. * @param GearmanCallbacksDispatcher $gearmanCallbacksDispatcher Gearman callbacks dispatcher
  93. *
  94. * @return GearmanClient self Object
  95. */
  96. public function setGearmanCallbacksDispatcher(GearmanCallbacksDispatcher $gearmanCallbacksDispatcher)
  97. {
  98. $this->gearmanCallbacksDispatcher = $gearmanCallbacksDispatcher;
  99. return $this;
  100. }
  101. /**
  102. * Set default settings
  103. *
  104. * @param array $settings
  105. *
  106. * @return GearmanClient self Object
  107. */
  108. public function setDefaultSettings($settings)
  109. {
  110. $this->settings = $settings;
  111. return $this;
  112. }
  113. /**
  114. * Set server to client. Empty all servers and set this one
  115. *
  116. * @param string $servername Server name (must be ip)
  117. * @param int $port Port of server. By default 4730
  118. *
  119. * @return GearmanClient Returns self object
  120. */
  121. public function setServer($servername, $port = 4730)
  122. {
  123. $this
  124. ->clearServers()
  125. ->addServer($servername, $port);
  126. return $this;
  127. }
  128. /**
  129. * Add server to client
  130. *
  131. * @param string $servername Server name (must be ip)
  132. * @param int $port Port of server. By default 4730
  133. *
  134. * @return GearmanClient Returns self object
  135. */
  136. public function addServer($servername, $port = 4730)
  137. {
  138. $this->servers[] = array(
  139. 'host' => $servername,
  140. 'port' => $port,
  141. );
  142. return $this;
  143. }
  144. /**
  145. * Clear server list
  146. *
  147. * @return GearmanClient Returns self object
  148. */
  149. public function clearServers()
  150. {
  151. $this->servers = array();
  152. return $this;
  153. }
  154. /**
  155. * Get real worker from job name and enqueues the action given one
  156. * method.
  157. *
  158. * @param string $jobName A GearmanBundle registered function the worker is to execute
  159. * @param string $params Parameters to send to job as string
  160. * @param string $method Method to execute
  161. * @param string $unique A unique ID used to identify a particular task
  162. *
  163. * @return mixed Return result of the call. If worker is not valid, return false
  164. */
  165. private function enqueue($jobName, $params, $method, $unique)
  166. {
  167. $worker = $this->getJob($jobName);
  168. $unique = $this
  169. ->uniqueJobIdentifierGenerator
  170. ->generateUniqueKey($jobName, $params, $unique, $method);
  171. return $worker
  172. ? $this->doEnqueue($worker, $params, $method, $unique)
  173. : false;
  174. }
  175. /**
  176. * Execute a GearmanClient call given a worker, params and a method.
  177. *
  178. * If he GarmanClient call is asyncronous, result value will be a handler.
  179. * Otherwise, will return job result.
  180. *
  181. * @param array $worker Worker definition
  182. * @param string $params Parameters to send to job as string
  183. * @param string $method Method to execute
  184. * @param string $unique A unique ID used to identify a particular task
  185. *
  186. * @return mixed Return result of the GearmanClient call
  187. */
  188. private function doEnqueue(array $worker, $params, $method, $unique)
  189. {
  190. $gearmanClient = new \GearmanClient();
  191. $this->assignServers($gearmanClient);
  192. return $gearmanClient->$method($worker['job']['realCallableName'], $params, $unique);
  193. }
  194. /**
  195. * Given a GearmanClient, set all included servers
  196. *
  197. * @param \GearmanClient $gearmanClient Object to include servers
  198. *
  199. * @return GearmanClient Returns self object
  200. */
  201. private function assignServers(\GearmanClient $gearmanClient)
  202. {
  203. $servers = $this->defaultServers;
  204. if (!empty($this->servers)) {
  205. $servers = $this->servers;
  206. }
  207. /**
  208. * We include each server into gearman client
  209. */
  210. foreach ($servers as $server) {
  211. $gearmanClient->addServer($server['host'], $server['port']);
  212. }
  213. return $this;
  214. }
  215. /**
  216. * Job methods
  217. */
  218. /**
  219. * Runs a single task and returns some result, depending of method called.
  220. * Method called depends of default callable method setted on gearman
  221. * settings or overwritted on work or job annotations
  222. *
  223. * @param string $name A GearmanBundle registered function the worker is to execute
  224. * @param string $params Parameters to send to job as string
  225. * @param string $unique A unique ID used to identify a particular task
  226. *
  227. * @return mixed result depending of method called.
  228. */
  229. public function callJob($name, $params = '', $unique = null)
  230. {
  231. $worker = $this->getJob($name);
  232. $methodCallable = $worker['job']['defaultMethod'];
  233. return $this->enqueue($name, $params, $methodCallable, $unique);
  234. }
  235. /**
  236. * Runs a single task and returns a string representation of the result.
  237. * It is up to the GearmanClient and GearmanWorker to agree on the format of
  238. * the result.
  239. *
  240. * The GearmanClient::do() method is deprecated as of pecl/gearman 1.0.0.
  241. * Use GearmanClient::doNormal().
  242. *
  243. * @param string $name A GearmanBundle registered function the worker is to execute
  244. * @param string $params Parameters to send to job as string
  245. * @param string $unique A unique ID used to identify a particular task
  246. *
  247. * @return string A string representing the results of running a task.
  248. * @deprecated
  249. */
  250. public function doJob($name, $params = '', $unique = null)
  251. {
  252. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DONORMAL, $unique);
  253. }
  254. /**
  255. * Runs a single task and returns a string representation of the result.
  256. * It is up to the GearmanClient and GearmanWorker to agree on the format of
  257. * the result.
  258. *
  259. * @param string $name A GearmanBundle registered function the worker is to execute
  260. * @param string $params Parameters to send to job as string
  261. * @param string $unique A unique ID used to identify a particular task
  262. *
  263. * @return string A string representing the results of running a task.
  264. */
  265. public function doNormalJob($name, $params = '', $unique = null)
  266. {
  267. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DONORMAL, $unique);
  268. }
  269. /**
  270. * Runs a task in the background, returning a job handle which can be used
  271. * to get the status of the running task.
  272. *
  273. * @param string $name A GearmanBundle registered function the worker is to execute
  274. * @param string $params Parameters to send to job as string
  275. * @param string $unique A unique ID used to identify a particular task
  276. *
  277. * @return string Job handle for the submitted task.
  278. */
  279. public function doBackgroundJob($name, $params = '', $unique = null)
  280. {
  281. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOBACKGROUND, $unique);
  282. }
  283. /**
  284. * Runs a single high priority task and returns a string representation of
  285. * the result.
  286. *
  287. * It is up to the GearmanClient and GearmanWorker to agree on the format of
  288. * the result.
  289. *
  290. * High priority tasks will get precedence over normal and low priority
  291. * tasks in the job queue.
  292. *
  293. * @param string $name A GearmanBundle registered function the worker is to execute
  294. * @param string $params Parameters to send to job as string
  295. * @param string $unique A unique ID used to identify a particular task
  296. *
  297. * @return string A string representing the results of running a task.
  298. */
  299. public function doHighJob($name, $params = '', $unique = null)
  300. {
  301. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOHIGH, $unique);
  302. }
  303. /**
  304. * Runs a high priority task in the background, returning a job handle which
  305. * can be used to get the status of the running task.
  306. *
  307. * High priority tasks take precedence over normal and low priority tasks in
  308. * the job queue.
  309. *
  310. * @param string $name A GearmanBundle registered function the worker is to execute
  311. * @param string $params Parameters to send to job as string
  312. * @param string $unique A unique ID used to identify a particular task
  313. *
  314. * @return string The job handle for the submitted task.
  315. */
  316. public function doHighBackgroundJob($name, $params = '', $unique = null)
  317. {
  318. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOHIGHBACKGROUND, $unique);
  319. }
  320. /**
  321. * Runs a single low priority task and returns a string representation of
  322. * the result.
  323. *
  324. * It is up to the GearmanClient and GearmanWorker to agree on the format of
  325. * the result.
  326. *
  327. * Normal and high priority tasks will get precedence over low priority
  328. * tasks in the job queue.
  329. *
  330. * @param string $name A GearmanBundle registered function the worker is to execute
  331. * @param string $params Parameters to send to job as string
  332. * @param string $unique A unique ID used to identify a particular task
  333. *
  334. * @return string A string representing the results of running a task.
  335. */
  336. public function doLowJob($name, $params = '', $unique = null)
  337. {
  338. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOLOW, $unique);
  339. }
  340. /**
  341. * Runs a low priority task in the background, returning a job handle which
  342. * can be used to get the status of the running task.
  343. *
  344. * Normal and high priority tasks will get precedence over low priority
  345. * tasks in the job queue.
  346. *
  347. * @param string $name A GearmanBundle registered function the worker is to execute
  348. * @param string $params Parameters to send to job as string
  349. * @param string $unique A unique ID used to identify a particular task
  350. *
  351. * @return string The job handle for the submitted task.
  352. */
  353. public function doLowBackgroundJob($name, $params = '', $unique = null)
  354. {
  355. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOLOWBACKGROUND, $unique);
  356. }
  357. /**
  358. * Fetches the Status of a special Background Job.
  359. *
  360. * @param string $idJob The job handle string
  361. *
  362. * @return JobStatus Job status
  363. */
  364. public function getJobStatus($idJob)
  365. {
  366. $gearmanClient = new \GearmanClient();
  367. $this->assignServers($gearmanClient);
  368. $statusData = $gearmanClient->jobStatus($idJob);
  369. $jobStatus = new JobStatus($statusData);
  370. return $jobStatus;
  371. }
  372. /**
  373. * Task methods
  374. */
  375. /**
  376. * Adds a task to be run in parallel with other tasks.
  377. * Call this method for all the tasks to be run in parallel, then call
  378. * GearmanClient::runTasks() to perform the work.
  379. *
  380. * Note that enough workers need to be available for the tasks to all run in
  381. * parallel.
  382. *
  383. * @param string $name A GermanBundle registered function to be executed
  384. * @param string $params Parameters to send to task as string
  385. * @param Mixed &$context Application context to associate with a task
  386. * @param string $unique A unique ID used to identify a particular task
  387. *
  388. * @return GearmanClient Return this object
  389. */
  390. public function addTask($name, $params = '', &$context = null, $unique = null)
  391. {
  392. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASK);
  393. return $this;
  394. }
  395. /**
  396. * Adds a high priority task to be run in parallel with other tasks.
  397. * Call this method for all the high priority tasks to be run in parallel,
  398. * then call GearmanClient::runTasks() to perform the work.
  399. *
  400. * Tasks with a high priority will be selected from the queue before those
  401. * of normal or low priority.
  402. *
  403. * @param string $name A GermanBundle registered function to be executed
  404. * @param string $params Parameters to send to task as string
  405. * @param Mixed &$context Application context to associate with a task
  406. * @param string $unique A unique ID used to identify a particular task
  407. *
  408. * @return GearmanClient Return this object
  409. */
  410. public function addTaskHigh($name, $params = '', &$context = null, $unique = null)
  411. {
  412. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKHIGH);
  413. return $this;
  414. }
  415. /**
  416. * Adds a low priority background task to be run in parallel with other
  417. * tasks.
  418. *
  419. * Call this method for all the tasks to be run in parallel, then call
  420. * GearmanClient::runTasks() to perform the work.
  421. *
  422. * Tasks with a low priority will be selected from the queue after those of
  423. * normal or low priority.
  424. *
  425. * @param string $name A GermanBundle registered function to be executed
  426. * @param string $params Parameters to send to task as string
  427. * @param Mixed &$context Application context to associate with a task
  428. * @param string $unique A unique ID used to identify a particular task
  429. *
  430. * @return GearmanClient Return this object
  431. */
  432. public function addTaskLow($name, $params = '', &$context = null, $unique = null)
  433. {
  434. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKLOW);
  435. return $this;
  436. }
  437. /**
  438. * Adds a background task to be run in parallel with other tasks
  439. * Call this method for all the tasks to be run in parallel, then call
  440. * GearmanClient::runTasks() to perform the work.
  441. *
  442. * @param string $name A GermanBundle registered function to be executed
  443. * @param string $params Parameters to send to task as string
  444. * @param Mixed &$context Application context to associate with a task
  445. * @param string $unique A unique ID used to identify a particular task
  446. *
  447. * @return GearmanClient Return this object
  448. */
  449. public function addTaskBackground($name, $params = '', &$context = null, $unique = null)
  450. {
  451. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKBACKGROUND);
  452. return $this;
  453. }
  454. /**
  455. * Adds a high priority background task to be run in parallel with other
  456. * tasks.
  457. *
  458. * Call this method for all the tasks to be run in parallel, then call
  459. * GearmanClient::runTasks() to perform the work.
  460. *
  461. * Tasks with a high priority will be selected from the queue before those
  462. * of normal or low priority.
  463. *
  464. * @param string $name A GermanBundle registered function to be executed
  465. * @param string $params Parameters to send to task as string
  466. * @param Mixed &$context Application context to associate with a task
  467. * @param string $unique A unique ID used to identify a particular task
  468. *
  469. * @return GearmanClient Return this object
  470. */
  471. public function addTaskHighBackground($name, $params = '', &$context = null, $unique = null)
  472. {
  473. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKHIGHBACKGROUND);
  474. return $this;
  475. }
  476. /**
  477. * Adds a low priority background task to be run in parallel with other
  478. * tasks.
  479. *
  480. * Call this method for all the tasks to be run in parallel, then call
  481. * GearmanClient::runTasks() to perform the work.
  482. *
  483. * Tasks with a low priority will be selected from the queue after those of
  484. * normal or high priority.
  485. *
  486. * @param string $name A GermanBundle registered function to be executed
  487. * @param string $params Parameters to send to task as string
  488. * @param Mixed &$context Application context to associate with a task
  489. * @param string $unique A unique ID used to identify a particular task
  490. *
  491. * @return GearmanClient Return this object
  492. */
  493. public function addTaskLowBackground($name, $params = '', &$context = null, $unique = null)
  494. {
  495. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKLOWBACKGROUND);
  496. return $this;
  497. }
  498. /**
  499. * Adds a task into the structure of tasks with included type of call
  500. *
  501. * @param string $name A GermanBundle registered function to be executed
  502. * @param string $params Parameters to send to task as string
  503. * @param Mixed $context Application context to associate with a task
  504. * @param string $unique A unique ID used to identify a particular task
  505. * @param string $method Method to perform
  506. *
  507. * @return GearmanClient Return this object
  508. */
  509. private function enqueueTask($name, $params, $context, $unique, $method)
  510. {
  511. $task = array(
  512. 'name' => $name,
  513. 'params' => $params,
  514. 'context' => $context,
  515. 'unique' => $this->uniqueJobIdentifierGenerator->generateUniqueKey($name, $params, $unique, $method),
  516. 'method' => $method,
  517. );
  518. $this->addTaskToStructure($task);
  519. return $this;
  520. }
  521. /**
  522. * Appends a task structure into taskStructure array
  523. *
  524. * @param array $task Task structure
  525. *
  526. * @return GearmanClient Return this object
  527. */
  528. private function addTaskToStructure(array $task)
  529. {
  530. $this->taskStructure[] = $task;
  531. return $this;
  532. }
  533. /**
  534. * For a set of tasks previously added with
  535. *
  536. * GearmanClient::addTask(),
  537. * GearmanClient::addTaskHigh(),
  538. * GearmanClient::addTaskLow(),
  539. * GearmanClient::addTaskBackground(),
  540. * GearmanClient::addTaskHighBackground(),
  541. * GearmanClient::addTaskLowBackground(),
  542. *
  543. * this call starts running the tasks in parallel.
  544. * Note that enough workers need to be available for the tasks to all run in parallel
  545. *
  546. * @return boolean run tasks result
  547. */
  548. public function runTasks()
  549. {
  550. $gearmanClient = new \GearmanClient();
  551. $this->assignServers($gearmanClient);
  552. if ($this->settings['callbacks']) {
  553. $this->gearmanCallbacksDispatcher->assignTaskCallbacks($gearmanClient);
  554. }
  555. foreach ($this->taskStructure as $task) {
  556. $type = $task['method'];
  557. $jobName = $task['name'];
  558. $worker = $this->getJob($jobName);
  559. if (false !== $worker) {
  560. $gearmanClient->$type(
  561. $worker['job']['realCallableName'],
  562. $task['params'],
  563. $task['context'],
  564. $task['unique']
  565. );
  566. }
  567. }
  568. $this->initTaskStructure();
  569. return $gearmanClient->runTasks();
  570. }
  571. }