stream源码阅读 流计算 vs 装饰设计模式
地址: github.com/ghemawat/stream
装饰设计模式
https://github.com/senghoo/golang-design-pattern/tree/master/20_decorator
类似于gin的中间件,如果在一个函数是这样写的: IN(INA(INB(INC(arg))))
stream的实现
很巧妙的实现了上一个结果,下一个处理函数能接收到
/*
AOP设计模式: IN(INA(INB(INC(arg))))
this: IN(f1, f2, f3)
1. 先整个字符串
2. Step1函数在尾部加上strp1
3. 依次类推
*/
package main
import (
"errors"
"fmt"
)
type Arg struct {
In <-chan string
Out chan<- string
dummy bool // To allow later expansion
}
type Filter interface {
RunFilter(Arg) error
}
type FilterFunc func(Arg) error
func (f FilterFunc) RunFilter(arg Arg) error { return f(arg) }
func Sequence(filters ...Filter) Filter {
if len(filters) == 1 {
return filters[0]
}
return FilterFunc(func(arg Arg) error {
in := arg.In
for _, f := range filters {
c := make(chan string, 1000)
go runFilter(f, Arg{In: in, Out: c})
in = c
}
for s := range in {
fmt.Println("seq调试: ", s)
arg.Out <- s
}
return errors.New("seq")
})
}
func ForEach(filter Filter, fn func(s string)) error {
in := make(chan string)
close(in)
out := make(chan string, 1000)
go runFilter(filter, Arg{In: in, Out: out})
for s := range out {
fn(s)
}
return errors.New("for each")
}
func runFilter(f Filter, arg Arg) {
f.RunFilter(arg)
close(arg.Out)
for range arg.In { // Discard all unhandled input
}
}
func Run(filters ...Filter) {
ForEach(Sequence(filters...), func(s string) {})
}
// ---------------- 自定义部分-------------------
type InitString struct{}
func (InitString) RunFilter(arg Arg) error {
arg.Out <- "init string"
return nil
}
func Step1() Filter {
return FilterFunc(func(arg Arg) error {
for s := range arg.In {
arg.Out <- s + "step1"
}
return nil
})
}
func Step2() Filter {
return FilterFunc(func(arg Arg) error {
for s := range arg.In {
arg.Out <- s + "step2"
}
return nil
})
}
func Console() Filter {
return FilterFunc(func(arg Arg) error {
for s := range arg.In {
fmt.Println(s)
}
return nil
})
}
func main() {
Run(InitString{}, Step1(), Step2(), Console())
}
精简函数
/*
AOP设计模式: IN(INA(INB(INC(arg))))
this: IN(f1, f2, f3)
1. 先整个字符串
2. Step1函数在尾部加上strp1
3. 依次类推
*/
package main
import (
"errors"
"fmt"
"time"
)
type Arg struct {
In <-chan string
Out chan<- string
dummy bool // To allow later expansion
}
type Filter interface {
RunFilter(Arg) error
}
type FilterFunc func(Arg) error
func (f FilterFunc) RunFilter(arg Arg) error { return f(arg) }
// Sequence 把前一个管道的 out 和 这个管道的 In 连接起来
func Sequence(filters ...Filter) Filter {
fmt.Println("seq")
if len(filters) == 1 {
return filters[0]
}
return FilterFunc(func(arg Arg) error {
in := arg.In
for _, f := range filters {
c := make(chan string, 1000)
go runFilter(f, Arg{In: in, Out: c})
in = c
}
return errors.New("seq")
})
}
func ForEach(filter Filter, fn func(s string)) error {
fmt.Println("for each")
out := make(chan string, 1000)
go runFilter(filter, Arg{In: nil, Out: out})
return errors.New("for each")
}
func runFilter(f Filter, arg Arg) {
f.RunFilter(arg)
close(arg.Out)
for range arg.In { // Discard all unhandled input
}
}
func Run(filters ...Filter) {
ForEach(Sequence(filters...), func(s string) {})
}
// ---------------- 自定义部分-------------------
type InitString struct{}
func (InitString) RunFilter(arg Arg) error {
arg.Out <- "init string"
return nil
}
func Step1() Filter {
return FilterFunc(func(arg Arg) error {
for s := range arg.In {
arg.Out <- s + "step1"
}
return nil
})
}
func Step2() Filter {
return FilterFunc(func(arg Arg) error {
for s := range arg.In {
arg.Out <- s + "step2"
}
return nil
})
}
func Console() Filter {
return FilterFunc(func(arg Arg) error {
for s := range arg.In {
fmt.Println(s)
}
return nil
})
}
func main() {
Run(InitString{}, Step1(), Step2(), Console())
time.Sleep(time.Second)
}