美文网首页
RabbitMQ与Laravel项目中结合

RabbitMQ与Laravel项目中结合

作者: 起个名字好难_98 | 来源:发表于2019-11-21 17:35 被阅读0次

搭建RabbitMQ环境不在此文范围内,后面会单独出搭建的教程资料

与Laravel的结合使用

1.composer引入官方php-amqplib/php-amqplib包

2.封装消息生产者

/**
 * 入消息队列
 *
 * @param $queue string 队列名
 * @param $data mixed 数据
 */
public static function pushMessageQueue($queue, $data = null)
{
    $connection  = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
    $channel = $connection->channel();

    $table = new AMQPTable([
        'x-queue-type' => 'classic'
    ]);

    $channel->queue_declare($queue, false, true, false, false, false, $table);

    $message = new AMQPMessage(json_encode($data, JSON_UNESCAPED_UNICODE));
    $channel->basic_publish($message, '', $queue);

    $channel->close();
    try {
        $connection->close();
    } catch (\Exception $e) {
    }
}

3.控制器调用封装好的入队方法

IndexController.php
SystemService::pushMessageQueue('other', ['date' => date('Y-m-d H:i:s')]);

4.封装消费者基类

<?php

namespace App\Console\Commands\Queue;

use PhpAmqpLib\Wire\AMQPTable;
use Illuminate\Console\Command;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Connection\AMQPStreamConnection;

class Base extends Command
{
    /**
     * rabbitMQ队列名称
     *
     * @var string
     */
    protected $queue = '';

    /**
     * rabbitMQ连接
     *
     * @var AMQPStreamConnection|null
     */
    protected $connection = null;

    /**
     * 连接频道
     *
     * @var \PhpAmqpLib\Channel\AMQPChannel|null
     */
    protected $channel = null;

    public function __construct()
    {
        parent::__construct();

        if (!empty($this->queue)) {
            $this->connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
            $this->channel = $this->connection->channel();
            $this->channel->queue_declare($this->queue, false, true, false, false, false, new AMQPTable([
                'x-queue-type' => 'classic'
            ]));

            $this->channel->basic_consume($this->queue, '', false, false, false, false, $this->handle());

            while (count($this->channel->callbacks)) {
                try {
                    $this->channel->wait();
                } catch (\ErrorException $exception) {
                }
            }
        }
    }

    public function handle()
    {
        return function ($message) {

        };
    }

    /**
     * 确认消息
     *
     * @param $message AMQPMessage 当前消息
     */
    protected function ack($message)
    {
        $this->channel->basic_ack($message->delivery_info['delivery_tag']);
    }

    /**
     * 拒收消息
     *
     * @param $message AMQPMessage 当前消息
     * @param $multiple bool 是否应用于多消息
     * @param $requeue bool 是否requeue
     */
    protected function nack($message, $multiple = false, $requeue = false)
    {
        $this->channel->basic_nack($message->delivery_info['delivery_tag'], $multiple, $requeue);
    }

    /**
     * 拒绝消息并选择是否重新入队
     *
     * @param $message AMQPMessage 当前消息
     * @param $requeue bool 是否requeue true则重新入队列(该消费者还是会消费到该条被reject的消息),否则丢弃或者进入死信队列。
     */
    protected function reject($message, $requeue = false)
    {
        $this->channel->basic_reject($message->delivery_info['delivery_tag'], $requeue);
    }

    /**
     * 是否恢复消息到队列
     *
     * @param $requeue bool true则重新入队列并且尽可能的将之前recover的消息投递给其他消费者消费,而不是自己再次消费,false则消息会重新被投递给自己
     */
    protected function recover($requeue = false)
    {
        $this->channel->basic_recover($requeue);
    }
}

5.other队列消费者

<?php

namespace App\Console\Commands\Queue;

class Other extends Base
{
    protected $queue = 'other';

    protected $signature = 'command:other';

    protected $description = '队列测试';

    public function handle()
    {
        return function ($message) {
            echo '收到消息:'.$message->body.PHP_EOL;
            // 业务....
            sleep(2);
            $this->ack($message);
        };
    }
}

6.Kernel文件进行注册artison命令

<?php

namespace App\Console;

use App\Console\Commands\Queue\Other;
use Illuminate\Console\Scheduling\Schedule;
use Illuminate\Foundation\Console\Kernel as ConsoleKernel;

class Kernel extends ConsoleKernel
{
    /**
     * The Artisan commands provided by your application.
     *
     * @var array
     */
    protected $commands = [
        Other::class
    ];

    /**
     * Define the application's command schedule.
     *
     * @param \Illuminate\Console\Scheduling\Schedule $schedule
     * @return void
     */
    protected function schedule(Schedule $schedule)
    {
        // $schedule->command('inspire')
        //          ->hourly();
    }

    /**
     * Register the commands for the application.
     *
     * @return void
     */
    protected function commands()
    {
        $this->load(__DIR__.'/Commands');

        require base_path('routes/console.php');
    }
}

7.代码写完了,运行一下看看效果

7.1模拟请求入队,直接请求对应控制器
7.2消费者输出
image.png
7.3RabbitMQ控制台监控
image.png
Tips:laravel需要注意下这里,去除composer执行完毕的自动发现包(php artsion package:discover)否则composer install/update会一直阻塞在消费队列监听。修改后如下图:
image.png

相关文章

网友评论

      本文标题:RabbitMQ与Laravel项目中结合

      本文链接:https://www.haomeiwen.com/subject/gyitwctx.html