zoukankan      html  css  js  c++  java
  • FLINK实例(11):CONNECTORS(10)hbase 读写 (一)

    1 工程目录

     pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.atguigu.flinktutorial</groupId>
        <artifactId>flinktutorial</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <repositories>
            <repository>
                <id>apache.snapshots</id>
                <name>Apache Development Snapshot Repository</name>
                <url>https://repository.apache.org/content/repositories/snapshots/</url>
                <releases>
                    <enabled>false</enabled>
                </releases>
                <snapshots>
                    <enabled>true</enabled>
                </snapshots>
            </repository>
        </repositories>
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <flink.version>1.10.0</flink.version>
            <scala.binary.version>2.11</scala.binary.version>
            <scala.version>2.11.12</scala.version>
        </properties>
    
        <dependencies>
            <!-- Apache Flink dependencies -->
            <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-hbase_2.11</artifactId>
                <version>1.10.0</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.apache.hadoop</groupId>
                        <artifactId>hadoop-core</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>org.apache.hadoop</groupId>
                        <artifactId>hadoop-common</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>org.apache.hadoop</groupId>
                        <artifactId>hadoop-client</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>org.apache.hadoop</groupId>
                        <artifactId>hadoop-yarn-common</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>org.apache.hadoop</groupId>
                        <artifactId>hadoop-mapreduce-client-core</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>org.apache.hadoop</groupId>
                        <artifactId>hadoop-auth</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
    
    
            <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.1.3</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <!--shaded主要是解决jar包冲突-->
                <artifactId>hbase-shaded-client</artifactId>
                <version>1.3.1</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.phoenix</groupId>
                <artifactId>phoenix-core</artifactId>
                <version>5.0.0-HBase-2.0</version>
                <exclusions>
                    <exclusion>
                        <artifactId>hbase-client</artifactId>
                        <groupId>org.apache.hbase</groupId>
                    </exclusion>
                    <exclusion>
                        <artifactId>commons-cli</artifactId>
                        <groupId>commons-cli</groupId>
                    </exclusion>
                    <exclusion>
                        <artifactId>commons-codec</artifactId>
                        <groupId>commons-codec</groupId>
                    </exclusion>
                    <exclusion>
                        <artifactId>commons-io</artifactId>
                        <groupId>commons-io</groupId>
                    </exclusion>
                    <exclusion>
                        <artifactId>jackson-annotations</artifactId>
                        <groupId>com.fasterxml.jackson.core</groupId>
                    </exclusion>
                    <exclusion>
                        <artifactId>jackson-databind</artifactId>
                        <groupId>com.fasterxml.jackson.core</groupId>
                    </exclusion>
                    <exclusion>
                        <artifactId>jsr305</artifactId>
                        <groupId>com.google.code.findbugs</groupId>
                    </exclusion>
                    <exclusion>
                        <artifactId>guava</artifactId>
                        <groupId>com.google.guava</groupId>
                    </exclusion>
                    <exclusion>
                        <artifactId>guice</artifactId>
                        <groupId>com.google.inject</groupId>
                    </exclusion>
                    <exclusion>
                        <artifactId>guice-servlet</artifactId>
                        <groupId>com.google.inject.extensions</groupId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-server</artifactId>
                <version>2.0.5</version>
                <exclusions>
                    <exclusion>
                        <artifactId>commons-codec</artifactId>
                        <groupId>commons-codec</groupId>
                    </exclusion>
                    <exclusion>
                        <artifactId>commons-io</artifactId>
                        <groupId>commons-io</groupId>
                    </exclusion>
                    <exclusion>
                        <artifactId>jackson-databind</artifactId>
                        <groupId>com.fasterxml.jackson.core</groupId>
                    </exclusion>
                    <exclusion>
                        <artifactId>jersey-server</artifactId>
                        <groupId>com.sun.jersey</groupId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>com.lmax</groupId>
                <artifactId>disruptor</artifactId>
                <version>3.3.6</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-client</artifactId>
                <version>2.0.5</version>
                <exclusions>
                    <exclusion>
                        <artifactId>commons-codec</artifactId>
                        <groupId>commons-codec</groupId>
                    </exclusion>
                    <exclusion>
                        <artifactId>jackson-databind</artifactId>
                        <groupId>com.fasterxml.jackson.core</groupId>
                    </exclusion>
                </exclusions>
            </dependency>
    
                <dependency>
                    <groupId>redis.clients</groupId>
                    <artifactId>jedis</artifactId>
                    <version>2.8.1</version>
                </dependency>
    
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.56</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-jdbc_2.11</artifactId>
                <version>1.9.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-scala_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <exclusions>
                    <exclusion>
                        <artifactId>jsr305</artifactId>
                        <groupId>com.google.code.findbugs</groupId>
                    </exclusion>
                </exclusions>
                <!--<scope>provided</scope>-->
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <exclusions>
                    <exclusion>
                        <artifactId>commons-cli</artifactId>
                        <groupId>commons-cli</groupId>
                    </exclusion>
                    <exclusion>
                        <artifactId>commons-io</artifactId>
                        <groupId>commons-io</groupId>
                    </exclusion>
                    <exclusion>
                        <artifactId>jsr305</artifactId>
                        <groupId>com.google.code.findbugs</groupId>
                    </exclusion>
                </exclusions>
                <!--<scope>provided</scope>-->
            </dependency>
    
            <!-- Scala Library, provided by Flink as well. -->
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>${scala.version}</version>
                <!--<scope>provided</scope>-->
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
                <version>1.10.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.bahir</groupId>
                <artifactId>flink-connector-redis_2.11</artifactId>
                <version>1.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
                <version>1.10.0</version>
                <exclusions>
                    <exclusion>
                        <artifactId>commons-codec</artifactId>
                        <groupId>commons-codec</groupId>
                    </exclusion>
                    <exclusion>
                        <artifactId>jackson-core</artifactId>
                        <groupId>com.fasterxml.jackson.core</groupId>
                    </exclusion>
                </exclusions>
            </dependency>
    
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.44</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-cep-scala_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <!-- Add connector dependencies here. They must be in the default scope (compile). -->
    
            <!-- Example:
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            -->
    
            <!-- Add logging framework, to produce console output when running in the IDE. -->
            <!-- These dependencies are excluded from the application JAR by default. -->
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>1.7.7</version>
                <scope>runtime</scope>
            </dependency>
            <dependency>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
                <version>1.2.17</version>
                <scope>runtime</scope>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
                <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>3.1.1</version>
                    <executions>
                        <!-- Run shade goal on package phase -->
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <artifactSet>
                                    <excludes>
                                        <exclude>org.apache.flink:force-shading</exclude>
                                        <exclude>com.google.code.findbugs:jsr305</exclude>
                                        <exclude>org.slf4j:*</exclude>
                                        <exclude>log4j:*</exclude>
                                    </excludes>
                                </artifactSet>
                                <filters>
                                    <filter>
                                        <!-- Do not copy the signatures in the META-INF folder.
                                        Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                        <artifact>*:*</artifact>
                                        <excludes>
                                            <exclude>META-INF/*.SF</exclude>
                                            <exclude>META-INF/*.DSA</exclude>
                                            <exclude>META-INF/*.RSA</exclude>
                                        </excludes>
                                    </filter>
                                </filters>
                                <transformers>
                                    <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                        <mainClass>com.atguigu.WordCountFromBatch</mainClass>
                                    </transformer>
                                </transformers>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
    
                <!-- Java Compiler -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.1</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
    
                <!-- Scala Compiler -->
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.2</version>
                    <executions>
                        <execution>
                            <goals>
                                <goal>compile</goal>
                                <goal>testCompile</goal>
                            </goals>
                        </execution>
                    </executions>
                    <configuration>
                        <args>
                            <arg>-nobootcp</arg>
                        </args>
                    </configuration>
                </plugin>
    
                <!-- Eclipse Scala Integration -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-eclipse-plugin</artifactId>
                    <version>2.8</version>
                    <configuration>
                        <downloadSources>true</downloadSources>
                        <projectnatures>
                            <projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
                            <projectnature>org.eclipse.jdt.core.javanature</projectnature>
                        </projectnatures>
                        <buildcommands>
                            <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
                        </buildcommands>
                        <classpathContainers>
                            <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
                            <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
                        </classpathContainers>
                        <excludes>
                            <exclude>org.scala-lang:scala-library</exclude>
                            <exclude>org.scala-lang:scala-compiler</exclude>
                        </excludes>
                        <sourceIncludes>
                            <sourceInclude>**/*.scala</sourceInclude>
                            <sourceInclude>**/*.java</sourceInclude>
                        </sourceIncludes>
                    </configuration>
                </plugin>
                <plugin>
                    <groupId>org.codehaus.mojo</groupId>
                    <artifactId>build-helper-maven-plugin</artifactId>
                    <version>1.7</version>
                    <executions>
                        <!-- Add src/main/scala to eclipse build path -->
                        <execution>
                            <id>add-source</id>
                            <phase>generate-sources</phase>
                            <goals>
                                <goal>add-source</goal>
                            </goals>
                            <configuration>
                                <sources>
                                    <source>src/main/scala</source>
                                </sources>
                            </configuration>
                        </execution>
                        <!-- Add src/test/scala to eclipse build path -->
                        <execution>
                            <id>add-test-source</id>
                            <phase>generate-test-sources</phase>
                            <goals>
                                <goal>add-test-source</goal>
                            </goals>
                            <configuration>
                                <sources>
                                    <source>src/test/scala</source>
                                </sources>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    
    
    
    
    </project>
    View Code

    2 flink 读取hbase

    HbaseUtil(scala读写hbase)

    package com.atguigu.flink.utils
    
    import java.io.IOException
    
    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.hbase._
    import org.apache.hadoop.hbase.client._
    import org.apache.hadoop.hbase.util.Bytes
    
    import scala.collection.JavaConverters._
    
    
    object HbaseUtil {
      def main(args: Array[String]): Unit = {
        val conf = HbaseUtil.getHbaseConf()
        val hbconn = HbaseUtil.getHbaseConn(conf)
        val table:Table = hbconn.getTable(TableName.valueOf("student"))
        // 查询rowkey为1001的行数据
        HbaseUtil.getSingleRow(table,"1001")
        // 对table进行全表扫描
        //HbaseUtil.queryAll(table)
    
        hbconn.close()
    
      }
    
    
      // hbase配置
      def getHbaseConf(): Configuration = {
        val configuration = HBaseConfiguration.create()
        configuration.set("hbase.zookeeper.quorum", "192.168.1.122:2181,192.168.1.133:2181,192.168.1.144:2181")
        configuration
      }
    
    
      //hbase创建连接
      def getHbaseConn(conf:Configuration): Connection={
        val conn = ConnectionFactory.createConnection(conf)
        conn
      }
    
      //创建一个hbase表
      def createTable(conn:Connection,tableName: String, columnFamilys: Array[String]) = {
        //创建 HBaseAdmin 对象
        val adminTable:Admin = conn.getAdmin
        //操作的表名
        val tName = TableName.valueOf(tableName)
        //当表不存在的时候创建Hbase表
        if (!adminTable.tableExists(tName)) {
          //创建Hbase表模式
          val descriptor = new HTableDescriptor(tName)
          //创建列簇i
          for (columnFamily <- columnFamilys) {
            descriptor.addFamily(new HColumnDescriptor(columnFamily))
          }
          //创建表
          adminTable.createTable(descriptor)
          println("create successful!!")
        }else{
          print("table already existed")
        }
        adminTable.close()
      }
    
      // 删除一个表
      def dropTable(conn:Connection,tableName: String) = {
        //创建 HBaseAdmin 对象
        val adminTable:Admin = conn.getAdmin
        //操作的表名
        val tName = TableName.valueOf(tableName)
        //当表不存在的时候创建Hbase表
        if (adminTable.tableExists(tName)) {
          // 停用表
          adminTable.disableTable(tName)
          // 删除表
          adminTable.deleteTable(tName);
        }else{
          print("table does not exist")
        }
        adminTable.close()
      }
    
      //获取表
      def getHbaseTable(conn:Connection,tableName: String): Table={
        //创建 HBaseAdmin 对象
        val adminTable = conn.getAdmin
        //操作的表名
        val tName = TableName.valueOf(tableName)
        //当表不存在的时候创建Hbase表
        if (adminTable.tableExists(tName)) {
             val table = conn.getTable(tName)
             adminTable.close()
             return table
        }else {
          print("table does not exist")
          adminTable.close()
          return null
        }
    
      }
    
    
      //表添加数据
      def addRowData(table: Table, rowKey: String, columnFamily: String, quorm: String, value: String): Unit ={
        val rowPut: Put = new Put(Bytes.toBytes(rowKey))
        if (value == null) {
          rowPut.addColumn(columnFamily.getBytes, quorm.getBytes, "".getBytes())
        } else {
          rowPut.addColumn(columnFamily.getBytes, quorm.getBytes, value.getBytes)
        }
        table.put(rowPut)
      }
    
    
    
      //查询全部
      def queryAll(table: Table): ResultScanner = {
          // 包含起始行键,不包含结束行键,但是如果真的想查询出末尾的那个行键,那么,可以在末尾行键上拼接一个不可见的字节(00)
          // val scan = new Scan("10".getBytes(), "10000".getBytes());
          //val scan = new Scan("10".getBytes(), "1000001".getBytes())
         //val results: ResultScanner = table.getScanner(scan)
    
    
          val s = new Scan()
          val results: ResultScanner = table.getScanner(s)
          val iterator = results.iterator()
          while (iterator.hasNext){
            val result = iterator.next()
            val rowKey = Bytes.toString(result.getRow)
            print("rowkey",rowKey)
            val sb: StringBuffer = new StringBuffer()
            // 一行里面的所有cell将会被遍历
            for (cell:Cell <- result.listCells().asScala){
              // 列名
              val columnKey = Bytes.toString(cell.getQualifierArray, cell.getQualifierOffset, cell.getQualifierLength)
              // 列值
              val value = Bytes.toString(cell.getValueArray, cell.getValueOffset, cell.getValueLength)
              sb.append(value).append("_")
            }
        }
        return results
      }
    
      //查询某一行数据
      def getSingleRow(table: Table, rowKey: String): Result ={
        // Get对象 指定行健
        val get: Get = new Get(Bytes.toBytes(rowKey))
    
        // 行健为rowKey的全部数据
        val result: Result = table.get(get)
    
        // 从结果中取用户指定的某个列的value
        //val value =result.getValue("info".getBytes(), "age".getBytes())
        //print("single value",new String(value))
    
        for (rowKv <- result.rawCells()) {
          // 列族
          println("Famiily:" + new String(rowKv.getFamilyArray, rowKv.getFamilyOffset, rowKv.getFamilyLength, "UTF-8"))
          // 列名
          println("Qualifier:" + new String(rowKv.getQualifierArray, rowKv.getQualifierOffset, rowKv.getQualifierLength, "UTF-8"))
          // 时间戳
          println("TimeStamp:" + rowKv.getTimestamp)
          // rowkey
          println("rowkey:" + new String(rowKv.getRowArray, rowKv.getRowOffset, rowKv.getRowLength, "UTF-8"))
          // 列值
          println("Value:" + new String(rowKv.getValueArray, rowKv.getValueOffset, rowKv.getValueLength, "UTF-8"))
        }
        return result
      }
    
    }
    View Code

    HbaseSource

    package com.atguigu.flink.source
    
    import com.atguigu.flink.bean.SensorReading
    import com.atguigu.flink.utils.HbaseUtil
    import org.apache.flink.configuration
    import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.hbase._
    import org.apache.hadoop.hbase.client._
    import org.apache.hadoop.hbase.util.Bytes
    import scala.collection.JavaConverters._
    
    class HbaseSource extends RichSourceFunction[SensorReading]{
    
      private var conn: Connection = null
      private var table: Table = null
      private var scan: Scan = null
    
      /**
       * 建立HBase连接
       * @param parameters
       */
      override def open(parameters: configuration.Configuration): Unit = {
        val conf = HbaseUtil.getHbaseConf()
        conn = HbaseUtil.getHbaseConn(conf)
        table = conn.getTable(TableName.valueOf("sensor"))
    
      }
    
      /**
       * run方法来自java的接口文件SourceFunction,使用IDEA工具Ctrl + o 无法便捷获取到该方法,直接override会提示
       * @param sourceContext
       */
    
      override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = {
        scan = new Scan()
        val results: ResultScanner = table.getScanner(scan)
        val iterator = results.iterator()
        while (iterator.hasNext){
          var result = iterator.next()
          // 获取rowkey
          var rowKey = Bytes.toString(result.getRow)
          // 通过rowkey找到行数据
          var get: Get = new Get(Bytes.toBytes(rowKey))
          var element: Result = table.get(get)
          // 通过列族和列名找到对应值
          var id:String = new String(element.getValue("info".getBytes(), "id".getBytes()))
          var curTime= new String(element.getValue("info".getBytes(), "timestamp".getBytes())).toLong
          var timepreture= new String(element.getValue("info".getBytes(), "timepreture".getBytes())).toDouble
          // 发送数据
          sourceContext.collect(SensorReading(id,curTime,timepreture))
    
        }
    
      }
    
      /**
       * 必须重写
       */
      override def cancel(): Unit = {
    
      }
    
      /**
       * 关闭hbase的连接,关闭table表
       */
      override def close(): Unit = {
        try {
          if (table != null) {
            table.close()
          }
          if (conn != null) {
            conn.close()
          }
        } catch {
          case e:Exception => println(e.getMessage)
        }
      }
    }

    主程序 HbaseSourceSinkApp

    package com.atguigu.flink.app
    
    import com.atguigu.flink.bean.SensorReading
    import com.atguigu.flink.sink.HbaseSink
    import org.apache.flink.streaming.api.scala
    import org.apache.flink.streaming.api.scala._
    import com.atguigu.flink.source.HbaseSource
    
    object HbaseSourceSinkApp {
      def main(args: Array[String]): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
        //调用addSource以此来作为数据输入端
        val stream: scala.DataStream[SensorReading] = env.addSource(new HbaseSource)
    
        //调用addSink以此来作为数据输出端
        stream.addSink(new HbaseSink)
    
        // 打印流
        stream.print()
    
        // 执行主程序
        env.execute()
      }
    }

    3 flink 写入 hbase

    HbaseSink

    package com.atguigu.flink.sink
    
    import com.atguigu.flink.bean.SensorReading
    import com.atguigu.flink.utils.HbaseUtil
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.functions.sink._
    import org.apache.hadoop.hbase._
    import org.apache.hadoop.hbase.client._
    import org.apache.hadoop.hbase.util.Bytes
    
    /**
     * @Author: Yang JianQiu
     * @Date: 2019/3/1 1:34
     *
     *        写入HBase
     *        第一种:继承RichSinkFunction重写父类方法
     *
     *        注意:由于flink是一条一条的处理数据,所以我们在插入hbase的时候不能来一条flush下,
     *        不然会给hbase造成很大的压力,而且会产生很多线程导致集群崩溃,所以线上任务必须控制flush的频率。
     *
     *        解决方案:我们可以在open方法中定义一个变量,然后在写入hbase时比如500条flush一次,或者加入一个list,判断list的大小满足某个阀值flush一下
     */
    
    
    class HbaseSink extends RichSinkFunction[SensorReading]{
      private var conn: Connection = null
      private var table: Table = null
      private var scan: Scan = null
      var mutator: BufferedMutator = null
      var count = 0
      var rowKey_test = 2000
    
      /**
       * 建立HBase连接
       *
       * @param parameters
       */
      override def open(parameters: Configuration): Unit ={
        val conf = HbaseUtil.getHbaseConf()
        conn = HbaseUtil.getHbaseConn(conf)
        val tableName: TableName = TableName.valueOf("psensor")
        val params: BufferedMutatorParams = new BufferedMutatorParams(tableName)
        //设置缓存1m,当达到1m时数据会自动刷到hbase
        params.writeBufferSize(1024 * 1024) //设置缓存的大小
        mutator = conn.getBufferedMutator(params)
        count = 0
    
      }
    
      /**
       * 处理获取的hbase数据
       *
       */
    
      override def invoke(value: SensorReading): Unit = {
        val cf1 = "info"
        var id = value.id
        var curtime = value.timestamp.toString
        var timperature = value.timepreture.toString
    
        val put: Put = new Put(Bytes.toBytes(rowKey_test.toString))
        put.addColumn(Bytes.toBytes(cf1), Bytes.toBytes("id"), Bytes.toBytes(id))
        put.addColumn(Bytes.toBytes(cf1), Bytes.toBytes("timestamp"), Bytes.toBytes(curtime))
        put.addColumn(Bytes.toBytes(cf1), Bytes.toBytes("timeperature"), Bytes.toBytes(timperature))
        mutator.mutate(put)
        //每满2条刷新一下数据
        if (count >= 2){
          mutator.flush()
          count = 0
        }
        count = count + 1
        rowKey_test = rowKey_test + 1
      }
    
      /**
       * 关闭
       */
      override def close(): Unit = {
        if (conn != null) conn.close()
      }
    
    
    }

    主程序 HbaseSourceSinkApp

    package com.atguigu.flink.app
    
    import com.atguigu.flink.bean.SensorReading
    import com.atguigu.flink.sink.HbaseSink
    import org.apache.flink.streaming.api.scala
    import org.apache.flink.streaming.api.scala._
    import com.atguigu.flink.source.HbaseSource
    
    object HbaseSourceSinkApp {
      def main(args: Array[String]): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
        //调用addSource以此来作为数据输入端
        val stream: scala.DataStream[SensorReading] = env.addSource(new HbaseSource)
    
        //调用addSink以此来作为数据输出端
        stream.addSink(new HbaseSink)
    
        // 打印流
        stream.print()
    
        // 执行主程序
        env.execute()
      }
    }

    注意:端到端的一致性

      像Cassandra、HBase和Redis这样的KV数据库一般经常用来作为Sink,用以实现端到端的Exactly-Once。需要注意的是,并不是说一个KV数据库就百分百支持幂等写。幂等写对KV对有要求,那就是Key-Value必须是可确定性(Deterministic)计算的。假如我们设计的Key是:name + curTimestamp,每次执行数据重发时,生成的Key都不相同,会产生多次结果,整个操作不是幂等的。因此,为了追求端到端的Exactly-Once,我们设计业务逻辑时要尽量使用确定性的计算逻辑和数据模型。 

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/13680660.html

  • 相关阅读:
    JavaBean 与 EJB 的区别
    MFC选项卡的实现
    MFC的图片按钮
    windows 下使用 MinGW + msys 编译 ffmpeg
    在windows使用vs2008编译live555
    C89 和 C99 标准比较
    11.求二元查找树的镜像[MirrorOfBST]
    10.排序数组中和为给定值的两个数字[Find2NumbersWithGivenSum]
    9.链表中倒数第k个结点[FindReverseKthLinkedListNode]
    8.另类方法求1+2+...+n[AnotherMethodOfCalculateSumN]
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/13680660.html
Copyright © 2011-2022 走看看