zoukankan      html  css  js  c++  java
  • 在MaxCompute中利用bitmap进行数据处理

    很多数据开发者使用bitmap技术对用户数据进行编码和压缩,然后利用bitmap的与/或/非的极速处理速度,实现类似用户画像标签的人群筛选、运营分析的7日活跃等分析。
    本文给出了一个使用MaxCompute MapReduce开发一个对不同日期活跃用户ID进行bitmap编码和计算的样例。供感兴趣的用户进一步了解、分析,并应用在自己的场景下。

    
    import com.aliyun.odps.OdpsException;
    import com.aliyun.odps.data.Record;
    import com.aliyun.odps.data.TableInfo;
    import com.aliyun.odps.mapred.JobClient;
    import com.aliyun.odps.mapred.MapperBase;
    import com.aliyun.odps.mapred.ReducerBase;
    import com.aliyun.odps.mapred.conf.JobConf;
    import com.aliyun.odps.mapred.utils.InputUtils;
    import com.aliyun.odps.mapred.utils.OutputUtils;
    import com.aliyun.odps.mapred.utils.SchemaUtils;
    import org.roaringbitmap.RoaringBitmap;
    import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
    
    import java.io.DataOutputStream;
    import java.io.IOException;
    import java.io.OutputStream;
    import java.nio.ByteBuffer;
    import java.util.Base64;
    import java.util.Iterator;
    
    public class bitmapDemo2
    {
    
        public static class BitMapper extends MapperBase {
    
            Record key;
            Record value;
            @Override
            public void setup(TaskContext context) throws IOException {
                key = context.createMapOutputKeyRecord();
                value = context.createMapOutputValueRecord();
            }
    
            @Override
            public void map(long recordNum, Record record, TaskContext context)
                    throws IOException
            {
                RoaringBitmap mrb=new RoaringBitmap();
                long AID=0;
                {
                    {
                        {
                            {
                                AID=record.getBigint("id");
                                mrb.add((int) AID);
                                //获取key
                                key.set(new Object[] {record.getString("active_date")});
    
                            }
                        }
                    }
                }
                ByteBuffer outbb = ByteBuffer.allocate(mrb.serializedSizeInBytes());
                mrb.serialize(new DataOutputStream(new OutputStream(){
                    ByteBuffer mBB;
                    OutputStream init(ByteBuffer mbb) {mBB=mbb; return this;}
                    public void close() {}
                    public void flush() {}
                    public void write(int b) {
                        mBB.put((byte) b);}
                    public void write(byte[] b) {mBB.put(b);}
                    public void write(byte[] b, int off, int l) {mBB.put(b,off,l);}
                }.init(outbb)));
                String serializedstring = Base64.getEncoder().encodeToString(outbb.array());
                value.set(new Object[] {serializedstring});
                context.write(key, value);
            }
        }
    
        public static class BitReducer extends ReducerBase {
            private Record result = null;
    
            public void setup(TaskContext context) throws IOException {
                result = context.createOutputRecord();
            }
    
            public void reduce(Record key, Iterator<Record> values, TaskContext context) throws IOException {
                long fcount = 0;
                RoaringBitmap rbm=new RoaringBitmap();
                while (values.hasNext())
                {
                    Record val = values.next();
                    ByteBuffer newbb = ByteBuffer.wrap(Base64.getDecoder().decode((String)val.get(0)));
                    ImmutableRoaringBitmap irb = new ImmutableRoaringBitmap(newbb);
                    RoaringBitmap p= new RoaringBitmap(irb);
                    rbm.or(p);
                }
                ByteBuffer outbb = ByteBuffer.allocate(rbm.serializedSizeInBytes());
                rbm.serialize(new DataOutputStream(new OutputStream(){
                    ByteBuffer mBB;
                    OutputStream init(ByteBuffer mbb) {mBB=mbb; return this;}
                    public void close() {}
                    public void flush() {}
                    public void write(int b) {
                        mBB.put((byte) b);}
                    public void write(byte[] b) {mBB.put(b);}
                    public void write(byte[] b, int off, int l) {mBB.put(b,off,l);}
                }.init(outbb)));
                String serializedstring = Base64.getEncoder().encodeToString(outbb.array());
                result.set(0, key.get(0));
                result.set(1, serializedstring);
                context.write(result);
            }
        }
        public static void main( String[] args ) throws OdpsException
        {
    
            System.out.println("begin.........");
            JobConf job = new JobConf();
            
            job.setMapperClass(BitMapper.class);
            job.setReducerClass(BitReducer.class);
    
            job.setMapOutputKeySchema(SchemaUtils.fromString("active_date:string"));
            job.setMapOutputValueSchema(SchemaUtils.fromString("id:string"));
    
            InputUtils.addTable(TableInfo.builder().tableName("bitmap_source").cols(new String[] {"id","active_date"}).build(), job);
    //        +------------+-------------+
    //        | id         | active_date |
    //        +------------+-------------+
    //        | 1          | 20190729    |
    //        | 2          | 20190729    |
    //        | 3          | 20190730    |
    //        | 4          | 20190801    |
    //        | 5          | 20190801    |
    //        +------------+-------------+
            OutputUtils.addTable(TableInfo.builder().tableName("bitmap_target").build(), job);
    //        +-------------+------------+
    //        | active_date | bit_map    |
    //        +-------------+------------+
    //        20190729,OjAAAAEAAAAAAAEAEAAAAAEAAgA=3D
    //        20190730,OjAAAAEAAAAAAAAAEAAAAAMA
    //        20190801,OjAAAAEAAAAAAAEAEAAAAAQABQA=3D
    
            JobClient.runJob(job);
        }
    }
    

    对Java应用打包后,上传到MaxCompute项目中,即可在MaxCompute中调用该MR作业,对输入表的数据按日期作为key进行用户id的编码,同时按照相同日期对bitmap后的用户id取OR操作(根据需要可以取AND,例如存留场景),并将处理后的数据写入目标结构表当中供后续处理使用。

    本文作者:圣远

    原文链接

    本文为云栖社区原创内容,未经允许不得转载。

  • 相关阅读:
    Leetcode Binary Tree Preorder Traversal
    Leetcode Minimum Depth of Binary Tree
    Leetcode 148. Sort List
    Leetcode 61. Rotate List
    Leetcode 86. Partition List
    Leetcode 21. Merge Two Sorted Lists
    Leetcode 143. Reorder List
    J2EE项目应用开发过程中的易错点
    JNDI初认识
    奔腾的代码
  • 原文地址:https://www.cnblogs.com/zhaowei121/p/11315477.html
Copyright © 2011-2022 走看看