zoukankan      html  css  js  c++  java
  • ballerina 学习十 streams

    ballerina 的streams 使用的是siddhi complex event processing 引擎处理,可以包含的语法有
    projection filtering windows join pattern

    简单例子

    • 参考代码
    import ballerina/io;
    import ballerina/runtime;
    type StatusCount {
    string status;
    int totalCount;
    };
    
    type Teacher {
    string name;
    int age;
    string status;
    string batch;
    string school;
    };
    function testAggregationQuery(
    stream<StatusCount> filteredStatusCountStream,
    stream<Teacher> teacherStream) {
    forever {
    
    //  cep  处理并发布结果
    from teacherStream where age > 18 window lengthBatch(3)
    select status, count(status) as totalCount
    group by status
    having totalCount > 1
    => (StatusCount[] status) {
    filteredStatusCountStream.publish(status);
    }
    }
    }
    
    function main(string… args) {
    stream<StatusCount> filteredStatusCountStream;
    
    stream<Teacher> teacherStream;
    
    testAggregationQuery(filteredStatusCountStream, teacherStream);
    
    Teacher t1 = {name: "Sam", age: 25, status: "single",
    batch: "LK2014", school: "Hampden High School"};
    Teacher t2 = {name: "Jordan", age: 33, status: "single",
    batch: "LK1998", school: "Columbia High School"};
    Teacher t3 = {name: "Morgan", age: 45, status: "married",
    batch: "LK1988", school: "Central High School"};
    
    filteredStatusCountStream.subscribe(printStatusCount);
    
    // 生产数据
    teacherStream.publish(t1);
    teacherStream.publish(t2);
    teacherStream.publish(t3);
    
    runtime:sleep(1000);
    }
    function printStatusCount(StatusCount s) {
    io:println("Event received; status: " + s.status +
    ", total occurrences: " + s.totalCount);
    }
    
    • 输出结果
    Event received; status: single, total occurrences: 2

    stream join

    • 参考代码

    代码就是进行通过http 获取的到数据进行流化,同时进行join 并对于符合业务的数据进行报警

    import ballerina/http;
    import ballerina/mime;
    import ballerina/io;
    type ProductMaterial {
    string name;
    float amount;
    };
    type MaterialUsage {
    string name;
    float totalRawMaterial;
    float totalConsumption;
    };
    stream<ProductMaterial> rawMaterialStream;
    stream<ProductMaterial> productionInputStream;
    stream<MaterialUsage> materialUsageStream;
    function initRealtimeProductionAlert() {
    materialUsageStream.subscribe(printMaterialUsageAlert);
    forever {
    from productionInputStream window time(10000) as p
    join rawMaterialStream window time(10000) as r
    on r.name == p.name
    select r.name, sum(r.amount) as totalRawMaterial,
    sum(p.amount) as totalConsumption
    group by r.name
    having ((totalRawMaterial - totalConsumption) * 100.0 /
    totalRawMaterial) > 5
    => (MaterialUsage[] materialUsages) {
    materialUsageStream.publish(materialUsages);
    }
    }
    }
    function printMaterialUsageAlert(MaterialUsage materialUsage) {
    float materialUsageDifference = (materialUsage.totalRawMaterial -
    materialUsage.totalConsumption) * 100.0 /
    (materialUsage.totalRawMaterial); io:println("ALERT!! : Material usage is higher than the expected"
    + " limit for material : " + materialUsage.name +
    " , usage difference (%) : " + materialUsageDifference);
    }
    endpoint http:Listener productMaterialListener {
    port: 9090
    };
    @http:ServiceConfig {
    basePath: "/"
    }
    service productMaterialService bind productMaterialListener {
    future ftr = start initRealtimeProductionAlert();
    @http:ResourceConfig {
    methods: ["POST"],
    path: "/rawmaterial"
    }
    rawmaterialrequests(endpoint outboundEP, http:Request req) {
    var jsonMsg = req.getJsonPayload();
    io:println(jsonMsg);
    match jsonMsg {
    json msg => {
    var productMaterial = check <ProductMaterial>msg;
    rawMaterialStream.publish(productMaterial);
    http:Response res = new;
    res.setJsonPayload({"message": "Raw material request"
    + " successfully received"});
    _ = outboundEP->respond(res);
    }
    error err => {
    http:Response res = new;
    res.statusCode = 500;
    res.setPayload(err.message);
    _ = outboundEP->respond(res);
    }
    }
    }
    @http:ResourceConfig {
    methods: ["POST"],
    path: "/productionmaterial"
    }
    productionmaterialrequests(endpoint outboundEP,http:Request req) {
    var jsonMsg = req.getJsonPayload();
    match jsonMsg {
    json msg => {
    var productMaterial = check <ProductMaterial>msg;
    productionInputStream.publish(productMaterial);
    http:Response res = new;
    res.setJsonPayload({"message": "Production input " +
    "request successfully received"});
    _ = outboundEP->respond(res);
    }
    error err => {
    http:Response res = new;
    res.statusCode = 500;
    res.setPayload(err.message);
    _ = outboundEP->respond(res);
    }
    }
    }
    }
    

    用途

    对于事件处理的应用特别方便,比如日志处理,以及响应式系统开发,其中的siddhi 也是一个很不错的工具

    参考资料

    https://ballerina.io/learn/by-example/hello-world-streams.html

  • 相关阅读:
    .NET深入解析LINQ框架(四:IQueryable、IQueryProvider接口详解)
    .NET简谈组件程序设计之(初识NetRemoting)
    .NET简谈组件程序设计之(delegate与event关系)
    .NET简谈组件程序设计之(上下文与同步域)
    .NET简谈特性(代码属性)
    .NET可逆框架设计
    使用KTM(内核事务管理器)进行文件事务处理
    .NET面向上下文、AOP架构模式(实现)
    .NET简谈设计模式之(装饰者模式)
    .NET对存储过程的调用抽象封装
  • 原文地址:https://www.cnblogs.com/rongfengliang/p/9061462.html
Copyright © 2011-2022 走看看