123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697 |
- <?php
- /**
- * Gearman Bundle for Symfony2
- *
- * For the full copyright and license information, please view the LICENSE
- * file that was distributed with this source code.
- *
- * Feel free to edit as you please, and have fun.
- *
- * @author Marc Morera <yuhu@mmoreram.com>
- */
- namespace Mmoreram\GearmanBundle\Service;
- use Mmoreram\GearmanBundle\Dispatcher\GearmanCallbacksDispatcher;
- use Mmoreram\GearmanBundle\GearmanMethods;
- use Mmoreram\GearmanBundle\Generator\UniqueJobIdentifierGenerator;
- use Mmoreram\GearmanBundle\Module\JobStatus;
- use Mmoreram\GearmanBundle\Service\Abstracts\AbstractGearmanService;
- /**
- * GearmanClient. Implementation of AbstractGearmanService
- *
- * @since 2.3.1
- */
- class GearmanClient extends AbstractGearmanService
- {
- /**
- * @var \GearmanClient
- *
- * Gearman native client instance
- */
- protected $gearmanClient;
- /**
- * @var GearmanCallbacksDispatcher
- *
- * Gearman callbacks dispatcher
- */
- protected $gearmanCallbacksDispatcher;
- /**
- * @var array
- *
- * Server set to define in what server must connect to
- */
- protected $servers = array();
- /**
- * @var array
- *
- * task structure to store all about called tasks
- */
- protected $taskStructure = array();
- /**
- * @var array
- *
- * Set default servers
- */
- protected $defaultServers;
- /**
- * @var array
- *
- * Set default settings
- */
- protected $settings;
- /**
- * @var UniqueJobIdentifierGenerator
- *
- * Unique Job Intefier Generator
- */
- protected $uniqueJobIdentifierGenerator;
- /**
- * @var int
- *
- * Return code from internal client.
- */
- protected $returnCode;
- /**
- * @return \GearmanClient
- */
- public function getNativeClient(){
- if ($this->gearmanClient === null){
- $this->gearmanClient = new \GearmanClient();
- }
- return $this->gearmanClient;
- }
- /**
- * Init tasks structure
- *
- * @return GearmanClient self Object
- */
- public function initTaskStructure()
- {
- $this->taskStructure = array();
- return $this;
- }
- /**
- * Set default servers
- *
- * @param array $defaultServers Default servers
- *
- * @return GearmanClient self Object
- */
- public function setDefaultServers(array $defaultServers)
- {
- $this->defaultServers = $defaultServers;
- return $this;
- }
- /**
- * Set UniqueJobIdentifierGenerator object
- *
- * @param UniqueJobIdentifierGenerator $uniqueJobIdentifierGenerator Unique Job Intefier Generator
- *
- * @return GearmanClient self Object
- */
- public function setUniqueJobIdentifierGenerator(UniqueJobIdentifierGenerator $uniqueJobIdentifierGenerator)
- {
- $this->uniqueJobIdentifierGenerator = $uniqueJobIdentifierGenerator;
- return $this;
- }
- /**
- * Set gearman callbacks
- *
- * @param GearmanCallbacksDispatcher $gearmanCallbacksDispatcher Gearman callbacks dispatcher
- *
- * @return GearmanClient self Object
- */
- public function setGearmanCallbacksDispatcher(GearmanCallbacksDispatcher $gearmanCallbacksDispatcher)
- {
- $this->gearmanCallbacksDispatcher = $gearmanCallbacksDispatcher;
- return $this;
- }
- /**
- * Set default settings
- *
- * @param array $settings
- *
- * @return GearmanClient self Object
- */
- public function setDefaultSettings($settings)
- {
- $this->settings = $settings;
- return $this;
- }
- /**
- * Set server to client. Empty all servers and set this one
- *
- * @param string $servername Server name (must be ip)
- * @param int $port Port of server. By default 4730
- *
- * @return GearmanClient Returns self object
- */
- public function setServer($servername, $port = 4730)
- {
- $this
- ->clearServers()
- ->addServer($servername, $port);
- return $this;
- }
- /**
- * Add server to client
- *
- * @param string $servername Server name (must be ip)
- * @param int $port Port of server. By default 4730
- *
- * @return GearmanClient Returns self object
- */
- public function addServer($servername, $port = 4730)
- {
- $this->servers[] = array(
- 'host' => $servername,
- 'port' => $port,
- );
- return $this;
- }
- /**
- * Clear server list
- *
- * @return GearmanClient Returns self object
- */
- public function clearServers()
- {
- $this->servers = array();
- return $this;
- }
- /**
- * Get real worker from job name and enqueues the action given one
- * method.
- *
- * @param string $jobName A GearmanBundle registered function the worker is to execute
- * @param string $params Parameters to send to job as string
- * @param string $method Method to execute
- * @param string $unique A unique ID used to identify a particular task
- *
- * @return mixed Return result of the call. If worker is not valid, return false
- */
- protected function enqueue($jobName, $params, $method, $unique)
- {
- $worker = $this->getJob($jobName);
- $unique = $this
- ->uniqueJobIdentifierGenerator
- ->generateUniqueKey($jobName, $params, $unique, $method);
- return $worker
- ? $this->doEnqueue($worker, $params, $method, $unique)
- : false;
- }
- /**
- * Execute a GearmanClient call given a worker, params and a method.
- *
- * If he GarmanClient call is asyncronous, result value will be a handler.
- * Otherwise, will return job result.
- *
- * @param array $worker Worker definition
- * @param string $params Parameters to send to job as string
- * @param string $method Method to execute
- * @param string $unique A unique ID used to identify a particular task
- *
- * @return mixed Return result of the GearmanClient call
- */
- protected function doEnqueue(array $worker, $params, $method, $unique)
- {
- $gearmanClient = $this->getNativeClient();
- $this->assignServers($gearmanClient);
- $result = $gearmanClient->$method($worker['job']['realCallableName'], $params, $unique);
- $this->returnCode = $gearmanClient->returnCode();
- $this->gearmanClient = null;
- return $result;
- }
- /**
- * Given a GearmanClient, set all included servers
- *
- * @param \GearmanClient $gearmanClient Object to include servers
- *
- * @return GearmanClient Returns self object
- */
- protected function assignServers(\GearmanClient $gearmanClient)
- {
- $servers = $this->defaultServers;
- if (!empty($this->servers)) {
- $servers = $this->servers;
- }
- /**
- * We include each server into gearman client
- */
- foreach ($servers as $server) {
- $gearmanClient->addServer($server['host'], $server['port']);
- }
- return $this;
- }
- /**
- * Job methods
- */
- /**
- * Runs a single task and returns some result, depending of method called.
- * Method called depends of default callable method setted on gearman
- * settings or overwritted on work or job annotations
- *
- * @param string $name A GearmanBundle registered function the worker is to execute
- * @param string $params Parameters to send to job as string
- * @param string $unique A unique ID used to identify a particular task
- *
- * @return mixed result depending of method called.
- */
- public function callJob($name, $params = '', $unique = null)
- {
- $worker = $this->getJob($name);
- $methodCallable = $worker['job']['defaultMethod'];
- return $this->enqueue($name, $params, $methodCallable, $unique);
- }
- /**
- * Runs a single task and returns a string representation of the result.
- * It is up to the GearmanClient and GearmanWorker to agree on the format of
- * the result.
- *
- * The GearmanClient::do() method is deprecated as of pecl/gearman 1.0.0.
- * Use GearmanClient::doNormal().
- *
- * @param string $name A GearmanBundle registered function the worker is to execute
- * @param string $params Parameters to send to job as string
- * @param string $unique A unique ID used to identify a particular task
- *
- * @return string A string representing the results of running a task.
- * @deprecated
- */
- public function doJob($name, $params = '', $unique = null)
- {
- return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DONORMAL, $unique);
- }
- /**
- * Runs a single task and returns a string representation of the result.
- * It is up to the GearmanClient and GearmanWorker to agree on the format of
- * the result.
- *
- * @param string $name A GearmanBundle registered function the worker is to execute
- * @param string $params Parameters to send to job as string
- * @param string $unique A unique ID used to identify a particular task
- *
- * @return string A string representing the results of running a task.
- */
- public function doNormalJob($name, $params = '', $unique = null)
- {
- return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DONORMAL, $unique);
- }
- /**
- * Runs a task in the background, returning a job handle which can be used
- * to get the status of the running task.
- *
- * @param string $name A GearmanBundle registered function the worker is to execute
- * @param string $params Parameters to send to job as string
- * @param string $unique A unique ID used to identify a particular task
- *
- * @return string Job handle for the submitted task.
- */
- public function doBackgroundJob($name, $params = '', $unique = null)
- {
- return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOBACKGROUND, $unique);
- }
- /**
- * Runs a single high priority task and returns a string representation of
- * the result.
- *
- * It is up to the GearmanClient and GearmanWorker to agree on the format of
- * the result.
- *
- * High priority tasks will get precedence over normal and low priority
- * tasks in the job queue.
- *
- * @param string $name A GearmanBundle registered function the worker is to execute
- * @param string $params Parameters to send to job as string
- * @param string $unique A unique ID used to identify a particular task
- *
- * @return string A string representing the results of running a task.
- */
- public function doHighJob($name, $params = '', $unique = null)
- {
- return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOHIGH, $unique);
- }
- /**
- * Runs a high priority task in the background, returning a job handle which
- * can be used to get the status of the running task.
- *
- * High priority tasks take precedence over normal and low priority tasks in
- * the job queue.
- *
- * @param string $name A GearmanBundle registered function the worker is to execute
- * @param string $params Parameters to send to job as string
- * @param string $unique A unique ID used to identify a particular task
- *
- * @return string The job handle for the submitted task.
- */
- public function doHighBackgroundJob($name, $params = '', $unique = null)
- {
- return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOHIGHBACKGROUND, $unique);
- }
- /**
- * Runs a single low priority task and returns a string representation of
- * the result.
- *
- * It is up to the GearmanClient and GearmanWorker to agree on the format of
- * the result.
- *
- * Normal and high priority tasks will get precedence over low priority
- * tasks in the job queue.
- *
- * @param string $name A GearmanBundle registered function the worker is to execute
- * @param string $params Parameters to send to job as string
- * @param string $unique A unique ID used to identify a particular task
- *
- * @return string A string representing the results of running a task.
- */
- public function doLowJob($name, $params = '', $unique = null)
- {
- return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOLOW, $unique);
- }
- /**
- * Runs a low priority task in the background, returning a job handle which
- * can be used to get the status of the running task.
- *
- * Normal and high priority tasks will get precedence over low priority
- * tasks in the job queue.
- *
- * @param string $name A GearmanBundle registered function the worker is to execute
- * @param string $params Parameters to send to job as string
- * @param string $unique A unique ID used to identify a particular task
- *
- * @return string The job handle for the submitted task.
- */
- public function doLowBackgroundJob($name, $params = '', $unique = null)
- {
- return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOLOWBACKGROUND, $unique);
- }
- /**
- * Fetches the Status of a special Background Job.
- *
- * @param string $idJob The job handle string
- *
- * @return JobStatus Job status
- */
- public function getJobStatus($idJob)
- {
- $gearmanClient = $this->getNativeClient();
- $this->assignServers($gearmanClient);
- $statusData = $gearmanClient->jobStatus($idJob);
- $jobStatus = new JobStatus($statusData);
- $this->gearmanClient = null;
- return $jobStatus;
- }
- /**
- * Gets the return code from the last run job.
- *
- * @return int
- */
- public function getReturnCode()
- {
- return $this->returnCode;
- }
- /**
- * Task methods
- */
- /**
- * Adds a task to be run in parallel with other tasks.
- * Call this method for all the tasks to be run in parallel, then call
- * GearmanClient::runTasks() to perform the work.
- *
- * Note that enough workers need to be available for the tasks to all run in
- * parallel.
- *
- * @param string $name A GermanBundle registered function to be executed
- * @param string $params Parameters to send to task as string
- * @param Mixed &$context Application context to associate with a task
- * @param string $unique A unique ID used to identify a particular task
- *
- * @return GearmanClient Return this object
- */
- public function addTask($name, $params = '', &$context = null, $unique = null)
- {
- $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASK);
- return $this;
- }
- /**
- * Adds a high priority task to be run in parallel with other tasks.
- * Call this method for all the high priority tasks to be run in parallel,
- * then call GearmanClient::runTasks() to perform the work.
- *
- * Tasks with a high priority will be selected from the queue before those
- * of normal or low priority.
- *
- * @param string $name A GermanBundle registered function to be executed
- * @param string $params Parameters to send to task as string
- * @param Mixed &$context Application context to associate with a task
- * @param string $unique A unique ID used to identify a particular task
- *
- * @return GearmanClient Return this object
- */
- public function addTaskHigh($name, $params = '', &$context = null, $unique = null)
- {
- $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKHIGH);
- return $this;
- }
- /**
- * Adds a low priority background task to be run in parallel with other
- * tasks.
- *
- * Call this method for all the tasks to be run in parallel, then call
- * GearmanClient::runTasks() to perform the work.
- *
- * Tasks with a low priority will be selected from the queue after those of
- * normal or low priority.
- *
- * @param string $name A GermanBundle registered function to be executed
- * @param string $params Parameters to send to task as string
- * @param Mixed &$context Application context to associate with a task
- * @param string $unique A unique ID used to identify a particular task
- *
- * @return GearmanClient Return this object
- */
- public function addTaskLow($name, $params = '', &$context = null, $unique = null)
- {
- $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKLOW);
- return $this;
- }
- /**
- * Adds a background task to be run in parallel with other tasks
- * Call this method for all the tasks to be run in parallel, then call
- * GearmanClient::runTasks() to perform the work.
- *
- * @param string $name A GermanBundle registered function to be executed
- * @param string $params Parameters to send to task as string
- * @param Mixed &$context Application context to associate with a task
- * @param string $unique A unique ID used to identify a particular task
- *
- * @return GearmanClient Return this object
- */
- public function addTaskBackground($name, $params = '', &$context = null, $unique = null)
- {
- $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKBACKGROUND);
- return $this;
- }
- /**
- * Adds a high priority background task to be run in parallel with other
- * tasks.
- *
- * Call this method for all the tasks to be run in parallel, then call
- * GearmanClient::runTasks() to perform the work.
- *
- * Tasks with a high priority will be selected from the queue before those
- * of normal or low priority.
- *
- * @param string $name A GermanBundle registered function to be executed
- * @param string $params Parameters to send to task as string
- * @param Mixed &$context Application context to associate with a task
- * @param string $unique A unique ID used to identify a particular task
- *
- * @return GearmanClient Return this object
- */
- public function addTaskHighBackground($name, $params = '', &$context = null, $unique = null)
- {
- $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKHIGHBACKGROUND);
- return $this;
- }
- /**
- * Adds a low priority background task to be run in parallel with other
- * tasks.
- *
- * Call this method for all the tasks to be run in parallel, then call
- * GearmanClient::runTasks() to perform the work.
- *
- * Tasks with a low priority will be selected from the queue after those of
- * normal or high priority.
- *
- * @param string $name A GermanBundle registered function to be executed
- * @param string $params Parameters to send to task as string
- * @param Mixed &$context Application context to associate with a task
- * @param string $unique A unique ID used to identify a particular task
- *
- * @return GearmanClient Return this object
- */
- public function addTaskLowBackground($name, $params = '', &$context = null, $unique = null)
- {
- $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKLOWBACKGROUND);
- return $this;
- }
- /**
- * Adds a task into the structure of tasks with included type of call
- *
- * @param string $name A GermanBundle registered function to be executed
- * @param string $params Parameters to send to task as string
- * @param Mixed $context Application context to associate with a task
- * @param string $unique A unique ID used to identify a particular task
- * @param string $method Method to perform
- *
- * @return GearmanClient Return this object
- */
- protected function enqueueTask($name, $params, &$context, $unique, $method)
- {
- $contextReference = array('context' => &$context);
- $task = array(
- 'name' => $name,
- 'params' => $params,
- 'context' => $contextReference,
- 'unique' => $this->uniqueJobIdentifierGenerator->generateUniqueKey($name, $params, $unique, $method),
- 'method' => $method,
- );
- $this->addTaskToStructure($task);
- return $this;
- }
- /**
- * Appends a task structure into taskStructure array
- *
- * @param array $task Task structure
- *
- * @return GearmanClient Return this object
- */
- protected function addTaskToStructure(array $task)
- {
- $this->taskStructure[] = $task;
- return $this;
- }
- /**
- * For a set of tasks previously added with
- *
- * GearmanClient::addTask(),
- * GearmanClient::addTaskHigh(),
- * GearmanClient::addTaskLow(),
- * GearmanClient::addTaskBackground(),
- * GearmanClient::addTaskHighBackground(),
- * GearmanClient::addTaskLowBackground(),
- *
- * this call starts running the tasks in parallel.
- * Note that enough workers need to be available for the tasks to all run in parallel
- *
- * @return boolean run tasks result
- */
- public function runTasks()
- {
- $gearmanClient = $this->getNativeClient();
- $this->assignServers($gearmanClient);
- if ($this->settings['callbacks']) {
- $this->gearmanCallbacksDispatcher->assignTaskCallbacks($gearmanClient);
- }
- foreach ($this->taskStructure as $task) {
- $type = $task['method'];
- $jobName = $task['name'];
- $worker = $this->getJob($jobName);
- if (false !== $worker) {
- $gearmanClient->$type(
- $worker['job']['realCallableName'],
- $task['params'],
- $task['context'],
- $task['unique']
- );
- }
- }
- $this->initTaskStructure();
- $this->gearmanClient = null;
- return $gearmanClient->runTasks();
- }
- }
|