美文网首页
php使用kafka

php使用kafka

作者: 段义纬 | 来源:发表于2021-07-28 10:37 被阅读0次

Ⅰ、kafka扩展安装参考

Ⅱ、代码
1、生产者

<?php
class KafkaProducer
{
    const TAG = 'KafkaProducer';

    /** @var RdKafka\ProducerTopic */
    private static $topic;

    /** @var  RdKafka\Producer */
    private static $producer;

    /**
     * @param string $brokerList
     * @param string $topic
     * @param array $isSsl 
     * @return KafkaProducer
     * @throws Exception
     */
    public static function init($brokerList, $topic, $isSsl = null)
    {
        if (!extension_loaded('rdkafka')) {
            throw new Exception("no rdkafka extension", 1);
        }

        $conf = new RdKafka\Conf();

        $conf->set('metadata.broker.list', $brokerList);
        $conf->set('broker.version.fallback', '1.0.0');
        $conf->set('compression.type', 'lz4');
        $conf->set("retries", 3);
        $conf->set("retry.backoff.ms", 1000);

        $conf->setDrMsgCb(function ($kafka, $message) {
//            static::log('推送成功:' . var_export($message, true));
        });
        $conf->setErrorCb(function ($kafka, $err, $reason) {
            throw new Exception('推送失败:' . rd_kafka_err2str($err) . '。缘由:' . $reason);
        });

        if ($isSsl) {
            $conf->set('security.protocol', KAFKA_SECURITY_PROTOCOL);
            $conf->set('sasl.mechanism', KAFKA_SASL_MECHANISM);
            $conf->set('sasl.username', $isSsl['username']);
            $conf->set('sasl.password', $isSsl['password']);
        }

        static::$producer = new RdKafka\Producer($conf);
        static::$topic = static::$producer->newTopic($topic);

        return new KafkaProducer();
    }

    /**
     * @param $message
     * @param int $retries
     * @throws Exception
     */
    public function producer($message, $retries = 10)
    {
        try {
            static::$topic->produce(RD_KAFKA_PARTITION_UA, 0, json_encode($message));

            static::$producer->poll(0);

            // 线上版本不支持flush方法
            if (!method_exists(static::$producer, 'flush')) {
                return;
            }

            $result = RD_KAFKA_RESP_ERR_NO_ERROR;

            for ($flushRetries = 0; $flushRetries < $retries; $flushRetries++) {
                $result = static::$producer->flush(10000);
                if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
                    break;
                }
            }

            if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
                throw new \RuntimeException('Was unable to flush, messages might be lost!');
            }
        } catch (Exception $e) {
            $this->log(array('msg' => json_encode($message), 'err_msg' => $e->getMessage()));
            throw $e;
        }
    }

    public function log($message)
    {
        $message = PHP_EOL . "时间:" . date('Y-m-d H:i:s') . PHP_EOL . var_export($message, true) . PHP_EOL;
        error_log($message, 3, ROOT_DIR . '/log/KafkaError.log');
    }
}

2、消费者

<?php
<?php
require_once dirname(__FILE__) . '/../constant.php';

class KafkaConsumer
{
    /** @var RdKafka\ConsumerTopic */
    private static $topic;

    /** @var  RdKafka\KafkaConsumer */
    private static $consumer;

    /**
     * @param string $brokerList
     * @param string $groupId
     * @param string $topic
     * @param array $isSsl 公司集群每个topic都得申请认证用户和密码
     * @return KafkaConsumer
     * @throws Exception
     */
    public static function init($brokerList, $groupId, $topic, $isSsl = null)
    {
        if (!extension_loaded('rdkafka')) {
            throw new Exception("no rdkafka extension", 1);
        }
        $conf = new \RdKafka\Conf();
        $conf->setRebalanceCb(function (\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
            switch ($err) {
                case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
                    $kafka->assign($partitions);
                    break;

                case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
                    $kafka->assign(NULL);
                    break;
                default:
                    throw new \Exception($err);
            }
        });
        if ($isSsl) {
            $conf->set('security.protocol', KAFKA_SECURITY_PROTOCOL);
            $conf->set('sasl.mechanism', KAFKA_SASL_MECHANISM);
            $conf->set('sasl.username', $isSsl['username']);
            $conf->set('sasl.password', $isSsl['password']);
        }
        $conf->setErrorCb(function ($kafka, $err, $reason) {
            throw new Exception('消费失败:' . rd_kafka_err2str($err) . '。缘由:' . $reason);
        });
        $conf->set('group.id', $groupId);
        $conf->set('metadata.broker.list', $brokerList);
        $conf->set('topic.metadata.refresh.interval.ms', 60000);    // Topic metadata 刷新间隔,毫秒。metadata 自动刷新错误和连接。设置为 -1 关闭刷新间隔。
        $conf->set('socket.keepalive.enable', true);          // Broker sockets 允许 TCP 保持活力

        $conf->set('auto.commit.interval.ms', 100);
        $conf->set('auto.offset.reset', 'latest');//smallest

        static::$consumer = new \RdKafka\KafkaConsumer($conf);
        static::$consumer->subscribe([$topic]);

        return new KafkaConsumer();
    }

    /**
     * @param callable|null $callback
     * @throws Exception
     */
    public static function consumer($callback = null)
    {
        try {
            $message = static::$consumer->consume(120 * 1000);
            switch ($message->err) {
                case RD_KAFKA_RESP_ERR_NO_ERROR:
                    if ($callback)
                        call_user_func($callback, $message);
                    break;
                case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                    throw new \Exception("No more messages; will wait for more\n");
                    break;
                case RD_KAFKA_RESP_ERR__TIMED_OUT:
                    throw new \Exception("Timed out\n");
                    break;
                default:
                    throw new \Exception($message->errstr(), $message->err);
                    break;
            }
        } catch (Exception $e) {
            $message = PHP_EOL . "时间:" . date('Y-m-d H:i:s') . PHP_EOL . var_export($e->getMessage(), true) . PHP_EOL;
            error_log($message, 3, ROOT_DIR . '/log/KafkaError.log');
            throw $e;
        }
    }
}

3、生产者调用

try {
    $brokerList = '127.0.0.1:9092';
    $producer = KafkaProducer::init($brokerList, 'test_cluster',
        array('username' => 'writer', 'password' => '123456'));
    for ($i = 0; $i < 10; $i++) {
        $producer->producer(array('bc' => 201700, 'numd' => $i), 2);
    }
} catch (Exception $e) {
    var_dump($e->getMessage());
}

4、消费者调用

try {
    $brokerList = '127.0.0.1:9092';
    $kc = KafkaConsumer::init(
        $brokerList,
        'test-consumer-group',
        'test_cluster',
        array('username' => 'reader', 'password' => '123456')
    );
    while (true) {
        $kc->consumer(function ($message) {
            var_dump($message);
        });
    }
} catch (Exception $e) {
    var_dump($e->getMessage());
}

相关文章

  • 使用php连接操作kafka

    使用php连接操作kafka,从安装kafka到引入php扩展来操作kafka。 一、安装 注:需安装JDK 1....

  • php 使用kafka

    准备工作 安装librdkafka 库 安装php-rdkafka 扩展 编码实现 生产者 代码实现 检验发送是否...

  • php使用kafka

    Ⅰ、kafka扩展安装参考[https://www.jianshu.com/p/c3eea6f43c4d] Ⅱ、代...

  • 消息中间件Kafka - PHP操作使用Kafka

    PHP使用Kafka 我们需要安装libkafka和rdkafka 安装libkafka 下载去GitHub上克隆...

  • PHP安装使用kafka

    1.安装java环境并配置环境变量2.下载kafka安装包并解压3.安装librdkafka库 4.安装php-r...

  • php中使用kafka

    早想用消息队列了,但是一直没有动手今天搞起来 安装kafka wget https://mirror.bit.ed...

  • PHP 中使用Kafka

    3个问题 安装 :a. 看kafka官方文档会知道,kafka主要给java用的,其他的语言多是通过c/c++的a...

  • PHP使用kafka入门

    第一步,kafka安装使用入门 0:环境准备,Linux环境(CentOS为例),wget命令,git命令,jav...

  • kafka php版本

    https://github.com/search?l=PHP&q=kafka-php&type=Reposito...

  • See-KafKa 简单舒适的PHP-KafKa拓展

    See-KafKa 简单舒适的PHP-KafKa拓展 前言 (Simple 简单 easy 容易 expand 的...

网友评论

      本文标题:php使用kafka

      本文链接:https://www.haomeiwen.com/subject/aeqzmltx.html