zoukankan      html  css  js  c++  java
  • 【ODPS】阿里云ODPS中带分区的表操作

    1.创建分区表:

    分区表有自己的分区列,而分区表则没有。

    public static void createTableWithPartition(Odps odps, String createTableName)
    			throws Exception {
    		Tables tables = odps.tables();// /获取表示ODPS全部Table的集合对象
    		boolean a = tables.exists(createTableName);// 推断指定表test_table_jyl是否存在
    		if (a) {
    			System.out.println("指定表存在");
    			Table table = tables.get(createTableName);
    			tables.delete(createTableName);//存在就删除
    		} else {
    			System.out.println("指定表不存在");
    		}
    		System.out.println("-------------------------------------------------");
    		
    		/* 创建表 */
    		if (tables.exists(createTableName)) {
    			System.out.println("指定表存在,无法创建");
    		} else {
    			System.out.println("指定表不存在,能够创建");
    			/* TableSchema表示ODPS中表的定义 */
    			TableSchema tableSchema = new TableSchema();
    			/* 加入列 */
    			Column col; // Column表示ODPS中表的列定义
    			col = new Column("id", OdpsType.STRING, "ID");
    			tableSchema.addColumn(col);
    			col = new Column("name", OdpsType.STRING, "姓名");
    			tableSchema.addColumn(col);
    			col = new Column("sex", OdpsType.BIGINT, "性别");
    			tableSchema.addColumn(col);
    			col = new Column("birthday", OdpsType.DATETIME, "生日");
    			tableSchema.addColumn(col);
    
    			/* 加入分区列 */
    			col = new Column("province ", OdpsType.STRING, "省(分区列)");
    			tableSchema.addPartitionColumn(col);
    			
    			tables.create(createTableName, tableSchema);//创建表
    			System.out.println("表【" + createTableName + "】创建成功");
    		}
    		System.out.println("-------------------------------------------------");
    
    	}


    2.分区表数据上传:

    分区表上传数据必须指定分区。所以上传数据前必须保证存在分区,不存在就创建一个,创建分区有两种方法

    /*PartitionSpec类表示一个特定分区的定义*/
    		String partitionColumn="province";//表中的分区列
    		/*第一种,直接调用带參构造函数,
    		 * 參数格式:分区定义字符串。比方: pt='1',ds='2'
    		 */
    		PartitionSpec partitionSpec1 = new PartitionSpec(partitionColumn+"='hubei'");
    		
    		/*另外一种,调用布带參数构造函数,再调用队形set方法。

    */ PartitionSpec partitionSpec2 = new PartitionSpec(); partitionSpec2.set(partitionColumn, "hubei");


    TableTunnel类中有两个创建创建上传会话方法:

    createUploadSession

    public TableTunnel.UploadSession createUploadSession(String projectName,
                                                         String tableName)
                                                  throws TunnelException
    在非分区表上创建上传会话

    Parameters:
    projectName - Project名称
    tableName - 表名,非视图
    Returns:
    TableTunnel.UploadSession
    Throws:
    TunnelException

    createUploadSession

    public TableTunnel.UploadSession createUploadSession(String projectName,
                                                         String tableName,
                                                         PartitionSpec partitionSpec)
                                                  throws TunnelException
    在分区表上创建上传会话

    注: 分区必须为最末级分区,如表有两级分区pt,ds, 则必须所有指定值, 不支持仅仅指定当中一个值

    Parameters:
    projectName - Project名
    tableName - 表名,非视图
    partitionSpec - 指定分区 PartitionSpec
    Returns:
    TableTunnel.UploadSession
    Throws:
    TunnelException


    分区表必须使用带分区的构造方法。还必须保证该分区存在,否则会报异常。

    public static void uploadDataToYun(Odps odps, String project, String tableName)
    			throws Exception {
    		TableTunnel tunnel = new TableTunnel(odps);
    		tunnel.setEndpoint(TUNNEL_URL);// 设置TunnelServer地址,没有设置TunnelServer地址的情况下自己主动选择
    		
    		/*PartitionSpec类表示一个特定分区的定义*/
    		String partitionColumn="province";//表中的分区列
    		PartitionSpec partitionSpec = new PartitionSpec();
    		partitionSpec.set(partitionColumn, "hubei");
    		
    		Table table = odps.tables().get(tableName);//获取当前表
    		boolean a= table.hasPartition(partitionSpec);//推断上述定义分区在表中是否存在
    		if(a){
    			System.out.println("分区已经存在,能够直接上传数据");
    		}else{
    			System.out.println("分区不存在,先创建分区再上传数据");
    			table.createPartition(partitionSpec);
    		}
    		
    		/*在分区表上创建上传会话*/
    		TableTunnel.UploadSession uploadSession = tunnel.createUploadSession(
    				project, tableName,partitionSpec);
    
    		RecordWriter rw = uploadSession.openRecordWriter(1);
    		Column[] columns = new Column[4];
    		columns[0] = new Column("id", OdpsType.STRING);
    		columns[1] = new Column("name", OdpsType.STRING);
    		columns[2] = new Column("sex", OdpsType.BIGINT);
    		columns[3] = new Column("birthday", OdpsType.DATETIME);
    		Record r = new ArrayRecord(columns);
    		
    		r.setString("id", "3");
    		r.setString("name", "name3");
        	r.setBigint("sex", (long) 2);
        	Date date = new Date();
        	r.setDatetime("birthday", date);
        	rw.write(r);
        	rw.close();//一定要close,不然无法commit
    
    		Long[] blocks = uploadSession.getBlockList();
    		uploadSession.commit(blocks);
    		System.out.println("数据上传成功");
    	}


    3.測试类:

    	private static final String ACCESS_ID = "***********";
    	private static final String ACCESS_KEY = "***************";
    	private static final String PROJECT_NAME = "*************";
    	private static final String TUNNEL_URL = "http://dt.odps.aliyun.com";
    	private static final String ODPS_URL = "http://service.odps.aliyun.com/api";
    	
    	public static void main(String args[]) throws Exception {
    
    		/* 先构建阿里云帐号 */
    		Account account = new AliyunAccount(ACCESS_ID, ACCESS_KEY);
    		
    		/* Odps类是ODPS SDK的入口 */
    		Odps odps = new Odps(account);
    		odps.setDefaultProject(PROJECT_NAME);// 指定默认使用的Project名称
    		odps.setEndpoint(ODPS_URL);// 设置ODPS服务的地址
    		
    		String tableName="test_table_jyl";
    		/*创建带分区的表*/
    		createTableWithPartition(odps,tableName);
    		
    		/*上传数据*/
    		uploadDataToYun(odps, PROJECT_NAME, tableName);
    	}



  • 相关阅读:
    Rex 密钥认证
    MQTT协议之moquette 安装使用
    开源MQTT中间件:moquette
    Hazelcast入门简介
    Maven和Gradle对比
    rex 上传文件并远程执行
    myeclipse配置gradle插件
    ansible 新手上路
    CentOS release 6.5 (Final) 安装ansible
    spring boot 使用profile来分区配置
  • 原文地址:https://www.cnblogs.com/slgkaifa/p/6944016.html
Copyright © 2011-2022 走看看