GearmanClient.php 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569
  1. <?php
  2. /**
  3. * Gearman Bundle for Symfony2
  4. *
  5. * @author Marc Morera <yuhu@mmoreram.com>
  6. * @since 2013
  7. */
  8. namespace Mmoreram\GearmanBundle\Service;
  9. use Mmoreram\GearmanBundle\Service\Abstracts\AbstractGearmanService;
  10. use Mmoreram\GearmanBundle\GearmanMethods;
  11. use Mmoreram\GearmanBundle\Module\JobStatus;
  12. /**
  13. * GearmanClient. Implementation of AbstractGearmanService
  14. */
  15. class GearmanClient extends AbstractGearmanService
  16. {
  17. /**
  18. * @var GearmanCallbacks
  19. *
  20. * Gearman callbacks
  21. */
  22. protected $gearmanCallbacks;
  23. /**
  24. * @var array
  25. *
  26. * Server set to define in what server must connect to
  27. */
  28. protected $servers = array();
  29. /**
  30. * @var array
  31. *
  32. * task structure to store all about called tasks
  33. */
  34. protected $taskStructure = array();
  35. /**
  36. * @var array
  37. *
  38. * Set default servers
  39. */
  40. protected $defaultServers;
  41. /**
  42. * Set default servers
  43. *
  44. * @param array $defaultServers Default servers
  45. *
  46. * @return GearmanClient self Object
  47. */
  48. public function setDefaultServers(array $defaultServers)
  49. {
  50. $this->defaultServers = $defaultServers;
  51. return $this;
  52. }
  53. /**
  54. * Set gearman callbacks
  55. *
  56. * @param GearmanCallbacks $gearmanCallbacks Gearman callbacks
  57. *
  58. * @return GearmanClient self Object
  59. */
  60. public function setGearmanCallbacks(GearmanCallbacks $gearmanCallbacks)
  61. {
  62. $this->gearmanCallbacks = $gearmanCallbacks;
  63. return $this;
  64. }
  65. /**
  66. * Set server to client. Empty all servers and set this one
  67. *
  68. * @param type $servername Server name (must be ip)
  69. * @param type $port Port of server. By default 4730
  70. *
  71. * @return GearmanClient Returns self object
  72. */
  73. public function setServer($servername, $port = 4730)
  74. {
  75. $this
  76. ->clearServers()
  77. ->addServer($servername, $port);
  78. return $this;
  79. }
  80. /**
  81. * Add server to client
  82. *
  83. * @param type $servername Server name (must be ip)
  84. * @param type $port Port of server. By default 4730
  85. *
  86. * @return GearmanClient Returns self object
  87. */
  88. public function addServer($servername, $port = 4730)
  89. {
  90. $this->servers[] = array(
  91. 'host' => $servername,
  92. 'port' => $port,
  93. );
  94. return $this;
  95. }
  96. /**
  97. * Clear server list
  98. *
  99. * @return GearmanClient Returns self object
  100. */
  101. public function clearServers()
  102. {
  103. $this->servers = array();
  104. return $this;
  105. }
  106. /**
  107. * Get real worker from job name and enqueues the action given one
  108. * method.
  109. *
  110. * @param string $jobName A GearmanBundle registered function the worker is to execute
  111. * @param string $params Parameters to send to job as string
  112. * @param string $method Method to execute
  113. * @param string $unique A unique ID used to identify a particular task
  114. *
  115. * @return mixed Return result of the call
  116. */
  117. private function enqueue($jobName, $params = '', $method = null, $unique = null)
  118. {
  119. $worker = $this->getJob($jobName);
  120. if (false !== $worker) {
  121. return $this->doEnqueue($worker, $params, $method, $unique);
  122. }
  123. return false;
  124. }
  125. /**
  126. * Execute a GearmanClient call given a worker, params and a method.
  127. *
  128. * If he GarmanClient call is asyncronous, result value will be a handler.
  129. * Otherwise, will return job result.
  130. *
  131. * @param array $worker Worker definition
  132. * @param string $params Parameters to send to job as string
  133. * @param string $method Method to execute
  134. * @param string $unique A unique ID used to identify a particular task
  135. *
  136. * @return mixed Return result of the GearmanClient call
  137. */
  138. private function doEnqueue(Array $worker, $params = '', $method = null, $unique = null)
  139. {
  140. $gearmanClient = new \GearmanClient();
  141. $this->assignServers($gearmanClient);
  142. return $gearmanClient->$method($worker['job']['realCallableName'], $params, $unique);
  143. }
  144. /**
  145. * Given a GearmanClient, set all included servers
  146. *
  147. * @param GearmanClient $gearmanClient Object to include servers
  148. *
  149. * @return GearmanClient Returns self object
  150. */
  151. private function assignServers(\GearmanClient $gearmanClient)
  152. {
  153. $servers = $this->defaultServers;
  154. if (!empty($this->servers)) {
  155. $servers = $this->servers;
  156. }
  157. /**
  158. * We include each server into gearman client
  159. */
  160. foreach ($servers as $server) {
  161. $gearmanClient->addServer($server['host'], $server['port']);
  162. }
  163. return $this;
  164. }
  165. /**
  166. * Job methods
  167. */
  168. /**
  169. * Runs a single task and returns some result, depending of method called.
  170. * Method called depends of default callable method setted on gearman settings
  171. * or overwritted on work or job annotations
  172. *
  173. * @param string $name A GearmanBundle registered function the worker is to execute
  174. * @param string $params Parameters to send to job as string
  175. * @param string $unique A unique ID used to identify a particular task
  176. *
  177. * @return mixed result depending of method called.
  178. */
  179. public function callJob($name, $params = '', $unique = null)
  180. {
  181. $worker = $this->getJob($name);
  182. $methodCallable = $worker['job']['defaultMethod'] . 'Job';
  183. return $this->enqueue($name, $params, $methodCallable, $unique);
  184. }
  185. /**
  186. * Runs a single task and returns a string representation of the result.
  187. * It is up to the GearmanClient and GearmanWorker to agree on the format of the result.
  188. * The GearmanClient::do() method is deprecated as of pecl/gearman 1.0.0. Use GearmanClient::doNormal().
  189. *
  190. * @param string $name A GearmanBundle registered function the worker is to execute
  191. * @param string $params Parameters to send to job as string
  192. * @param string $unique A unique ID used to identify a particular task
  193. *
  194. * @return string A string representing the results of running a task.
  195. * @deprecated
  196. */
  197. public function doJob($name, $params = '', $unique = null)
  198. {
  199. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DO, $unique);
  200. }
  201. /**
  202. * Runs a single task and returns a string representation of the result.
  203. * It is up to the GearmanClient and GearmanWorker to agree on the format of the result.
  204. *
  205. * @param string $name A GearmanBundle registered function the worker is to execute
  206. * @param string $params Parameters to send to job as string
  207. * @param string $unique A unique ID used to identify a particular task
  208. *
  209. * @return string A string representing the results of running a task.
  210. */
  211. public function doNormalJob($name, $params = '', $unique = null)
  212. {
  213. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DONORMAL, $unique);
  214. }
  215. /**
  216. * Runs a task in the background, returning a job handle which
  217. * can be used to get the status of the running task.
  218. *
  219. * @param string $name A GearmanBundle registered function the worker is to execute
  220. * @param string $params Parameters to send to job as string
  221. * @param string $unique A unique ID used to identify a particular task
  222. *
  223. * @return string Job handle for the submitted task.
  224. */
  225. public function doBackgroundJob($name, $params = '', $unique = null)
  226. {
  227. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOBACKGROUND, $unique);
  228. }
  229. /**
  230. * Runs a single high priority task and returns a string representation of the result.
  231. * It is up to the GearmanClient and GearmanWorker to agree on the format of the result.
  232. * High priority tasks will get precedence over normal and low priority tasks in the job queue.
  233. *
  234. * @param string $name A GearmanBundle registered function the worker is to execute
  235. * @param string $params Parameters to send to job as string
  236. * @param string $unique A unique ID used to identify a particular task
  237. *
  238. * @return string A string representing the results of running a task.
  239. */
  240. public function doHighJob($name, $params = '', $unique = null)
  241. {
  242. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOHIGH, $unique);
  243. }
  244. /**
  245. * Runs a high priority task in the background, returning a job handle which can be used to get the status of the running task.
  246. * High priority tasks take precedence over normal and low priority tasks in the job queue.
  247. *
  248. * @param string $name A GearmanBundle registered function the worker is to execute
  249. * @param string $params Parameters to send to job as string
  250. * @param string $unique A unique ID used to identify a particular task
  251. *
  252. * @return string The job handle for the submitted task.
  253. */
  254. public function doHighBackgroundJob($name, $params = '', $unique = null)
  255. {
  256. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOHIGHBACKGROUND, $unique);
  257. }
  258. /**
  259. * Runs a single low priority task and returns a string representation of the result.
  260. * It is up to the GearmanClient and GearmanWorker to agree on the format of the result.
  261. * Normal and high priority tasks will get precedence over low priority tasks in the job queue.
  262. *
  263. * @param string $name A GearmanBundle registered function the worker is to execute
  264. * @param string $params Parameters to send to job as string
  265. * @param string $unique A unique ID used to identify a particular task
  266. *
  267. * @return string A string representing the results of running a task.
  268. */
  269. public function doLowJob($name, $params = '', $unique = null)
  270. {
  271. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOLOW, $unique);
  272. }
  273. /**
  274. * Runs a low priority task in the background, returning a job handle which can be used to get the status of the running task.
  275. * Normal and high priority tasks will get precedence over low priority tasks in the job queue.
  276. *
  277. * @param string $name A GearmanBundle registered function the worker is to execute
  278. * @param string $params Parameters to send to job as string
  279. * @param string $unique A unique ID used to identify a particular task
  280. *
  281. * @return string The job handle for the submitted task.
  282. */
  283. public function doLowBackgroundJob($name, $params = '', $unique = null)
  284. {
  285. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOLOWBACKGROUND, $unique);
  286. }
  287. /**
  288. * Fetches the Status of a special Background Job.
  289. *
  290. * @param string $idJob The job handle string
  291. *
  292. * @return JobStatus Job status
  293. */
  294. public function getJobStatus($idJob)
  295. {
  296. $gearmanClient = new \GearmanClient();
  297. $this->assignServers($gearmanClient);
  298. $statusData = $gearmanClient->jobStatus($idJob);
  299. $jobStatus = new JobStatus($statusData);
  300. return $jobStatus;
  301. }
  302. /**
  303. * Task methods
  304. */
  305. /**
  306. * Adds a task to be run in parallel with other tasks.
  307. * Call this method for all the tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
  308. * Note that enough workers need to be available for the tasks to all run in parallel.
  309. *
  310. * @param string $name A GermanBundle registered function to be executed
  311. * @param string $params Parameters to send to task as string
  312. * @param Mixed &$context Application context to associate with a task
  313. * @param string $unique A unique ID used to identify a particular task
  314. *
  315. * @return GearmanClient Return this object
  316. */
  317. public function addTask($name, $params = '', &$context = null, $unique = null)
  318. {
  319. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASK);
  320. return $this;
  321. }
  322. /**
  323. * Adds a high priority task to be run in parallel with other tasks.
  324. * Call this method for all the high priority tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
  325. * Tasks with a high priority will be selected from the queue before those of normal or low priority.
  326. *
  327. * @param string $name A GermanBundle registered function to be executed
  328. * @param string $params Parameters to send to task as string
  329. * @param Mixed &$context Application context to associate with a task
  330. * @param string $unique A unique ID used to identify a particular task
  331. *
  332. * @return GearmanClient Return this object
  333. */
  334. public function addTaskHigh($name, $params = '', &$context = null, $unique = null)
  335. {
  336. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKHIGH);
  337. return $this;
  338. }
  339. /**
  340. * Adds a low priority background task to be run in parallel with other tasks.
  341. * Call this method for all the tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
  342. * Tasks with a low priority will be selected from the queue after those of normal or low priority.
  343. *
  344. * @param string $name A GermanBundle registered function to be executed
  345. * @param string $params Parameters to send to task as string
  346. * @param Mixed &$context Application context to associate with a task
  347. * @param string $unique A unique ID used to identify a particular task
  348. *
  349. * @return GearmanClient Return this object
  350. */
  351. public function addTaskLow($name, $params = '', &$context = null, $unique = null)
  352. {
  353. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKLOW);
  354. return $this;
  355. }
  356. /**
  357. * Adds a background task to be run in parallel with other tasks
  358. * Call this method for all the tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
  359. *
  360. * @param string $name A GermanBundle registered function to be executed
  361. * @param string $params Parameters to send to task as string
  362. * @param Mixed &$context Application context to associate with a task
  363. * @param string $unique A unique ID used to identify a particular task
  364. *
  365. * @return GearmanClient Return this object
  366. */
  367. public function addTaskBackground($name, $params = '', &$context = null, $unique = null)
  368. {
  369. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKBACKGROUND);
  370. return $this;
  371. }
  372. /**
  373. * Adds a high priority background task to be run in parallel with other tasks.
  374. * Call this method for all the tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
  375. * Tasks with a high priority will be selected from the queue before those of normal or low priority.
  376. *
  377. * @param string $name A GermanBundle registered function to be executed
  378. * @param string $params Parameters to send to task as string
  379. * @param Mixed &$context Application context to associate with a task
  380. * @param string $unique A unique ID used to identify a particular task
  381. *
  382. * @return GearmanClient Return this object
  383. */
  384. public function addTaskHighBackground($name, $params = '', &$context = null, $unique = null)
  385. {
  386. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKHIGHBACKGROUND);
  387. return $this;
  388. }
  389. /**
  390. * Adds a low priority background task to be run in parallel with other tasks.
  391. * Call this method for all the tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
  392. * Tasks with a low priority will be selected from the queue after those of normal or high priority.
  393. *
  394. * @param string $name A GermanBundle registered function to be executed
  395. * @param string $params Parameters to send to task as string
  396. * @param Mixed &$context Application context to associate with a task
  397. * @param string $unique A unique ID used to identify a particular task
  398. *
  399. * @return GearmanClient Return this object
  400. */
  401. public function addTaskLowBackground($name, $params = '', &$context = null, $unique = null)
  402. {
  403. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKLOWBACKGROUND);
  404. return $this;
  405. }
  406. /**
  407. * Adds a task into the structure of tasks with included type of call
  408. *
  409. * @param string $name A GermanBundle registered function to be executed
  410. * @param string $params Parameters to send to task as string
  411. * @param Mixed $context Application context to associate with a task
  412. * @param string $unique A unique ID used to identify a particular task
  413. * @param string $method Method to perform
  414. *
  415. * @return GearmanClient Return this object
  416. */
  417. private function enqueueTask($name, $params, $context, $unique, $method)
  418. {
  419. $task = array(
  420. 'name' => $name,
  421. 'params' => $params,
  422. 'context' => $context,
  423. 'unique' => $unique,
  424. 'method' => $method,
  425. );
  426. $this->addTaskToStructure($task);
  427. return $this;
  428. }
  429. /**
  430. * Appends a task structure into taskStructure array
  431. *
  432. * @param array $task Task structure
  433. *
  434. * @return GearmanClient Return this object
  435. */
  436. private function addTaskToStructure(array $task)
  437. {
  438. $this->taskStructure[] = $task;
  439. return $this;
  440. }
  441. /**
  442. * For a set of tasks previously added with GearmanClient::addTask(), GearmanClient::addTaskHigh(),
  443. * GearmanClient::addTaskLow(), GearmanClient::addTaskBackground(), GearmanClient::addTaskHighBackground(),
  444. * or GearmanClient::addTaskLowBackground(), this call starts running the tasks in parallel.
  445. * Note that enough workers need to be available for the tasks to all run in parallel
  446. *
  447. * @return boolean run tasks result
  448. */
  449. public function runTasks()
  450. {
  451. $gearmanClient = new \GearmanClient();
  452. $this->assignServers($gearmanClient);
  453. $this->gearmanCallbacks->assignTaskCallbacks($gearmanClient);
  454. foreach ($this->taskStructure as $task) {
  455. $type = $task['method'];
  456. $jobName = $task['name'];
  457. $worker = $this->getJob($jobName);
  458. if (false !== $worker) {
  459. $gearmanClient->$type($worker['job']['realCallableName'], $task['params'], $task['context'], $task['unique']);
  460. }
  461. }
  462. $this->taskStructure = array();
  463. return $gearmanClient->runTasks();
  464. }
  465. }