zoukankan      html  css  js  c++  java
  • php 使用 ElasticSearch/es 的最佳实践

    PHP 使用elasticsearch

    composer require elasticsearch/elasticsearch

    会自动加载合适的版本!我的php是7.1的,它会自动加载7.9的elasticsearch版本!

    elasticsearch 的安装:

    https://www.cnblogs.com/-mrl/p/13854210.html

    elasticsearch的简单使用:

    例子1:

    <?php
    
    require 'vendor/autoload.php';
    use ElasticsearchClientBuilder;
    
    //https://www.elastic.co/guide/cn/elasticsearch/php/current/_quickstart.html#_quickstart
    //$client = ClientBuilder::create()->build();
    //如果没有设置主机地址默认为127.0.0.1:9200
    $client = ElasticsearchClientBuilder::create()->setHosts(['127.0.0.1:9200'])->build();
    //var_dump($client);
    //索引一个文档
    echo '索引一个文档'.PHP_EOL;
    $params = [
        'index' => 'my_index',
        'type' => 'my_type',
        'id' => 'my_id',
        'body' => ['testField' => 'abc']
    ];
    $response = $client->index($params);
    pr($response);
    
    //获取一个文档
    echo '获取一个文档'.PHP_EOL;
    $params = [
        'index' => 'my_index',
        'type' => 'my_type',
        'id' => 'my_id'
    ];
    
    $response = $client->get($params);
    pr($response);
    echo '搜索一个文档'.PHP_EOL;
    //搜索一个文档
    $params = [
        'index' => 'my_index',
        'type' => 'my_type',
        'body' => [
            'query' => [
                'match' => [
                    'testField' => 'abc'
                ]
            ]
        ]
    ];
    
    $response = $client->search($params);
    pr($response);
    
    
    //DIE;
    echo '删除一个文档'.PHP_EOL;
    //删除一个文档
    $params = [
        'index' => 'my_index',
        'type' => 'my_type',
        'id' => 'my_id'
    ];
    $response = $client->delete($params);
    pr($response);
    
    echo '删除一个索引'.PHP_EOL;
    //删除一个索引
    $deleteParams = [
        'index' => 'my_index'
    ];
    $response = $client->indices()->delete($deleteParams);
    pr($response);
    
    
    echo '创建一个索引'.PHP_EOL;
    //创建一个索引
    $params = [
        'index' => 'my_index',
        'body' => [
            'settings' => [
                'number_of_shards' => 2,
                'number_of_replicas' => 0
            ]
        ]
    ];
    
    $response = $client->indices()->create($params);
    pr($response);
    
    function pr($response){
        echo '<pre>';
        print_r($response);
        echo '</pre>';
    }
    View Code

    例子2:

    先在 MySQL 中创建一张数据表 product,建表语句如下:

    CREATE TABLE `product` (
      `id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '商品 ID',
      `title` varchar(64) NOT NULL DEFAULT '' COMMENT '商品名称',
      `long_title` varchar(64) NOT NULL DEFAULT '' COMMENT '商品长名称',
      `sku` varchar(32) NOT NULL DEFAULT '' COMMENT '商品 SKU',
      `money` int(10) NOT NULL DEFAULT '0' COMMENT '商品价格',
      `sales` int(11) NOT NULL DEFAULT '0' COMMENT '商品销量',
      `created_at` datetime DEFAULT NULL COMMENT '创建时间',
      `updated_at` datetime DEFAULT NULL COMMENT '修改时间',
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=15 DEFAULT CHARSET=utf8mb4;

    创建test.php

    <?php
    
    require 'vendor/autoload.php';
    require 'Db.php';
    
    use ElasticsearchClientBuilder;
    
    const ES_INDEX = 'product_index';
    const ES_TYPE = 'product';
    
    
    $nowTime = date("Y-m-d H:i:s");
    //https://www.elastic.co/guide/cn/elasticsearch/php/current/_quickstart.html#_quickstart
    //$client = ClientBuilder::create()->build();
    //如果没有设置主机地址默认为127.0.0.1:9200
    $client = ElasticsearchClientBuilder::create()->setHosts(['127.0.0.1:9200'])->build();
    //var_dump($client);
    $db = new Db();
    //print_r($db);die;
    //http://localhost/es/test.php?do=create
    if (isset($_GET['do']) && $_GET['do'] == 'create') {    // 只能创建一次
        //通过设置mapping结构创建一个索引库(相当于mysql创建一个数据库)
        $params = [
            'index' => ES_INDEX,  //索引名(相当于mysql的数据库)
            /*'client' => [
                'ignore' => 404
            ],*/
            'body' => [
                'settings' => [
                    'number_of_shards' => 2,  #分片数
                    'number_of_replicas' => 0,
                ],
                'mappings' => [
                    'properties' => [ //文档类型设置(相当于mysql的数据类型)
                        'product_id' => [
                            'type' => 'integer',
                        ],
                        'title' => [
                            'type' => 'text'
                        ],
                        'long_title' => [
                            'type' => 'text'
                        ],
                        'sku' => [
                            'type' => 'text'
                        ],
                        'money' => [
                            'type' => 'integer'
                        ],
                        'sales' => [
                            'type' => 'integer'
                        ],
                        'created_at' => [
                            'type' => 'date',
                            'format' => 'yyyy-MM-dd HH:mm:ss || yyyy-MM-dd || epoch_millis',
                        ],
                        'updated_at' => [
                            'type' => 'date',
                            'format' => 'yyyy-MM-dd HH:mm:ss || yyyy-MM-dd || epoch_millis',
                        ],
                    ]
                ]
            ]
        ];
        try {
            echo '创建一个索引' . PHP_EOL;
            //创建库索引
            $response = $client->indices()->create($params);
            pr($response);
        } catch (ElasticsearchCommonExceptionsBadRequest400Exception $e) {
            $msg = $e->getMessage();
            $msg = json_decode($msg, true);
            pr($msg);
        }
    }
    //http://localhost/es/test.php?do=index
    if (isset($_GET['do']) && $_GET['do'] == 'index') {
        /**
         * 创建商品数据
         */
        // 商品数据写入 DB
        $title = ['苹果', '香蕉', '橙子', '马蹄', '草莓'];
        $data = [
            'title' => $title[array_rand($title, 1)],
            'long_title' => 'long_title' . $nowTime,
            'sku' => rand(1, 3),
            'money' => rand(1, 10000000),
            'sales' => rand(1, 1000),
            'created_at' => $nowTime,
            'updated_at' => $nowTime
        ];
        $productId = $db->table('product')->insert($data);
        if ($productId) {
            $body = array_merge($data, [
                'product_id' => $productId,
            ]);
            $params = [
                'body' => $body,
                'id' => $productId, //(提供id,则会更新对应id的记录。若没有提供,则会生成一条文档)#可以手动指定id,也可以不指定随机生成
                'index' => ES_INDEX,
            ];
            try {
                // 商品数据写入 ES
                $response = $client->index($params);
                pr($response);
            } catch (ElasticsearchCommonExceptionsBadRequest400Exception $e) {
                $msg = $e->getMessage();
                $msg = json_decode($msg, true);
                pr($msg);
            }
        }
    }
    //http://localhost/es/test.php?do=delete&id=14
    if (isset($_GET['do']) && $_GET['do'] == 'delete' && isset($_GET['id'])) {
        $res = $db->table('product')->where("id = {$_GET['id']}")->delete();
        if ($res) {
            //删除一个文档
            $params = [
                'index' => ES_INDEX,
                'id' => $_GET['id']
            ];
            try {
                echo '删除一个文档' . PHP_EOL;
                // 商品数据写入 ES
                $response = $client->delete($params);
                pr($response);
            } catch (ElasticsearchCommonExceptionsBadRequest400Exception $e) {
                $msg = $e->getMessage();
                $msg = json_decode($msg, true);
                pr($msg);
            }
        }
    }
    //http://localhost/es/test.php?do=deleteindex
    if (isset($_GET['do']) && $_GET['do'] == 'deleteindex') {
        //删除一个索引
        $params = [
            'index' => ES_INDEX,
            //'index' => 'test_index*', //删除以test_index开始的所有索引
        ];
        try {
            echo '删除一个索引' . PHP_EOL;
            // 商品数据写入 ES
            $response = $client->indices()->delete($params);
            pr($response);
        } catch (ElasticsearchCommonExceptionsBadRequest400Exception $e) {
            $msg = $e->getMessage();
            $msg = json_decode($msg, true);
            pr($msg);
        }
    }
    //http://localhost/es/test.php?do=update&id=14
    if (isset($_GET['do']) && $_GET['do'] == 'update' && isset($_GET['id'])) {
        $title = ['苹果', '香蕉', '橙子', '马蹄', '草莓'];
        $data = [
            'title' => $title[array_rand($title, 1)],
            'long_title' => 'long_title' . $nowTime,
            'sku' => rand(1, 3),
            'money' => rand(1, 10000000),
            'sales' => rand(1, 1000),
            'created_at' => $nowTime,
            'updated_at' => $nowTime
        ];
        $res = $db->table('product')->where("id = {$_GET['id']}")->update($data);
        if ($res) {
            //更新一个文档
            $body = array_merge($data, [
                'product_id' => $_GET['id'],
            ]);
            $params = [
                'body' => [
                    'doc' => $body
                ],
                'id' => $_GET['id'],
                'index' => ES_INDEX,
            ];
            try {
                echo '更新一个文档' . PHP_EOL;
                // 商品数据写入 ES
                $response = $client->update($params);
                pr($response);
            } catch (ElasticsearchCommonExceptionsBadRequest400Exception $e) {
                $msg = $e->getMessage();
                $msg = json_decode($msg, true);
                pr($msg);
            }
        }
    }
    //http://localhost/es/test.php?do=get&id=13
    if (isset($_GET['do']) && $_GET['do'] == 'get' && isset($_GET['id'])) {
        $params = [
            'index' => ES_INDEX,
            'id' => $_GET['id']
        ];
        try {
            echo '获取一个文档' . PHP_EOL;
            $response = $client->get($params);
            pr($response);
        } catch (ElasticsearchCommonExceptionsBadRequest400Exception $e) {
            $msg = $e->getMessage();
            $msg = json_decode($msg, true);
            pr($msg);
        }
    }
    //http://localhost/es/test.php?do=mget&ids=24,25
    if (isset($_GET['do']) && $_GET['do'] == 'mget' && isset($_GET['ids'])) {
        $params = [
            'index' => ES_INDEX,
            'body' => ['ids' => explode(',', $_GET['ids'])],
            '_source' => ['money', 'title'], // 请求指定的字段
        ];
        try {
            echo '获取多个文档' . PHP_EOL;
            $response = $client->mget($params);
            pr($response);
        } catch (ElasticsearchCommonExceptionsBadRequest400Exception $e) {
            $msg = $e->getMessage();
            $msg = json_decode($msg, true);
            pr($msg);
        }
    }
    //http://localhost/es/test.php?do=search
    if (isset($_GET['do']) && $_GET['do'] == 'search') {
        //$query1.相当于sql语句:select * from product where title='香蕉' limit 0,10;
        $query1 = [
            'match' => [
                'title' => '香蕉'
            ]
        ];
        $params = [
            'index' => [ES_INDEX],      //['my_index1', 'my_index2'],可以通过这种形式进行跨库查询
            '_source' => ['money', 'title','sku'], // 请求指定的字段
            'body' => [
                'query' => $query1,
                'sort' => [['money' => ['order' => 'desc']]],     //排序
                'from' => 0,
                'size' => 10
            ]
        ];
        try {
            echo '搜索文档' . PHP_EOL;
            $response = $client->search($params);
            pr($response);
            $data = $response['hits']['hits'];
            pr($data);
        } catch (ElasticsearchCommonExceptionsBadRequest400Exception $e) {
            $msg = $e->getMessage();
            $msg = json_decode($msg, true);
            pr($msg);
        }
    }
    //http://localhost/es/test.php?do=putSettings
    if (isset($_GET['do']) && $_GET['do'] == 'putSettings') {
        $params = [
            'index' => ES_INDEX,
            'body' => [
                'settings' => [
                    'number_of_replicas' => 2,
                ]
            ]
        ];
        try {
            echo '更改索引的配置参数' . PHP_EOL;
            $response = $client->indices()->putSettings($params);
            pr($response);
        } catch (ElasticsearchCommonExceptionsBadRequest400Exception $e) {
            $msg = $e->getMessage();
            $msg = json_decode($msg, true);
            pr($msg);
        }
    }
    //http://localhost/es/test.php?do=getSource&id=24
    if (isset($_GET['do']) && $_GET['do'] == 'getSource') {
        $params = [
            'index' => ES_INDEX,
            'id' => $_GET['id']
        ];
        try {
            echo '获取指定文档的sourse内容(即字段的信息)' . PHP_EOL;
            $response = $client->getSource($params);
            pr($response);
        } catch (ElasticsearchCommonExceptionsBadRequest400Exception $e) {
            $msg = $e->getMessage();
            $msg = json_decode($msg, true);
            pr($msg);
        }
    }
    //http://localhost/es/test.php?do=getSettings
    if (isset($_GET['do']) && $_GET['do'] == 'getSettings') {
        $params = [
            'index' => [ES_INDEX],
        ];
        try {
            echo '更改索引的配置参数' . PHP_EOL;
            $response = $client->indices()->getSettings($params);
            pr($response);
        } catch (ElasticsearchCommonExceptionsBadRequest400Exception $e) {
            $msg = $e->getMessage();
            $msg = json_decode($msg, true);
            pr($msg);
        } catch (Exception $e) {
            pr($e);
        }
    }
    //http://localhost/es/test.php?do=exists&id=13
    if (isset($_GET['do']) && $_GET['do'] == 'exists' && isset($_GET['id'])) {
        $params = [
            'index' => ES_INDEX,
            'id' => $_GET['id']
        ];
        try {
            echo '判断文档是否存在' . PHP_EOL;
            $response = $client->exists($params);
            pr($response);
        } catch (ElasticsearchCommonExceptionsBadRequest400Exception $e) {
            $msg = $e->getMessage();
            $msg = json_decode($msg, true);
            pr($msg);
        }
    }
    //http://localhost/es/test.php?do=getMapping
    if (isset($_GET['do']) && $_GET['do'] == 'getMapping') {
        $params = [
            'index' => ES_INDEX,
        ];
        try {
            echo '获取查看映射' . PHP_EOL;
            $response = $client->indices()->getMapping($params);
            pr($response);
        } catch (ElasticsearchCommonExceptionsBadRequest400Exception $e) {
            $msg = $e->getMessage();
            $msg = json_decode($msg, true);
            pr($msg);
        }
    }
    
    function pr($response)
    {
        echo '<pre>';
        print_r($response);
        echo '</pre>';
    }
    View Code

    创建Db.php

    <?php
    header("Content-Type:text/html;charset=utf-8");
    
    /**
     *php操作mysql的工具类
     */
    class Db
    {
        private $_db = null;//数据库连接句柄
        private $_table = null;//表名
        private $_where = null;//where条件
        private $_order = null;//order排序
        private $_limit = null;//limit限定查询
        private $_group = null;//group分组
        private $_configs = array(
            'hostname' => 'localhost',
            'dbname' => 'test',
            'username' => 'root',
            'password' => 'root'
        );//数据库配置
    
        /**
         * 构造函数,连接数据库
         */
        public function __construct()
        {
            $link = $this->_db;
            if (!$link) {
                $db = mysqli_connect($this->_configs['hostname'], $this->_configs['username'], $this->_configs['password'], $this->_configs['dbname']);
                mysqli_query($db, "set names utf8");
                if (!$db) {
                    $this->ShowException("错误信息" . mysqli_connect_error());
                }
                $this->_db = $db;
            }
        }
    
        /**
         * 获取所有数据
         *
         * @param <type> $table The table
         *
         * @return     boolean  All.
         */
        public function getAll($table = null)
        {
            $link = $this->_db;
            if (!$link) return false;
            $sql = "SELECT * FROM {$table}";
            $data = mysqli_fetch_all($this->execute($sql));
            return $data;
        }
    
        public function table($table)
        {
            $this->_table = $table;
            return $this;
        }
    
        /**
         * 实现查询操作
         *
         * @param string $fields The fields
         *
         * @return     boolean  ( description_of_the_return_value )
         */
        public function select($fields = "*")
        {
            $fieldsStr = '';
            $link = $this->_db;
            if (!$link) return false;
            if (is_array($fields)) {
                $fieldsStr = implode(',', $fields);
            } elseif (is_string($fields) && !empty($fields)) {
                $fieldsStr = $fields;
            }
            $sql = "SELECT {$fields} FROM {$this->_table} {$this->_where} {$this->_order} {$this->_limit}";
            $data = mysqli_fetch_all($this->execute($sql));
            return $data;
        }
    
        /**
         * order排序
         *
         * @param string $order The order
         *
         * @return     boolean  ( description_of_the_return_value )
         */
        public function order($order = '')
        {
            $orderStr = '';
            $link = $this->_db;
            if (!$link) return false;
            if (is_string($order) && !empty($order)) {
                $orderStr = "ORDER BY " . $order;
            }
            $this->_order = $orderStr;
            return $this;
        }
    
        /**
         * where条件
         *
         * @param string $where The where
         *
         * @return     <type>  ( description_of_the_return_value )
         */
        public function where($where = '')
        {
            $whereStr = '';
            $link = $this->_db;
            if (!$link) return $link;
            if (is_array($where)) {
                foreach ($where as $key => $value) {
                    if ($value == end($where)) {
                        $whereStr .= "`" . $key . "` = '" . $value . "'";
                    } else {
                        $whereStr .= "`" . $key . "` = '" . $value . "' AND ";
                    }
                }
                $whereStr = "WHERE " . $whereStr;
            } elseif (is_string($where) && !empty($where)) {
                $whereStr = "WHERE " . $where;
            }
            $this->_where = $whereStr;
            return $this;
        }
    
        /**
         * group分组
         *
         * @param string $group The group
         *
         * @return     boolean  ( description_of_the_return_value )
         */
        public function group($group = '')
        {
            $groupStr = '';
            $link = $this->_db;
            if (!$link) return false;
            if (is_array($group)) {
                $groupStr = "GROUP BY " . implode(',', $group);
            } elseif (is_string($group) && !empty($group)) {
                $groupStr = "GROUP BY " . $group;
            }
            $this->_group = $groupStr;
            return $this;
        }
    
        /**
         * limit限定查询
         *
         * @param string $limit The limit
         *
         * @return     <type>  ( description_of_the_return_value )
         */
        public function limit($limit = '')
        {
            $limitStr = '';
            $link = $this->_db;
            if (!$link) return $link;
            if (is_string($limit) || !empty($limit)) {
                $limitStr = "LIMIT " . $limit;
            } elseif (is_numeric($limit)) {
                $limitStr = "LIMIT " . $limit;
            }
            $this->_limit = $limitStr;
            return $this;
        }
    
        /**
         * 执行sql语句
         *
         * @param <type> $sql The sql
         *
         * @return     boolean  ( description_of_the_return_value )
         */
        public function execute($sql = null)
        {
            $link = $this->_db;
            if (!$link) return false;
            $res = mysqli_query($this->_db, $sql);
            if (!$res) {
                $errors = mysqli_error_list($this->_db);
                $this->ShowException("报错啦!<br/>错误号:" . $errors[0]['errno'] . "<br/>SQL错误状态:" . $errors[0]['sqlstate'] . "<br/>错误信息:" . $errors[0]['error']);
                die();
            }
            return $res;
        }
    
        /**
         * 插入数据
         *
         * @param <type> $data The data
         *
         * @return     boolean  ( description_of_the_return_value )
         */
        public function insert($data)
        {
            $link = $this->_db;
            if (!$link) return false;
            if (is_array($data)) {
                $keys = '';
                $values = '';
                foreach ($data as $key => $value) {
                    $keys .= "`" . $key . "`,";
                    $values .= "'" . $value . "',";
                }
                $keys = rtrim($keys, ',');
                $values = rtrim($values, ',');
            }
            $sql = "INSERT INTO `{$this->_table}`({$keys}) VALUES({$values})";
            mysqli_query($this->_db, $sql);
            $insertId = mysqli_insert_id($this->_db);
            return $insertId;
        }
    
        /**
         * 更新数据
         *
         * @param <type> $data The data
         *
         * @return     <type>  ( description_of_the_return_value )
         */
        public function update($data)
        {
            $link = $this->_db;
            if (!$link) return $link;
            if (is_array($data)) {
                $dataStr = '';
                foreach ($data as $key => $value) {
                    $dataStr .= "`" . $key . "`='" . $value . "',";
                }
                $dataStr = rtrim($dataStr, ',');
            }
            $sql = "UPDATE `{$this->_table}` SET {$dataStr} {$this->_where} {$this->_order} {$this->_limit}";
            $res = $this->execute($sql);
            return $res;
        }
    
        /**
         * 删除数据
         *
         * @return     <type>  ( description_of_the_return_value )
         */
        public function delete()
        {
            $link = $this->_db;
            if (!$link) return $link;
            $sql = "DELETE FROM `{$this->_table}` {$this->_where}";
            $res = $this->execute($sql);
            return $res;
        }
    
        /**
         * 异常信息输出
         *
         * @param <type> $var The variable
         */
        private function ShowException($var)
        {
            if (is_bool($var)) {
                var_dump($var);
            } else if (is_null($var)) {
                var_dump(NULL);
            } else {
                echo "<pre style='position:relative;z-index:1000;padding:10px;border-radius:5px;background:#F5F5F5;border:1px solid #aaa;font-size:14px;line-height:18px;opacity:0.9;'>" . print_r($var, true) . "</pre>";
            }
        }
    
    }
    View Code

    注意:

    创建好索引之后,我们就从 MySQL 将数据同步到 ES,同步的方案有如下三种:

    1、可以直接在存储入 MySQL 之后,就直接写入 ES。

    2、通过 Logstash 定时,从 MySQL 数据库中拉取数据同步到 ES。

    3、可以通过第三方中间件(例如:canal、go-mysql-elasticsearch),拉取 MySQL 的 binlog 日志,之后中间件解析日志将数据同步到 ES。

    业务发展到中后时期的时候,可能发现字段越来越多了,这个时候想要删除一些字段。 但是,在 ES 中的 Mapping 中是不能直接删除字段的,只能重新创建。 很多情况,我们还是不建议去删除字段,因为这会增加很多不必要的成本以及带来的风险。 如果,为了节省存储空间,Boss 一定要删除字段。那就按照下面的方法,也是可以实现的。

    1、创建一个新的索引

    2、创建新的映射关系 mapping

    3、将原索引的数据到入到新索引

    4、新索引创建原索引一致的别名 5、删除原索引

  • 相关阅读:
    悲观锁乐观锁实战
    悲观锁
    乐观锁
    mysql数据库怎么设置乐观锁
    猴子吃桃问题
    算法题
    面试总结
    分布式系统理论(二):一致性协议Paxos
    职工工资管理
    79-WordSearch
  • 原文地址:https://www.cnblogs.com/-mrl/p/13858629.html
Copyright © 2011-2022 走看看