GearmanClient.php 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554
  1. <?php
  2. /**
  3. * Gearman Bundle for Symfony2
  4. *
  5. * @author Marc Morera <yuhu@mmoreram.com>
  6. * @since 2013
  7. */
  8. namespace Mmoreram\GearmanBundle\Service;
  9. use Mmoreram\GearmanBundle\Service\Abstracts\AbstractGearmanService;
  10. use Mmoreram\GearmanBundle\Service\GearmanInterface;
  11. use Mmoreram\GearmanBundle\Exceptions\NoCallableGearmanMethodException;
  12. use Mmoreram\GearmanBundle\GearmanMethods;
  13. /**
  14. * Implementation of GearmanInterface
  15. *
  16. * @todo Find the way of reducing number of class methods
  17. */
  18. class GearmanClient extends AbstractGearmanService
  19. {
  20. /**
  21. * @var GearmanCallbacks
  22. *
  23. * Gearman callbacks
  24. */
  25. private $gearmanCallbacks;
  26. /**
  27. * @var array
  28. *
  29. * Server set to define in what server must connect to
  30. */
  31. private $servers = array();
  32. /**
  33. * @var array
  34. *
  35. * task structure to store all about called tasks
  36. */
  37. private $taskStructure = array();
  38. /**
  39. * @var array
  40. *
  41. * Set default servers
  42. */
  43. private $defaultServers;
  44. /**
  45. * Set default servers
  46. *
  47. * @param array $defaultServers Default servers
  48. *
  49. * @return GearmanClient self Object
  50. */
  51. public function setDefaultServers(array $defaultServers)
  52. {
  53. $this->defaultServers = $defaultServers;
  54. return $this;
  55. }
  56. /**
  57. * Set gearman callbacks
  58. *
  59. * @param GearmanCallbacks $gearmanCallbacks Gearman callbacks
  60. *
  61. * @return GearmanClient self Object
  62. */
  63. public function setGearmanCallbacks(GearmanCallbacks $gearmanCallbacks)
  64. {
  65. $this->gearmanCallbacks = $gearmanCallbacks;
  66. return $this;
  67. }
  68. /**
  69. * Set server to client. Empty all servers and set this one
  70. *
  71. * @param type $servername Server name (must be ip)
  72. * @param type $port Port of server. By default 4730
  73. *
  74. * @return GearmanClient Returns self object
  75. */
  76. public function setServer($servername, $port = 4730)
  77. {
  78. $this
  79. ->clearServers()
  80. ->addServer($servername, $port);
  81. return $this;
  82. }
  83. /**
  84. * Add server to client
  85. *
  86. * @param type $servername Server name (must be ip)
  87. * @param type $port Port of server. By default 4730
  88. *
  89. * @return GearmanClient Returns self object
  90. */
  91. public function addServer($servername, $port = 4730)
  92. {
  93. $this->servers[] = array(
  94. 'host' => $servername,
  95. 'port' => $port,
  96. );
  97. return $this;
  98. }
  99. /**
  100. * Clear server list
  101. *
  102. * @return GearmanClient Returns self object
  103. */
  104. public function clearServers()
  105. {
  106. $this->servers = array();
  107. return $this;
  108. }
  109. /**
  110. * Get real worker from job name and enqueues the action given one
  111. * method.
  112. *
  113. * @param string $jobName A GearmanBundle registered function the worker is to execute
  114. * @param string $params Parameters to send to job as string
  115. * @param string $method Method to execute
  116. * @param string $unique A unique ID used to identify a particular task
  117. *
  118. * @return mixed Return result of the call
  119. */
  120. private function enqueue($jobName, $params = '', $method, $unique)
  121. {
  122. $worker = $this->getJob($jobName);
  123. if (false !== $worker) {
  124. return $this->doEnqueue($worker, $params, $method, $unique);
  125. }
  126. return false;
  127. }
  128. /**
  129. * Execute a GearmanClient call given a worker, params and a method.
  130. *
  131. * If he GarmanClient call is asyncronous, result value will be a handler.
  132. * Otherwise, will return job result.
  133. *
  134. * @param array $worker Worker definition
  135. * @param string $params Parameters to send to job as string
  136. * @param string $method Method to execute
  137. * @param string $unique A unique ID used to identify a particular task
  138. *
  139. * @return mixed Return result of the GearmanClient call
  140. */
  141. private function doEnqueue(Array $worker, $params = '', $method = null, $unique = null)
  142. {
  143. $gearmanClient = new \GearmanClient();
  144. $this->assignServers($gearmanClient);
  145. return $gearmanClient->$method($worker['job']['realCallableName'], $params, $unique);
  146. }
  147. /**
  148. * Given a GearmanClient, set all included servers
  149. *
  150. * @param GearmanClient $gearmanClient Object to include servers
  151. *
  152. * @return GearmanClient Returns self object
  153. */
  154. private function assignServers(\GearmanClient $gearmanClient)
  155. {
  156. $servers = $this->defaultServers;
  157. if (!empty($this->servers)) {
  158. $servers = $this->servers;
  159. }
  160. /**
  161. * We include each server into gearman client
  162. */
  163. foreach ($servers as $server) {
  164. $gearmanClient->addServer($server['host'], $server['port']);
  165. }
  166. return $this;
  167. }
  168. /**
  169. * Job methods
  170. */
  171. /**
  172. * Runs a single task and returns some result, depending of method called.
  173. * Method called depends of default callable method setted on gearman settings
  174. * or overwritted on work or job annotations
  175. *
  176. * @param string $name A GearmanBundle registered function the worker is to execute
  177. * @param string $params Parameters to send to job as string
  178. * @param string $unique A unique ID used to identify a particular task
  179. *
  180. * @return mixed result depending of method called.
  181. */
  182. public function callJob($name, $params = '', $unique = null)
  183. {
  184. $worker = $this->getJob($name);
  185. $methodCallable = $worker['job']['defaultMethod'] . 'Job';
  186. return $this->enqueue($name, $params, $methodCallable, $unique);
  187. }
  188. /**
  189. * Runs a single task and returns a string representation of the result.
  190. * It is up to the GearmanClient and GearmanWorker to agree on the format of the result.
  191. * The GearmanClient::do() method is deprecated as of pecl/gearman 1.0.0. Use GearmanClient::doNormal().
  192. *
  193. * @param string $name A GearmanBundle registered function the worker is to execute
  194. * @param string $params Parameters to send to job as string
  195. * @param string $unique A unique ID used to identify a particular task
  196. *
  197. * @return string A string representing the results of running a task.
  198. * @deprecated
  199. */
  200. public function doJob($name, $params = '', $unique = null)
  201. {
  202. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DO, $unique);
  203. }
  204. /**
  205. * Runs a single task and returns a string representation of the result.
  206. * It is up to the GearmanClient and GearmanWorker to agree on the format of the result.
  207. *
  208. * @param string $name A GearmanBundle registered function the worker is to execute
  209. * @param string $params Parameters to send to job as string
  210. * @param string $unique A unique ID used to identify a particular task
  211. *
  212. * @return string A string representing the results of running a task.
  213. */
  214. public function doNormalJob($name, $params = '', $unique = null)
  215. {
  216. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DONORMAL, $unique);
  217. }
  218. /**
  219. * Runs a task in the background, returning a job handle which
  220. * can be used to get the status of the running task.
  221. *
  222. * @param string $name A GearmanBundle registered function the worker is to execute
  223. * @param string $params Parameters to send to job as string
  224. * @param string $unique A unique ID used to identify a particular task
  225. *
  226. * @return string Job handle for the submitted task.
  227. */
  228. public function doBackgroundJob($name, $params = '', $unique = null)
  229. {
  230. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOBACKGROUND, $unique);
  231. }
  232. /**
  233. * Runs a single high priority task and returns a string representation of the result.
  234. * It is up to the GearmanClient and GearmanWorker to agree on the format of the result.
  235. * High priority tasks will get precedence over normal and low priority tasks in the job queue.
  236. *
  237. * @param string $name A GearmanBundle registered function the worker is to execute
  238. * @param string $params Parameters to send to job as string
  239. * @param string $unique A unique ID used to identify a particular task
  240. *
  241. * @return string A string representing the results of running a task.
  242. */
  243. public function doHighJob($name, $params = '', $unique = null)
  244. {
  245. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOHIGH, $unique);
  246. }
  247. /**
  248. * Runs a high priority task in the background, returning a job handle which can be used to get the status of the running task.
  249. * High priority tasks take precedence over normal and low priority tasks in the job queue.
  250. *
  251. * @param string $name A GearmanBundle registered function the worker is to execute
  252. * @param string $params Parameters to send to job as string
  253. * @param string $unique A unique ID used to identify a particular task
  254. *
  255. * @return string The job handle for the submitted task.
  256. */
  257. public function doHighBackgroundJob($name, $params = '', $unique = null)
  258. {
  259. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOHIGHBACKGROUND, $unique);
  260. }
  261. /**
  262. * Runs a single low priority task and returns a string representation of the result.
  263. * It is up to the GearmanClient and GearmanWorker to agree on the format of the result.
  264. * Normal and high priority tasks will get precedence over low priority tasks in the job queue.
  265. *
  266. * @param string $name A GearmanBundle registered function the worker is to execute
  267. * @param string $params Parameters to send to job as string
  268. * @param string $unique A unique ID used to identify a particular task
  269. *
  270. * @return string A string representing the results of running a task.
  271. */
  272. public function doLowJob($name, $params = '', $unique = null)
  273. {
  274. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOLOW, $unique);
  275. }
  276. /**
  277. * Runs a low priority task in the background, returning a job handle which can be used to get the status of the running task.
  278. * Normal and high priority tasks will get precedence over low priority tasks in the job queue.
  279. *
  280. * @param string $name A GearmanBundle registered function the worker is to execute
  281. * @param string $params Parameters to send to job as string
  282. * @param string $unique A unique ID used to identify a particular task
  283. *
  284. * @return string The job handle for the submitted task.
  285. */
  286. public function doLowBackgroundJob($name, $params = '', $unique = null)
  287. {
  288. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOLOWBACKGROUND, $unique);
  289. }
  290. /**
  291. * Task methods
  292. */
  293. /**
  294. * Adds a task to be run in parallel with other tasks.
  295. * Call this method for all the tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
  296. * Note that enough workers need to be available for the tasks to all run in parallel.
  297. *
  298. * @param string $name A GermanBundle registered function to be executed
  299. * @param string $params Parameters to send to task as string
  300. * @param Mixed &$context Application context to associate with a task
  301. * @param string $unique A unique ID used to identify a particular task
  302. *
  303. * @return GearmanClient Return this object
  304. */
  305. public function addTask($name, $params = '', &$context = null, $unique = null)
  306. {
  307. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASK);
  308. return $this;
  309. }
  310. /**
  311. * Adds a high priority task to be run in parallel with other tasks.
  312. * Call this method for all the high priority tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
  313. * Tasks with a high priority will be selected from the queue before those of normal or low priority.
  314. *
  315. * @param string $name A GermanBundle registered function to be executed
  316. * @param string $params Parameters to send to task as string
  317. * @param Mixed &$context Application context to associate with a task
  318. * @param string $unique A unique ID used to identify a particular task
  319. *
  320. * @return GearmanClient Return this object
  321. */
  322. public function addTaskHigh($name, $params = '', &$context = null, $unique = null)
  323. {
  324. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKHIGH);
  325. return $this;
  326. }
  327. /**
  328. * Adds a low priority background task to be run in parallel with other tasks.
  329. * Call this method for all the tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
  330. * Tasks with a low priority will be selected from the queue after those of normal or low priority.
  331. *
  332. * @param string $name A GermanBundle registered function to be executed
  333. * @param string $params Parameters to send to task as string
  334. * @param Mixed &$context Application context to associate with a task
  335. * @param string $unique A unique ID used to identify a particular task
  336. *
  337. * @return GearmanClient Return this object
  338. */
  339. public function addTaskLow($name, $params = '', &$context = null, $unique = null)
  340. {
  341. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKLOW);
  342. return $this;
  343. }
  344. /**
  345. * Adds a background task to be run in parallel with other tasks
  346. * Call this method for all the tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
  347. *
  348. * @param string $name A GermanBundle registered function to be executed
  349. * @param string $params Parameters to send to task as string
  350. * @param Mixed &$context Application context to associate with a task
  351. * @param string $unique A unique ID used to identify a particular task
  352. *
  353. * @return GearmanClient Return this object
  354. */
  355. public function addTaskBackground($name, $params = '', &$context = null, $unique = null)
  356. {
  357. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKBACKGROUND);
  358. return $this;
  359. }
  360. /**
  361. * Adds a high priority background task to be run in parallel with other tasks.
  362. * Call this method for all the tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
  363. * Tasks with a high priority will be selected from the queue before those of normal or low priority.
  364. *
  365. * @param string $name A GermanBundle registered function to be executed
  366. * @param string $params Parameters to send to task as string
  367. * @param Mixed &$context Application context to associate with a task
  368. * @param string $unique A unique ID used to identify a particular task
  369. *
  370. * @return GearmanClient Return this object
  371. */
  372. public function addTaskHighBackground($name, $params = '', &$context = null, $unique = null)
  373. {
  374. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKHIGHBACKGROUND);
  375. return $this;
  376. }
  377. /**
  378. * Adds a low priority background task to be run in parallel with other tasks.
  379. * Call this method for all the tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
  380. * Tasks with a low priority will be selected from the queue after those of normal or high priority.
  381. *
  382. * @param string $name A GermanBundle registered function to be executed
  383. * @param string $params Parameters to send to task as string
  384. * @param Mixed &$context Application context to associate with a task
  385. * @param string $unique A unique ID used to identify a particular task
  386. *
  387. * @return GearmanClient Return this object
  388. */
  389. public function addTaskLowBackground($name, $params = '', &$context = null, $unique = null)
  390. {
  391. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKLOWBACKGROUND);
  392. return $this;
  393. }
  394. /**
  395. * Adds a task into the structure of tasks with included type of call
  396. *
  397. * @param string $name A GermanBundle registered function to be executed
  398. * @param string $params Parameters to send to task as string
  399. * @param Mixed $context Application context to associate with a task
  400. * @param string $unique A unique ID used to identify a particular task
  401. * @param string $method Method to perform
  402. *
  403. * @return GearmanClient Return this object
  404. */
  405. private function enqueueTask($name, $params, $context, $unique, $method)
  406. {
  407. $task = array(
  408. 'name' => $name,
  409. 'params' => $params,
  410. 'context' => $context,
  411. 'unique' => $unique,
  412. 'method' => $method,
  413. );
  414. $this->addTaskToStructure($task);
  415. return $this;
  416. }
  417. /**
  418. * Appends a task structure into taskStructure array
  419. *
  420. * @param array $task Task structure
  421. *
  422. * @return GearmanClient Return this object
  423. */
  424. private function addTaskToStructure(array $task)
  425. {
  426. $this->taskStructure[] = $task;
  427. return $this;
  428. }
  429. /**
  430. * For a set of tasks previously added with GearmanClient::addTask(), GearmanClient::addTaskHigh(),
  431. * GearmanClient::addTaskLow(), GearmanClient::addTaskBackground(), GearmanClient::addTaskHighBackground(),
  432. * or GearmanClient::addTaskLowBackground(), this call starts running the tasks in parallel.
  433. * Note that enough workers need to be available for the tasks to all run in parallel
  434. *
  435. * @return boolean run tasks result
  436. */
  437. public function runTasks()
  438. {
  439. $taskStructure = $this->taskStructure;
  440. $gearmanClient = new \GearmanClient();
  441. $this->assignServers($gearmanClient);
  442. $this->gearmanCallbacks->assignTaskCallbacks($gearmanClient);
  443. foreach ($this->taskStructure as $task) {
  444. $type = $task['method'];
  445. $jobName = $task['name'];
  446. $worker = $this->getJob($jobName);
  447. if (false !== $worker) {
  448. $gearmanClient->$type($worker['job']['realCallableName'], $task['params'], $task['context'], $task['unique']);
  449. }
  450. }
  451. $this->taskStructure = array();
  452. return $gearmanClient->runTasks();
  453. }
  454. }