前言
代码、源码不重要,重要的是思想,希望大家多给建议。
正文
微软有个叫SSIS,引用了数据流概念,不过更加强大的是,他基于了sql server,能够进行数据分析,构造数据仓库。
数据挖掘的目标的确远了,不过数据引擎我导开发了一个。
先看个demo。
需求:
我有个订单表POS_SALESORDER,
需要生成一张订单的消费凭证:POS_SALESORDERRECEIPT,
其中凭证的一些数据来源于我的顾客表:USR_PROFILE
传统的c#代码:
(取得POS_SALESORDERRECEIPT表,查询客户数据USR_PROFILE,然后再结合POS_SALESORDER生成凭证)
INoebeCommand command = NoebeManager.Instance.NoebeCommand;
command.SQL = "SELECT * FROM USR_PROFILE WHERE USERCODE = :USERCODE";
command.Parameters.Add("USERCODE", row["USERCODE"].ToString());
DataTable usrtb = command.ExecuteReader();
DataRow userrow = null;
if (usrtb.Rows.Count == 0)
userrow = null;
else
userrow = usrtb.Rows[0];
double staffcommission = 0;
double commission = 0;
if (userrow == null)
{
staffcommission = CitiboxGlobalStringHelper.default_staffcommission;
}
else
{
staffcommission = double.Parse(userrow["STAFFCOMMISSION"].ToString());
commission = double.Parse(userrow["COMMISSION"].ToString());
}
DataRow receiptrow = receipt.NewRow();
//receiptrow["ID"] =
receiptrow["ORDERBILLCODE"] = row["BILLCODE"];
receiptrow["RECEIPTCODE"] = CitiboxGlobalPkHelper.Instance.GetBillPosOrderReceiptPk();
receiptrow["SHOPCODE"] = row["SHOPCODE"];
receiptrow["SHOPNAME"] = row["SHOPNAME"];
receiptrow["MERCHANTCODE"] = row["USERCODE"];
receiptrow["MERCHANTNAME"] = row["USERNAME"];
receiptrow["CREATEDATE"] = Pixysoft.Tools.GlobalTimer.Instance.GetGlobalTime();
receiptrow["MODIDATE"] = Pixysoft.Tools.GlobalTimer.Instance.GetGlobalTime();
receiptrow["ORDERTEMPLATECODE"] = row["TEMPLATECODE"];
receiptrow["ORDERTEMPLATENAME"] = row["TEMPLATENAME"];
receiptrow["DEPOSITPRICE"] = row["DEPOSITPRICE"];
receiptrow["ITEMPRICE"] = row["ITEMPRICE"];
receiptrow["REALPRICE"] = row["ITEMPRICE"];
receiptrow["STATUS"] = (int)BillIntStatus.New;
receiptrow["REMARK"] = "订单成功";
receiptrow["COMMISSION"] = commission;
receiptrow["STAFFCOMMISSION"] = staffcommission;
receipt.Rows.Add(receiptrow);
CstNoebeManager.Instance.ClientManager.Session.AutoInsert(receipt);
如果用数据流引擎:
IInput input = dataflow.GetInput();
input.Add(row);
input.Add("@DEFAULTSTAFFCOMMISSION", CitiboxGlobalStringHelper.default_staffcommission);
input.Add("@STATUS", (int)BillIntStatus.New);
input.Add("@RECEIPTCODE", CitiboxGlobalPkHelper.Instance.GetBillPosOrderReceiptPk());
dataflow.Initialize(input);
IExchanger exchanger = dataflow.GetExchanger("POS_SALESORDERRECEIPT");
exchanger.AddScript("ORDERBILLCODE = POS_SALESORDER.BILLCODE");
exchanger.AddScript("RECEIPTCODE = @RECEIPTCODE");
exchanger.AddScript("SHOPCODE = POS_SALESORDER.SHOPCODE");
exchanger.AddScript("SHOPNAME = POS_SALESORDER.SHOPNAME");
exchanger.AddScript("MERCHANTCODE = POS_SALESORDER.USERCODE");
exchanger.AddScript("MERCHANTNAME = POS_SALESORDER.USERNAME");
exchanger.AddScript("CREATEDATE = SYS.DATETIME");
exchanger.AddScript("MODIDATE = SYS.DATETIME");
exchanger.AddScript("ORDERTEMPLATECODE = POS_SALESORDER.TEMPLATECODE");
exchanger.AddScript("ORDERTEMPLATENAME = POS_SALESORDER.TEMPLATENAME");
exchanger.AddScript("DEPOSITPRICE = POS_SALESORDER.DEPOSITPRICE");
exchanger.AddScript("ITEMPRICE = POS_SALESORDER.ITEMPRICE");
exchanger.AddScript("REALPRICE = POS_SALESORDER.ITEMPRICE");
exchanger.AddScript("STATUS = @STATUS");
exchanger.AddScript("REMARK = '订单成功'");
dataflow.Runflow(exchanger);
ILoader loader = dataflow.GetLoader("USR_PROFILE");
loader.Sql = "SELECT STAFFCOMMISSION,COMMISSION FROM USR_PROFILE WHERE USERCODE = :USERCODE";
loader.AddScript("USERCODE = POS_SALESORDER.USERCODE");
dataflow.Runflow(loader);
if (loader.Succeed.IsAlive)
{
IExchanger subexchanger = dataflow.GetExchanger("POS_SALESORDERRECEIPT");
subexchanger.AddScript("COMMISSION = USR_PROFILE.COMMISSION");
subexchanger.AddScript("STAFFCOMMISSION = USR_PROFILE.STAFFCOMMISSION");
dataflow.Runflow(subexchanger);
}
else
{
IExchanger subexchanger = dataflow.GetExchanger("POS_SALESORDERRECEIPT");
subexchanger.AddScript("COMMISSION = 0");
subexchanger.AddScript(ScriptType.Number, "STAFFCOMMISSION = @DEFAULTSTAFFCOMMISSION");
dataflow.Runflow(subexchanger);
}
IOutput output = dataflow.GetOutput();
DataTable receipttb = output.GetInsertTable("POS_SALESORDERRECEIPT");
CstNoebeManager.Instance.ClientManager.Session.AutoInsert(receipttb);
似乎代码没有什么节省。不过,如果我的生成的表数据非常复杂,比如:多个表的四则运算、函数运算,那么传统就需要写一大堆的小方法,算好了,再传递给字段。
这个时候,数据流引擎就发挥作用了,所有的函数运算仅需要写好表达式,自动计算。
数据流模块
IExchanger 就是上文的数据交换
ILoader 读取数据库装载数据
Ifer 条件判断,例如当订单价格ITEMPRICE>30的时候,xxx
ISwitcher 值判断,例如根据订单客户类型MERCHANTTYPECODE,进行不同的处理
IMapper 字段值映射,例如把某个占位符映射成一个具体的值,@STATUS = 1
Injector 数据中途注入,除了数据库装载,可以在中途注入新的数据,再次运算。
Isorter 流排序,如果装载了新的数据,和旧的对不上,那么通过排序能够重新接上(例如先后装载表A,表B,但是大家对不上好,那么我根据条件表A.Merchantcode = 表B.merchantcode排序之后,就对上了)
最后还有个Foreach功能,和MergeForeach,把数据流分开处理后,合并。
一个复杂的数据流处理案例(samsara可以做的更多!):
closingreceipt.Merchantcode = webClosingRow["MERCHANTCODE"].ToString();
closingreceipt.Merchantname = webClosingRow["MERCHANTNAME"].ToString();
//取得本地结算表
string pk = CitiboxGlobalPkHelper.Instance.GetBillSaleClosingPk();
Info("get primary key for balance bill. pk = " + pk);
IDataflow dataflow = SamsaraManager.Instance.Dataflow;
IInput input = dataflow.GetInput();
input.Add(webClosingRow);
input.Add("@BILL_PRIMARYKEY", pk);
input.Add("@DEFAULT_USRBOXCODE", CitiboxGlobalStringHelper.default_usrboxcode);
dataflow.Initialize(input);
//取得网站结算单
Info("get web_salesclosing detail.");
ILoader loader = dataflow.GetLoader("WEB_SALESCLOSINGDETAIL");
loader.Sql = "SELECT * FROM WEB_SALESCLOSINGDETAIL WHERE BILLCODE = :BILLCODE";
loader.AddScript("BILLCODE = WEB_SALESCLOSING.BILLCODE");
dataflow.Runflow(loader);
//生成本地结算单
foreach (IDataflow subflow in dataflow.Foreach("WEB_SALESCLOSINGDETAIL"))
{
Ifer ifflow = subflow.If("WEB_SALESCLOSINGDETAIL.USRBOXCODE == @DEFAULT_USRBOXCODE");
IDataflow iftrueflow = ifflow.True;
bool hasreceipt = false;
if (iftrueflow.IsAlive)
{
hasreceipt = GetCurrentNonReceiptTable(iftrueflow).Succeed.IsAlive;
}
IDataflow iffalseflow = ifflow.False;
if (iffalseflow.IsAlive)
{
ILoader usrboxloader = UsrboxIsUnavailable(subflow);
if (!usrboxloader.Succeed.IsAlive)
{
continue;
}
else
{
hasreceipt = GetCurrentReceiptTable(iffalseflow).Succeed.IsAlive;
}
}
Info("create BIL_SALESCLOSINGDETAIL");
if (hasreceipt)
{
IExchanger exchangerflow = subflow.GetExchanger("BIL_SALESCLOSINGDETAIL");
exchangerflow.AddScript("BILLCODE = @BILL_PRIMARYKEY");
exchangerflow.AddScript(ScriptType.Number, "CLOSINGPRICE = SUM( POS_ITEMRECEIPT.SALEPRICE * POS_ITEMRECEIPT.SALEQTY )");
exchangerflow.AddScript(ScriptType.Number, "CLOSINGCOMMISSION = SUM( POS_ITEMRECEIPT.SALEPRICE * POS_ITEMRECEIPT.SALEQTY * POS_ITEMRECEIPT.COMMISSION )");
exchangerflow.AddScript(ScriptType.Number, "CLOSINGSTAFFCOMMISSION = SUM( POS_ITEMRECEIPT.SALEPRICE * POS_ITEMRECEIPT.SALEQTY * POS_ITEMRECEIPT.STAFFCOMMISSION )");
exchangerflow.AddScript("CLOSINGDATEFROM = WEB_SALESCLOSINGDETAIL.CLOSINGDATEFROM");
exchangerflow.AddScript("CLOSINGDATETO = WEB_SALESCLOSINGDETAIL.CLOSINGDATETO");
exchangerflow.AddScript("CLOSINGDATE = WEB_SALESCLOSINGDETAIL.CLOSINGDATE");
exchangerflow.AddScript("USRBOXCODE = WEB_SALESCLOSINGDETAIL.USRBOXCODE");
subflow.Runflow(exchangerflow);
}
else
{
IExchanger exchangerflow = subflow.GetExchanger("BIL_SALESCLOSINGDETAIL");
exchangerflow.AddScript("BILLCODE = @BILL_PRIMARYKEY");
exchangerflow.AddScript(ScriptType.Number, "CLOSINGPRICE = 0");
exchangerflow.AddScript(ScriptType.Number, "CLOSINGCOMMISSION = 0");
exchangerflow.AddScript(ScriptType.Number, "CLOSINGSTAFFCOMMISSION = 0");
exchangerflow.AddScript("CLOSINGDATEFROM = WEB_SALESCLOSINGDETAIL.CLOSINGDATEFROM");
exchangerflow.AddScript("CLOSINGDATETO = WEB_SALESCLOSINGDETAIL.CLOSINGDATETO");
exchangerflow.AddScript("CLOSINGDATE = WEB_SALESCLOSINGDETAIL.CLOSINGDATE");
exchangerflow.AddScript("USRBOXCODE = WEB_SALESCLOSINGDETAIL.USRBOXCODE");
subflow.Runflow(exchangerflow);
}
ILoader blsloader = GetBalanceControlTable(subflow);
IDataflow blstrueflow = blsloader.Succeed;
if (blstrueflow.IsAlive)
{
IExchanger blsexchanger = blstrueflow.GetExchanger("BLS_COMMODITYACCOUNTCONTROL");
blsexchanger.AddScript("CONTROLDATE = WEB_SALESCLOSINGDETAIL.CLOSINGDATE");
blsexchanger.AddScript("MODIDATE = SYS.DATETIME");
blsexchanger.AddScript("LASTCLOSINGPRICE = BIL_SALESCLOSINGDETAIL.CLOSINGPRICE");
blsexchanger.AddScript("LASTCLOSINGCOMMISSION = BIL_SALESCLOSINGDETAIL.CLOSINGCOMMISSION");
blsexchanger.AddScript("LASTCLOSINGSTAFFCOMMISSION = BIL_SALESCLOSINGDETAIL.CLOSINGSTAFFCOMMISSION");
blstrueflow.Runflow(blsexchanger);
IExchanger webexchanger = blstrueflow.GetExchanger("WEB_SALESCLOSINGDETAIL");
webexchanger.AddScript("REALCLOSINGPRICE = BIL_SALESCLOSINGDETAIL.CLOSINGPRICE");
webexchanger.AddScript("REALCLOSINGCOMMISSION = BIL_SALESCLOSINGDETAIL.CLOSINGCOMMISSION");
webexchanger.AddScript("REALCLOSINGSTAFFCOMMISSION = BIL_SALESCLOSINGDETAIL.CLOSINGSTAFFCOMMISSION");
webexchanger.AddScript("REALCLOSINGPRICE = BIL_SALESCLOSINGDETAIL.CLOSINGPRICE");
blstrueflow.Runflow(webexchanger);
}
IDataflow blsfalseflow = blsloader.Failed;
if (blsfalseflow.IsAlive)
{
Error(string.Format("missing bls_commodityaccountcontrol. user validation fail. merchantcode = {0}",
webClosingRow["MERCHANTCODE"].ToString()));
return null;
}
IDataflow absreceiptflow = subflow.If("POS_ITEMRECEIPT.STATUS == " + BillStringStatus.Abnomity).True;
{
if (absreceiptflow.IsAlive)
{
IExchanger absexchanger = absreceiptflow.GetExchanger("POS_ITEMRECEIPT");
absexchanger.AddScript("STATUS = " + BillStringStatus.New);
absexchanger.AddScript("CREATEDATE = SYS.DATETIME");
absreceiptflow.Runflow(absexchanger);
}
}
DataTable closingdetailtb = subflow.Peekflow("WEB_SALESCLOSINGDETAIL");
DataTable receipttb = subflow.Peekflow("POS_ITEMRECEIPT");
if (closingdetailtb.Rows.Count == 0)
continue;
DataRow closingdetailrow = closingdetailtb.Rows[0];
SalesClosingItem closingitem = new SalesClosingItem();
closingitem.Boxlocationcode = closingdetailrow["BOXLOCATIONCODE"].ToString();
closingitem.Datefrom = closingdetailrow["CLOSINGDATEFROM"].ToString();
closingitem.Dateto = closingdetailrow["CLOSINGDATETO"].ToString();
closingitem.Price = (double)closingdetailrow["REALCLOSINGPRICE"];
closingitem.Commission = (double)closingdetailrow["REALCLOSINGCOMMISSION"];
closingitem.Receipttb = receipttb;
closingreceipt.Items.Add(closingitem);
}
dataflow.MergeForeach();
Info("begin change BIL_SALESCLOSING");
IExchanger mainbillexchanger = dataflow.GetExchanger("BIL_SALESCLOSING");
mainbillexchanger.AddScript("BILLCODE = @BILL_PRIMARYKEY");
mainbillexchanger.AddScript("MERCHANTCODE = WEB_SALESCLOSING.MERCHANTCODE");
mainbillexchanger.AddScript("CREATEDATE = SYS.DATETIME");
mainbillexchanger.AddScript("MODIDATE = SYS.DATETIME");
mainbillexchanger.AddScript(ScriptType.Number, "CLOSINTTOTALPRICE = SUM (BIL_SALESCLOSINGDETAIL.CLOSINGPRICE)");
mainbillexchanger.AddScript(ScriptType.Number, "CLOSINTTOTALCOMMISSION = SUM (BIL_SALESCLOSINGDETAIL.CLOSINGCOMMISSION)");
mainbillexchanger.AddScript(ScriptType.Number, "CLOSINGTOTALSTAFFCOMMISSION = SUM (BIL_SALESCLOSINGDETAIL.CLOSINGSTAFFCOMMISSION)");
dataflow.Runflow(mainbillexchanger);
Info("begin change web_salesclosing status to pass.");
IExchanger mainwebexchanger = dataflow.GetExchanger("WEB_SALESCLOSING");
mainwebexchanger.AddScript(ScriptType.Number, "STATUS = " + (int)BillIntStatus.Pass);
dataflow.Runflow(mainwebexchanger);
IOutput output = dataflow.GetOutput();
后记
代码乱了。
说下samsara,是佛教中轮回的意思。
第一阶段:
当时是5年前,开发一个信息系统,被客户搞烦了,整天要修改表结构,因此我想出了一个用脚本去运算表的思路。成为第一代samsara。
当时刚刚接触c#,xml之类的,因此所有的配置用xml,samsara读取xml之后直接运算。
事实上发现了,xml根本不是给人看的,维护起来太麻烦了。而且把企业的业务逻辑绑在xml,debug的时候不知道为什么会有异常。
第二阶段:
因此毕业的时候,开发了samsara 第二代。把脚本简化,使用人读的语言,而不是xml。
能够减少一些开发难度。但是企业业务逻辑还是绑定在xml,维护非常不方便。
第三阶段:
之后工作了,一直没有时间用samsara,自己也没有信心,所以在后来系统里面简单调用了一下之后就荒废了。
现在正好工作没了,有一段空闲的时间,让我好好根据这几年的积累重新修改。
于是提出了脚本与代码结合的方式,成为了现在的samsara第三代。
他的特点是,业务的逻辑由代码完成,我的samsara尽量的接近c#的一些逻辑处理。然后一些复杂的数据运算交给脚本完成。
我个人认为,第三代samsara可以商业化了。接下来,第四代samsara完全可以开发数据仓库了。
或者可以考虑把对象运算加入,成为对象流引擎。我称为
samsara 第四代!