GearmanClient.php 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551
  1. <?php
  2. /**
  3. * Gearman Bundle for Symfony2
  4. *
  5. * @author Marc Morera <yuhu@mmoreram.com>
  6. * @since 2013
  7. */
  8. namespace Mmoreram\GearmanBundle\Service;
  9. use Mmoreram\GearmanBundle\Service\Abstracts\AbstractGearmanService;
  10. use Mmoreram\GearmanBundle\Service\GearmanInterface;
  11. use Mmoreram\GearmanBundle\Exceptions\NoCallableGearmanMethodException;
  12. use Mmoreram\GearmanBundle\GearmanMethods;
  13. /**
  14. * Implementation of GearmanInterface
  15. */
  16. class GearmanClient extends AbstractGearmanService
  17. {
  18. /**
  19. * @var GearmanCallbacks
  20. *
  21. * Gearman callbacks
  22. */
  23. private $gearmanCallbacks;
  24. /**
  25. * @var array
  26. *
  27. * Server set to define in what server must connect to
  28. */
  29. private $servers = array();
  30. /**
  31. * @var array
  32. *
  33. * task structure to store all about called tasks
  34. */
  35. private $taskStructure = array();
  36. /**
  37. * @var array
  38. *
  39. * Set default servers
  40. */
  41. private $defaultServers;
  42. /**
  43. * Set default servers
  44. *
  45. * @param array $defaultServers Default servers
  46. *
  47. * @return GearmanClient self Object
  48. */
  49. public function setDefaultServers(array $defaultServers)
  50. {
  51. $this->defaultServers = $defaultServers;
  52. return $this;
  53. }
  54. /**
  55. * Set gearman callbacks
  56. *
  57. * @param GearmanCallbacks $gearmanCallbacks Gearman callbacks
  58. *
  59. * @return GearmanClient self Object
  60. */
  61. public function setGearmanCallbacks(GearmanCallbacks $gearmanCallbacks)
  62. {
  63. $this->gearmanCallbacks = $gearmanCallbacks;
  64. return $this;
  65. }
  66. /**
  67. * Set server to client. Empty all servers and set this one
  68. *
  69. * @param type $servername Server name (must be ip)
  70. * @param type $port Port of server. By default 4730
  71. *
  72. * @return GearmanClient Returns self object
  73. */
  74. public function setServer($servername, $port = 4730)
  75. {
  76. $this
  77. ->clearServers()
  78. ->addServer($servername, $port);
  79. return $this;
  80. }
  81. /**
  82. * Add server to client
  83. *
  84. * @param type $servername Server name (must be ip)
  85. * @param type $port Port of server. By default 4730
  86. *
  87. * @return GearmanClient Returns self object
  88. */
  89. public function addServer($servername, $port = 4730)
  90. {
  91. $this->servers[] = array(
  92. 'host' => $servername,
  93. 'port' => $port,
  94. );
  95. return $this;
  96. }
  97. /**
  98. * Clear server list
  99. *
  100. * @return GearmanClient Returns self object
  101. */
  102. public function clearServers()
  103. {
  104. $this->servers = array();
  105. return $this;
  106. }
  107. /**
  108. * Get real worker from job name and enqueues the action given one
  109. * method.
  110. *
  111. * @param string $jobName A GearmanBundle registered function the worker is to execute
  112. * @param string $params Parameters to send to job as string
  113. * @param string $method Method to execute
  114. * @param string $unique A unique ID used to identify a particular task
  115. *
  116. * @return mixed Return result of the call
  117. */
  118. private function enqueue($jobName, $params = '', $method = null, $unique = null)
  119. {
  120. $worker = $this->getJob($jobName);
  121. if (false !== $worker) {
  122. return $this->doEnqueue($worker, $params, $method, $unique);
  123. }
  124. return false;
  125. }
  126. /**
  127. * Execute a GearmanClient call given a worker, params and a method.
  128. *
  129. * If he GarmanClient call is asyncronous, result value will be a handler.
  130. * Otherwise, will return job result.
  131. *
  132. * @param array $worker Worker definition
  133. * @param string $params Parameters to send to job as string
  134. * @param string $method Method to execute
  135. * @param string $unique A unique ID used to identify a particular task
  136. *
  137. * @return mixed Return result of the GearmanClient call
  138. */
  139. private function doEnqueue(Array $worker, $params = '', $method = null, $unique = null)
  140. {
  141. $gearmanClient = new \GearmanClient();
  142. $this->assignServers($gearmanClient);
  143. return $gearmanClient->$method($worker['job']['realCallableName'], $params, $unique);
  144. }
  145. /**
  146. * Given a GearmanClient, set all included servers
  147. *
  148. * @param GearmanClient $gearmanClient Object to include servers
  149. *
  150. * @return GearmanClient Returns self object
  151. */
  152. private function assignServers(\GearmanClient $gearmanClient)
  153. {
  154. $servers = $this->defaultServers;
  155. if (!empty($this->servers)) {
  156. $servers = $this->servers;
  157. }
  158. /**
  159. * We include each server into gearman client
  160. */
  161. foreach ($servers as $server) {
  162. $gearmanClient->addServer($server['host'], $server['port']);
  163. }
  164. return $this;
  165. }
  166. /**
  167. * Job methods
  168. */
  169. /**
  170. * Runs a single task and returns some result, depending of method called.
  171. * Method called depends of default callable method setted on gearman settings
  172. * or overwritted on work or job annotations
  173. *
  174. * @param string $name A GearmanBundle registered function the worker is to execute
  175. * @param string $params Parameters to send to job as string
  176. * @param string $unique A unique ID used to identify a particular task
  177. *
  178. * @return mixed result depending of method called.
  179. */
  180. public function callJob($name, $params = '', $unique = null)
  181. {
  182. $worker = $this->getJob($name);
  183. $methodCallable = $worker['job']['defaultMethod'] . 'Job';
  184. return $this->enqueue($name, $params, $methodCallable, $unique);
  185. }
  186. /**
  187. * Runs a single task and returns a string representation of the result.
  188. * It is up to the GearmanClient and GearmanWorker to agree on the format of the result.
  189. * The GearmanClient::do() method is deprecated as of pecl/gearman 1.0.0. Use GearmanClient::doNormal().
  190. *
  191. * @param string $name A GearmanBundle registered function the worker is to execute
  192. * @param string $params Parameters to send to job as string
  193. * @param string $unique A unique ID used to identify a particular task
  194. *
  195. * @return string A string representing the results of running a task.
  196. * @deprecated
  197. */
  198. public function doJob($name, $params = '', $unique = null)
  199. {
  200. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DO, $unique);
  201. }
  202. /**
  203. * Runs a single task and returns a string representation of the result.
  204. * It is up to the GearmanClient and GearmanWorker to agree on the format of the result.
  205. *
  206. * @param string $name A GearmanBundle registered function the worker is to execute
  207. * @param string $params Parameters to send to job as string
  208. * @param string $unique A unique ID used to identify a particular task
  209. *
  210. * @return string A string representing the results of running a task.
  211. */
  212. public function doNormalJob($name, $params = '', $unique = null)
  213. {
  214. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DONORMAL, $unique);
  215. }
  216. /**
  217. * Runs a task in the background, returning a job handle which
  218. * can be used to get the status of the running task.
  219. *
  220. * @param string $name A GearmanBundle registered function the worker is to execute
  221. * @param string $params Parameters to send to job as string
  222. * @param string $unique A unique ID used to identify a particular task
  223. *
  224. * @return string Job handle for the submitted task.
  225. */
  226. public function doBackgroundJob($name, $params = '', $unique = null)
  227. {
  228. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOBACKGROUND, $unique);
  229. }
  230. /**
  231. * Runs a single high priority task and returns a string representation of the result.
  232. * It is up to the GearmanClient and GearmanWorker to agree on the format of the result.
  233. * High priority tasks will get precedence over normal and low priority tasks in the job queue.
  234. *
  235. * @param string $name A GearmanBundle registered function the worker is to execute
  236. * @param string $params Parameters to send to job as string
  237. * @param string $unique A unique ID used to identify a particular task
  238. *
  239. * @return string A string representing the results of running a task.
  240. */
  241. public function doHighJob($name, $params = '', $unique = null)
  242. {
  243. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOHIGH, $unique);
  244. }
  245. /**
  246. * Runs a high priority task in the background, returning a job handle which can be used to get the status of the running task.
  247. * High priority tasks take precedence over normal and low priority tasks in the job queue.
  248. *
  249. * @param string $name A GearmanBundle registered function the worker is to execute
  250. * @param string $params Parameters to send to job as string
  251. * @param string $unique A unique ID used to identify a particular task
  252. *
  253. * @return string The job handle for the submitted task.
  254. */
  255. public function doHighBackgroundJob($name, $params = '', $unique = null)
  256. {
  257. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOHIGHBACKGROUND, $unique);
  258. }
  259. /**
  260. * Runs a single low priority task and returns a string representation of the result.
  261. * It is up to the GearmanClient and GearmanWorker to agree on the format of the result.
  262. * Normal and high priority tasks will get precedence over low priority tasks in the job queue.
  263. *
  264. * @param string $name A GearmanBundle registered function the worker is to execute
  265. * @param string $params Parameters to send to job as string
  266. * @param string $unique A unique ID used to identify a particular task
  267. *
  268. * @return string A string representing the results of running a task.
  269. */
  270. public function doLowJob($name, $params = '', $unique = null)
  271. {
  272. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOLOW, $unique);
  273. }
  274. /**
  275. * Runs a low priority task in the background, returning a job handle which can be used to get the status of the running task.
  276. * Normal and high priority tasks will get precedence over low priority tasks in the job queue.
  277. *
  278. * @param string $name A GearmanBundle registered function the worker is to execute
  279. * @param string $params Parameters to send to job as string
  280. * @param string $unique A unique ID used to identify a particular task
  281. *
  282. * @return string The job handle for the submitted task.
  283. */
  284. public function doLowBackgroundJob($name, $params = '', $unique = null)
  285. {
  286. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOLOWBACKGROUND, $unique);
  287. }
  288. /**
  289. * Task methods
  290. */
  291. /**
  292. * Adds a task to be run in parallel with other tasks.
  293. * Call this method for all the tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
  294. * Note that enough workers need to be available for the tasks to all run in parallel.
  295. *
  296. * @param string $name A GermanBundle registered function to be executed
  297. * @param string $params Parameters to send to task as string
  298. * @param Mixed &$context Application context to associate with a task
  299. * @param string $unique A unique ID used to identify a particular task
  300. *
  301. * @return GearmanClient Return this object
  302. */
  303. public function addTask($name, $params = '', &$context = null, $unique = null)
  304. {
  305. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASK);
  306. return $this;
  307. }
  308. /**
  309. * Adds a high priority task to be run in parallel with other tasks.
  310. * Call this method for all the high priority tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
  311. * Tasks with a high priority will be selected from the queue before those of normal or low priority.
  312. *
  313. * @param string $name A GermanBundle registered function to be executed
  314. * @param string $params Parameters to send to task as string
  315. * @param Mixed &$context Application context to associate with a task
  316. * @param string $unique A unique ID used to identify a particular task
  317. *
  318. * @return GearmanClient Return this object
  319. */
  320. public function addTaskHigh($name, $params = '', &$context = null, $unique = null)
  321. {
  322. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKHIGH);
  323. return $this;
  324. }
  325. /**
  326. * Adds a low priority background task to be run in parallel with other tasks.
  327. * Call this method for all the tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
  328. * Tasks with a low priority will be selected from the queue after those of normal or low priority.
  329. *
  330. * @param string $name A GermanBundle registered function to be executed
  331. * @param string $params Parameters to send to task as string
  332. * @param Mixed &$context Application context to associate with a task
  333. * @param string $unique A unique ID used to identify a particular task
  334. *
  335. * @return GearmanClient Return this object
  336. */
  337. public function addTaskLow($name, $params = '', &$context = null, $unique = null)
  338. {
  339. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKLOW);
  340. return $this;
  341. }
  342. /**
  343. * Adds a background task to be run in parallel with other tasks
  344. * Call this method for all the tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
  345. *
  346. * @param string $name A GermanBundle registered function to be executed
  347. * @param string $params Parameters to send to task as string
  348. * @param Mixed &$context Application context to associate with a task
  349. * @param string $unique A unique ID used to identify a particular task
  350. *
  351. * @return GearmanClient Return this object
  352. */
  353. public function addTaskBackground($name, $params = '', &$context = null, $unique = null)
  354. {
  355. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKBACKGROUND);
  356. return $this;
  357. }
  358. /**
  359. * Adds a high priority background task to be run in parallel with other tasks.
  360. * Call this method for all the tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
  361. * Tasks with a high priority will be selected from the queue before those of normal or low priority.
  362. *
  363. * @param string $name A GermanBundle registered function to be executed
  364. * @param string $params Parameters to send to task as string
  365. * @param Mixed &$context Application context to associate with a task
  366. * @param string $unique A unique ID used to identify a particular task
  367. *
  368. * @return GearmanClient Return this object
  369. */
  370. public function addTaskHighBackground($name, $params = '', &$context = null, $unique = null)
  371. {
  372. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKHIGHBACKGROUND);
  373. return $this;
  374. }
  375. /**
  376. * Adds a low priority background task to be run in parallel with other tasks.
  377. * Call this method for all the tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
  378. * Tasks with a low priority will be selected from the queue after those of normal or high priority.
  379. *
  380. * @param string $name A GermanBundle registered function to be executed
  381. * @param string $params Parameters to send to task as string
  382. * @param Mixed &$context Application context to associate with a task
  383. * @param string $unique A unique ID used to identify a particular task
  384. *
  385. * @return GearmanClient Return this object
  386. */
  387. public function addTaskLowBackground($name, $params = '', &$context = null, $unique = null)
  388. {
  389. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKLOWBACKGROUND);
  390. return $this;
  391. }
  392. /**
  393. * Adds a task into the structure of tasks with included type of call
  394. *
  395. * @param string $name A GermanBundle registered function to be executed
  396. * @param string $params Parameters to send to task as string
  397. * @param Mixed $context Application context to associate with a task
  398. * @param string $unique A unique ID used to identify a particular task
  399. * @param string $method Method to perform
  400. *
  401. * @return GearmanClient Return this object
  402. */
  403. private function enqueueTask($name, $params, $context, $unique, $method)
  404. {
  405. $task = array(
  406. 'name' => $name,
  407. 'params' => $params,
  408. 'context' => $context,
  409. 'unique' => $unique,
  410. 'method' => $method,
  411. );
  412. $this->addTaskToStructure($task);
  413. return $this;
  414. }
  415. /**
  416. * Appends a task structure into taskStructure array
  417. *
  418. * @param array $task Task structure
  419. *
  420. * @return GearmanClient Return this object
  421. */
  422. private function addTaskToStructure(array $task)
  423. {
  424. $this->taskStructure[] = $task;
  425. return $this;
  426. }
  427. /**
  428. * For a set of tasks previously added with GearmanClient::addTask(), GearmanClient::addTaskHigh(),
  429. * GearmanClient::addTaskLow(), GearmanClient::addTaskBackground(), GearmanClient::addTaskHighBackground(),
  430. * or GearmanClient::addTaskLowBackground(), this call starts running the tasks in parallel.
  431. * Note that enough workers need to be available for the tasks to all run in parallel
  432. *
  433. * @return boolean run tasks result
  434. */
  435. public function runTasks()
  436. {
  437. $gearmanClient = new \GearmanClient();
  438. $this->assignServers($gearmanClient);
  439. $this->gearmanCallbacks->assignTaskCallbacks($gearmanClient);
  440. foreach ($this->taskStructure as $task) {
  441. $type = $task['method'];
  442. $jobName = $task['name'];
  443. $worker = $this->getJob($jobName);
  444. if (false !== $worker) {
  445. $gearmanClient->$type($worker['job']['realCallableName'], $task['params'], $task['context'], $task['unique']);
  446. }
  447. }
  448. $this->taskStructure = array();
  449. return $gearmanClient->runTasks();
  450. }
  451. }