GearmanClient.php 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565
  1. <?php
  2. /**
  3. * Gearman Bundle for Symfony2
  4. *
  5. * @author Marc Morera <yuhu@mmoreram.com>
  6. * @since 2013
  7. */
  8. namespace Mmoreram\GearmanBundle\Service;
  9. use Mmoreram\GearmanBundle\Service\Abstracts\AbstractGearmanService;
  10. use Mmoreram\GearmanBundle\GearmanMethods;
  11. /**
  12. * GearmanClient. Implementation of AbstractGearmanService
  13. */
  14. class GearmanClient extends AbstractGearmanService
  15. {
  16. /**
  17. * @var GearmanCallbacks
  18. *
  19. * Gearman callbacks
  20. */
  21. private $gearmanCallbacks;
  22. /**
  23. * @var array
  24. *
  25. * Server set to define in what server must connect to
  26. */
  27. private $servers = array();
  28. /**
  29. * @var array
  30. *
  31. * task structure to store all about called tasks
  32. */
  33. private $taskStructure = array();
  34. /**
  35. * @var array
  36. *
  37. * Set default servers
  38. */
  39. private $defaultServers;
  40. /**
  41. * Set default servers
  42. *
  43. * @param array $defaultServers Default servers
  44. *
  45. * @return GearmanClient self Object
  46. */
  47. public function setDefaultServers(array $defaultServers)
  48. {
  49. $this->defaultServers = $defaultServers;
  50. return $this;
  51. }
  52. /**
  53. * Set gearman callbacks
  54. *
  55. * @param GearmanCallbacks $gearmanCallbacks Gearman callbacks
  56. *
  57. * @return GearmanClient self Object
  58. */
  59. public function setGearmanCallbacks(GearmanCallbacks $gearmanCallbacks)
  60. {
  61. $this->gearmanCallbacks = $gearmanCallbacks;
  62. return $this;
  63. }
  64. /**
  65. * Set server to client. Empty all servers and set this one
  66. *
  67. * @param type $servername Server name (must be ip)
  68. * @param type $port Port of server. By default 4730
  69. *
  70. * @return GearmanClient Returns self object
  71. */
  72. public function setServer($servername, $port = 4730)
  73. {
  74. $this
  75. ->clearServers()
  76. ->addServer($servername, $port);
  77. return $this;
  78. }
  79. /**
  80. * Add server to client
  81. *
  82. * @param type $servername Server name (must be ip)
  83. * @param type $port Port of server. By default 4730
  84. *
  85. * @return GearmanClient Returns self object
  86. */
  87. public function addServer($servername, $port = 4730)
  88. {
  89. $this->servers[] = array(
  90. 'host' => $servername,
  91. 'port' => $port,
  92. );
  93. return $this;
  94. }
  95. /**
  96. * Clear server list
  97. *
  98. * @return GearmanClient Returns self object
  99. */
  100. public function clearServers()
  101. {
  102. $this->servers = array();
  103. return $this;
  104. }
  105. /**
  106. * Get real worker from job name and enqueues the action given one
  107. * method.
  108. *
  109. * @param string $jobName A GearmanBundle registered function the worker is to execute
  110. * @param string $params Parameters to send to job as string
  111. * @param string $method Method to execute
  112. * @param string $unique A unique ID used to identify a particular task
  113. *
  114. * @return mixed Return result of the call
  115. */
  116. private function enqueue($jobName, $params = '', $method = null, $unique = null)
  117. {
  118. $worker = $this->getJob($jobName);
  119. if (false !== $worker) {
  120. return $this->doEnqueue($worker, $params, $method, $unique);
  121. }
  122. return false;
  123. }
  124. /**
  125. * Execute a GearmanClient call given a worker, params and a method.
  126. *
  127. * If he GarmanClient call is asyncronous, result value will be a handler.
  128. * Otherwise, will return job result.
  129. *
  130. * @param array $worker Worker definition
  131. * @param string $params Parameters to send to job as string
  132. * @param string $method Method to execute
  133. * @param string $unique A unique ID used to identify a particular task
  134. *
  135. * @return mixed Return result of the GearmanClient call
  136. */
  137. private function doEnqueue(Array $worker, $params = '', $method = null, $unique = null)
  138. {
  139. $gearmanClient = new \GearmanClient();
  140. $this->assignServers($gearmanClient);
  141. return $gearmanClient->$method($worker['job']['realCallableName'], $params, $unique);
  142. }
  143. /**
  144. * Given a GearmanClient, set all included servers
  145. *
  146. * @param GearmanClient $gearmanClient Object to include servers
  147. *
  148. * @return GearmanClient Returns self object
  149. */
  150. private function assignServers(\GearmanClient $gearmanClient)
  151. {
  152. $servers = $this->defaultServers;
  153. if (!empty($this->servers)) {
  154. $servers = $this->servers;
  155. }
  156. /**
  157. * We include each server into gearman client
  158. */
  159. foreach ($servers as $server) {
  160. $gearmanClient->addServer($server['host'], $server['port']);
  161. }
  162. return $this;
  163. }
  164. /**
  165. * Job methods
  166. */
  167. /**
  168. * Runs a single task and returns some result, depending of method called.
  169. * Method called depends of default callable method setted on gearman settings
  170. * or overwritted on work or job annotations
  171. *
  172. * @param string $name A GearmanBundle registered function the worker is to execute
  173. * @param string $params Parameters to send to job as string
  174. * @param string $unique A unique ID used to identify a particular task
  175. *
  176. * @return mixed result depending of method called.
  177. */
  178. public function callJob($name, $params = '', $unique = null)
  179. {
  180. $worker = $this->getJob($name);
  181. $methodCallable = $worker['job']['defaultMethod'] . 'Job';
  182. return $this->enqueue($name, $params, $methodCallable, $unique);
  183. }
  184. /**
  185. * Runs a single task and returns a string representation of the result.
  186. * It is up to the GearmanClient and GearmanWorker to agree on the format of the result.
  187. * The GearmanClient::do() method is deprecated as of pecl/gearman 1.0.0. Use GearmanClient::doNormal().
  188. *
  189. * @param string $name A GearmanBundle registered function the worker is to execute
  190. * @param string $params Parameters to send to job as string
  191. * @param string $unique A unique ID used to identify a particular task
  192. *
  193. * @return string A string representing the results of running a task.
  194. * @deprecated
  195. */
  196. public function doJob($name, $params = '', $unique = null)
  197. {
  198. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DO, $unique);
  199. }
  200. /**
  201. * Runs a single task and returns a string representation of the result.
  202. * It is up to the GearmanClient and GearmanWorker to agree on the format of the result.
  203. *
  204. * @param string $name A GearmanBundle registered function the worker is to execute
  205. * @param string $params Parameters to send to job as string
  206. * @param string $unique A unique ID used to identify a particular task
  207. *
  208. * @return string A string representing the results of running a task.
  209. */
  210. public function doNormalJob($name, $params = '', $unique = null)
  211. {
  212. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DONORMAL, $unique);
  213. }
  214. /**
  215. * Runs a task in the background, returning a job handle which
  216. * can be used to get the status of the running task.
  217. *
  218. * @param string $name A GearmanBundle registered function the worker is to execute
  219. * @param string $params Parameters to send to job as string
  220. * @param string $unique A unique ID used to identify a particular task
  221. *
  222. * @return string Job handle for the submitted task.
  223. */
  224. public function doBackgroundJob($name, $params = '', $unique = null)
  225. {
  226. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOBACKGROUND, $unique);
  227. }
  228. /**
  229. * Runs a single high priority task and returns a string representation of the result.
  230. * It is up to the GearmanClient and GearmanWorker to agree on the format of the result.
  231. * High priority tasks will get precedence over normal and low priority tasks in the job queue.
  232. *
  233. * @param string $name A GearmanBundle registered function the worker is to execute
  234. * @param string $params Parameters to send to job as string
  235. * @param string $unique A unique ID used to identify a particular task
  236. *
  237. * @return string A string representing the results of running a task.
  238. */
  239. public function doHighJob($name, $params = '', $unique = null)
  240. {
  241. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOHIGH, $unique);
  242. }
  243. /**
  244. * Runs a high priority task in the background, returning a job handle which can be used to get the status of the running task.
  245. * High priority tasks take precedence over normal and low priority tasks in the job queue.
  246. *
  247. * @param string $name A GearmanBundle registered function the worker is to execute
  248. * @param string $params Parameters to send to job as string
  249. * @param string $unique A unique ID used to identify a particular task
  250. *
  251. * @return string The job handle for the submitted task.
  252. */
  253. public function doHighBackgroundJob($name, $params = '', $unique = null)
  254. {
  255. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOHIGHBACKGROUND, $unique);
  256. }
  257. /**
  258. * Runs a single low priority task and returns a string representation of the result.
  259. * It is up to the GearmanClient and GearmanWorker to agree on the format of the result.
  260. * Normal and high priority tasks will get precedence over low priority tasks in the job queue.
  261. *
  262. * @param string $name A GearmanBundle registered function the worker is to execute
  263. * @param string $params Parameters to send to job as string
  264. * @param string $unique A unique ID used to identify a particular task
  265. *
  266. * @return string A string representing the results of running a task.
  267. */
  268. public function doLowJob($name, $params = '', $unique = null)
  269. {
  270. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOLOW, $unique);
  271. }
  272. /**
  273. * Runs a low priority task in the background, returning a job handle which can be used to get the status of the running task.
  274. * Normal and high priority tasks will get precedence over low priority tasks in the job queue.
  275. *
  276. * @param string $name A GearmanBundle registered function the worker is to execute
  277. * @param string $params Parameters to send to job as string
  278. * @param string $unique A unique ID used to identify a particular task
  279. *
  280. * @return string The job handle for the submitted task.
  281. */
  282. public function doLowBackgroundJob($name, $params = '', $unique = null)
  283. {
  284. return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOLOWBACKGROUND, $unique);
  285. }
  286. /**
  287. * Task methods
  288. */
  289. /**
  290. * Adds a task to be run in parallel with other tasks.
  291. * Call this method for all the tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
  292. * Note that enough workers need to be available for the tasks to all run in parallel.
  293. *
  294. * @param string $name A GermanBundle registered function to be executed
  295. * @param string $params Parameters to send to task as string
  296. * @param Mixed &$context Application context to associate with a task
  297. * @param string $unique A unique ID used to identify a particular task
  298. *
  299. * @return GearmanClient Return this object
  300. */
  301. public function addTask($name, $params = '', &$context = null, $unique = null)
  302. {
  303. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASK);
  304. return $this;
  305. }
  306. /**
  307. * Adds a high priority task to be run in parallel with other tasks.
  308. * Call this method for all the high priority tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
  309. * Tasks with a high priority will be selected from the queue before those of normal or low priority.
  310. *
  311. * @param string $name A GermanBundle registered function to be executed
  312. * @param string $params Parameters to send to task as string
  313. * @param Mixed &$context Application context to associate with a task
  314. * @param string $unique A unique ID used to identify a particular task
  315. *
  316. * @return GearmanClient Return this object
  317. */
  318. public function addTaskHigh($name, $params = '', &$context = null, $unique = null)
  319. {
  320. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKHIGH);
  321. return $this;
  322. }
  323. /**
  324. * Adds a low priority background task to be run in parallel with other tasks.
  325. * Call this method for all the tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
  326. * Tasks with a low priority will be selected from the queue after those of normal or low priority.
  327. *
  328. * @param string $name A GermanBundle registered function to be executed
  329. * @param string $params Parameters to send to task as string
  330. * @param Mixed &$context Application context to associate with a task
  331. * @param string $unique A unique ID used to identify a particular task
  332. *
  333. * @return GearmanClient Return this object
  334. */
  335. public function addTaskLow($name, $params = '', &$context = null, $unique = null)
  336. {
  337. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKLOW);
  338. return $this;
  339. }
  340. /**
  341. * Adds a background task to be run in parallel with other tasks
  342. * Call this method for all the tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
  343. *
  344. * @param string $name A GermanBundle registered function to be executed
  345. * @param string $params Parameters to send to task as string
  346. * @param Mixed &$context Application context to associate with a task
  347. * @param string $unique A unique ID used to identify a particular task
  348. *
  349. * @return GearmanClient Return this object
  350. */
  351. public function addTaskBackground($name, $params = '', &$context = null, $unique = null)
  352. {
  353. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKBACKGROUND);
  354. return $this;
  355. }
  356. /**
  357. * Adds a high priority background task to be run in parallel with other tasks.
  358. * Call this method for all the tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
  359. * Tasks with a high priority will be selected from the queue before those of normal or low priority.
  360. *
  361. * @param string $name A GermanBundle registered function to be executed
  362. * @param string $params Parameters to send to task as string
  363. * @param Mixed &$context Application context to associate with a task
  364. * @param string $unique A unique ID used to identify a particular task
  365. *
  366. * @return GearmanClient Return this object
  367. */
  368. public function addTaskHighBackground($name, $params = '', &$context = null, $unique = null)
  369. {
  370. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKHIGHBACKGROUND);
  371. return $this;
  372. }
  373. /**
  374. * Adds a low priority background task to be run in parallel with other tasks.
  375. * Call this method for all the tasks to be run in parallel, then call GearmanClient::runTasks() to perform the work.
  376. * Tasks with a low priority will be selected from the queue after those of normal or high priority.
  377. *
  378. * @param string $name A GermanBundle registered function to be executed
  379. * @param string $params Parameters to send to task as string
  380. * @param Mixed &$context Application context to associate with a task
  381. * @param string $unique A unique ID used to identify a particular task
  382. *
  383. * @return GearmanClient Return this object
  384. */
  385. public function addTaskLowBackground($name, $params = '', &$context = null, $unique = null)
  386. {
  387. $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKLOWBACKGROUND);
  388. return $this;
  389. }
  390. /**
  391. * Adds a task into the structure of tasks with included type of call
  392. *
  393. * @param string $name A GermanBundle registered function to be executed
  394. * @param string $params Parameters to send to task as string
  395. * @param Mixed $context Application context to associate with a task
  396. * @param string $unique A unique ID used to identify a particular task
  397. * @param string $method Method to perform
  398. *
  399. * @return GearmanClient Return this object
  400. */
  401. private function enqueueTask($name, $params, $context, $unique, $method)
  402. {
  403. $task = array(
  404. 'name' => $name,
  405. 'params' => $params,
  406. 'context' => $context,
  407. 'unique' => $unique,
  408. 'method' => $method,
  409. );
  410. $this->addTaskToStructure($task);
  411. return $this;
  412. }
  413. /**
  414. * Appends a task structure into taskStructure array
  415. *
  416. * @param array $task Task structure
  417. *
  418. * @return GearmanClient Return this object
  419. */
  420. private function addTaskToStructure(array $task)
  421. {
  422. $this->taskStructure[] = $task;
  423. return $this;
  424. }
  425. /**
  426. * For a set of tasks previously added with GearmanClient::addTask(), GearmanClient::addTaskHigh(),
  427. * GearmanClient::addTaskLow(), GearmanClient::addTaskBackground(), GearmanClient::addTaskHighBackground(),
  428. * or GearmanClient::addTaskLowBackground(), this call starts running the tasks in parallel.
  429. * Note that enough workers need to be available for the tasks to all run in parallel
  430. *
  431. * @return boolean run tasks result
  432. */
  433. public function runTasks()
  434. {
  435. $gearmanClient = new \GearmanClient();
  436. $this->assignServers($gearmanClient);
  437. $this->gearmanCallbacks->assignTaskCallbacks($gearmanClient);
  438. foreach ($this->taskStructure as $task) {
  439. $type = $task['method'];
  440. $jobName = $task['name'];
  441. $worker = $this->getJob($jobName);
  442. if (false !== $worker) {
  443. $gearmanClient->$type($worker['job']['realCallableName'], $task['params'], $task['context'], $task['unique']);
  444. }
  445. }
  446. $this->taskStructure = array();
  447. return $gearmanClient->runTasks();
  448. }
  449. /**
  450. * Fetches the Status of a special Job. You need the Job handle for the Task you want to check.
  451. * As long as the Job is known by the Gearmen-Servers it will return a true
  452. *
  453. * @param string $backgroundId the job handle string
  454. * @return bool
  455. */
  456. public function jobIsRunning($backgroundId)
  457. {
  458. $gearmanClient = new \GearmanClient();
  459. $this->assignServers($gearmanClient);
  460. $statusData = $gearmanClient->jobStatus($backgroundId);
  461. return isset($statusData[0]) && $statusData[0];
  462. }
  463. }