zoukankan      html  css  js  c++  java
  • Knative 实战:基于 Knative Serverless 技术实现天气服务-下篇

    上一期我们介绍了如何基于 Knative Serverless 技术实现天气服务-上篇,首先我们先来回顾一下上篇介绍的内容:

    • 通过高德天气 API 接口,每隔 3 个小时定时发送定时事件,将国内城市未来 3 天的天气信息,存储更新到表格存储
    • 提供 RESTful API 查询天气信息

    接下来我们介绍如何通过表格存储提供的通道服务,实现 Knative 对接表格存储事件源,订阅并通过钉钉发送天气提醒通知。

    整体架构

    回顾一下整体架构:

    o1

    • 通过 CronJob 事件源,每隔 3 个小时定时发送定时事件,将国内城市未来 3 天的天气信息,存储更新到表格存储
    • 提供 RESTful API 查询天气信息
    • 通过表格存储提供的通道服务,实现 TableStore 事件源
    • 通过 Borker/Trigger 事件驱动模型,订阅天气信息
    • 根据订阅收到的天气信息进行钉钉消息通知。如明天下雨,提示带伞等

    基于 Knative 实现天气服务-下篇

    首先我们介绍一下表格存储提供的通道服务。通道服务(Tunnel Service)是基于表格存储数据接口之上的全增量一体化服务。通道服务为您提供了增量、全量、增量加全量三种类型的分布式数据实时消费通道。通过为数据表建立数据通道,您可以简单地实现对表中历史存量和新增数据的消费处理。通过数据通道可以进行数据同步、事件驱动、流式数据处理以及数据搬迁。这里事件驱动正好契合我们的场景。

    先看一下处理流程图:

    o2

    • 定义 TableStore 事件源,用于接收通道服务数据
    • 通过 Borker/Trigger 事件驱动模型,订阅天气信息
    • 订阅接收到的天气信息发送给天气提醒服务,进行钉钉消息通知

    下面我们来详细介绍一下。

    自定义 TableStore 事件源

    在 Knative 中自定义事件源其实很容易,可以参考官方提供的自定义事件源的实例:https://github.com/knative/docs/tree/master/docs/eventing/samples/writing-a-source。

    我们这里定义数据源为 AliTablestoreSource。代码实现主要分为两部分:

    1. 资源控制器-Controller:接收 AliTablestoreSource 资源,在通道服务中创建 Tunnel
    2. 事件接收器-Receiver:通过 Tunnel Client 监听事件,并将接收到的事件发送到目标服务( Broker)

    关于自定义 TableStore 事件源实现参见 GitHub 源代码:https://github.com/knative-sample/tablestore-source

    部署自定义事件源服务如下:

    https://github.com/knative-sample/tablestore-source/tree/master/config 中可以获取事件源部署文件,执行下面的操作:

    kubectl apply -f 200-serviceaccount.yaml -f 201-clusterrole.yaml -f 202-clusterrolebinding.yaml -f 300-alitablestoresource.yaml -f 400-controller-service.yaml -f 500-controller.yaml -f 600-istioegress.yaml
    

    部署完成之后,我们可以看到资源控制器已经开始运行:

    [root@iZ8vb5wa3qv1gwrgb3lxqpZ config]# kubectl -n knative-sources get pods
    NAME                                 READY   STATUS    RESTARTS   AGE
    alitablestore-controller-manager-0   1/1     Running   0          4h12m
    

    创建事件源

    由于我们是通过 Knative Eventing 中 Broker/Trigger 事件驱动模型对天气事件进行处理。首先我们创建用于数据接收的 Broker 服务。

    创建 Broker

    apiVersion: eventing.knative.dev/v1alpha1
    kind: Broker
    metadata:
      name: weather
    spec:
      channelTemplateSpec:
        apiVersion: messaging.knative.dev/v1alpha1
        kind: InMemoryChannel
    

    创建事件源实例

    这里需要说明一下,创建事件源实例其实就是在表格存储中创建通道服务,那么就需要配置访问通道服务的地址、accessKeyId 和 accessKeySecret,这里参照格式:{ "url":"https://xxx.cn-beijing.ots.aliyuncs.com/", "accessKeyId":"xxxx","accessKeySecret":"xxxx" } 设置并进行 base64 编码。将结果设置到如下 Secret 配置文件 alitablestore 属性中:

    apiVersion: v1
    kind: Secret
    metadata:
      name: alitablestore-secret
    type: Opaque
    data:
      # { "url":"https://xxx.cn-beijing.ots.aliyuncs.com/", "accessKeyId":"xxxx","accessKeySecret":"xxxx" }
      alitablestore: "<base64>"
    

    创建 RBAC 权限:

    apiVersion: rbac.authorization.k8s.io/v1
    kind: ClusterRoleBinding
    metadata:
      name: eventing-sources-alitablestore
    subjects:
    - kind: ServiceAccount
      name: alitablestore-sa
      namespace: default
    roleRef:
      apiGroup: rbac.authorization.k8s.io
      kind: ClusterRole
      name: eventing-sources-alitablestore-controller
    
    ---
    apiVersion: v1
    kind: ServiceAccount
    metadata:
      name: alitablestore-sa
    secrets:
    - name: alitablestore-secret
    

    创建 AliTablestoreSource 实例,这里我们设置接收事件的 sink 为上面创建的 Broker 服务。

    ---
    apiVersion: sources.eventing.knative.dev/v1alpha1
    kind: AliTablestoreSource
    metadata:
      labels:
        controller-tools.k8s.io: "1.0"
      name: alitablestoresource
    spec:
      # Add fields here
      serviceAccountName: alitablestore-sa
      accessToken:
        secretKeyRef:
          name: alitablestore-secret
          key: alitablestore
      tableName: weather
      instance: knative-weather
      sink:
        apiVersion: eventing.knative.dev/v1alpha1
        kind: Broker
        name: weather
    

    创建完成之后,我们可以看到运行中的事件源:

    [root@iZ8vb5wa3qv1gwrgb3lxqpZ config]# kubectl get pods
    NAME                                                              READY   STATUS      RESTARTS   AGE
    tablestore-alitablestoresource-9sjqx-656c5bf84b-pbhvw             1/1     Running     0          4h9m
    

    订阅事件和通知提醒

    创建天气提醒服务

    如何进行钉钉通知呢,我们可以创建一个钉钉的群组(可以把家里人组成一个钉钉群,天气异常时,给家人一个提醒),添加群机器人:

    o3

    获取 webhook :

    o4

    这里我们假设北京 (110000),日期:2019-10-13, 如果天气有雨,就通过钉钉发送通知提醒,则服务配置如下:

    apiVersion: serving.knative.dev/v1beta1
    kind: Service
    metadata:
      name: day-weather
    spec:
      template:
        spec:
          containers:
          - args:
            - --dingtalkurl=https://oapi.dingtalk.com/robot/send?access_token=xxxxxx
            - --adcode=110000
            - --date=2019-10-13
            - --dayweather=雨
            image: registry.cn-hangzhou.aliyuncs.com/knative-sample/dingtalk-weather-service:1.2
    

    关于钉钉提醒服务具体实现参见 GitHub 源代码:https://github.com/knative-sample/dingtalk-weather-service

    创建订阅

    最后我们创建 Trigger订阅天气事件,并且触发天气提醒服务:

    apiVersion: eventing.knative.dev/v1alpha1
    kind: Trigger
    metadata:
      name: weather-trigger
    spec:
      broker: weather
      subscriber:
        ref:
          apiVersion: serving.knative.dev/v1alpha1
          kind: Service
          name: day-weather
    

    订阅之后,如果北京 (110000),日期:2019-10-13, 天气有雨,会收到如下的钉钉提醒:

    o5

    这里其实还有待完善的地方:

    • 是否可以基于城市进行订阅(只订阅目标城市)?
    • 是否可以指定时间发送消息提醒(当天晚上 8 点准时推送第 2 天的天气提醒信息)?

    有兴趣的可以继续完善当前的天气服务功能。

    小结

    本文介绍了如何在 Knative 中自定义事件源,并通过事件驱动接收天气变化信息,订阅并通过钉钉推送通知提醒。这样基于 Knative Serverless 技术实现天气服务整体实现就介绍完了。有兴趣的同学可以针对上面提到的不足继续研究。还是那句话,做好天气服务不容易,但还好我有 Knative。

    欢迎加入 Knative 交流群

    o6

    “ 阿里巴巴云原生微信公众号(ID:Alicloudnative)关注微服务、Serverless、容器、Service Mesh等技术领域、聚焦云原生流行技术趋势、云原生大规模的落地实践,做最懂云原生开发者的技术公众号。”

  • 相关阅读:
    Azkaban的使用
    Azkaban安装
    Kafka 启动失败,报错Corrupt index found以及org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'version': java.nio.BufferUnderflowException
    Kafka 消费者设置分区策略及原理
    Kafka利用Java API自定义生产者,消费者,拦截器,分区器等组件
    zookeeper群起总是有那么几个节点起不来的问题解决
    flume 启动agent报No appenders could be found for logger的解决
    Flume 的监控方式
    Flume 自定义 组件
    Source r1 has been removed due to an error during configuration java.lang.IllegalArgumentException: Required parameter bind must exist and may not be null & 端口无法连接
  • 原文地址:https://www.cnblogs.com/alisystemsoftware/p/11661185.html
Copyright © 2011-2022 走看看