GearmanClient.php 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476
  1. <?php
  2. /**
  3. * Gearman Bundle for Symfony2
  4. *
  5. * @author Marc Morera <yuhu@mmoreram.com>
  6. * @since 2013
  7. */
  8. namespace Mmoreram\GearmanBundle\Service;
  9. use Mmoreram\GearmanBundle\Service\Abstracts\AbstractGearmanService;
  10. use Mmoreram\GearmanBundle\Service\GearmanInterface;
  11. use Mmoreram\GearmanBundle\Exceptions\NoCallableGearmanMethodException;
  12. /**
  13. * Implementation of GearmanInterface
  14. *
  15. * @todo Find the way of reducing number of class methods
  16. */
  17. class GearmanClient extends AbstractGearmanService
  18. {
  19. /**
  20. * Server variable to define in what server must connect to
  21. *
  22. * @var array
  23. */
  24. public $server;
  25. /**
  26. * task structure to store all about called tasks
  27. *
  28. * @var $taskStructure
  29. */
  30. public $taskStructure = array();
  31. /**
  32. * Runs a single task and returns some result, depending of method called.
  33. * Method called depends of default callable method setted on gearman settings
  34. * or overwritted on work or job annotations
  35. *
  36. * @param string $name A GearmanBundle registered function the worker is to execute
  37. * @param Mixed $params Parameters to send to job
  38. *
  39. * @return mixed result depending of method called.
  40. */
  41. public function callJob($name, $params = array())
  42. {
  43. $worker = $this->getJob($name);
  44. $methodCallable = $worker['job']['defaultMethod'] . 'Job';
  45. if (!method_exists($this, $methodCallable)) {
  46. throw new NoCallableGearmanMethodException($methodCallable);
  47. }
  48. return $this->$methodCallable($name, $params);
  49. }
  50. /**
  51. * Get real worker from job name and enqueues the action given one
  52. * method.
  53. *
  54. * @param string $jobName A GearmanBundle registered function the worker is to execute
  55. * @param mixed $params Parameters to send to job
  56. * @param string $method Method to execute
  57. * @param string $unique A unique ID used to identify a particular task
  58. *
  59. * @return mixed Return result of the call
  60. */
  61. private function enqueue($jobName, $params, $method, $unique)
  62. {
  63. $worker = $this->getJob($jobName);
  64. if (false !== $worker) {
  65. return $this->doEnqueue($worker, $params, $method, $unique);
  66. }
  67. return false;
  68. }
  69. /**
  70. * Execute a GearmanClient call given a worker, params and a method.
  71. * If any method is given, it performs a "do" call
  72. *
  73. * If he GarmanClient call is asyncronous, result value will be a handler.
  74. * Otherwise, will return job result.
  75. *
  76. * @param array $worker Worker definition
  77. * @param mixed $params Parameters to send to job
  78. * @param string $method Method to execute
  79. * @param string $unique A unique ID used to identify a particular task
  80. *
  81. * @return mixed Return result of the GearmanClient call
  82. */
  83. private function doEnqueue(Array $worker, $params = '', $method = 'do', $unique = null)
  84. {
  85. $gmclient = new \GearmanClient();
  86. $this->assignServers($gmclient);
  87. return $gmclient->$method($worker['job']['realCallableName'], $params, $unique);
  88. }
  89. /**
  90. * Set server of gearman
  91. *
  92. * @param type $servername Server name (must be ip)
  93. * @param type $port Port of server. By default 4730
  94. *
  95. * @return GearmanClient Returns self object
  96. */
  97. public function setServer($servername, $port = 4730)
  98. {
  99. $this->server = array($servername, $port);
  100. return $this;
  101. }
  102. /**
  103. * Given a GearmanClient, set all included servers
  104. *
  105. * @param GearmanClient $gearmanClient Object to include servers
  106. *
  107. * @return GearmanClient Returns self object
  108. */
  109. private function assignServers(\GearmanClient $gearmanClient)
  110. {
  111. if (empty($this->server)) {
  112. $gearmanClient->addServer();
  113. } else {
  114. $gearmanClient->addServer($this->server[0], $this->server[1]);
  115. }
  116. return $this;
  117. }
  118. /**
  119. * Clear server slot
  120. *
  121. * @return GearmanClient Returns self object
  122. */
  123. public function clearServers()
  124. {
  125. $this->server = null;
  126. return $this;
  127. }
  128. /**
  129. * Job methods
  130. */
  131. /**
  132. * Runs a single task and returns a string representation of the result.
  133. * It is up to the GearmanClient and GearmanWorker to agree on the format of the result.
  134. * The GearmanClient::do() method is deprecated as of pecl/gearman 1.0.0. Use GearmanClient::doNormal().
  135. *
  136. * @param string $name A GearmanBundle registered function the worker is to execute
  137. * @param Mixed $params Parameters to send to job
  138. * @param string $unique A unique ID used to identify a particular task
  139. *
  140. * @return string A string representing the results of running a task.
  141. * @deprecated
  142. */
  143. public function doJob($name, $params = array(), $unique = null)
  144. {
  145. return $this->enqueue($name, $params, 'do', $unique);
  146. }
  147. /**
  148. * Runs a single task and returns a string representation of the result.
  149. * It is up to the GearmanClient and GearmanWorker to agree on the format of the result.
  150. *
  151. * @param string $name A GearmanBundle registered function the worker is to execute
  152. * @param Mixed $params Parameters to send to job
  153. * @param string $unique A unique ID used to identify a particular task
  154. *
  155. * @return string A string representing the results of running a task.
  156. */
  157. public function doNormalJob($name, $params = array(), $unique = null)
  158. {
  159. return $this->enqueue($name, $params, 'doNormal', $unique);
  160. }
  161. /**
  162. * Runs a task in the background, returning a job handle which
  163. * can be used to get the status of the running task.
  164. *
  165. * @param string $name A GearmanBundle registered function the worker is to execute
  166. * @param Mixed $params Parameters to send to job
  167. * @param string $unique A unique ID used to identify a particular task
  168. *
  169. * @return string Job handle for the submitted task.
  170. */
  171. public function doBackgroundJob($name, $params = array(), $unique = null)
  172. {
  173. return $this->enqueue($name, $params, 'doBackground', $unique);
  174. }
  175. /**
  176. * Runs a single high priority task and returns a string representation of the result.
  177. * It is up to the GearmanClient and GearmanWorker to agree on the format of the result.
  178. * High priority tasks will get precedence over normal and low priority tasks in the job queue.
  179. *
  180. * @param string $name A GearmanBundle registered function the worker is to execute
  181. * @param Mixed $params Parameters to send to job
  182. * @param string $unique A unique ID used to identify a particular task
  183. *
  184. * @return string A string representing the results of running a task.
  185. */
  186. public function doHighJob($name, $params = array(), $unique = null)
  187. {
  188. return $this->enqueue($name, $params, 'doHigh', $unique);
  189. }
  190. /**
  191. * Runs a high priority task in the background, returning a job handle which can be used to get the status of the running task.
  192. * High priority tasks take precedence over normal and low priority tasks in the job queue.
  193. *
  194. * @param string $name A GearmanBundle registered function the worker is to execute
  195. * @param Mixed $params Parameters to send to job
  196. * @param string $unique A unique ID used to identify a particular task
  197. *
  198. * @return string The job handle for the submitted task.
  199. */
  200. public function doHighBackgroundJob($name, $params = array(), $unique = null)
  201. {
  202. return $this->enqueue($name, $params, 'doHighBackground', $unique);
  203. }
  204. /**
  205. * Runs a single low priority task and returns a string representation of the result.
  206. * It is up to the GearmanClient and GearmanWorker to agree on the format of the result.
  207. * Normal and high priority tasks will get precedence over low priority tasks in the job queue.
  208. *
  209. * @param string $name A GearmanBundle registered function the worker is to execute
  210. * @param Mixed $params Parameters to send to job
  211. * @param string $unique A unique ID used to identify a particular task
  212. *
  213. * @return string A string representing the results of running a task.
  214. */
  215. public function doLowJob($name, $params = array(), $unique = null)
  216. {
  217. return $this->enqueue($name, $params, 'doLow', $unique);
  218. }
  219. /**
  220. * Runs a low priority task in the background, returning a job handle which can be used to get the status of the running task.
  221. * Normal and high priority tasks will get precedence over low priority tasks in the job queue.
  222. *
  223. * @param string $name A GearmanBundle registered function the worker is to execute
  224. * @param Mixed $params Parameters to send to job
  225. * @param string $unique A unique ID used to identify a particular task
  226. *
  227. * @return string The job handle for the submitted task.
  228. */
  229. public function doLowBackgroundJob($name, $params = array(), $unique = null)
  230. {
  231. return $this->enqueue($name, $params, 'doLowBackground', $unique);
  232. }
  233. /**
  234. * Task methods
  235. */
  236. /**
  237. * Adds a task to be run in parallel with other tasks.
  238. * Call this method for all the tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
  239. * Note that enough workers need to be available for the tasks to all run in parallel.
  240. *
  241. * @param string $name A GermanBundle registered function to be executed
  242. * @param Mixed $params Parameters to send to task
  243. * @param Mixed &$context Application context to associate with a task
  244. * @param string $unique A unique ID used to identify a particular task
  245. *
  246. * @return GearmanClient Return this object
  247. */
  248. public function addTask($name, $params =array(), &$context = null, $unique = null)
  249. {
  250. $this->enqueueTask($name, $params, $context, $unique, 'addTask');
  251. return $this;
  252. }
  253. /**
  254. * Adds a high priority task to be run in parallel with other tasks.
  255. * Call this method for all the high priority tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
  256. * Tasks with a high priority will be selected from the queue before those of normal or low priority.
  257. *
  258. * @param string $name A GermanBundle registered function to be executed
  259. * @param Mixed $params Parameters to send to task
  260. * @param Mixed &$context Application context to associate with a task
  261. * @param string $unique A unique ID used to identify a particular task
  262. *
  263. * @return GearmanClient Return this object
  264. */
  265. public function addTaskHigh($name, $params =array(), &$context = null, $unique = null)
  266. {
  267. $this->enqueueTask($name, $params, $context, $unique, 'addTaskHigh');
  268. return $this;
  269. }
  270. /**
  271. * Adds a low priority background task to be run in parallel with other tasks.
  272. * Call this method for all the tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
  273. * Tasks with a low priority will be selected from the queue after those of normal or low priority.
  274. *
  275. * @param string $name A GermanBundle registered function to be executed
  276. * @param Mixed $params Parameters to send to task
  277. * @param Mixed &$context Application context to associate with a task
  278. * @param string $unique A unique ID used to identify a particular task
  279. *
  280. * @return GearmanClient Return this object
  281. */
  282. public function addTaskLow($name, $params =array(), &$context = null, $unique = null)
  283. {
  284. $this->enqueueTask($name, $params, $context, $unique, 'addTaskLow');
  285. return $this;
  286. }
  287. /**
  288. * Adds a background task to be run in parallel with other tasks
  289. * Call this method for all the tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
  290. *
  291. * @param string $name A GermanBundle registered function to be executed
  292. * @param Mixed $params Parameters to send to task
  293. * @param Mixed &$context Application context to associate with a task
  294. * @param string $unique A unique ID used to identify a particular task
  295. *
  296. * @return GearmanClient Return this object
  297. */
  298. public function addTaskBackground($name, $params =array(), &$context = null, $unique = null)
  299. {
  300. $this->enqueueTask($name, $params, $context, $unique, 'addTaskBackground');
  301. return $this;
  302. }
  303. /**
  304. * Adds a high priority background task to be run in parallel with other tasks.
  305. * Call this method for all the tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
  306. * Tasks with a high priority will be selected from the queue before those of normal or low priority.
  307. *
  308. * @param string $name A GermanBundle registered function to be executed
  309. * @param Mixed $params Parameters to send to task
  310. * @param Mixed &$context Application context to associate with a task
  311. * @param string $unique A unique ID used to identify a particular task
  312. *
  313. * @return GearmanClient Return this object
  314. */
  315. public function addTaskHighBackground($name, $params =array(), &$context = null, $unique = null)
  316. {
  317. $this->enqueueTask($name, $params, $context, $unique, 'addTaskHighBackground');
  318. return $this;
  319. }
  320. /**
  321. * Adds a low priority background task to be run in parallel with other tasks.
  322. * Call this method for all the tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
  323. * Tasks with a low priority will be selected from the queue after those of normal or high priority.
  324. *
  325. * @param string $name A GermanBundle registered function to be executed
  326. * @param Mixed $params Parameters to send to task
  327. * @param Mixed &$context Application context to associate with a task
  328. * @param string $unique A unique ID used to identify a particular task
  329. *
  330. * @return GearmanClient Return this object
  331. */
  332. public function addTaskLowBackground($name, $params =array(), &$context = null, $unique = null)
  333. {
  334. $this->enqueueTask($name, $params, $context, $unique, 'addTaskLowBackground');
  335. return $this;
  336. }
  337. /**
  338. * Adds a task into the structure of tasks with included type of call
  339. *
  340. * @param string $name A GermanBundle registered function to be executed
  341. * @param Mixed $params Parameters to send to task
  342. * @param Mixed $context Application context to associate with a task
  343. * @param string $unique A unique ID used to identify a particular task
  344. * @param string $method Method to perform
  345. *
  346. * @return GearmanClient Return this object
  347. */
  348. private function enqueueTask($name, $params, $context, $unique, $method)
  349. {
  350. $task = array(
  351. 'name' => $name,
  352. 'params' => $params,
  353. 'context' => $context,
  354. 'unique' => $unique,
  355. 'method' => $method,
  356. );
  357. $this->addTaskToStructure($task);
  358. return $this;
  359. }
  360. /**
  361. * Appends a task structure into taskStructure array
  362. *
  363. * @param array $task Task structure
  364. *
  365. * @return GearmanClient Return this object
  366. */
  367. private function addTaskToStructure(array $task)
  368. {
  369. $this->taskStructure[] = $task;
  370. return $this;
  371. }
  372. /**
  373. * For a set of tasks previously added with GearmanClient::addTask(), GearmanClient::addTaskHigh(),
  374. * GearmanClient::addTaskLow(), GearmanClient::addTaskBackground(), GearmanClient::addTaskHighBackground(),
  375. * or GearmanClient::addTaskLowBackground(), this call starts running the tasks in parallel.
  376. * Note that enough workers need to be available for the tasks to all run in parallel
  377. *
  378. * @return boolean run tasks result
  379. */
  380. public function runTasks()
  381. {
  382. $taskStructure = $this->taskStructure;
  383. $gearmanClient = new \GearmanClient();
  384. $this->assignServers($gearmanClient);
  385. foreach ($this->taskStructure as $task) {
  386. $type = $task['method'];
  387. $jobName = $task['name'];
  388. $worker = $this->getJob($jobName);
  389. if (false !== $worker) {
  390. $gearmanClient->$type($worker['job']['realCallableName'], $task['params'], $task['context'], $task['unique']);
  391. }
  392. }
  393. $this->taskStructure = array();
  394. return $gearmanClient->runTasks();
  395. }
  396. }