GearmanClient.php 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593
  1. <?php
  2. /**
  3. * Gearman Bundle for Symfony2
  4. *
  5. * @author Marc Morera <yuhu@mmoreram.com>
  6. * @since 2013
  7. */
  8. namespace Mmoreram\GearmanBundle\Service;
  9. use Mmoreram\GearmanBundle\Service\Abstracts\AbstractGearmanService;
  10. use Mmoreram\GearmanBundle\GearmanMethods;
  11. use Mmoreram\GearmanBundle\Module\JobStatus;
  12. /**
  13. * GearmanClient. Implementation of AbstractGearmanService
  14. */
  15. class GearmanClient extends AbstractGearmanService
  16. {
  17. /**
  18. * @var GearmanCallbacks
  19. *
  20. * Gearman callbacks
  21. */
  22. protected $gearmanCallbacks;
  23. /**
  24. * @var array
  25. *
  26. * Server set to define in what server must connect to
  27. */
  28. protected $servers = array();
  29. /**
  30. * @var array
  31. *
  32. * task structure to store all about called tasks
  33. */
  34. protected $taskStructure = array();
  35. /**
  36. * @var array
  37. *
  38. * Set default servers
  39. */
  40. protected $defaultServers;
  41. /**
  42. * @var array
  43. *
  44. * Set default settings
  45. */
  46. protected $settings;
  47. /**
  48. * Set default servers
  49. *
  50. * @param array $defaultServers Default servers
  51. *
  52. * @return GearmanClient self Object
  53. */
  54. public function setDefaultServers(array $defaultServers)
  55. {
  56. $this->defaultServers = $defaultServers;
  57. return $this;
  58. }
  59. /**
  60. * Set gearman callbacks
  61. *
  62. * @param GearmanCallbacks $gearmanCallbacks Gearman callbacks
  63. *
  64. * @return GearmanClient self Object
  65. */
  66. public function setGearmanCallbacks(GearmanCallbacks $gearmanCallbacks)
  67. {
  68. $this->gearmanCallbacks = $gearmanCallbacks;
  69. return $this;
  70. }
  71. /**
  72. * Set default settings
  73. *
  74. * @param array $settings
  75. *
  76. * @return GearmanClient self Object
  77. */
  78. public function setDefaultSettings($settings)
  79. {
  80. $this->settings = $settings;
  81. return $this;
  82. }
  83. /**
  84. * Set server to client. Empty all servers and set this one
  85. *
  86. * @param type $servername Server name (must be ip)
  87. * @param type $port Port of server. By default 4730
  88. *
  89. * @return GearmanClient Returns self object
  90. */
  91. public function setServer($servername, $port = 4730)
  92. {
  93. $this
  94. ->clearServers()
  95. ->addServer($servername, $port);
  96. return $this;
  97. }
  98. /**
  99. * Add server to client
  100. *
  101. * @param type $servername Server name (must be ip)
  102. * @param type $port Port of server. By default 4730
  103. *
  104. * @return GearmanClient Returns self object
  105. */
  106. public function addServer($servername, $port = 4730)
  107. {
  108. $this->servers[] = array(
  109. 'host' => $servername,
  110. 'port' => $port,
  111. );
  112. return $this;
  113. }
  114. /**
  115. * Clear server list
  116. *
  117. * @return GearmanClient Returns self object
  118. */
  119. public function clearServers()
  120. {
  121. $this->servers = array();
  122. return $this;
  123. }
  124. /**
  125. * Get real worker from job name and enqueues the action given one
  126. * method.
  127. *
  128. * @param string $jobName A GearmanBundle registered function the worker is to execute
  129. * @param string $params Parameters to send to job as string
  130. * @param string $method Method to execute
  131. * @param string $unique A unique ID used to identify a particular task
  132. *
  133. * @return mixed Return result of the call
  134. */
  135. private function enqueue($jobName, $params = '', $method = null, $unique = null)
  136. {
  137. $worker = $this->getJob($jobName);
  138. if (false !== $worker) {
  139. return $this->doEnqueue($worker, $params, $method, $unique);
  140. }
  141. return false;
  142. }
  143. /**
  144. * Execute a GearmanClient call given a worker, params and a method.
  145. *
  146. * If he GarmanClient call is asyncronous, result value will be a handler.
  147. * Otherwise, will return job result.
  148. *
  149. * @param array $worker Worker definition
  150. * @param string $params Parameters to send to job as string
  151. * @param string $method Method to execute
  152. * @param string $unique A unique ID used to identify a particular task
  153. *
  154. * @return mixed Return result of the GearmanClient call
  155. */
  156. private function doEnqueue(Array $worker, $params = '', $method = null, $unique = null)
  157. {
  158. $gearmanClient = new \GearmanClient();
  159. $this->assignServers($gearmanClient);
  160. return $gearmanClient->$method($worker['job']['realCallableName'], $params, $unique);
  161. }
  162. /**
  163. * Given a GearmanClient, set all included servers
  164. *
  165. * @param GearmanClient $gearmanClient Object to include servers
  166. *
  167. * @return GearmanClient Returns self object
  168. */
  169. private function assignServers(\GearmanClient $gearmanClient)
  170. {
  171. $servers = $this->defaultServers;
  172. if (!empty($this->servers)) {
  173. $servers = $this->servers;
  174. }
  175. /**
  176. * We include each server into gearman client
  177. */
  178. foreach ($servers as $server) {
  179. $gearmanClient->addServer($server['host'], $server['port']);
  180. }
  181. return $this;
  182. }
  183. /**
  184. * Job methods
  185. */
  186. /**
  187. * Runs a single task and returns some result, depending of method called.
  188. * Method called depends of default callable method setted on gearman settings
  189. * or overwritted on work or job annotations
  190. *
  191. * @param string $name A GearmanBundle registered function the worker is to execute
  192. * @param string $params Parameters to send to job as string
  193. * @param string $unique A unique ID used to identify a particular task
  194. *
  195. * @return mixed result depending of method called.
  196. */
  197. public function callJob($name, $params = '', $unique = null)
  198. {
  199. $worker = $this->getJob($name);
  200. $methodCallable = $worker['job']['defaultMethod'] . 'Job';
  201. return $this->enqueue($name, $params, $methodCallable, $unique);
  202. }
  203. /**
  204. * Runs a single task and returns a string representation of the result.
  205. * It is up to the GearmanClient and GearmanWorker to agree on the format of the result.
  206. * The GearmanClient::do() method is deprecated as of pecl/gearman 1.0.0. Use GearmanClient::doNormal().
  207. *
  208. * @param string $name A GearmanBundle registered function the worker is to execute
  209. * @param string $params Parameters to send to job as string
  210. * @param string $unique A unique ID used to identify a particular task
  211. *
  212. * @return string A string representing the results of running a task.
  213. * @deprecated
  214. */
  215. public function doJob($name, $params = '', $unique = null)
  216. {
  217. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DO, $unique);
  218. }
  219. /**
  220. * Runs a single task and returns a string representation of the result.
  221. * It is up to the GearmanClient and GearmanWorker to agree on the format of the result.
  222. *
  223. * @param string $name A GearmanBundle registered function the worker is to execute
  224. * @param string $params Parameters to send to job as string
  225. * @param string $unique A unique ID used to identify a particular task
  226. *
  227. * @return string A string representing the results of running a task.
  228. */
  229. public function doNormalJob($name, $params = '', $unique = null)
  230. {
  231. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DONORMAL, $unique);
  232. }
  233. /**
  234. * Runs a task in the background, returning a job handle which
  235. * can be used to get the status of the running task.
  236. *
  237. * @param string $name A GearmanBundle registered function the worker is to execute
  238. * @param string $params Parameters to send to job as string
  239. * @param string $unique A unique ID used to identify a particular task
  240. *
  241. * @return string Job handle for the submitted task.
  242. */
  243. public function doBackgroundJob($name, $params = '', $unique = null)
  244. {
  245. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOBACKGROUND, $unique);
  246. }
  247. /**
  248. * Runs a single high priority task and returns a string representation of the result.
  249. * It is up to the GearmanClient and GearmanWorker to agree on the format of the result.
  250. * High priority tasks will get precedence over normal and low priority tasks in the job queue.
  251. *
  252. * @param string $name A GearmanBundle registered function the worker is to execute
  253. * @param string $params Parameters to send to job as string
  254. * @param string $unique A unique ID used to identify a particular task
  255. *
  256. * @return string A string representing the results of running a task.
  257. */
  258. public function doHighJob($name, $params = '', $unique = null)
  259. {
  260. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOHIGH, $unique);
  261. }
  262. /**
  263. * Runs a high priority task in the background, returning a job handle which can be used to get the status of the running task.
  264. * High priority tasks take precedence over normal and low priority tasks in the job queue.
  265. *
  266. * @param string $name A GearmanBundle registered function the worker is to execute
  267. * @param string $params Parameters to send to job as string
  268. * @param string $unique A unique ID used to identify a particular task
  269. *
  270. * @return string The job handle for the submitted task.
  271. */
  272. public function doHighBackgroundJob($name, $params = '', $unique = null)
  273. {
  274. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOHIGHBACKGROUND, $unique);
  275. }
  276. /**
  277. * Runs a single low priority task and returns a string representation of the result.
  278. * It is up to the GearmanClient and GearmanWorker to agree on the format of the result.
  279. * Normal and high priority tasks will get precedence over low priority tasks in the job queue.
  280. *
  281. * @param string $name A GearmanBundle registered function the worker is to execute
  282. * @param string $params Parameters to send to job as string
  283. * @param string $unique A unique ID used to identify a particular task
  284. *
  285. * @return string A string representing the results of running a task.
  286. */
  287. public function doLowJob($name, $params = '', $unique = null)
  288. {
  289. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOLOW, $unique);
  290. }
  291. /**
  292. * Runs a low priority task in the background, returning a job handle which can be used to get the status of the running task.
  293. * Normal and high priority tasks will get precedence over low priority tasks in the job queue.
  294. *
  295. * @param string $name A GearmanBundle registered function the worker is to execute
  296. * @param string $params Parameters to send to job as string
  297. * @param string $unique A unique ID used to identify a particular task
  298. *
  299. * @return string The job handle for the submitted task.
  300. */
  301. public function doLowBackgroundJob($name, $params = '', $unique = null)
  302. {
  303. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOLOWBACKGROUND, $unique);
  304. }
  305. /**
  306. * Fetches the Status of a special Background Job.
  307. *
  308. * @param string $idJob The job handle string
  309. *
  310. * @return JobStatus Job status
  311. */
  312. public function getJobStatus($idJob)
  313. {
  314. $gearmanClient = new \GearmanClient();
  315. $this->assignServers($gearmanClient);
  316. $statusData = $gearmanClient->jobStatus($idJob);
  317. $jobStatus = new JobStatus($statusData);
  318. return $jobStatus;
  319. }
  320. /**
  321. * Task methods
  322. */
  323. /**
  324. * Adds a task to be run in parallel with other tasks.
  325. * Call this method for all the tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
  326. * Note that enough workers need to be available for the tasks to all run in parallel.
  327. *
  328. * @param string $name A GermanBundle registered function to be executed
  329. * @param string $params Parameters to send to task as string
  330. * @param Mixed &$context Application context to associate with a task
  331. * @param string $unique A unique ID used to identify a particular task
  332. *
  333. * @return GearmanClient Return this object
  334. */
  335. public function addTask($name, $params = '', &$context = null, $unique = null)
  336. {
  337. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASK);
  338. return $this;
  339. }
  340. /**
  341. * Adds a high priority task to be run in parallel with other tasks.
  342. * Call this method for all the high priority tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
  343. * Tasks with a high priority will be selected from the queue before those of normal or low priority.
  344. *
  345. * @param string $name A GermanBundle registered function to be executed
  346. * @param string $params Parameters to send to task as string
  347. * @param Mixed &$context Application context to associate with a task
  348. * @param string $unique A unique ID used to identify a particular task
  349. *
  350. * @return GearmanClient Return this object
  351. */
  352. public function addTaskHigh($name, $params = '', &$context = null, $unique = null)
  353. {
  354. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKHIGH);
  355. return $this;
  356. }
  357. /**
  358. * Adds a low priority background task to be run in parallel with other tasks.
  359. * Call this method for all the tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
  360. * Tasks with a low priority will be selected from the queue after those of normal or low priority.
  361. *
  362. * @param string $name A GermanBundle registered function to be executed
  363. * @param string $params Parameters to send to task as string
  364. * @param Mixed &$context Application context to associate with a task
  365. * @param string $unique A unique ID used to identify a particular task
  366. *
  367. * @return GearmanClient Return this object
  368. */
  369. public function addTaskLow($name, $params = '', &$context = null, $unique = null)
  370. {
  371. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKLOW);
  372. return $this;
  373. }
  374. /**
  375. * Adds a background task to be run in parallel with other tasks
  376. * Call this method for all the tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
  377. *
  378. * @param string $name A GermanBundle registered function to be executed
  379. * @param string $params Parameters to send to task as string
  380. * @param Mixed &$context Application context to associate with a task
  381. * @param string $unique A unique ID used to identify a particular task
  382. *
  383. * @return GearmanClient Return this object
  384. */
  385. public function addTaskBackground($name, $params = '', &$context = null, $unique = null)
  386. {
  387. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKBACKGROUND);
  388. return $this;
  389. }
  390. /**
  391. * Adds a high priority background task to be run in parallel with other tasks.
  392. * Call this method for all the tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
  393. * Tasks with a high priority will be selected from the queue before those of normal or low priority.
  394. *
  395. * @param string $name A GermanBundle registered function to be executed
  396. * @param string $params Parameters to send to task as string
  397. * @param Mixed &$context Application context to associate with a task
  398. * @param string $unique A unique ID used to identify a particular task
  399. *
  400. * @return GearmanClient Return this object
  401. */
  402. public function addTaskHighBackground($name, $params = '', &$context = null, $unique = null)
  403. {
  404. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKHIGHBACKGROUND);
  405. return $this;
  406. }
  407. /**
  408. * Adds a low priority background task to be run in parallel with other tasks.
  409. * Call this method for all the tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
  410. * Tasks with a low priority will be selected from the queue after those of normal or high priority.
  411. *
  412. * @param string $name A GermanBundle registered function to be executed
  413. * @param string $params Parameters to send to task as string
  414. * @param Mixed &$context Application context to associate with a task
  415. * @param string $unique A unique ID used to identify a particular task
  416. *
  417. * @return GearmanClient Return this object
  418. */
  419. public function addTaskLowBackground($name, $params = '', &$context = null, $unique = null)
  420. {
  421. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKLOWBACKGROUND);
  422. return $this;
  423. }
  424. /**
  425. * Adds a task into the structure of tasks with included type of call
  426. *
  427. * @param string $name A GermanBundle registered function to be executed
  428. * @param string $params Parameters to send to task as string
  429. * @param Mixed $context Application context to associate with a task
  430. * @param string $unique A unique ID used to identify a particular task
  431. * @param string $method Method to perform
  432. *
  433. * @return GearmanClient Return this object
  434. */
  435. private function enqueueTask($name, $params, $context, $unique, $method)
  436. {
  437. $task = array(
  438. 'name' => $name,
  439. 'params' => $params,
  440. 'context' => $context,
  441. 'unique' => $unique,
  442. 'method' => $method,
  443. );
  444. $this->addTaskToStructure($task);
  445. return $this;
  446. }
  447. /**
  448. * Appends a task structure into taskStructure array
  449. *
  450. * @param array $task Task structure
  451. *
  452. * @return GearmanClient Return this object
  453. */
  454. private function addTaskToStructure(array $task)
  455. {
  456. $this->taskStructure[] = $task;
  457. return $this;
  458. }
  459. /**
  460. * For a set of tasks previously added with GearmanClient::addTask(), GearmanClient::addTaskHigh(),
  461. * GearmanClient::addTaskLow(), GearmanClient::addTaskBackground(), GearmanClient::addTaskHighBackground(),
  462. * or GearmanClient::addTaskLowBackground(), this call starts running the tasks in parallel.
  463. * Note that enough workers need to be available for the tasks to all run in parallel
  464. *
  465. * @return boolean run tasks result
  466. */
  467. public function runTasks()
  468. {
  469. $gearmanClient = new \GearmanClient();
  470. $this->assignServers($gearmanClient);
  471. if ($this->settings['callbacks']) {
  472. $this->gearmanCallbacks->assignTaskCallbacks($gearmanClient);
  473. }
  474. foreach ($this->taskStructure as $task) {
  475. $type = $task['method'];
  476. $jobName = $task['name'];
  477. $worker = $this->getJob($jobName);
  478. if (false !== $worker) {
  479. $gearmanClient->$type($worker['job']['realCallableName'], $task['params'], $task['context'], $task['unique']);
  480. }
  481. }
  482. $this->taskStructure = array();
  483. return $gearmanClient->runTasks();
  484. }
  485. }