Getting the fastest result from multiple sources
In some cases, for example, while integrating information retrieval from multiple sources, you only need the first result, the fastest one, and the other results are irrelevant after that. An example from the real world could be extracting the currency rate to count the price. You have multiple third-party services and because you need to show the prices as fast as possible, you need only the first rate received from any service. This recipe will show the pattern for how to achieve such behavior.
从多个来源获得最快结果
例如,在某些情况下,在集成来自多个源的信息检索时,只需要第一个结果,即最快的结果,而其他结果则与此无关。现实世界中的一个例子是提取货币汇率来计算价格。您有多个第三方服务,因为您需要尽可能快地显示价格,您只需要从任何服务接收第一个速率。这个食谱将展示如何实现这种行为的模式。
package main
import (
"context"
"fmt"
"sync"
"time"
)
type SearchSrc struct {
ID string
Delay int
}
func (s *SearchSrc) Search(ctx context.Context) <-chan string {
out := make(chan string)
go func() {
time.Sleep(time.Duration(s.Delay) * time.Second)
select {
case out <- "Result " + s.ID:
case <-ctx.Done():
fmt.Println("Search received Done()")
}
close(out)
fmt.Println("Search finished for ID: " + s.ID)
}()
return out
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
src1 := &SearchSrc{"1", 2}
src2 := &SearchSrc{"2", 6}
r1 := src1.Search(ctx)
r2 := src2.Search(ctx)
out := merge(ctx, r1, r2)
for firstResult := range out {
cancel()
fmt.Println("First result is: " + firstResult)
}
}
func merge(ctx context.Context, results ...<-chan string) <-chan string {
wg := sync.WaitGroup{}
out := make(chan string)
output := func(c <-chan string) {
defer wg.Done()
select {
case <-ctx.Done():
fmt.Println("Received ctx.Done()")
case res := <-c:
out <- res
}
}
wg.Add(len(results))
for _, c := range results {
go output(c)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
/*
Search finished for ID: 1
Received ctx.Done()
First result is: Result 1
Search finished for ID: 1
First result is: Result 1
Received ctx.Done()
The preceding code proposes the solution on executing multiple tasks that output some results, and we need only the first fastest one. The solution uses the Context with the cancel function to call cancel once the first result is obtained. The SearchSrc structure provides the Search method that results in a channel where the result is written. Note that the Search method simulates the delay with the time.Sleep function. The merge function, for each channel from the Search method, triggers the goroutine that writes to the final output channel that is read in the main method. While the first result is received from the output channel produced from the merge function, the CancelFunc stored in the variable cancel is called to cancel the rest of the processing.
Be aware that the Search method still needs to end, even if its result would not be processed; so this needs to be handled to avoid the goroutine and channel leak.
前面的代码提出了执行多个输出一些结果的任务的解决方案,我们只需要第一个最快的结果。该解决方案使用具有取消功能的上下文在第一个结果获得后调用取消。的searchsrc结构提供了搜索方法,结果在一个通道,结果是写的。请注意,搜索方法用时间、睡眠功能模拟延迟。合并功能,每个通道的搜索方法,触发goroutine里写入最终输出通道主要是读取方法。而第一个结果是从合并功能产生的输出信道接收的cancelfunc变量中存储的是“取消取消其余的处理。
要知道,搜索方法仍需要结束,即使其结果将不被处理;因此这需要小心避免goroutine里和泄漏通道。
*/