zoukankan      html  css  js  c++  java
  • PHP 中使用 ElasticSearch 的最佳实践 (下)

    引言

    上一篇文章,我们使用同步的方式将数据,同步写入到 ElasticSearch 中。接下来的这篇文章,主要介绍使用 RabbitMQ 的方式,异步的将数据同步到 ElasticSearch 。

    部分实践代码

    创建商品

    /**
     * 创建商品数据
     * @param Request $request
     * @return IlluminateHttpJsonResponse
     */
    public function createProduct(Request $request)
    {
        $title = $request->request->get(ProductModel::TITLE);
        $longTitle = $request->request->get(ProductModel::LONG_TITLE);
        $description = $request->request->get(ProductModel::DESCRIPTION);
        $sku = $request->request->get(ProductModel::SKU);
        $price = $request->request->get(ProductModel::PRICE);
        $sales = $request->request->get(ProductModel::SALES);
    
        $nowTime = date("Y-m-d H:i:s");
        // 商品数据写入 DB
        $productId = DB::table(ProductModel::TABLE_NAME)->insertGetId([
            ProductModel::TITLE         => $title,
            ProductModel::LONG_TITLE    => $longTitle,
            ProductModel::DESCRIPTION   => $description,
            ProductModel::SKU           => $sku,
            ProductModel::PRICE         => $price,
            ProductModel::SALES         => $sales,
            ProductModel::CREATED_AT    => $nowTime,
            ProductModel::UPDATED_AT    => $nowTime
        ]);
    
    
        $params = [
            ProductModel::PRODUCT_ID    => $productId,
            ProductModel::TITLE         => $title,
            ProductModel::LONG_TITLE    => $longTitle,
            ProductModel::DESCRIPTION   => $description,
            ProductModel::SKU           => $sku,
            ProductModel::PRICE         => $price,
            ProductModel::SALES         => $sales,
            ProductModel::CREATED_AT    => $nowTime,
            ProductModel::UPDATED_AT    => $nowTime
        ];
    
        // 将数据投递到 RabbitMQ
        $this->routingKey = self::PRODUCT_CREATE;
        $this->publishMsg($params);
    
        return Response()->json(['code' => 0, 'msg' => 'success']);
    }
    

    删除商品

    /**
     * 删除商品数据
     * @param Request $request
     * @return IlluminateHttpJsonResponse
     */
    public function deleteProduct(Request $request)
    {
        $productId = $request->request->get(ProductModel::PRODUCT_ID);
    
        // 删除 DB 中的商品数据
        DB::table(ProductModel::TABLE_NAME)->where(ProductModel::PRODUCT_ID, $productId)->delete();
    
        $params = [
            ProductModel::PRODUCT_ID => $productId,
        ];
    
        // 将数据投递到 RabbitMQ
        $this->routingKey = self::PRODUCT_DELETE;
        $this->publishMsg($params);
    
        return Response()->json(['code' => 0, 'msg' => 'success']);
    }
    

    更新商品

    /**
     * 更新商品数据
     * @param Request $request
     * @return IlluminateHttpJsonResponse
     */
    public function updateProduct(Request $request)
    {
        $productId = $request->request->get(ProductModel::PRODUCT_ID);
        $title = $request->request->get(ProductModel::TITLE);
        $longTitle = $request->request->get(ProductModel::LONG_TITLE);
        $description = $request->request->get(ProductModel::DESCRIPTION);
        $sku = $request->request->get(ProductModel::SKU);
        $price = $request->request->get(ProductModel::PRICE);
        $sales = $request->request->get(ProductModel::SALES);
    
        $nowTime = date("Y-m-d H:i:s");
        // 商品数据更新到 DB
        DB::table(ProductModel::TABLE_NAME)
            ->where(ProductModel::PRODUCT_ID, $productId)
            ->update([
                ProductModel::TITLE         => $title,
                ProductModel::LONG_TITLE    => $longTitle,
                ProductModel::DESCRIPTION   => $description,
                ProductModel::SKU           => $sku,
                ProductModel::PRICE         => $price,
                ProductModel::SALES         => $sales,
                ProductModel::UPDATED_AT    => $nowTime
            ]);
    
    
        $params = [
            ProductModel::PRODUCT_ID    => $productId,
            ProductModel::TITLE         => $title,
            ProductModel::LONG_TITLE    => $longTitle,
            ProductModel::DESCRIPTION   => $description,
            ProductModel::SKU           => $sku,
            ProductModel::PRICE         => $price,
            ProductModel::SALES         => $sales,
            ProductModel::UPDATED_AT    => $nowTime
        ];
    
        // 将数据投递到 RabbitMQ
        $this->routingKey = self::PRODUCT_UPDATE;
        $this->publishMsg($params);
    
        return Response()->json(['code' => 0, 'msg' => 'success']);
    }
    

    获取单个商品数据

    /**
     * 获取单个商品数据
     * @param Request $request
     * @return IlluminateHttpJsonResponse
     */
    public function getProductInfo(Request $request)
    {
        $productId = $request->request->get(ProductModel::PRODUCT_ID);
    
        $params = [
            'id'    => $productId,
            'index' => self::INDEX,
            'type'  => self::TYPE,
        ];
        $this->client->get($params);
    
        return Response()->json(['code' => 0, 'msg' => 'success']);
    }
    

    搜索商品数据

    /**
     * 搜索商品数据
     * @param Request $request
     * @return IlluminateHttpJsonResponse
     */
    public function getProductList(Request $request)
    {
        $params = [
            'index' => self::INDEX,
            'type'  => self::TYPE,
        ];
        $this->client->search($params);
    
        return Response()->json(['code' => 0, 'msg' => 'success']);
    }
    

    通过订阅的方式,同步数据到 ElasticSearch

    /**
     * Execute the console command.
     *
     * @return mixed
     */
    public function handle()
    {
        $this->listen(function($msg) {
            $routingKey = $msg->delivery_info['routing_key'];
            $body = unserialize($msg->body);
            $this->mapping[$routingKey]($body);
    
            #当no_ack=false时, 需要写下行代码,否则可能出现内存不足情况#$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
        });
    }
    
    /**
     * 商品数据写入 ES
     * @param $body
     */
    protected function createProduct($body)
    {
        $params = [
            'body' => [
                ProductModel::PRODUCT_ID    => $body[ProductModel::PRODUCT_ID],
                ProductModel::TITLE         => $body[ProductModel::TITLE],
                ProductModel::LONG_TITLE    => $body[ProductModel::LONG_TITLE],
                ProductModel::DESCRIPTION   => $body[ProductModel::DESCRIPTION],
                ProductModel::SKU           => $body[ProductModel::SKU],
                ProductModel::PRICE         => $body[ProductModel::PRICE],
                ProductModel::SALES         => $body[ProductModel::SALES],
                ProductModel::CREATED_AT    => $body[ProductModel::CREATED_AT],
                ProductModel::UPDATED_AT    => $body[ProductModel::UPDATED_AT]
            ],
            'id'    => $body[ProductModel::PRODUCT_ID],
            'index' => self::INDEX,
            'type'  => self::TYPE,
        ];
    
        // 商品数据写入 ES
        $this->client->create($params);
    }
    
    /**
     * 更新 ES 中的商品数据
     * @param $body
     */
    protected function updateProduct($body)
    {
        $params = [
            'body' => [
                ProductModel::PRODUCT_ID    => $body[ProductModel::PRODUCT_ID],
                ProductModel::TITLE         => $body[ProductModel::TITLE],
                ProductModel::LONG_TITLE    => $body[ProductModel::LONG_TITLE],
                ProductModel::DESCRIPTION   => $body[ProductModel::DESCRIPTION],
                ProductModel::SKU           => $body[ProductModel::SKU],
                ProductModel::PRICE         => $body[ProductModel::PRICE],
                ProductModel::SALES         => $body[ProductModel::SALES],
                ProductModel::CREATED_AT    => $body[ProductModel::CREATED_AT],
                ProductModel::UPDATED_AT    => $body[ProductModel::UPDATED_AT]
            ],
            'id'    => $body[ProductModel::PRODUCT_ID],
            'index' => self::INDEX,
            'type'  => self::TYPE,
        ];
    
        // 商品数据更新到 ES
        $this->client->update($params);
    
    }
    
    /**
     * 删除 ES 中的商品数据
     * @param $body
     */
    protected function deleteProduct($body)
    {
        // 删除 ES 中的商品数据
        $params = [
            'id'    => $body[ProductModel::PRODUCT_ID],
            'index' => self::INDEX,
            'type'  => self::TYPE,
        ];
        $this->client->delete($params);
    }
    

    小结

    通过异步的方式同步数据到 ElasticSearch,可以提高系统的并发处理能力。

    附:Github 代码

  • 相关阅读:
    5 Longest Palindromic Substring(easy)
    4 Median of Two Sorted Arrays(medium)
    前言
    3 Longest Substring Without Repeating Characters(medium)
    JavaScript&jQuery学习笔记
    禅道、jenkins部署记录
    jenkins部署自动化项目备注
    pytest
    我的第一个py爬虫-小白(beatifulsoup)
    单元测试python unittest
  • 原文地址:https://www.cnblogs.com/yxhblogs/p/13419554.html
Copyright © 2011-2022 走看看