GearmanClient.php 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617
  1. <?php
  2. /**
  3. * Gearman Bundle for Symfony2
  4. *
  5. * @author Marc Morera <yuhu@mmoreram.com>
  6. * @since 2013
  7. */
  8. namespace Mmoreram\GearmanBundle\Service;
  9. use Mmoreram\GearmanBundle\Service\Abstracts\AbstractGearmanService;
  10. use Mmoreram\GearmanBundle\GearmanMethods;
  11. use Mmoreram\GearmanBundle\Module\JobStatus;
  12. /**
  13. * GearmanClient. Implementation of AbstractGearmanService
  14. */
  15. class GearmanClient extends AbstractGearmanService
  16. {
  17. /**
  18. * @var GearmanCallbacks
  19. *
  20. * Gearman callbacks
  21. */
  22. protected $gearmanCallbacks;
  23. /**
  24. * @var array
  25. *
  26. * Server set to define in what server must connect to
  27. */
  28. protected $servers = array();
  29. /**
  30. * @var array
  31. *
  32. * task structure to store all about called tasks
  33. */
  34. protected $taskStructure = array();
  35. /**
  36. * @var array
  37. *
  38. * Set default servers
  39. */
  40. protected $defaultServers;
  41. /**
  42. * @var array
  43. *
  44. * Set default settings
  45. */
  46. protected $settings;
  47. /**
  48. * Set default servers
  49. *
  50. * @param array $defaultServers Default servers
  51. *
  52. * @return GearmanClient self Object
  53. */
  54. public function setDefaultServers(array $defaultServers)
  55. {
  56. $this->defaultServers = $defaultServers;
  57. return $this;
  58. }
  59. /**
  60. * Set gearman callbacks
  61. *
  62. * @param GearmanCallbacks $gearmanCallbacks Gearman callbacks
  63. *
  64. * @return GearmanClient self Object
  65. */
  66. public function setGearmanCallbacks(GearmanCallbacks $gearmanCallbacks)
  67. {
  68. $this->gearmanCallbacks = $gearmanCallbacks;
  69. return $this;
  70. }
  71. /**
  72. * Set default settings
  73. *
  74. * @param array $settings
  75. *
  76. * @return GearmanClient self Object
  77. */
  78. public function setDefaultSettings($settings)
  79. {
  80. $this->settings = $settings;
  81. return $this;
  82. }
  83. /**
  84. * Set server to client. Empty all servers and set this one
  85. *
  86. * @param type $servername Server name (must be ip)
  87. * @param type $port Port of server. By default 4730
  88. *
  89. * @return GearmanClient Returns self object
  90. */
  91. public function setServer($servername, $port = 4730)
  92. {
  93. $this
  94. ->clearServers()
  95. ->addServer($servername, $port);
  96. return $this;
  97. }
  98. /**
  99. * Add server to client
  100. *
  101. * @param type $servername Server name (must be ip)
  102. * @param type $port Port of server. By default 4730
  103. *
  104. * @return GearmanClient Returns self object
  105. */
  106. public function addServer($servername, $port = 4730)
  107. {
  108. $this->servers[] = array(
  109. 'host' => $servername,
  110. 'port' => $port,
  111. );
  112. return $this;
  113. }
  114. /**
  115. * Clear server list
  116. *
  117. * @return GearmanClient Returns self object
  118. */
  119. public function clearServers()
  120. {
  121. $this->servers = array();
  122. return $this;
  123. }
  124. /**
  125. * Get real worker from job name and enqueues the action given one
  126. * method.
  127. *
  128. * @param string $jobName A GearmanBundle registered function the worker is to execute
  129. * @param string $params Parameters to send to job as string
  130. * @param string $method Method to execute
  131. * @param string $unique A unique ID used to identify a particular task
  132. *
  133. * @return mixed Return result of the call
  134. */
  135. private function enqueue($jobName, $params = '', $method = null, $unique = null)
  136. {
  137. $worker = $this->getJob($jobName);
  138. if ($this->settings['generate_unique_key']) {
  139. $unique = $this->generateUniqueKey($jobName, $params);
  140. }
  141. if (false !== $worker) {
  142. return $this->doEnqueue($worker, $params, $method, $unique);
  143. }
  144. return false;
  145. }
  146. /**
  147. * Execute a GearmanClient call given a worker, params and a method.
  148. *
  149. * If he GarmanClient call is asyncronous, result value will be a handler.
  150. * Otherwise, will return job result.
  151. *
  152. * @param array $worker Worker definition
  153. * @param string $params Parameters to send to job as string
  154. * @param string $method Method to execute
  155. * @param string $unique A unique ID used to identify a particular task
  156. *
  157. * @return mixed Return result of the GearmanClient call
  158. */
  159. private function doEnqueue(Array $worker, $params = '', $method = null, $unique = null)
  160. {
  161. $gearmanClient = new \GearmanClient();
  162. $this->assignServers($gearmanClient);
  163. return $gearmanClient->$method($worker['job']['realCallableName'], $params, $unique);
  164. }
  165. /**
  166. * Given a GearmanClient, set all included servers
  167. *
  168. * @param GearmanClient $gearmanClient Object to include servers
  169. *
  170. * @return GearmanClient Returns self object
  171. */
  172. private function assignServers(\GearmanClient $gearmanClient)
  173. {
  174. $servers = $this->defaultServers;
  175. if (!empty($this->servers)) {
  176. $servers = $this->servers;
  177. }
  178. /**
  179. * We include each server into gearman client
  180. */
  181. foreach ($servers as $server) {
  182. $gearmanClient->addServer($server['host'], $server['port']);
  183. }
  184. return $this;
  185. }
  186. /**
  187. * Job methods
  188. */
  189. /**
  190. * Runs a single task and returns some result, depending of method called.
  191. * Method called depends of default callable method setted on gearman settings
  192. * or overwritted on work or job annotations
  193. *
  194. * @param string $name A GearmanBundle registered function the worker is to execute
  195. * @param string $params Parameters to send to job as string
  196. * @param string $unique A unique ID used to identify a particular task
  197. *
  198. * @return mixed result depending of method called.
  199. */
  200. public function callJob($name, $params = '', $unique = null)
  201. {
  202. $worker = $this->getJob($name);
  203. $methodCallable = $worker['job']['defaultMethod'] . 'Job';
  204. return $this->enqueue($name, $params, $methodCallable, $unique);
  205. }
  206. /**
  207. * Runs a single task and returns a string representation of the result.
  208. * It is up to the GearmanClient and GearmanWorker to agree on the format of the result.
  209. * The GearmanClient::do() method is deprecated as of pecl/gearman 1.0.0. Use GearmanClient::doNormal().
  210. *
  211. * @param string $name A GearmanBundle registered function the worker is to execute
  212. * @param string $params Parameters to send to job as string
  213. * @param string $unique A unique ID used to identify a particular task
  214. *
  215. * @return string A string representing the results of running a task.
  216. * @deprecated
  217. */
  218. public function doJob($name, $params = '', $unique = null)
  219. {
  220. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DO, $unique);
  221. }
  222. /**
  223. * Runs a single task and returns a string representation of the result.
  224. * It is up to the GearmanClient and GearmanWorker to agree on the format of the result.
  225. *
  226. * @param string $name A GearmanBundle registered function the worker is to execute
  227. * @param string $params Parameters to send to job as string
  228. * @param string $unique A unique ID used to identify a particular task
  229. *
  230. * @return string A string representing the results of running a task.
  231. */
  232. public function doNormalJob($name, $params = '', $unique = null)
  233. {
  234. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DONORMAL, $unique);
  235. }
  236. /**
  237. * Runs a task in the background, returning a job handle which
  238. * can be used to get the status of the running task.
  239. *
  240. * @param string $name A GearmanBundle registered function the worker is to execute
  241. * @param string $params Parameters to send to job as string
  242. * @param string $unique A unique ID used to identify a particular task
  243. *
  244. * @return string Job handle for the submitted task.
  245. */
  246. public function doBackgroundJob($name, $params = '', $unique = null)
  247. {
  248. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOBACKGROUND, $unique);
  249. }
  250. /**
  251. * Runs a single high priority task and returns a string representation of the result.
  252. * It is up to the GearmanClient and GearmanWorker to agree on the format of the result.
  253. * High priority tasks will get precedence over normal and low priority tasks in the job queue.
  254. *
  255. * @param string $name A GearmanBundle registered function the worker is to execute
  256. * @param string $params Parameters to send to job as string
  257. * @param string $unique A unique ID used to identify a particular task
  258. *
  259. * @return string A string representing the results of running a task.
  260. */
  261. public function doHighJob($name, $params = '', $unique = null)
  262. {
  263. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOHIGH, $unique);
  264. }
  265. /**
  266. * Runs a high priority task in the background, returning a job handle which can be used to get the status of the running task.
  267. * High priority tasks take precedence over normal and low priority tasks in the job queue.
  268. *
  269. * @param string $name A GearmanBundle registered function the worker is to execute
  270. * @param string $params Parameters to send to job as string
  271. * @param string $unique A unique ID used to identify a particular task
  272. *
  273. * @return string The job handle for the submitted task.
  274. */
  275. public function doHighBackgroundJob($name, $params = '', $unique = null)
  276. {
  277. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOHIGHBACKGROUND, $unique);
  278. }
  279. /**
  280. * Runs a single low priority task and returns a string representation of the result.
  281. * It is up to the GearmanClient and GearmanWorker to agree on the format of the result.
  282. * Normal and high priority tasks will get precedence over low priority tasks in the job queue.
  283. *
  284. * @param string $name A GearmanBundle registered function the worker is to execute
  285. * @param string $params Parameters to send to job as string
  286. * @param string $unique A unique ID used to identify a particular task
  287. *
  288. * @return string A string representing the results of running a task.
  289. */
  290. public function doLowJob($name, $params = '', $unique = null)
  291. {
  292. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOLOW, $unique);
  293. }
  294. /**
  295. * Runs a low priority task in the background, returning a job handle which can be used to get the status of the running task.
  296. * Normal and high priority tasks will get precedence over low priority tasks in the job queue.
  297. *
  298. * @param string $name A GearmanBundle registered function the worker is to execute
  299. * @param string $params Parameters to send to job as string
  300. * @param string $unique A unique ID used to identify a particular task
  301. *
  302. * @return string The job handle for the submitted task.
  303. */
  304. public function doLowBackgroundJob($name, $params = '', $unique = null)
  305. {
  306. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOLOWBACKGROUND, $unique);
  307. }
  308. /**
  309. * Fetches the Status of a special Background Job.
  310. *
  311. * @param string $idJob The job handle string
  312. *
  313. * @return JobStatus Job status
  314. */
  315. public function getJobStatus($idJob)
  316. {
  317. $gearmanClient = new \GearmanClient();
  318. $this->assignServers($gearmanClient);
  319. $statusData = $gearmanClient->jobStatus($idJob);
  320. $jobStatus = new JobStatus($statusData);
  321. return $jobStatus;
  322. }
  323. /**
  324. * Task methods
  325. */
  326. /**
  327. * Adds a task to be run in parallel with other tasks.
  328. * Call this method for all the tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
  329. * Note that enough workers need to be available for the tasks to all run in parallel.
  330. *
  331. * @param string $name A GermanBundle registered function to be executed
  332. * @param string $params Parameters to send to task as string
  333. * @param Mixed &$context Application context to associate with a task
  334. * @param string $unique A unique ID used to identify a particular task
  335. *
  336. * @return GearmanClient Return this object
  337. */
  338. public function addTask($name, $params = '', &$context = null, $unique = null)
  339. {
  340. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASK);
  341. return $this;
  342. }
  343. /**
  344. * Adds a high priority task to be run in parallel with other tasks.
  345. * Call this method for all the high priority tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
  346. * Tasks with a high priority will be selected from the queue before those of normal or low priority.
  347. *
  348. * @param string $name A GermanBundle registered function to be executed
  349. * @param string $params Parameters to send to task as string
  350. * @param Mixed &$context Application context to associate with a task
  351. * @param string $unique A unique ID used to identify a particular task
  352. *
  353. * @return GearmanClient Return this object
  354. */
  355. public function addTaskHigh($name, $params = '', &$context = null, $unique = null)
  356. {
  357. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKHIGH);
  358. return $this;
  359. }
  360. /**
  361. * Adds a low priority background task to be run in parallel with other tasks.
  362. * Call this method for all the tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
  363. * Tasks with a low priority will be selected from the queue after those of normal or low priority.
  364. *
  365. * @param string $name A GermanBundle registered function to be executed
  366. * @param string $params Parameters to send to task as string
  367. * @param Mixed &$context Application context to associate with a task
  368. * @param string $unique A unique ID used to identify a particular task
  369. *
  370. * @return GearmanClient Return this object
  371. */
  372. public function addTaskLow($name, $params = '', &$context = null, $unique = null)
  373. {
  374. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKLOW);
  375. return $this;
  376. }
  377. /**
  378. * Adds a background task to be run in parallel with other tasks
  379. * Call this method for all the tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
  380. *
  381. * @param string $name A GermanBundle registered function to be executed
  382. * @param string $params Parameters to send to task as string
  383. * @param Mixed &$context Application context to associate with a task
  384. * @param string $unique A unique ID used to identify a particular task
  385. *
  386. * @return GearmanClient Return this object
  387. */
  388. public function addTaskBackground($name, $params = '', &$context = null, $unique = null)
  389. {
  390. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKBACKGROUND);
  391. return $this;
  392. }
  393. /**
  394. * Adds a high priority background task to be run in parallel with other tasks.
  395. * Call this method for all the tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
  396. * Tasks with a high priority will be selected from the queue before those of normal or low priority.
  397. *
  398. * @param string $name A GermanBundle registered function to be executed
  399. * @param string $params Parameters to send to task as string
  400. * @param Mixed &$context Application context to associate with a task
  401. * @param string $unique A unique ID used to identify a particular task
  402. *
  403. * @return GearmanClient Return this object
  404. */
  405. public function addTaskHighBackground($name, $params = '', &$context = null, $unique = null)
  406. {
  407. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKHIGHBACKGROUND);
  408. return $this;
  409. }
  410. /**
  411. * Adds a low priority background task to be run in parallel with other tasks.
  412. * Call this method for all the tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
  413. * Tasks with a low priority will be selected from the queue after those of normal or high priority.
  414. *
  415. * @param string $name A GermanBundle registered function to be executed
  416. * @param string $params Parameters to send to task as string
  417. * @param Mixed &$context Application context to associate with a task
  418. * @param string $unique A unique ID used to identify a particular task
  419. *
  420. * @return GearmanClient Return this object
  421. */
  422. public function addTaskLowBackground($name, $params = '', &$context = null, $unique = null)
  423. {
  424. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKLOWBACKGROUND);
  425. return $this;
  426. }
  427. /**
  428. * Adds a task into the structure of tasks with included type of call
  429. *
  430. * @param string $name A GermanBundle registered function to be executed
  431. * @param string $params Parameters to send to task as string
  432. * @param Mixed $context Application context to associate with a task
  433. * @param string $unique A unique ID used to identify a particular task
  434. * @param string $method Method to perform
  435. *
  436. * @return GearmanClient Return this object
  437. */
  438. private function enqueueTask($name, $params, $context, $unique, $method)
  439. {
  440. if ($this->settings['generate_unique_key']) {
  441. $unique = $this->generateUniqueKey($name, $params);
  442. }
  443. $task = array(
  444. 'name' => $name,
  445. 'params' => $params,
  446. 'context' => $context,
  447. 'unique' => $unique,
  448. 'method' => $method,
  449. );
  450. $this->addTaskToStructure($task);
  451. return $this;
  452. }
  453. /**
  454. * Appends a task structure into taskStructure array
  455. *
  456. * @param array $task Task structure
  457. *
  458. * @return GearmanClient Return this object
  459. */
  460. private function addTaskToStructure(array $task)
  461. {
  462. $this->taskStructure[] = $task;
  463. return $this;
  464. }
  465. /**
  466. * For a set of tasks previously added with GearmanClient::addTask(), GearmanClient::addTaskHigh(),
  467. * GearmanClient::addTaskLow(), GearmanClient::addTaskBackground(), GearmanClient::addTaskHighBackground(),
  468. * or GearmanClient::addTaskLowBackground(), this call starts running the tasks in parallel.
  469. * Note that enough workers need to be available for the tasks to all run in parallel
  470. *
  471. * @return boolean run tasks result
  472. */
  473. public function runTasks()
  474. {
  475. $gearmanClient = new \GearmanClient();
  476. $this->assignServers($gearmanClient);
  477. if ($this->settings['callbacks']) {
  478. $this->gearmanCallbacks->assignTaskCallbacks($gearmanClient);
  479. }
  480. foreach ($this->taskStructure as $task) {
  481. $type = $task['method'];
  482. $jobName = $task['name'];
  483. $worker = $this->getJob($jobName);
  484. if (false !== $worker) {
  485. $gearmanClient->$type($worker['job']['realCallableName'], $task['params'], $task['context'], $task['unique']);
  486. }
  487. }
  488. $this->taskStructure = array();
  489. return $gearmanClient->runTasks();
  490. }
  491. /**
  492. * Generates a unique string if null
  493. *
  494. * @param string $name Name of the Worker/Job
  495. * @param string $params Params
  496. *
  497. * @return string Unique string generated for given object and params
  498. */
  499. private function generateUniqueKey($name, $params = '')
  500. {
  501. return md5($name . $params);
  502. }
  503. }