Browse Source

Command AMQP producer y consumer

Guillermo Espinoza 7 years ago
parent
commit
6c772fcac0

+ 46 - 0
Command/AMQPRemoteCommand.php

@@ -0,0 +1,46 @@
+<?php
+
+namespace WorkflowBundle\Command;
+
+use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
+use Symfony\Component\Console\Input\InputInterface;
+use Symfony\Component\Console\Input\InputOption;
+use Symfony\Component\Console\Output\OutputInterface;
+
+class AMQPRemoteCommand extends ContainerAwareCommand
+{
+
+    protected function configure()
+    {
+        $this
+            ->setName('amqp:remote')
+            ->setDescription('Run a command on an AMQP Consumer')
+            ->setHelp('Run a command on an AMQP Consumer')
+            ->addArgument('name', InputOption::VALUE_REQUIRED, 'Command to execute')
+            ->addOption('args', null, InputOption::VALUE_OPTIONAL|InputOption::VALUE_IS_ARRAY, 'Optional Commands arguments. e.g. --args=key:value --args=key1:value1')
+        ;
+    }
+
+    /**
+     * @param InputInterface $input
+     * @param OutputInterface $output
+     */
+    protected function execute(InputInterface $input, OutputInterface $output)
+    {
+        $name = $input->getArgument('name');
+        $args = $input->getOption('args');
+        if ($name) {
+            $msg = array(
+                'name' => $name,
+                'args' => $args,
+            );
+            $producer = $this->getContainer()->get('old_sound_rabbit_mq.command_producer_producer');
+            $producer->publish(serialize(compact('name', 'args')));
+            
+            $output->writeln('<info>Command executed!</info>');
+        } else {
+            $output->writeln('<error>Enter a valid command</error>');
+        }
+    }
+
+}

+ 12 - 0
Resources/config/rabbit_mq/config.yml

@@ -24,9 +24,21 @@ old_sound_rabbit_mq:
             connection:       default
             exchange_options: {name: 'send', type: direct}
             service_alias:    fd_tasklogger_service # no alias by default
+        
+        command_producer:
+            connection:       default
+            exchange_options: {name: 'command', type: direct}
+            service_alias:    command_producer_service # no alias by default
+            
     consumers:
         flowdat_tasklogger:
             connection:       default
             exchange_options: {name: 'send', type: direct}
             queue_options:    {name: 'send'}
             callback:         flowdat_tasklogger_service # sf service id
+            
+        command_consumer:
+            connection:       default
+            exchange_options: {name: 'command', type: direct}
+            queue_options:    {name: 'command'}
+            callback:         command_consumer_service # sf service id

+ 5 - 0
Resources/config/rabbit_mq/services.yml

@@ -1,6 +1,7 @@
 parameters:
     tasklogger_service.class: WorkflowBundle\Services\TaskLoggerService
     workflow.producer_service.class: WorkflowBundle\Services\ProducerService
+    command_consumer_service.class: WorkflowBundle\Services\CommandConsumer
 
 services:
     flowdat_tasklogger_service:
@@ -9,3 +10,7 @@ services:
     workflow.producer_service:
         class: '%workflow.producer_service.class%'
         arguments: ['@service_container']
+        
+    command_consumer_service:
+        class: '%command_consumer_service.class%'
+        arguments: ['@service_container']

+ 74 - 0
Services/CommandConsumer.php

@@ -0,0 +1,74 @@
+<?php
+
+namespace WorkflowBundle\Services;
+
+use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
+use PhpAmqpLib\Message\AMQPMessage;
+use Symfony\Bundle\FrameworkBundle\Console\Application;
+use Symfony\Component\DependencyInjection\ContainerInterface;
+use Symfony\Component\Console\Input\ArrayInput;
+use Symfony\Component\Console\Output\BufferedOutput;
+
+class CommandConsumer
+{
+
+    /**
+     * @var ContainerInterface
+     */
+    protected $serviceContainer;
+
+
+    /**
+     * @param ContainerInterface $serviceContainer
+     */
+    public function __construct(ContainerInterface $serviceContainer)
+    {
+        $this->serviceContainer = $serviceContainer;
+    }
+
+    /**
+     * $msg will be an instance of `PhpAmqpLib\Message\AMQPMessage` 
+     * with the $msg->body being the data sent over RabbitMQ.
+     * 
+     * @param AMQPMessage $msg
+     */
+    public function execute(AMQPMessage $msg)
+    {
+        $msgBody = unserialize($msg->getBody());
+        if (isset($msgBody['name']) && isset($msgBody['args'])) {
+            // command name and args
+            $name = $msgBody['name'];
+            $args = $msgBody['args'];
+
+            $kernel = $this->serviceContainer->get('kernel');
+            $application = new Application($kernel);
+            $application->setAutoExit(false);
+
+            $input = array('command' => $name,);
+            foreach ($args as $arg) {
+                $pieces = explode(':', $arg);
+                if (isset($pieces[0]) && isset($pieces[1])) {
+                    $input["--{$pieces[0]}"] = $pieces[1];
+                } elseif (isset($pieces[0])) {
+                    $input[] = $pieces[0];
+                }
+            }
+            
+            $input = new ArrayInput($input);
+
+            $output = new BufferedOutput();
+
+            $application->run($input, $output);
+
+            // return the output
+            $content = $output->fetch();
+            
+            echo $content;
+
+            return true;
+        }
+
+        return false;
+    }
+
+}