GearmanClient.php 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631
  1. <?php
  2. /**
  3. * Gearman Bundle for Symfony2
  4. *
  5. * @author Marc Morera <yuhu@mmoreram.com>
  6. * @since 2013
  7. */
  8. namespace Mmoreram\GearmanBundle\Service;
  9. use Mmoreram\GearmanBundle\Service\Abstracts\AbstractGearmanService;
  10. use Mmoreram\GearmanBundle\GearmanMethods;
  11. use Mmoreram\GearmanBundle\Module\JobStatus;
  12. use Mmoreram\GearmanBundle\Generator\UniqueJobIdentifierGenerator;
  13. use Mmoreram\GearmanBundle\Dispatcher\GearmanCallbacksDispatcher;
  14. /**
  15. * GearmanClient. Implementation of AbstractGearmanService
  16. */
  17. class GearmanClient extends AbstractGearmanService
  18. {
  19. /**
  20. * @var GearmanCallbacksDispatcher
  21. *
  22. * Gearman callbacks dispatcher
  23. */
  24. protected $gearmanCallbacksDisparcher;
  25. /**
  26. * @var array
  27. *
  28. * Server set to define in what server must connect to
  29. */
  30. protected $servers = array();
  31. /**
  32. * @var array
  33. *
  34. * task structure to store all about called tasks
  35. */
  36. protected $taskStructure = array();
  37. /**
  38. * @var array
  39. *
  40. * Set default servers
  41. */
  42. protected $defaultServers;
  43. /**
  44. * @var array
  45. *
  46. * Set default settings
  47. */
  48. protected $settings;
  49. /**
  50. * @var UniqueJobIdentifierGenerator
  51. *
  52. * Unique Job Intefier Generator
  53. */
  54. protected $uniqueJobIdentifierGenerator;
  55. /**
  56. * Init tasks structure
  57. *
  58. * @return GearmanClient self Object
  59. */
  60. public function initTaskStructure()
  61. {
  62. $this->taskStructure = array();
  63. return $this;
  64. }
  65. /**
  66. * Set default servers
  67. *
  68. * @param array $defaultServers Default servers
  69. *
  70. * @return GearmanClient self Object
  71. */
  72. public function setDefaultServers(array $defaultServers)
  73. {
  74. $this->defaultServers = $defaultServers;
  75. return $this;
  76. }
  77. /**
  78. * Set UniqueJobIdentifierGenerator object
  79. *
  80. * @param UniqueJobIdentifierGenerator $uniqueJobIdentifierGenerator Unique Job Intefier Generator
  81. *
  82. * @return GearmanClient self Object
  83. */
  84. public function setUniqueJobIdentifierGenerator(UniqueJobIdentifierGenerator $uniqueJobIdentifierGenerator)
  85. {
  86. $this->uniqueJobIdentifierGenerator = $uniqueJobIdentifierGenerator;
  87. return $this;
  88. }
  89. /**
  90. * Set gearman callbacks
  91. *
  92. * @param GearmanCallbacksDispatcher $gearmanCallbacksDispatcher Gearman callbacks dispatcher
  93. *
  94. * @return GearmanClient self Object
  95. */
  96. public function setGearmanCallbacksDispatcher(GearmanCallbacksDispatcher $gearmanCallbacksDispatcher)
  97. {
  98. $this->gearmanCallbacksDispatcher = $gearmanCallbacksDispatcher;
  99. return $this;
  100. }
  101. /**
  102. * Set default settings
  103. *
  104. * @param array $settings
  105. *
  106. * @return GearmanClient self Object
  107. */
  108. public function setDefaultSettings($settings)
  109. {
  110. $this->settings = $settings;
  111. return $this;
  112. }
  113. /**
  114. * Set server to client. Empty all servers and set this one
  115. *
  116. * @param type $servername Server name (must be ip)
  117. * @param type $port Port of server. By default 4730
  118. *
  119. * @return GearmanClient Returns self object
  120. */
  121. public function setServer($servername, $port = 4730)
  122. {
  123. $this
  124. ->clearServers()
  125. ->addServer($servername, $port);
  126. return $this;
  127. }
  128. /**
  129. * Add server to client
  130. *
  131. * @param type $servername Server name (must be ip)
  132. * @param type $port Port of server. By default 4730
  133. *
  134. * @return GearmanClient Returns self object
  135. */
  136. public function addServer($servername, $port = 4730)
  137. {
  138. $this->servers[] = array(
  139. 'host' => $servername,
  140. 'port' => $port,
  141. );
  142. return $this;
  143. }
  144. /**
  145. * Clear server list
  146. *
  147. * @return GearmanClient Returns self object
  148. */
  149. public function clearServers()
  150. {
  151. $this->servers = array();
  152. return $this;
  153. }
  154. /**
  155. * Get real worker from job name and enqueues the action given one
  156. * method.
  157. *
  158. * @param string $jobName A GearmanBundle registered function the worker is to execute
  159. * @param string $params Parameters to send to job as string
  160. * @param string $method Method to execute
  161. * @param string $unique A unique ID used to identify a particular task
  162. *
  163. * @return mixed Return result of the call. If worker is not valid, return false
  164. */
  165. private function enqueue($jobName, $params, $method, $unique)
  166. {
  167. $worker = $this->getJob($jobName);
  168. $unique = $this->uniqueJobIdentifierGenerator->generateUniqueKey($jobName, $params, $unique, $method);
  169. return $worker
  170. ? $this->doEnqueue($worker, $params, $method, $unique)
  171. : false;
  172. }
  173. /**
  174. * Execute a GearmanClient call given a worker, params and a method.
  175. *
  176. * If he GarmanClient call is asyncronous, result value will be a handler.
  177. * Otherwise, will return job result.
  178. *
  179. * @param array $worker Worker definition
  180. * @param string $params Parameters to send to job as string
  181. * @param string $method Method to execute
  182. * @param string $unique A unique ID used to identify a particular task
  183. *
  184. * @return mixed Return result of the GearmanClient call
  185. */
  186. private function doEnqueue(array $worker, $params, $method, $unique)
  187. {
  188. $gearmanClient = new \GearmanClient();
  189. $this->assignServers($gearmanClient);
  190. return $gearmanClient->$method($worker['job']['realCallableName'], $params, $unique);
  191. }
  192. /**
  193. * Given a GearmanClient, set all included servers
  194. *
  195. * @param GearmanClient $gearmanClient Object to include servers
  196. *
  197. * @return GearmanClient Returns self object
  198. */
  199. private function assignServers(\GearmanClient $gearmanClient)
  200. {
  201. $servers = $this->defaultServers;
  202. if (!empty($this->servers)) {
  203. $servers = $this->servers;
  204. }
  205. /**
  206. * We include each server into gearman client
  207. */
  208. foreach ($servers as $server) {
  209. $gearmanClient->addServer($server['host'], $server['port']);
  210. }
  211. return $this;
  212. }
  213. /**
  214. * Job methods
  215. */
  216. /**
  217. * Runs a single task and returns some result, depending of method called.
  218. * Method called depends of default callable method setted on gearman settings
  219. * or overwritted on work or job annotations
  220. *
  221. * @param string $name A GearmanBundle registered function the worker is to execute
  222. * @param string $params Parameters to send to job as string
  223. * @param string $unique A unique ID used to identify a particular task
  224. *
  225. * @return mixed result depending of method called.
  226. */
  227. public function callJob($name, $params = '', $unique = null)
  228. {
  229. $worker = $this->getJob($name);
  230. $methodCallable = $worker['job']['defaultMethod'] . 'Job';
  231. return $this->enqueue($name, $params, $methodCallable, $unique);
  232. }
  233. /**
  234. * Runs a single task and returns a string representation of the result.
  235. * It is up to the GearmanClient and GearmanWorker to agree on the format of the result.
  236. * The GearmanClient::do() method is deprecated as of pecl/gearman 1.0.0. Use GearmanClient::doNormal().
  237. *
  238. * @param string $name A GearmanBundle registered function the worker is to execute
  239. * @param string $params Parameters to send to job as string
  240. * @param string $unique A unique ID used to identify a particular task
  241. *
  242. * @return string A string representing the results of running a task.
  243. * @deprecated
  244. */
  245. public function doJob($name, $params = '', $unique = null)
  246. {
  247. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DO, $unique);
  248. }
  249. /**
  250. * Runs a single task and returns a string representation of the result.
  251. * It is up to the GearmanClient and GearmanWorker to agree on the format of the result.
  252. *
  253. * @param string $name A GearmanBundle registered function the worker is to execute
  254. * @param string $params Parameters to send to job as string
  255. * @param string $unique A unique ID used to identify a particular task
  256. *
  257. * @return string A string representing the results of running a task.
  258. */
  259. public function doNormalJob($name, $params = '', $unique = null)
  260. {
  261. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DONORMAL, $unique);
  262. }
  263. /**
  264. * Runs a task in the background, returning a job handle which
  265. * can be used to get the status of the running task.
  266. *
  267. * @param string $name A GearmanBundle registered function the worker is to execute
  268. * @param string $params Parameters to send to job as string
  269. * @param string $unique A unique ID used to identify a particular task
  270. *
  271. * @return string Job handle for the submitted task.
  272. */
  273. public function doBackgroundJob($name, $params = '', $unique = null)
  274. {
  275. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOBACKGROUND, $unique);
  276. }
  277. /**
  278. * Runs a single high priority task and returns a string representation of the result.
  279. * It is up to the GearmanClient and GearmanWorker to agree on the format of the result.
  280. * High priority tasks will get precedence over normal and low priority tasks in the job queue.
  281. *
  282. * @param string $name A GearmanBundle registered function the worker is to execute
  283. * @param string $params Parameters to send to job as string
  284. * @param string $unique A unique ID used to identify a particular task
  285. *
  286. * @return string A string representing the results of running a task.
  287. */
  288. public function doHighJob($name, $params = '', $unique = null)
  289. {
  290. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOHIGH, $unique);
  291. }
  292. /**
  293. * Runs a high priority task in the background, returning a job handle which can be used to get the status of the running task.
  294. * High priority tasks take precedence over normal and low priority tasks in the job queue.
  295. *
  296. * @param string $name A GearmanBundle registered function the worker is to execute
  297. * @param string $params Parameters to send to job as string
  298. * @param string $unique A unique ID used to identify a particular task
  299. *
  300. * @return string The job handle for the submitted task.
  301. */
  302. public function doHighBackgroundJob($name, $params = '', $unique = null)
  303. {
  304. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOHIGHBACKGROUND, $unique);
  305. }
  306. /**
  307. * Runs a single low priority task and returns a string representation of the result.
  308. * It is up to the GearmanClient and GearmanWorker to agree on the format of the result.
  309. * Normal and high priority tasks will get precedence over low priority tasks in the job queue.
  310. *
  311. * @param string $name A GearmanBundle registered function the worker is to execute
  312. * @param string $params Parameters to send to job as string
  313. * @param string $unique A unique ID used to identify a particular task
  314. *
  315. * @return string A string representing the results of running a task.
  316. */
  317. public function doLowJob($name, $params = '', $unique = null)
  318. {
  319. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOLOW, $unique);
  320. }
  321. /**
  322. * Runs a low priority task in the background, returning a job handle which can be used to get the status of the running task.
  323. * Normal and high priority tasks will get precedence over low priority tasks in the job queue.
  324. *
  325. * @param string $name A GearmanBundle registered function the worker is to execute
  326. * @param string $params Parameters to send to job as string
  327. * @param string $unique A unique ID used to identify a particular task
  328. *
  329. * @return string The job handle for the submitted task.
  330. */
  331. public function doLowBackgroundJob($name, $params = '', $unique = null)
  332. {
  333. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOLOWBACKGROUND, $unique);
  334. }
  335. /**
  336. * Fetches the Status of a special Background Job.
  337. *
  338. * @param string $idJob The job handle string
  339. *
  340. * @return JobStatus Job status
  341. */
  342. public function getJobStatus($idJob)
  343. {
  344. $gearmanClient = new \GearmanClient();
  345. $this->assignServers($gearmanClient);
  346. $statusData = $gearmanClient->jobStatus($idJob);
  347. $jobStatus = new JobStatus($statusData);
  348. return $jobStatus;
  349. }
  350. /**
  351. * Task methods
  352. */
  353. /**
  354. * Adds a task to be run in parallel with other tasks.
  355. * Call this method for all the tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
  356. * Note that enough workers need to be available for the tasks to all run in parallel.
  357. *
  358. * @param string $name A GermanBundle registered function to be executed
  359. * @param string $params Parameters to send to task as string
  360. * @param Mixed &$context Application context to associate with a task
  361. * @param string $unique A unique ID used to identify a particular task
  362. *
  363. * @return GearmanClient Return this object
  364. */
  365. public function addTask($name, $params = '', &$context = null, $unique = null)
  366. {
  367. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASK);
  368. return $this;
  369. }
  370. /**
  371. * Adds a high priority task to be run in parallel with other tasks.
  372. * Call this method for all the high priority tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
  373. * Tasks with a high priority will be selected from the queue before those of normal or low priority.
  374. *
  375. * @param string $name A GermanBundle registered function to be executed
  376. * @param string $params Parameters to send to task as string
  377. * @param Mixed &$context Application context to associate with a task
  378. * @param string $unique A unique ID used to identify a particular task
  379. *
  380. * @return GearmanClient Return this object
  381. */
  382. public function addTaskHigh($name, $params = '', &$context = null, $unique = null)
  383. {
  384. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKHIGH);
  385. return $this;
  386. }
  387. /**
  388. * Adds a low priority background task to be run in parallel with other tasks.
  389. * Call this method for all the tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
  390. * Tasks with a low priority will be selected from the queue after those of normal or low priority.
  391. *
  392. * @param string $name A GermanBundle registered function to be executed
  393. * @param string $params Parameters to send to task as string
  394. * @param Mixed &$context Application context to associate with a task
  395. * @param string $unique A unique ID used to identify a particular task
  396. *
  397. * @return GearmanClient Return this object
  398. */
  399. public function addTaskLow($name, $params = '', &$context = null, $unique = null)
  400. {
  401. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKLOW);
  402. return $this;
  403. }
  404. /**
  405. * Adds a background task to be run in parallel with other tasks
  406. * Call this method for all the tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
  407. *
  408. * @param string $name A GermanBundle registered function to be executed
  409. * @param string $params Parameters to send to task as string
  410. * @param Mixed &$context Application context to associate with a task
  411. * @param string $unique A unique ID used to identify a particular task
  412. *
  413. * @return GearmanClient Return this object
  414. */
  415. public function addTaskBackground($name, $params = '', &$context = null, $unique = null)
  416. {
  417. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKBACKGROUND);
  418. return $this;
  419. }
  420. /**
  421. * Adds a high priority background task to be run in parallel with other tasks.
  422. * Call this method for all the tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
  423. * Tasks with a high priority will be selected from the queue before those of normal or low priority.
  424. *
  425. * @param string $name A GermanBundle registered function to be executed
  426. * @param string $params Parameters to send to task as string
  427. * @param Mixed &$context Application context to associate with a task
  428. * @param string $unique A unique ID used to identify a particular task
  429. *
  430. * @return GearmanClient Return this object
  431. */
  432. public function addTaskHighBackground($name, $params = '', &$context = null, $unique = null)
  433. {
  434. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKHIGHBACKGROUND);
  435. return $this;
  436. }
  437. /**
  438. * Adds a low priority background task to be run in parallel with other tasks.
  439. * Call this method for all the tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
  440. * Tasks with a low priority will be selected from the queue after those of normal or high priority.
  441. *
  442. * @param string $name A GermanBundle registered function to be executed
  443. * @param string $params Parameters to send to task as string
  444. * @param Mixed &$context Application context to associate with a task
  445. * @param string $unique A unique ID used to identify a particular task
  446. *
  447. * @return GearmanClient Return this object
  448. */
  449. public function addTaskLowBackground($name, $params = '', &$context = null, $unique = null)
  450. {
  451. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKLOWBACKGROUND);
  452. return $this;
  453. }
  454. /**
  455. * Adds a task into the structure of tasks with included type of call
  456. *
  457. * @param string $name A GermanBundle registered function to be executed
  458. * @param string $params Parameters to send to task as string
  459. * @param Mixed $context Application context to associate with a task
  460. * @param string $unique A unique ID used to identify a particular task
  461. * @param string $method Method to perform
  462. *
  463. * @return GearmanClient Return this object
  464. */
  465. private function enqueueTask($name, $params, $context, $unique, $method)
  466. {
  467. $task = array(
  468. 'name' => $name,
  469. 'params' => $params,
  470. 'context' => $context,
  471. 'unique' => $this->uniqueJobIdentifierGenerator->generateUniqueKey($name, $params, $unique, $method),
  472. 'method' => $method,
  473. );
  474. $this->addTaskToStructure($task);
  475. return $this;
  476. }
  477. /**
  478. * Appends a task structure into taskStructure array
  479. *
  480. * @param array $task Task structure
  481. *
  482. * @return GearmanClient Return this object
  483. */
  484. private function addTaskToStructure(array $task)
  485. {
  486. $this->taskStructure[] = $task;
  487. return $this;
  488. }
  489. /**
  490. * For a set of tasks previously added with GearmanClient::addTask(), GearmanClient::addTaskHigh(),
  491. * GearmanClient::addTaskLow(), GearmanClient::addTaskBackground(), GearmanClient::addTaskHighBackground(),
  492. * or GearmanClient::addTaskLowBackground(), this call starts running the tasks in parallel.
  493. * Note that enough workers need to be available for the tasks to all run in parallel
  494. *
  495. * @return boolean run tasks result
  496. */
  497. public function runTasks()
  498. {
  499. $gearmanClient = new \GearmanClient();
  500. $this->assignServers($gearmanClient);
  501. if ($this->settings['callbacks']) {
  502. $this->gearmanCallbacksDispatcher->assignTaskCallbacks($gearmanClient);
  503. }
  504. foreach ($this->taskStructure as $task) {
  505. $type = $task['method'];
  506. $jobName = $task['name'];
  507. $worker = $this->getJob($jobName);
  508. if (false !== $worker) {
  509. $gearmanClient->$type($worker['job']['realCallableName'], $task['params'], $task['context'], $task['unique']);
  510. }
  511. }
  512. $this->initTaskStructure();
  513. return $gearmanClient->runTasks();
  514. }
  515. }