zoukankan      html  css  js  c++  java
  • Faust——python分布式流式处理框架

    摘要

    Faust是用python开发的一个分布式流式处理框架。在一个机器学习应用中,机器学习算法可能被用于数据流实时处理的各个环节,而不是仅仅在推理阶段,算法也不仅仅局限于常见的分类回归算法,而是会根据业务需要执行一个十分差异化的任务, 例如:在我们的时序异常检测应用中, 前处理阶段的变点检测算法。这就要求流处理框架除了具备进行常规的转换聚合操作之外,可以支持更加强大的任意自定义逻辑和更加复杂的自定义状态,能够更好地与原生的python算法代码紧密结合在一起。在主流的flink, spark streaming不能满足我们的个性化需求时, Faust为我们提供了一个选择.

    本文将对faust框架的主要功能进行概要描述。

    参考连接

    https://faust.readthedocs.io/en/latest/

    https://github.com/robinhood/faust

    基本使用

    app

    faust库的一个实例,提供了Faust的核心API,通过app可定义kafka topic、流处理器等。

    >>> app = faust.App(
    ... 'myid',
    ... broker='kafka://kafka.example.com',
    ... store='rocksdb://',
    ... )

    创建topic

    faust以kafka作为数据传输和自组织管理的媒介,可以直接在faust应用中定义kafka主题。

    topic = app.topic('name_of_topic')
    @app.agent(topic)
    async def process(stream):
      async for event in stream:
        ...

    创建table

    table是Faust中的分布式键值对数据表,可用于保存流处理过程中的中间状态。

    transfer_counts = app.Table(
    'transfer_counts',
    default=int,
    key_type=str,
    value_type=int,
    )

    创建agent

    agent是数据处理流中的一个基本处理单元,通过从kafka中摄取指定topic中的数据,并进行相应的处理。

    import faust
    app = faust.App('stream-example')
    @app.agent()
    async def myagent(stream):
      """Example agent."""
      async for value in stream:
        print(f'MYAGENT RECEIVED -- {value!r}')
        yield value
    if __name__ == '__main__':
      app.main()

    agent——分布式自组织流处理器

    import faust

    class Add(faust.Record):
      a: int
      b: int

    app = faust.App('agent-example')

    topic = app.topic('adding', value_type=Add)
    @app.agent(topic)
    async def adding(stream):
      async for value in stream:
        yield value.a + value.b

    命令行中执行:faust -A examples.agent worker -l info,即可运行这个app。

    sinks

    可定义事件处理后的额外操作,比如推送告警等。一个sink可以是一个callable、异步callable、另外一个主题、另外一个agent等等。

    回调函数

    def mysink(value):
      print(f'AGENT YIELD: {value!r}')
    @app.agent(sink=[mysink])
    async def myagent(stream):
      async for value in stream:
        yield process_value(value)

    异步回调

    async def mysink(value):
      print(f'AGENT YIELD: {value!r}')
      await asyncio.sleep(1)
    @app.agent(sink=[mysink])
    async def myagent(stream):
      ...

    另外一个topic

    agent_log_topic = app.topic('agent_log')
    @app.agent(sink=[agent_log_topic])
    async def myagent(stream):
      ...

    另外一个agent

    @app.agent()
    async def agent_b(stream):
      async for event in stream:
        print(f'AGENT B RECEIVED: {event!r}')
    @app.agent(sink=[agent_b])
    async def agent_a(stream):
      async for event in stream:
        print(f'AGENT A RECEIVED: {event!r}')

    streams

    basics

    stream是一个无限的异步可迭代对象,从topic中消费数据。

    stream的常规使用为:

    @app.agent(my_topic)
    async def process(stream):
       async for value in stream:
        ...

    也可以自己创建一个stream:

    stream = app.stream(my_topic) # or: my_topic.stream()
    async for value in stream:
      ...

    但是要处理自定义数据流,需要首先定义一个任务,在app启动时执行这个任务。一个完整的例子:

    import faust
    class Withdrawal(faust.Record):
      account: str
      amount: float
    app = faust.App('example-app')
    withdrawals_topic = app.topic('withdrawals', value_type=Withdrawal)
    @app.task
    async def mytask():
      async for w in withdrawals_topic.stream():
        print(w.amount)
    if __name__ == '__main__':
      app.main()

    processors

    一个stream可以有任意多个处理器回调。

    def add_default_language(value: MyModel) -> MyModel:
      if not value.language:
        value.language = 'US'
      return value
    async def add_client_info(value: MyModel) -> MyModel:
      value.client = await get_http_client_info(value.account_id)
      return value
    s = app.stream(my_topic,
      processors=[add_default_language, add_client_info])

    kafka主题

    每个faustworker会启动一个kafka consumer消费数据。如果两个agent消费了相同的主题,那么两个agent会分别受到相同的消息,每次消息被回执,那么引用级数-1,当引用计数为0时,consumer就可以提交偏移量了。

    操作

    groupby

    对流进行重新分区。新的流会使用一个新的中间主题,并以相应的字段作为键,这个新的流是agent最终迭代的流。

    import faust
    class Order(faust.Record):
      account_id: str
      product_id: str
      amount: float
      price: float
    app = faust.App('group-by-example')
    orders_topic = app.topic('orders', value_type=Order)
    @app.agent(orders_topic)
    async def process(orders):
      async for order in orders.group_by(Order.account_id):
        ...

    流分区的关键字不仅可以是数据中的字段,也可以是一个callable。

    def get_order_account_id(order):
      return json.loads(order)['account_id']
    @app.agent(app.topic('order'))
    async def process(orders):
      async for order in orders.group_by(get_order_account_id):
        ...

    take

    缓存数据。

    @app.agent()
    async def process(stream):
    async for values in stream.take(100):
      assert len(values) == 100
      print(f'RECEIVED 100 VALUES: {values}')

    through

    将流推送到一个新的topic,并迭代新的topic里的数据.

    source_topic = app.topic('source-topic')
    destination_topic = app.topic('destination-topic')
    @app.agent()
    async def process(stream):
      async for value in stream.through(destination_topic):
        # we are now iterating over stream(destination_topic)
        print(value)

    filter

    过滤操作.

    @app.agent()
    async def process(stream):
      async for value in stream.filter(lambda: v > 1000).group_by(...):
        ...

    Channels & Topics--数据源

    basics

    agents迭代streams, streams迭代channels.

    Models, Serialization, and Codecs

    model

    model用来描述数据结构, 例如:

    class Point(Record, serializer='json'):
      x: int
      y: int

    匿名agent

    匿名agent不显示地使用一个topic,而是自己创建topic,在定义好消息类型后,只需直接向该agent发送相应地消息即可.

    @app.agent(key_type=Point, value_type=Point)
    async def my_agent(events):
      async for event in events:
        print(event)

    await my_agent.send(key=Point(x=10, y=20), value=Point(x=30, y=10))

    schema

    定义键值的类型和序列化反序列化器

    schema = faust.Schema(
    key_type=Point,
    value_type=Point,
    key_serializer='json',
    value_serializer='json',
    )
    topic = app.topic('mytopic', schema=schema)

    collections

    model中的一个field可以是一个其他类型数据的列表.

    from typing import List
    import faust
    class User(faust.Record):
      accounts: List[Account]

    支持的其他类型为: set, mapping, tuple.

    tables和windowing

    tables

    table是Faust中的分布式内存数据表,使用kafka的changelog topic作为后端进行持久化和容错.

    table = app.Table('totals', default=int)

    table的修改只能在流操作只能进行, 否则会报错.

    Co-partitioning Tables and Streams

    table的任何键的数据只能存在于一台主机上.有状态的流处理要求table和stream协同分区,即同一台主机处理的流和table必须共享相同的分区.因此在操作table的流迭代中需要对流重新分区.

    withdrawals_topic = app.topic('withdrawals', value_type=Withdrawal)
    country_to_total = app.Table(
    'country_to_total', default=int).tumbling(10.0, expires=10.0)
    withdrawals_stream = app.topic('withdrawals', value_type=Withdrawal).stream()
    withdrawals_by_country = withdrawals_stream.group_by(Withdrawal.country)
    @app.agent
    async def process_withdrawal(withdrawals):
      async for withdrawal in withdrawals.group_by(Withdrawal.country):
        country_to_total[withdrawal.country] += withdrawal.amount

    如果要进行的计算分别以两个不太的字段分组,则应使用两个不同的agent, 分别groupby.

    withdrawals_topic = app.topic('withdrawals', value_type=Withdrawal)
    user_to_total = app.Table('user_to_total', default=int)
    country_to_total = app.Table(
    'country_to_total', default=int).tumbling(10.0, expires=10.0)
    @app.agent(withdrawals_topic)
    async def find_large_user_withdrawals(withdrawals):
      async for withdrawal in withdrawals:
        user_to_total[withdrawal.user] += withdrawal.amount
    @app.agent(withdrawals_topic)
    async def find_large_country_withdrawals(withdrawals):
      async for withdrawal in withdrawals.group_by(Withdrawal.country):
        country_to_total[withdrawal.country] += withdrawal.amount

    windowing

    window的定义

    from datetime import timedelta
    views = app.Table('views', default=int).tumbling(
    timedelta(minutes=1),
    expires=timedelta(hours=1),
    )

    可以定义window使用的时间,包括系统时间relativ_to_now(), 当前流的处理时间relative_to_current(),相对数据中的时间字段relative_to_field().

    views = app.Table('views', default=int).tumbling(...).relative_to_stream()

    事件乱序

    windowed table可以正确处理乱序, 只要迟到的数据在table的过期时间内.

  • 相关阅读:
    js记录
    快速最好响应式布局(CSS3)
    CSS3终极动画制作属性animation
    虚拟机中试用windows 8(视频)
    平板电脑上完美体验Windows 8 (视频)
    面对电磁泄漏您的电脑还安全吗?--计算机设备信息泄漏揭秘
    基于Linux平台Softimage XSI 演示
    为您的Office文档加把锁-ADRMS的安装
    VNC Server模拟攻击实战
    暴力破解FTP服务器技术探讨与防范措施
  • 原文地址:https://www.cnblogs.com/zcsh/p/13841561.html
Copyright © 2011-2022 走看看