zoukankan      html  css  js  c++  java
  • PO Box简介

    使用Erlang写程序的时候,经常会碰到一种情况:因为Erlang进程的mailbox是没有大小限制的,所以它会一直接受消息,直到Erlang节点内存溢出。在大多数情况下,我们可以通过限制消息生产者的频率来解决这个问题,而且也应该实现这一点。但是,有时候不太可能完全限制发给一个进程的所有消息,这时候,开发人员就需要通过丢弃消息来减轻负载。

    PO Box就是一个可以减轻负载的工具。

    设计原则

    PO Box是一个实现了消息缓冲区的功能的库。因为Erlang进程需要同时接收消息并做自身的工作,所以很有可能因为消息过多,而使内存消耗急剧上升。

    这是Erlang现在实现的消息处理逻辑,所有的消息都是放在mailbox中的,当一条消息处理卡顿时,会导致后面的消息一直在排队,消耗内存。

            	     messages  
               	       |  
                       V  
            +-----[Pid or Name]-----+  
            |      |         |      |  
            |      | mailbox |      |  
            |      +---------+      |  
            |       |               |  
            |    receive            |  
            +-----------------------+  
    

    PO Box实现了一个消息代理:

                                                             messages  
                                                                |  
                                                                V  
        	+---------[Pid]---------+                +--------[POBox]--------+  
        	|                       |<-- got mail ---|      |         |      |  
        	|                       |                |      | mailbox |      |  
        	|   <important stuff>   |--- send it! -->|      +---------+      |  
        	|                       |                |       |               |  
        	|                       |<---<messages>--|<---buffer             |  
        	+-----------------------+                +-----------------------+ 
    

    可以将一个PO Box进程看成是一个消息代理,它通过实现一个缓冲区,对消息进行缓存,并在缓冲区满了的时候丢弃消息。PO Box可以通知你有新的数据,这样你可以请求这些数据;或者你可以通知它直接把数据发给你,不用通知

    更具体一些,就是PO Box是一个有owner进程的状态机,共有3种状态:

    • Active
    • Notify
    • Passive

    Passive状态基本上除了接收消息保存在缓冲区和必要时丢弃消息,不做什么事。

    用户可以通过调用PO Box的api来进入Notify状态。该状态的唯一任务是检查缓冲区内有没有消息。如果有,它会发一个{mail, new_data}给owner进程。如果没有,PO Box会一直在notify状态等待,直到有新消息。在发送通知后,PO Box会返回passive状态。

    只有active状态可以将真实数据发给owner进程。用户同样可以通过调用PO Box的api告知它进入active状态。如果缓冲区内有消息,所有的消息会以一个list的状态发给owner进程。如果没有消息,PO Box会等待直到有消息。转发完消息后,PO Box会回到passive状态。

       					,---->[passive]------(user makes active)----->[active]
      					|         | ^                                  |  ^  |
      					|         | '---(sends message to user)--<-----'  |  |
      					|  (user makes notify)                            |  |
      					|         |                                       |  |
    		   (user is notified) |                                       |  |
       					|         V                                       |  |
             			'-----[notify]---------(user makes active)--------'  |
                         	      ^----------(user makes notify)<----------'
    

    缓存类型

    PO Box实现了消息的缓存机制,当前支持的缓存方式包括三种:queuestackkeep_old queue

    queue按消息到达的顺序保存消息,当缓存满时,会丢弃最老的消息。例如,有6条消息,a,b,c,d,e,消息缓存的大小是3,最后会保留的消息是[c, d, e]。

    keep_old queue也是一种queue,不过当缓存满时,会阻止新的消息的到达。例如,有6条消息,a,b,c,d,e,缓存大小是3,最后会保留的消息是[a, b, c]。

    stack并不能保证消息的顺序呢。当缓存满时,会丢弃栈顶的消息。对于前面两个例子,stack最后保存的消息是[e, b, a]。

    当考虑采取哪一种缓存类型时,要关注的地方是:

    • 是否需要消息保持有序
    • 是保留最新到达的消息,还是最先到达的消息
    • 对时间上有没有要求?如果要求最低的时间延迟,选择stack。

    当然,也可以自己开发自己想要的缓存类型。


    使用用例

    PO Box进程启动函数:

        start_link(OwnerPid, MaxSize, BufferType)      
        start_link(OwnerPid, MaxSize, BufferType, InitialState)    
        start_link(Name, OwnerPid, MaxSize, BufferType)  
        start_link(Name, OwnerPid, MaxSize, BufferType, InitialState) 
    
    • Name 就是Po Box进程注册的名字
    • OwnerPid 就是PO Box的owner进程的Pid。只有owner进程可以读取该PO Box进程的消息,也只有这个owner进程可以设置PO Box进程的state。OwnerPid也可以是原子。两个进程之间会建立link关系,PO Box 不会trap exits。所以如果想要PO Box进程独立存活,应该手动取消link。
    • MaxSize 就是缓冲区的大小
    • BufferType 就是上面所说缓存类型
    • InitialState 可以是passive或者notify。缺省是notify

    将PO Box转入active状态

    	pobox:active(BoxPid, FilterFun, FilterState)
    

    FilterFun就是消息的读取过滤函数,返回值如下:

    • ok, Message, NewState 这条消息会被发送到owner进程
    • {drop, NewState} 这条消息会被丢弃
    • skip 这条消息会被留在缓冲区,之前被遍历过的会被发送

    消息发送的格式是:

    {mail, BoxPid, Messages, MessageCount, MessageDropCount}
    

    转换成notify状态:

    pobox:notify(BoxPid)
    

    发送消息:

    pobox:post(BoxPid, Msg)
    

    或者

    BoxPid ! {post, Msg}    
    

    注意

    • FilterFun必须是轻量级,尤其是在处理消息到达速度非常快的时候。因为发送到PO Box进程的消息还是会先保存在Po Box自己的Mailbox中.
    • 一个进程可以有多个PO Box进程
    • 可以看到,如果使用keep_old queue类型,一次处理一条消息,等价于拥有一个受限制的mailbox。

    FilterFun/2

    FilterFun/2的两个参数是message和state。

    会把所有消息都发送到owner进程的函数可以这样写

    fun(Msg, _ ) -> {{ok, Msg}, nostate} 
        end.
    

    限制二进制消息大小的函数可以这样写:

    fun(Msg, Allowed) ->  
    	case Allowed - byte_size(Msg) of  
    		N when N < 0 -> skip;  
    		N -> {{ok, Msg}, N}  
    	end  
    end
    

    丢掉空消息:

    fun(<<>>, State) -> {drop, State};  
    	(Msg, State) -> {{ok, Msg}, State}  
    end.
    

    只读取一条消息:

    fun(Msg, 0) -> {{ok, Msg}, 1};
        (_, _) -> skip  
    end.
    
  • 相关阅读:
    EFCore.BulkExtensions Demo
    查询处理器用尽了内部资源,无法生成查询计划。这种情况很少出现,只有在查询极其复杂或引用了大量表或分区时才会出现。请简化查询。如果您认为该消息的出现纯属错误,请与客户支持服务部门联系,了解详细信息
    .net core 删除主表,同时删除子表
    java 数据类型优先级
    string.Join 的用法
    JDK-13下载安装及环境变量配置
    Java 前加加和后加加 总结
    变量类型查看-type
    路径:获取 & 更改
    用sql获取数据库中所有的表名、字段名
  • 原文地址:https://www.cnblogs.com/xianzhedeyu/p/5580647.html
Copyright © 2011-2022 走看看