实现的需求是从 RabbitMQ 读取 JSON 格式的消息,处理结果输出到 MySQL。
主要参考了 这篇博客 和 Apache Flink 中文文档 。
编程语言: Scala 2.12.10
构建工具: sbt 1.3.0
IDE:IntelliJ IDEA Community 2019.1
开发环境的搭建可以参考 这篇博客 => 通过 IntelliJ IDEA 打包 Flink Scala 项目 。
1. 通过 IntelliJ IDEA 创建 Scala -> sbt 项目
sbt 选择 1.3.0
Scala 选择 2.12.10
2. 在 build.sbt 中添加引用
主要使用如下几个包:
- genson-scala:Json 序列化/反序列化
- druid:阿里的数据库连接池
- mysql-connector-java:MySQL 的 Connector
- flink-connector-rabbitmq:RabbitMQ 的 Connector
build.sbt :
sbtPlugin := true
name := "octopus-behavior-analysis"
version := "0.1"
//scalaVersion := "2.12.10"
val flinkVersion = "1.9.0"
val flinkDependencies = Seq(
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
"com.owlike" %% "genson-scala" % "1.6" % "compile",
"com.alibaba" % "druid" % "1.1.20" % "compile",
"mysql" % "mysql-connector-java" % "8.0.17" % "compile",
"org.apache.flink" %% "flink-connector-rabbitmq" % flinkVersion % "compile")
lazy val root = (project in file(".")).
settings(
libraryDependencies ++= flinkDependencies
)
关于 scalaVersion 的设置为什么要注释掉详见 这篇博客。
3. RabbitMQStreamWordCount.scala
使用 RMQSource
从 RabbitMQ 队列中读取消息。
由于消息格式为 Json,这里使用的 AbstractDeserializationSchema
自定义了反序列化处理(如果是简单的字符串可使用 SimpleStringSchema
)。
反序列化处理使用了 genson 的 fromJson
方法。
之后就可以使用 DataStream
的 API 做各种转换了,这里仅统计了消息中 id 出现的次数,比较简单,仅作参考。
package octopus.ba
import com.owlike.genson.defaultGenson._
import octopus.ba.config.RabbitMQConfig
import org.apache.flink.api.common.serialization.AbstractDeserializationSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig
object RabbitMQStreamWordCount {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val connectionConfig = new RMQConnectionConfig.Builder()
.setHost(RabbitMQConfig.host) // 例:192.168.0.1
.setPort(RabbitMQConfig.port) // 一般使用默认端口 5672
.setUserName(RabbitMQConfig.userName)
.setPassword(RabbitMQConfig.password)
.setVirtualHost(RabbitMQConfig.virtualHost) // 如果没有配置的话,则设置为默认的虚拟Host "/"
.build()
val stream = env
.addSource(new RMQSource[RabbitMQMessageModel](
connectionConfig,
"queue_name",
true,
new AbstractDeserializationSchema[RabbitMQMessageModel]() {
override def deserialize(bytes: Array[Byte]): RabbitMQMessageModel = fromJson[RabbitMQMessageModel](new String(bytes))
} ))
.setParallelism(1)
stream.addSink(new SinkVisitLineLog)
val counts = stream