zoukankan      html  css  js  c++  java
  • 数据处理如同流水——介绍下偶的数据流引擎Samsara

    前言

    代码、源码不重要,重要的是思想,希望大家多给建议。

    正文

    微软有个叫SSIS,引用了数据流概念,不过更加强大的是,他基于了sql server,能够进行数据分析,构造数据仓库。

    数据挖掘的目标的确远了,不过数据引擎我导开发了一个。

    先看个demo。

    需求:

    我有个订单表POS_SALESORDER,

    需要生成一张订单的消费凭证:POS_SALESORDERRECEIPT,

    其中凭证的一些数据来源于我的顾客表:USR_PROFILE

    传统的c#代码:

    (取得POS_SALESORDERRECEIPT表,查询客户数据USR_PROFILE,然后再结合POS_SALESORDER生成凭证)

    DataTable receipt = NoebeManager.Instance.GetEntity("POS_SALESORDERRECEIPT");

    INoebeCommand command 
    = NoebeManager.Instance.NoebeCommand;
    command.SQL 
    = "SELECT * FROM USR_PROFILE WHERE USERCODE = :USERCODE";
    command.Parameters.Add(
    "USERCODE", row["USERCODE"].ToString());
    DataTable usrtb 
    = command.ExecuteReader();
    DataRow userrow 
    = null;
    if (usrtb.Rows.Count == 0)
        userrow 
    = null;
    else
        userrow 
    = usrtb.Rows[0];

    double staffcommission = 0;

    double commission = 0;

    if (userrow == null)
    {
        staffcommission 
    = CitiboxGlobalStringHelper.default_staffcommission;
    }
    else
    {
        staffcommission 
    = double.Parse(userrow["STAFFCOMMISSION"].ToString());

        commission 
    = double.Parse(userrow["COMMISSION"].ToString());
    }

    DataRow receiptrow 
    = receipt.NewRow();
    //receiptrow["ID"] =
    receiptrow["ORDERBILLCODE"= row["BILLCODE"];
    receiptrow[
    "RECEIPTCODE"= CitiboxGlobalPkHelper.Instance.GetBillPosOrderReceiptPk();
    receiptrow[
    "SHOPCODE"= row["SHOPCODE"];
    receiptrow[
    "SHOPNAME"= row["SHOPNAME"];
    receiptrow[
    "MERCHANTCODE"= row["USERCODE"];
    receiptrow[
    "MERCHANTNAME"= row["USERNAME"];
    receiptrow[
    "CREATEDATE"= Pixysoft.Tools.GlobalTimer.Instance.GetGlobalTime();
    receiptrow[
    "MODIDATE"= Pixysoft.Tools.GlobalTimer.Instance.GetGlobalTime();
    receiptrow[
    "ORDERTEMPLATECODE"= row["TEMPLATECODE"];
    receiptrow[
    "ORDERTEMPLATENAME"= row["TEMPLATENAME"];
    receiptrow[
    "DEPOSITPRICE"= row["DEPOSITPRICE"];
    receiptrow[
    "ITEMPRICE"= row["ITEMPRICE"];
    receiptrow[
    "REALPRICE"= row["ITEMPRICE"];
    receiptrow[
    "STATUS"= (int)BillIntStatus.New;
    receiptrow[
    "REMARK"= "订单成功";
    receiptrow[
    "COMMISSION"= commission;
    receiptrow[
    "STAFFCOMMISSION"= staffcommission;
    receipt.Rows.Add(receiptrow);
    CstNoebeManager.Instance.ClientManager.Session.AutoInsert(receipt);

    如果用数据流引擎:

    IDataflow dataflow = SamsaraManager.Instance.Dataflow;

    IInput input 
    = dataflow.GetInput();
    input.Add(row);
    input.Add(
    "@DEFAULTSTAFFCOMMISSION", CitiboxGlobalStringHelper.default_staffcommission);
    input.Add(
    "@STATUS", (int)BillIntStatus.New);
    input.Add(
    "@RECEIPTCODE", CitiboxGlobalPkHelper.Instance.GetBillPosOrderReceiptPk());
    dataflow.Initialize(input);

    IExchanger exchanger 
    = dataflow.GetExchanger("POS_SALESORDERRECEIPT");
    exchanger.AddScript(
    "ORDERBILLCODE = POS_SALESORDER.BILLCODE");
    exchanger.AddScript(
    "RECEIPTCODE = @RECEIPTCODE");
    exchanger.AddScript(
    "SHOPCODE = POS_SALESORDER.SHOPCODE");
    exchanger.AddScript(
    "SHOPNAME = POS_SALESORDER.SHOPNAME");
    exchanger.AddScript(
    "MERCHANTCODE = POS_SALESORDER.USERCODE");
    exchanger.AddScript(
    "MERCHANTNAME = POS_SALESORDER.USERNAME");
    exchanger.AddScript(
    "CREATEDATE = SYS.DATETIME");
    exchanger.AddScript(
    "MODIDATE = SYS.DATETIME");
    exchanger.AddScript(
    "ORDERTEMPLATECODE = POS_SALESORDER.TEMPLATECODE");
    exchanger.AddScript(
    "ORDERTEMPLATENAME = POS_SALESORDER.TEMPLATENAME");
    exchanger.AddScript(
    "DEPOSITPRICE = POS_SALESORDER.DEPOSITPRICE");
    exchanger.AddScript(
    "ITEMPRICE = POS_SALESORDER.ITEMPRICE");
    exchanger.AddScript(
    "REALPRICE = POS_SALESORDER.ITEMPRICE");
    exchanger.AddScript(
    "STATUS = @STATUS");
    exchanger.AddScript(
    "REMARK = '订单成功'");
    dataflow.Runflow(exchanger);

    ILoader loader 
    = dataflow.GetLoader("USR_PROFILE");
    loader.Sql 
    = "SELECT STAFFCOMMISSION,COMMISSION  FROM USR_PROFILE WHERE USERCODE = :USERCODE";
    loader.AddScript(
    "USERCODE = POS_SALESORDER.USERCODE");
    dataflow.Runflow(loader);

    if (loader.Succeed.IsAlive)
    {
        IExchanger subexchanger 
    = dataflow.GetExchanger("POS_SALESORDERRECEIPT");
        subexchanger.AddScript(
    "COMMISSION = USR_PROFILE.COMMISSION");
        subexchanger.AddScript(
    "STAFFCOMMISSION = USR_PROFILE.STAFFCOMMISSION");
        dataflow.Runflow(subexchanger);
    }
    else
    {
        IExchanger subexchanger 
    = dataflow.GetExchanger("POS_SALESORDERRECEIPT");
        subexchanger.AddScript(
    "COMMISSION = 0");
        subexchanger.AddScript(ScriptType.Number, 
    "STAFFCOMMISSION = @DEFAULTSTAFFCOMMISSION");
        dataflow.Runflow(subexchanger);
    }

    IOutput output 
    = dataflow.GetOutput();
    DataTable receipttb 
    = output.GetInsertTable("POS_SALESORDERRECEIPT");
    CstNoebeManager.Instance.ClientManager.Session.AutoInsert(receipttb);

    似乎代码没有什么节省。不过,如果我的生成的表数据非常复杂,比如:多个表的四则运算、函数运算,那么传统就需要写一大堆的小方法,算好了,再传递给字段。

    这个时候,数据流引擎就发挥作用了,所有的函数运算仅需要写好表达式,自动计算。

    数据流模块

    IExchanger 就是上文的数据交换

    ILoader 读取数据库装载数据

    Ifer 条件判断,例如当订单价格ITEMPRICE>30的时候,xxx

    ISwitcher 值判断,例如根据订单客户类型MERCHANTTYPECODE,进行不同的处理

    IMapper 字段值映射,例如把某个占位符映射成一个具体的值,@STATUS = 1

    Injector 数据中途注入,除了数据库装载,可以在中途注入新的数据,再次运算。

    Isorter 流排序,如果装载了新的数据,和旧的对不上,那么通过排序能够重新接上(例如先后装载表A,表B,但是大家对不上好,那么我根据条件表A.Merchantcode = 表B.merchantcode排序之后,就对上了)

    最后还有个Foreach功能,和MergeForeach,把数据流分开处理后,合并。

    一个复杂的数据流处理案例(samsara可以做的更多!):

    SalesClosingReceipt closingreceipt = new SalesClosingReceipt();

    closingreceipt.Merchantcode 
    = webClosingRow["MERCHANTCODE"].ToString();

    closingreceipt.Merchantname 
    = webClosingRow["MERCHANTNAME"].ToString();


    //取得本地结算表

    string pk = CitiboxGlobalPkHelper.Instance.GetBillSaleClosingPk();

    Info(
    "get primary key for balance bill. pk = " + pk);

    IDataflow dataflow 
    = SamsaraManager.Instance.Dataflow;

    IInput input 
    = dataflow.GetInput();
    input.Add(webClosingRow);
    input.Add(
    "@BILL_PRIMARYKEY", pk);
    input.Add(
    "@DEFAULT_USRBOXCODE", CitiboxGlobalStringHelper.default_usrboxcode);
    dataflow.Initialize(input);


    //取得网站结算单

    Info(
    "get web_salesclosing detail.");

    ILoader loader 
    = dataflow.GetLoader("WEB_SALESCLOSINGDETAIL");
    loader.Sql 
    = "SELECT * FROM WEB_SALESCLOSINGDETAIL WHERE BILLCODE = :BILLCODE";
    loader.AddScript(
    "BILLCODE = WEB_SALESCLOSING.BILLCODE");
    dataflow.Runflow(loader);


    //生成本地结算单

    foreach (IDataflow subflow in dataflow.Foreach("WEB_SALESCLOSINGDETAIL"))
    {
        Ifer ifflow 
    = subflow.If("WEB_SALESCLOSINGDETAIL.USRBOXCODE == @DEFAULT_USRBOXCODE");
        IDataflow iftrueflow 
    = ifflow.True;

        
    bool hasreceipt = false;

        
    if (iftrueflow.IsAlive)
        {
            hasreceipt 
    = GetCurrentNonReceiptTable(iftrueflow).Succeed.IsAlive;
        }
        IDataflow iffalseflow 
    = ifflow.False;
        
    if (iffalseflow.IsAlive)
        {
            ILoader usrboxloader 
    = UsrboxIsUnavailable(subflow);
            
    if (!usrboxloader.Succeed.IsAlive)
            {
    continue;
            }
            
    else
            {
    hasreceipt 
    = GetCurrentReceiptTable(iffalseflow).Succeed.IsAlive;
            }
        }

        Info(
    "create BIL_SALESCLOSINGDETAIL");

        
    if (hasreceipt)
        {
            IExchanger exchangerflow 
    = subflow.GetExchanger("BIL_SALESCLOSINGDETAIL");
            exchangerflow.AddScript(
    "BILLCODE = @BILL_PRIMARYKEY");
            exchangerflow.AddScript(ScriptType.Number, 
    "CLOSINGPRICE = SUM( POS_ITEMRECEIPT.SALEPRICE * POS_ITEMRECEIPT.SALEQTY )");
            exchangerflow.AddScript(ScriptType.Number, 
    "CLOSINGCOMMISSION = SUM( POS_ITEMRECEIPT.SALEPRICE * POS_ITEMRECEIPT.SALEQTY * POS_ITEMRECEIPT.COMMISSION )");
            exchangerflow.AddScript(ScriptType.Number, 
    "CLOSINGSTAFFCOMMISSION = SUM( POS_ITEMRECEIPT.SALEPRICE * POS_ITEMRECEIPT.SALEQTY * POS_ITEMRECEIPT.STAFFCOMMISSION )");
            exchangerflow.AddScript(
    "CLOSINGDATEFROM = WEB_SALESCLOSINGDETAIL.CLOSINGDATEFROM");
            exchangerflow.AddScript(
    "CLOSINGDATETO = WEB_SALESCLOSINGDETAIL.CLOSINGDATETO");
            exchangerflow.AddScript(
    "CLOSINGDATE = WEB_SALESCLOSINGDETAIL.CLOSINGDATE");
            exchangerflow.AddScript(
    "USRBOXCODE = WEB_SALESCLOSINGDETAIL.USRBOXCODE");
            subflow.Runflow(exchangerflow);
        }
        
    else
        {
            IExchanger exchangerflow 
    = subflow.GetExchanger("BIL_SALESCLOSINGDETAIL");
            exchangerflow.AddScript(
    "BILLCODE = @BILL_PRIMARYKEY");
            exchangerflow.AddScript(ScriptType.Number, 
    "CLOSINGPRICE = 0");
            exchangerflow.AddScript(ScriptType.Number, 
    "CLOSINGCOMMISSION = 0");
            exchangerflow.AddScript(ScriptType.Number, 
    "CLOSINGSTAFFCOMMISSION = 0");
            exchangerflow.AddScript(
    "CLOSINGDATEFROM = WEB_SALESCLOSINGDETAIL.CLOSINGDATEFROM");
            exchangerflow.AddScript(
    "CLOSINGDATETO = WEB_SALESCLOSINGDETAIL.CLOSINGDATETO");
            exchangerflow.AddScript(
    "CLOSINGDATE = WEB_SALESCLOSINGDETAIL.CLOSINGDATE");
            exchangerflow.AddScript(
    "USRBOXCODE = WEB_SALESCLOSINGDETAIL.USRBOXCODE");
            subflow.Runflow(exchangerflow);
        }

        ILoader blsloader 
    = GetBalanceControlTable(subflow);
        IDataflow blstrueflow 
    = blsloader.Succeed;
        
    if (blstrueflow.IsAlive)
        {
            IExchanger blsexchanger 
    = blstrueflow.GetExchanger("BLS_COMMODITYACCOUNTCONTROL");
            blsexchanger.AddScript(
    "CONTROLDATE = WEB_SALESCLOSINGDETAIL.CLOSINGDATE");
            blsexchanger.AddScript(
    "MODIDATE = SYS.DATETIME");
            blsexchanger.AddScript(
    "LASTCLOSINGPRICE = BIL_SALESCLOSINGDETAIL.CLOSINGPRICE");
            blsexchanger.AddScript(
    "LASTCLOSINGCOMMISSION = BIL_SALESCLOSINGDETAIL.CLOSINGCOMMISSION");
            blsexchanger.AddScript(
    "LASTCLOSINGSTAFFCOMMISSION = BIL_SALESCLOSINGDETAIL.CLOSINGSTAFFCOMMISSION");
            blstrueflow.Runflow(blsexchanger);

            IExchanger webexchanger 
    = blstrueflow.GetExchanger("WEB_SALESCLOSINGDETAIL");
            webexchanger.AddScript(
    "REALCLOSINGPRICE = BIL_SALESCLOSINGDETAIL.CLOSINGPRICE");
            webexchanger.AddScript(
    "REALCLOSINGCOMMISSION = BIL_SALESCLOSINGDETAIL.CLOSINGCOMMISSION");
            webexchanger.AddScript(
    "REALCLOSINGSTAFFCOMMISSION = BIL_SALESCLOSINGDETAIL.CLOSINGSTAFFCOMMISSION");
            webexchanger.AddScript(
    "REALCLOSINGPRICE = BIL_SALESCLOSINGDETAIL.CLOSINGPRICE");
            blstrueflow.Runflow(webexchanger);
        }
        IDataflow blsfalseflow 
    = blsloader.Failed;
        
    if (blsfalseflow.IsAlive)
        {
            Error(
    string.Format("missing bls_commodityaccountcontrol. user validation fail. merchantcode = {0}",
    webClosingRow[
    "MERCHANTCODE"].ToString()));

            
    return null;
        }

        IDataflow absreceiptflow 
    = subflow.If("POS_ITEMRECEIPT.STATUS == " + BillStringStatus.Abnomity).True;
        {
            
    if (absreceiptflow.IsAlive)
            {
    IExchanger absexchanger 
    = absreceiptflow.GetExchanger("POS_ITEMRECEIPT");
    absexchanger.AddScript(
    "STATUS = " + BillStringStatus.New);
    absexchanger.AddScript(
    "CREATEDATE = SYS.DATETIME");
    absreceiptflow.Runflow(absexchanger);
            }
        }

        DataTable closingdetailtb 
    = subflow.Peekflow("WEB_SALESCLOSINGDETAIL");
        DataTable receipttb 
    = subflow.Peekflow("POS_ITEMRECEIPT");

        
    if (closingdetailtb.Rows.Count == 0)
            
    continue;

        DataRow closingdetailrow 
    = closingdetailtb.Rows[0];

        SalesClosingItem closingitem 
    = new SalesClosingItem();
        closingitem.Boxlocationcode 
    = closingdetailrow["BOXLOCATIONCODE"].ToString();
        closingitem.Datefrom 
    = closingdetailrow["CLOSINGDATEFROM"].ToString();
        closingitem.Dateto 
    = closingdetailrow["CLOSINGDATETO"].ToString();
        closingitem.Price 
    = (double)closingdetailrow["REALCLOSINGPRICE"];
        closingitem.Commission 
    = (double)closingdetailrow["REALCLOSINGCOMMISSION"];
        closingitem.Receipttb 
    = receipttb;
        closingreceipt.Items.Add(closingitem);

    }

    dataflow.MergeForeach();



    Info(
    "begin change BIL_SALESCLOSING");

    IExchanger mainbillexchanger 
    = dataflow.GetExchanger("BIL_SALESCLOSING");
    mainbillexchanger.AddScript(
    "BILLCODE = @BILL_PRIMARYKEY");
    mainbillexchanger.AddScript(
    "MERCHANTCODE = WEB_SALESCLOSING.MERCHANTCODE");
    mainbillexchanger.AddScript(
    "CREATEDATE = SYS.DATETIME");
    mainbillexchanger.AddScript(
    "MODIDATE = SYS.DATETIME");
    mainbillexchanger.AddScript(ScriptType.Number, 
    "CLOSINTTOTALPRICE = SUM (BIL_SALESCLOSINGDETAIL.CLOSINGPRICE)");
    mainbillexchanger.AddScript(ScriptType.Number, 
    "CLOSINTTOTALCOMMISSION = SUM (BIL_SALESCLOSINGDETAIL.CLOSINGCOMMISSION)");
    mainbillexchanger.AddScript(ScriptType.Number, 
    "CLOSINGTOTALSTAFFCOMMISSION = SUM (BIL_SALESCLOSINGDETAIL.CLOSINGSTAFFCOMMISSION)");
    dataflow.Runflow(mainbillexchanger);



    Info(
    "begin change web_salesclosing status to pass.");

    IExchanger mainwebexchanger 
    = dataflow.GetExchanger("WEB_SALESCLOSING");
    mainwebexchanger.AddScript(ScriptType.Number, 
    "STATUS = " + (int)BillIntStatus.Pass);
    dataflow.Runflow(mainwebexchanger);

    IOutput output 
    = dataflow.GetOutput();

    后记

    代码乱了。

    说下samsara,是佛教中轮回的意思。

    第一阶段:

    当时是5年前,开发一个信息系统,被客户搞烦了,整天要修改表结构,因此我想出了一个用脚本去运算表的思路。成为第一代samsara。
    当时刚刚接触c#,xml之类的,因此所有的配置用xml,samsara读取xml之后直接运算。

    事实上发现了,xml根本不是给人看的,维护起来太麻烦了。而且把企业的业务逻辑绑在xml,debug的时候不知道为什么会有异常。


    第二阶段:

    因此毕业的时候,开发了samsara 第二代。把脚本简化,使用人读的语言,而不是xml。

    能够减少一些开发难度。但是企业业务逻辑还是绑定在xml,维护非常不方便。


    第三阶段:

    之后工作了,一直没有时间用samsara,自己也没有信心,所以在后来系统里面简单调用了一下之后就荒废了。

    现在正好工作没了,有一段空闲的时间,让我好好根据这几年的积累重新修改。

    于是提出了脚本与代码结合的方式,成为了现在的samsara第三代。

    他的特点是,业务的逻辑由代码完成,我的samsara尽量的接近c#的一些逻辑处理。然后一些复杂的数据运算交给脚本完成。

    我个人认为,第三代samsara可以商业化了。接下来,第四代samsara完全可以开发数据仓库了。

    或者可以考虑把对象运算加入,成为对象流引擎。我称为

    samsara 第四代!

  • 相关阅读:
    yzm10铺瓷砖 yzm10原创系列
    如何统计博客园的个人博客访问量
    Hybrid设计--账号体系的建设
    Hybrid设计--核心交互
    Hybrid设计--H5和Native,收口
    MySQL数据类型--与MySQL零距离接触 3-2 外键约束的要求解析
    MySQL数据类型--与MySQL零距离接触2-14MySQL默认约束
    css3径向渐变
    MySQL数据类型--与MySQL零距离接触2-13MySQL唯一约束
    MySQL数据类型--与MySQL零距离接触2-12主键约束
  • 原文地址:https://www.cnblogs.com/zc22/p/1587198.html
Copyright © 2011-2022 走看看