由于需要测试阿里云Datahub功能,因此测了一下Datahub的一些功能
DATAHUB:
简介:
阿里云的流式数据(streaming)处理平台
对流式数据的发布(publish)订阅(subscribe)和分发功能
主要功能:
采集实时数据,如移动设备,传感器,网站服务等
使用脚本或流计算引擎来处理写入datahub的数据
最后生成实时图表/报警信息等
术语:
project:项目,包含多个topic
topic:可以表示一种类型的流,订阅和发布单位
shard:topic的并发通道
record:用户数据与datahub端交互的基本单位
recordtype:topic的数据类型,支持tuple和blob
DataConnect:把datahub中的流式数据同步到其他云产品中的功能,现在支持odps/oss/es/mysql
操作过程
首先在新建project,注意管理员账号
注意授权信息
参考<授权信息管理>
<https://help.aliyun.com/document_detail/47442.html?spm=a2c4g.11186623.6.544.371f1a12NmNa1w>
然后进入后,创建topic
schema是指column,可以选择多种数据类型
新建DataConnect,来设置下游数据
设置maxcompute连接
使用python来插入数据到topic
import sys import traceback from datahub import DataHub from datahub.exceptions import ResourceExistException from datahub.models import FieldType, RecordSchema, TupleRecord, BlobRecord, CursorType, RecordType access_id = access_key = endpoint = 'https://dh-cn-shanghai.aliyuncs.com' dh = DataHub(access_id, access_key, endpoint) ##写入 project_name= topic_name = try: # block等待所有shard状态ready dh.wait_shards_ready(project_name, topic_name) print("shards all ready!!!") print("======================================= ") topic_result = dh.get_topic(project_name, topic_name) print(topic_result) if topic_result.record_type != RecordType.TUPLE: print("topic type illegal!") sys.exit(-1) print("topic type normal") print("======================================= ") record_schema = topic_result.record_schema records0 = [] record0 = TupleRecord(schema=record_schema, values=['1', '2yc1', '30.01', '4True', '5455869335000000','6','1455869335000000']) record0.shard_id = '0' record0.put_attribute('AK', '47') records0.append(record0) for i in range (1,10000): record2 = TupleRecord(schema=record_schema) record2.set_value(0, str(i)) record2.set_value(1, str(i)+'yc3') record2.set_value(2, str(i+1.1)) record2.set_value(3, str(i)) record2.set_value(4, '1455869335000011') record2.set_value(5, '20180913_1115') record2.set_value(6, int(time.time())*1000000) record2.attributes = {'key': 'value'} record2.partition_key = 'EVENT_TIME' records0.append(record2) put_result = dh.put_records(project_name, topic_name, records0) print(put_result) print("put tuple %d records, failed count: %d" %(len(records0), put_result.failed_record_count)) # failed_record_count如果大于0最好对failed record再进行重试 print('结束') print print("======================================= ") except DatahubException as e: print(e) sys.exit(-1)
进行验证数据导入
maxcompute默认是五分钟或者50M触发一次同步,如果需要实时的就要rds登场了
这样就测试完成了.后期进行压测,待续..