zoukankan      html  css  js  c++  java
  • MySQL导入MongoDB

    一、MongoDB的导入导出

     mongoDB的导入导出,分为mongoDB官方提供的工具类,和第三方的工具类。下面依次介绍下:

    1.1、mongoDB提供的工具

    1.1.1、mongoimport工具

    源数据只接受json、csv等格式的源文件。

    第一步:将user表从MySQL中导出,右键,点击导出向导,选择格式为xlsx。
    第二步:导出完成后,双击打开user.xlsx,将user.xlsx另存为csv格式的文件。(切记不可直接修改后缀名,会导致乱码,无法导入到MongoDB中,血的教训)
    第三步:由于表中数据可能包含中文,导入MongoDB时会造成乱码问题,所以将user.csv文件用任意编辑器打开,将编码转为UTF-8后保存。(推荐使用notepad++)
    第四步:如果MongoDB没有创建用户,使用
    mongoimport --db 数据库名 --collection 集合名 --type csv --headerline --ignoreBlanks --file CSV文件存放路径(例如:我是存放在桌面,所以路径为:C:UsersAdministratorDesktopuser.csv)
    如果MongoDB创建了用户,需要权限才能操作,使用
    mongoimport -u MongoDB账号 -p MongoDB密码 --db 数据库名 --collection 集合名 --type csv --headerline --ignoreBlanks --file CSV文件存放路径(例如:我是存放在桌面,所以路径为:C:UsersAdministratorDesktopuser.csv)

    1.1.2、mongoexport工具

     借助csv文件将数据从MongoDB导入到MySQL中。
    MongoDB提供了mongoexpert命令可将数据库中的数据导出成json和csv两种格式的文件。进入路径MongoDB/bin
    输入命令:mongoexpert -h [主机ip] -d [数据库名] -c [collection名] --csv -f [fields名,多个用","隔开] -o [输出的文件名]
    如此可以将数据导入到csv文件中,然后利用MySQL的导入数据命令将文件导入到MySQL中。

    1.2、第三方工具

     网上搜集了下从MySQL导入MongoDB的方案,有如下几种:

    1. logstash-output-mongodb实现;
    2. 用python脚本实现;
      # 把mysql数据库中的数据导入mongodb中
      import pymysql
      import pymongo
      
      # 创建mysql的数据库连接
      con = pymysql.connect(host='localhost', port=3306, user='root', password='123456', db='pp')
      # 获取游标
      cur = con.cursor(cursor=pymysql.cursors.DictCursor)
      # 查询student表
      try:
        cur.execute('select * from student')
        # 创建mongodb数据库连接
        client = pymongo.MongoClient(host='localhost', port=27017)
        # 获取数据库
        db = client['pp']#或者db=client.pp,相当于数据库中的use pp;
        for row in cur.fetchall():
          row['birthday'] = str(row['birthday']) #因为mongodb没有datetime类型,因此必须先转为字符串才能导入mongodb,否则可省略此步
          db.student.insert_one(row)
      except Exception as e:
        print(e)
      finally:
        con.close()
        client.close()
      #The achievement is attributed to teacher Peng! 
    3. binlog数据读取同步,如:tungsten replicator,canal;

       tungsten见《ETL之Tungsten Replicator》;

          canal

    二、示例

    2.1、微服务架构下的数据查询问题

      微服务拆分主要分两种方式:拆分业务系统不拆分数据库,拆分业务系统拆分库。如果数据规模小的话大可不必拆分数据库,因为拆分数据看必将面对多维度数据查询,跨进程之间的事务等问题。而我所在公司随着业务发展单数据库实例已经不能满足业务需要,所以选择了拆分业务系统同时拆分数据库的模式,所以也面临着以上的问题。当前系统架构和存储结构如下:

    解决思路 
    要对多数据库数据进行查询,首先就需要将数据库同步到一起以方便查询,为了满足大数据量数据需求,所以优先选择NOSQL数据库做同步库:
    NOSQL数据库基本无法进行关联查询,所以需要将关系数据进行拼接操作,转换成非关系型数据
    业务多维度查询需要实时性,所以需要选择NOSQL中实时性相对比较好的数据库:MongoDB 
    根据以上思路,总结数据整合架构如下图所示:

    解决方案 
    目前网上一些数据同步案例分两种:MQ消息同步和binlog数据读取同步。

    • MQ消息同步:先说MQ消息同步,该同步方式我所在公司试用过一段时间,发现以下问题:

        (1)数据围绕业务进行,对业务关键性数据操作发送MQ消息,对业务系统依赖性比较高 
        (2)对于数据库中存量数据需要单独处理 
        (3)对于工具表还需要单独维护同步 
        (4)每次新增数据表都需要重新添加MQ逻辑 
        考虑到以上问题,用MQ方式同步数据并不是最优解决办法

    • binlog数据读取同步:使用binlog 数据读取方式目前有一些成熟方案,比如tungsten replicator,但这些同步工具只能实现数据1:1复制,数据复制过程自定义逻辑添加比较麻烦,不支持分库分表数据归集操作。综上所述,最优方案应该是读取后binlog后自行处理后续数据逻辑。目前binlog读取binlog工具中最成熟的方案应该就是alibaba开源的canal了。

     通过canal从MySQL同步到MongoDB

    我使用的是canal的HA模式,由zookeeper选举可用实例,每个数据库一个instance,服务端配置如下:

    目录:

    conf
        database1
            -instance.properties
        database2
            -instance.properties
        canal.properties

    instance.properties

    canal.instance.mysql.slaveId = 1001
    canal.instance.master.address = X.X.X.X:3306
    canal.instance.master.journal.name = 
    canal.instance.master.position = 
    canal.instance.master.timestamp = 
    canal.instance.dbUsername = canal
    canal.instance.dbPassword = canal
    canal.instance.defaultDatabaseName =
    canal.instance.connectionCharset = UTF-8
    canal.instance.filter.regex = .*\..*
    canal.instance.filter.black.regex =

    canal.properties

    canal.id= 1
    canal.ip=X.X.X.X
    canal.port= 11111
    canal.zkServers=X.X.X.X:2181,X.X.X.X:2181,X.X.X.X:2181
    canal.zookeeper.flush.period = 1000
    canal.file.data.dir = ${canal.conf.dir}
    canal.file.flush.period = 1000
    canal.instance.memory.buffer.size = 16384
    canal.instance.memory.buffer.memunit = 1024 
    canal.instance.memory.batch.mode = MEMSIZE
    canal.instance.detecting.enable = true
    canal.instance.detecting.sql = select 1
    canal.instance.detecting.interval.time = 3
    canal.instance.detecting.retry.threshold = 3
    canal.instance.detecting.heartbeatHaEnable = false
    canal.instance.transaction.size =  1024
    canal.instance.fallbackIntervalInSeconds = 60
    canal.instance.network.receiveBufferSize = 16384
    canal.instance.network.sendBufferSize = 16384
    canal.instance.network.soTimeout = 30
    canal.instance.filter.query.dcl = true
    canal.instance.filter.query.dml = false
    canal.instance.filter.query.ddl = false
    canal.instance.filter.table.error = false
    canal.instance.filter.rows = false
    canal.instance.binlog.format = ROW,STATEMENT,MIXED 
    canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
    canal.instance.get.ddl.isolation = false
    canal.destinations= example,p4-test
    canal.conf.dir = ../conf
    canal.auto.scan = true
    canal.auto.scan.interval = 5
    canal.instance.global.mode = spring 
    canal.instance.global.lazy = false
    canal.instance.global.spring.xml = classpath:spring/default-instance.xml
    • 3

    部署数据流如下:

    tip: 
    虽然canal同时支持mixed和row类型的binlog日志,但是获取行数据时如果是mixed类型的日志则获取不到表名,所以本方案暂只支持row格式的binlog

    数据同步

    创建canal client应用订阅canal读取的binlog数据

    1.开启多instance 订阅,订阅多个instance

    public void initCanalStart() {
        List<String> destinations = canalProperties.getDestination();
        final List<CanalClient> canalClientList = new ArrayList<>();
        if (destinations != null && destinations.size() > 0) {
            for (String destination : destinations) {
                // 基于zookeeper动态获取canal server的地址,建立链接,其中一台server发生crash,可以支持failover
                CanalConnector connector = CanalConnectors.newClusterConnector(canalProperties.getZkServers(), destination, "", "");
                CanalClient client = new CanalClient(destination, connector);
                canalClientList.add(client);
                client.start();
            }
        }
        Runtime.getRuntime().addShutdownHook(new Thread() {
            public void run() {
                try {
                    logger.info("## stop the canal client");
                    for (CanalClient canalClient : canalClientList) {
                        canalClient.stop();
                    }
                } catch (Throwable e) {
                    logger.warn("##something goes wrong when stopping canal:", e);
                } finally {
                    logger.info("## canal client is down.");
                }
            }
        });
    }

    订阅消息处理

    private void process() {
        int batchSize = 5 * 1024;
        while (running) {
            try {
                MDC.put("destination", destination);
                connector.connect();
                connector.subscribe();
                while (running) {
                    Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                    long batchId = message.getId();
                    int size = message.getEntries().size();
                    if (batchId != -1 && size > 0) {
                        saveEntry(message.getEntries());
                    }
                    connector.ack(batchId); // 提交确认
                    // connector.rollback(batchId); // 处理失败, 回滚数据
                }
            } catch (Exception e) {
                logger.error("process error!", e);
            } finally {
                connector.disconnect();
                MDC.remove("destination");
            }
        }
    }

    根据数据库事件处理消息,过滤消息列表,对数据变动进行处理,用到信息为:

    insert :schemaName,tableName,beforeColumnsList 
    update :schemaName,tableName,afterColumnsList 
    delete :schemaName,tableName,afterColumnsList

    RowChange rowChage = null;
        try {
            rowChage = RowChange.parseFrom(entry.getStoreValue());
        } catch (Exception e) {
            throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
        }
        EventType eventType = rowChage.getEventType();
        logger.info(row_format,
                entry.getHeader().getLogfileName(),
                String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(),
                entry.getHeader().getTableName(), eventType,
                String.valueOf(entry.getHeader().getExecuteTime()), String.valueOf(delayTime));
        if (eventType == EventType.QUERY || rowChage.getIsDdl()) {
            logger.info(" sql ----> " + rowChage.getSql());
            continue;
        }
        DataService dataService = SpringUtil.getBean(DataService.class);
        for (RowData rowData : rowChage.getRowDatasList()) {
            if (eventType == EventType.DELETE) {
                dataService.delete(rowData.getBeforeColumnsList(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
            } else if (eventType == EventType.INSERT) {
                dataService.insert(rowData.getAfterColumnsList(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
            } else if (eventType == EventType.UPDATE) {
                dataService.update(rowData.getAfterColumnsList(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
            } else {
                logger.info("未知数据变动类型:{}", eventType);
            }
        }
    }

    ColumnsList转换成MongoTemplate 可用的数据类:DBObject,顺便做下数据类型转换

    public static DBObject columnToJson(List<CanalEntry.Column> columns) {
        DBObject obj = new BasicDBObject();
        try {
            for (CanalEntry.Column column : columns) {
                String mysqlType = column.getMysqlType();
                //int类型,长度11以下为Integer,以上为long
                if (mysqlType.startsWith("int")) {
                    int lenBegin = mysqlType.indexOf('(');
                    int lenEnd = mysqlType.indexOf(')');
                    if (lenBegin > 0 && lenEnd > 0) {
                        int length = Integer.parseInt(mysqlType.substring(lenBegin + 1, lenEnd));
                        if (length > 10) {
                            obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Long.parseLong(column.getValue()));
                            continue;
                        }
                    }
                    obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Integer.parseInt(column.getValue()));
                } else if (mysqlType.startsWith("bigint")) {
                    obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Long.parseLong(column.getValue()));
                } else if (mysqlType.startsWith("decimal")) {
                    int lenBegin = mysqlType.indexOf('(');
                    int lenCenter = mysqlType.indexOf(',');
                    int lenEnd = mysqlType.indexOf(')');
                    if (lenBegin > 0 && lenEnd > 0 && lenCenter > 0) {
                        int length = Integer.parseInt(mysqlType.substring(lenCenter + 1, lenEnd));
                        if (length == 0) {
                            obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Long.parseLong(column.getValue()));
                            continue;
                        }
                    }
                    obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Double.parseDouble(column.getValue()));
                } else if (mysqlType.equals("datetime") || mysqlType.equals("timestamp")) {
                    obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : DATE_TIME_FORMAT.parse(column.getValue()));
                } else if (mysqlType.equals("date")) {
                    obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : DATE_FORMAT.parse(column.getValue()));
                } else if (mysqlType.equals("time")) {
                    obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : TIME_FORMAT.parse(column.getValue()));
                } else {
                    obj.put(column.getName(), column.getValue());
                }
            }
        } catch (ParseException e) {
            e.printStackTrace();
        }
        return obj;
    }

    tip: 
    DBObject对象如果同时用于保存原始数据和组合数据或其他数据,使用时应该深度拷贝对象生成副本,然后使用副本

    数据拼接

    我们获取了数据库数据后做拼接操作,比如两张用户表:

    user_info:{id,user_no,user_name,user_password}
    user_other_info:{id,user_no,idcard,realname}
    • 1
    • 2

    拼接后mongo数据为:

    user:{_id,user_no,userInfo:{id,user_no,user_name,user_password},userOtherInfo:{id,user_no,idcard,realname})
    • 1

    接收到的数据信息很多,如何才能简单的触发数据拼接操作呢?

    先看我们能获取的信息:schemaName,tableName,DBObject,Event(insert,update,delete)

    将这些信息标识拼接起来看看:/schemaName/tableName/Event(DBObject),没错,就是一个标准的restful链接。只要我们实现一个简单的springMVC 就能自动获取需要的数据信息进行拼接操作。

    先实现@Controller,定义名称为Schema,value对应schemaName

    @Target({ElementType.TYPE})
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    @Component
    public  @interface Schema {
     String value() default "";
    }

    然后实现@RequestMapping,定义名称为Table,直接使用Canal中的EventType 对应RequestMethod

    @Target({ElementType.METHOD, ElementType.TYPE})
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    public  @interface Table {
        String value() default "";
        CanalEntry.EventType[] event() default {};
    }

    然后创建springUtil,实现接口ApplicationContextAware,应用启动 加载的时候初始化两个Map:intanceMap,handlerMap

    private static ApplicationContext applicationContext = null;
    //库名和数据处理Bean映射Map
    private static Map<String, Object> instanceMap = new HashMap<String, Object>();
    //路劲和数据处理Method映射Map
    private static Map<String, Method> handlerMap = new HashMap<String, Method>();
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) {
        if (SpringUtil.applicationContext == null) {
            SpringUtil.applicationContext = applicationContext;
            //初始化instanceMap数据
            instanceMap();
            //初始化handlerMap数据
            handlerMap();
        }
    }
    private void instanceMap() {
        Map<String, Object> beans = applicationContext.getBeansWithAnnotation(Schema.class);
        for (Object bean : beans.values()) {
            Class<?> clazz = bean.getClass();
            Object instance = applicationContext.getBean(clazz);
            Schema schema = clazz.getAnnotation(Schema.class);
            String key = schema.value();
            instanceMap.put(key, instance);
            logger.info("instanceMap [{}:{}]", key, bean == null ? "null" : clazz.getName());
        }
    }
    private void handlerMap() {
        if (instanceMap.size() <= 0)
            return;
        for (Map.Entry<String, Object> entry : instanceMap.entrySet()) {
            if (entry.getValue().getClass().isAnnotationPresent(Schema.class)) {
                Schema schema = entry.getValue().getClass().getAnnotation(Schema.class);
                String schemeName = schema.value();
                Method[] methods = entry.getValue().getClass().getMethods();
                for (Method method : methods) {
                    if (method.isAnnotationPresent(Table.class)) {
                        Table table = method.getAnnotation(Table.class);
                        String tName = table.value();
                        CanalEntry.EventType[] events = table.event();
                        //未标明数据事件类型的方法不做映射
                        if (events.length < 1) {
                            continue;
                        }
                        //同一个方法可以映射多张表
                        for (int i = 0; i < events.length; i++) {
                            String path = "/" + schemeName + "/" + tName + "/" + events[i].getNumber();
                            handlerMap.put(path, method);
                            logger.info("handlerMap [{}:{}]", path, method.getName());
                        }
                    } else {
                        continue;
                    }
                }
            } else {
                continue;
            }
        }
    }

    调用方法:

    public static void doEvent(String path, DBObject obj) throws Exception {
        String[] pathArray = path.split("/");
        if (pathArray.length != 4) {
            logger.info("path 格式不正确:{}", path);
            return;
        }
        Method method = handlerMap.get(path);
        Object schema = instanceMap.get(pathArray[1]);
        //查找不到映射Bean和Method不做处理
        if (method == null || schema == null) {
            return;
        }
        try {
            long begin = System.currentTimeMillis();
            logger.info("integrate data:{},{}", path, obj);
            method.invoke(schema, new Object[]{obj});
            logger.info("integrate data consume: {}ms:", System.currentTimeMillis() - begin);
        } catch (Exception e) {
            logger.error("调用组合逻辑异常", e);
            throw new Exception(e.getCause());
        }
    }

    数据拼接消息处理:

    @Schema("demo_user")
    public class UserService {
        @Table(value = "user_info", event = {CanalEntry.EventType.INSERT, CanalEntry.EventType.UPDATE})
        public void saveUser_UserInfo(DBObject userInfo) {
            String userNo = userInfo.get("user_no") == null ? null : userInfo.get("user_no").toString();
            DBCollection collection = completeMongoTemplate.getCollection("user");
            DBObject queryObject = new BasicDBObject("user_no", userNo);
            DBObject user = collection.findOne(queryObject);
            if (user == null) {
                user = new BasicDBObject();
                user.put("user_no", userNo);
                user.put("userInfo", userInfo);
                collection.insert(user);
            } else {
                DBObject updateObj = new BasicDBObject("userInfo", userInfo);
                DBObject update = new BasicDBObject("$set", updateObj);
                collection.update(queryObject, update);
            }
        }
    }

    示例源码 
    https://github.com/zhangtr/canal-mongo

    logstash-output-mongodb实现Mysql到Mongodb数据同步

    本文主要讲解如何通过logstash-output-mongodb插件实现Mysql与Mongodb数据的同步。源数据存储在Mysql,目标数据库为非关系型数据库Mongodb。

    0、前提
    1)已经安装好源数据库:Mysql;
    2)已经安装好目的数据库:Mongodb;
    3)已经安装好logstash及相关插件logstash-output-mongodb
    安装步骤参考:http://blog.csdn.net/laoyang360/article/details/65448962

    1、同步conf配置详解
    [root@la bin]# cat ./logstash_jdbc_mongo/jdbc_mongo.conf
    input {
      stdin {
      }
      jdbc {
      ‘# 源Mysql数据库地址
      jdbc_connection_string => "jdbc:mysql://20.18.11.4:3306/tech?zeroDateTimeBehavior=convertToNull"
      #源Mysql的用户名和密码
      jdbc_user => "root"
      jdbc_password => "rot123"

      ‘ #true代表记录最后存储的关联列值
      record_last_run => "true"
      use_column_value => "true"
      tracking_column => "id"
      ‘ #存储位置
      last_run_metadata_path => "/opt/logstash/bin/logstash_jdbc_mongo/mongo_info"
      clean_run => "false"

      ‘ #jdbc路径地址
      jdbc_driver_library => "/home/lib/mysql-connector-java-5.1.38.jar"
      ‘ # the name of the driver class for mysql
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "500"
    ‘ #以下对应着要执行的sql的绝对路径。
      statement_filepath => "/opt/logstash/bin/logstash_jdbc_mongo/jdbc_mongo.sql"
    ‘ #定时字段 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
    schedule => "* * * * *"
      }
    }

    filter {
    json {
      source => "message"
      remove_field => ["message"]
      }
    }

    ’#输出mongodb的配置
    output {
      stdout { codec => rubydebug }
      mongodb {
    ‘#对应mongodb的输出集合
      collection => "N_CLASS"
    ’#对应mongodb的输出数据库名称
      database => "data"
      uri => "mongodb://110.0.12.45:27017"
      }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    2、需要同步表的sql
    [root@lalogstash_jdbc_mongo]# cat jdbc_mongo.sql
    select
    *
    from n_class
    where n_class.id > :sql_last_value
    1
    2
    3
    4
    5
    3、小结
    以上实现了Mysql数据库中的一个库表table 与 Mongodb中的一个集合collection之间的全量、增量同步操作。
    全量的实现:通过sql语句实现;
    增量的实现:通过定时、sql_last_value实现。
    同步的时候,无需在mongodb做任何操作,同步后,既可以在mongodb的windows客户端:Robomongo看到同步后新增的集合collection。

    原文:https://blog.csdn.net/laoyang360/article/details/65449127

  • 相关阅读:
    【转】VS2010中 C++创建DLL图解
    [转]error: 'retainCount' is unavailable: not available in automatic reference counting mode
    [转]关于NSAutoreleasePool' is unavailable: not available in automatic reference counting mode的解决方法
    【转】 Tomcat v7.0 Server at localhost was unable to start within 45
    【转】Server Tomcat v7.0 Server at localhost was unable to start within 45 seconds. If
    【转】SVN管理多个项目版本库
    【转】eclipse安装SVN插件的两种方法
    【转】MYSQL启用日志,和查看日志
    【转】Repository has not been enabled to accept revision propchanges
    【转】SVN库的迁移
  • 原文地址:https://www.cnblogs.com/duanxz/p/3539213.html
Copyright © 2011-2022 走看看