zoukankan      html  css  js  c++  java
  • [阿里云]Datahub测试使用记录

    由于需要测试阿里云Datahub功能,因此测了一下Datahub的一些功能

    DATAHUB:
    简介:
    阿里云的流式数据(streaming)处理平台
    对流式数据的发布(publish)订阅(subscribe)和分发功能
     
    主要功能:
    采集实时数据,如移动设备,传感器,网站服务等
    使用脚本或流计算引擎来处理写入datahub的数据
    最后生成实时图表/报警信息等
     
    术语:
    project:项目,包含多个topic
    topic:可以表示一种类型的流,订阅和发布单位
    shard:topic的并发通道
    record:用户数据与datahub端交互的基本单位
    recordtype:topic的数据类型,支持tuple和blob
    DataConnect:把datahub中的流式数据同步到其他云产品中的功能,现在支持odps/oss/es/mysql
     
    操作过程
    首先在新建project,注意管理员账号
    注意授权信息
    参考<授权信息管理>
    <https://help.aliyun.com/document_detail/47442.html?spm=a2c4g.11186623.6.544.371f1a12NmNa1w>
     
    然后进入后,创建topic
    schema是指column,可以选择多种数据类型
     
    新建DataConnect,来设置下游数据
     
    设置maxcompute连接
     
    使用python来插入数据到topic
    import sys
    import traceback
    from datahub import DataHub
    from datahub.exceptions import ResourceExistException
    from datahub.models import FieldType, RecordSchema, TupleRecord, BlobRecord, CursorType, RecordType
    
    access_id = 
    access_key = 
    endpoint = 'https://dh-cn-shanghai.aliyuncs.com'
    dh = DataHub(access_id, access_key, endpoint)
    
    
    ##写入
    project_name=
    topic_name = 
    try:
        # block等待所有shard状态ready
        dh.wait_shards_ready(project_name, topic_name)
        print("shards all ready!!!")
        print("=======================================
    
    ")
        topic_result = dh.get_topic(project_name, topic_name)
        print(topic_result)
        if topic_result.record_type != RecordType.TUPLE:
            print("topic type illegal!")
            sys.exit(-1)
        print("topic type normal")
        print("=======================================
    
    ")
        record_schema = topic_result.record_schema
        records0 = []
        record0 = TupleRecord(schema=record_schema, values=['1', '2yc1', '30.01', '4True', '5455869335000000','6','1455869335000000'])
        record0.shard_id = '0'
        record0.put_attribute('AK', '47')
        records0.append(record0)
        for i in range (1,10000):
            record2 = TupleRecord(schema=record_schema)
            record2.set_value(0, str(i))
            record2.set_value(1, str(i)+'yc3')
            record2.set_value(2,  str(i+1.1))
            record2.set_value(3, str(i))
            record2.set_value(4, '1455869335000011')
            record2.set_value(5, '20180913_1115')
            record2.set_value(6, int(time.time())*1000000)
            record2.attributes = {'key': 'value'}
            record2.partition_key = 'EVENT_TIME'
            records0.append(record2)
    
        put_result = dh.put_records(project_name, topic_name, records0)
        print(put_result)
        print("put tuple %d records, failed count: %d" %(len(records0), put_result.failed_record_count))
        # failed_record_count如果大于0最好对failed record再进行重试
        print('结束')
        print
        print("=======================================
    
    ")
    except DatahubException as e:
        print(e)
        sys.exit(-1)

    进行验证数据导入

    maxcompute默认是五分钟或者50M触发一次同步,如果需要实时的就要rds登场了
    这样就测试完成了.后期进行压测,待续..
     
     
     
     
     
  • 相关阅读:
    Jenkins发布Java项目
    自动发布项目(支持部署,回退功能)
    Gitlab Server
    1一站式管理所有SpringBoot启动类,Services服务窗口
    Navicat 连接MySQL8.0.23 出现2059错误
    2命令模式
    1模板方法模式
    7享元模式
    6外观模式
    5桥梁模式
  • 原文地址:https://www.cnblogs.com/castlevania/p/9640126.html
Copyright © 2011-2022 走看看