您的位置:首页 > 教程 > 其他脚本 > Go语言异步API设计的扇入扇出模式详解

Go语言异步API设计的扇入扇出模式详解

2022-08-05 18:00:47 来源:易采站长站 作者:

Go语言异步API设计的扇入扇出模式详解

目录
前言扇入/扇出服务Go 语言实现扇入/扇出模式

前言

扇出/扇入模式是更高级>

扇出是从电子工程中借用的一个术语,它描述了输入的逻辑门连接到另一个输出门的数量。输出需要提供足够的电流来驱动所有连接的输入。在事务处理系统中,用来描述为了服务一个输入请求而需要做的请求总数。

扇入是指为逻辑单元的输入方程提供输入信号的最大数量。扇入是定义单个逻辑门可以接受的最大数字输入数量的术语。大多数晶体管-晶体管逻辑 (TTL) 门有一个或两个输入,尽管有些有两个以上。典型的逻辑门具有 1 或 2 的扇入。

扇入/扇出服务

我们举一个现实世界的例子,一个电子商务网站将自己与一个第三方支付网关整合在一起。>

另一个例子是 MapReduce。Map 是一个扇入的操作,而 Reduce 是一个扇出的 操作。一个服务器可以将一个信息扇出到下一组服务(API),并忽略结果。或者可以等到这些服务器的所有响应都返回。如 如下图所示,一个传入的请求被服务器复用为转换成两个传出的请求:

扇入 fan-in 是一种操作,即两个或更多传入的请求会聚成一个请求。这种情况下,API如何聚合来自多个后端服务的结果,并将结果即时返回给客户。

例如,想想一个酒店价格聚合器或航班票务聚合器,它从不同的数据提供者那里获取关于多个酒店或航班的请求信息并显示出来。

下图显示了扇出操作是如何结合多个请求并准备一个最终的响应,由客户端消费的。

客户端也可以是一个服务器,为更多的客户提供服务。如上图所示,左侧的服务器正在收集来自酒店 A、酒店 B 和 航空公司供应商 A,并为不同的客户准备另一个响应。

因此,扇入和扇出操作并不总是完全相互独立的。大多数情况下,它将是一个混合场景,扇入和扇出操作都是相互配合的。

请记住,对下一组服务器的扇出操作可以是异步的。也是如此。对于扇入请求来说,这可能不是真的。扇入操作有时被称为 API 调用。

Go>

Fan-out:多个 goroutine 从同一个通道读取数据,直到该通道关闭。OUT 是一种张开的模式,所以又被称为扇出,可以用来分发任务。

Fan-in:1 个 goroutine 从多个通道读取数据,直到这些通道关闭。IN 是一种收敛的模式,所以又被称为扇入,用来收集处理的结果。

package main
import (
	"context"
	"log"
	"sync"
	"time"
)
// Task 包含任务编号及任务所需时长
type Task struct {
	Number int
	Cost   time.Duration
}
// task channel 生成器
func taskChannelGerenator(ctx context.Context, taskList []Task) <-chan Task {
	taskCh := make(chan Task)
	go func() {
		defer close(taskCh)
		for _, task := range taskList {
			select {
			case <-ctx.Done():
				return
			case taskCh <- task:
			}
		}
	}()
	return taskCh
}
// doTask 处理并返回已处理的任务编号作为通道的函数
func doTask(ctx context.Context, taskCh <-chan Task) <-chan int {
	doneTaskCh := make(chan int)
	go func() {
		defer close(doneTaskCh)
		for task := range taskCh {
			select {
			case <-ctx.Done():
				return
			default:
				log.Printf("do task number: %d\n", task.Number)
				// task 任务处理
				// 根据任务耗时休眠
				time.Sleep(task.Cost)
				doneTaskCh <- task.Number // 已处理任务的编号放入通道
			}
		}
	}()
	return doneTaskCh
}
// `fan-in` 意味着将多个数据流复用或合并成一个流。
// merge 函数接收参数传递的多个通道 “taskChs”,并返回单个通道 “<-chan int”
func merge(ctx context.Context, taskChs []<-chan int) <-chan int {
	var wg sync.WaitGroup
	mergedTaskCh := make(chan int)
	mergeTask := func(taskCh <-chan int) {
		defer wg.Done()
		for t := range taskCh {
			select {
			case <-ctx.Done():
				return
			case mergedTaskCh <- t:
			}
		}
	}
	wg.Add(len(taskChs))
	for _, taskCh := range taskChs {
		go mergeTask(taskCh)
	}
	// 等待所有任务处理完毕
	go func() {
		wg.Wait()
		close(mergedTaskCh)
	}()
	return mergedTaskCh
}
func main() {
	start := time.Now()
	// 使用 context 来防止 goroutine 泄漏,即使在处理过程中被中断
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	// taskList 定义每个任务及其成本
	taskList := []Task{
		Task{1, 1 * time.Second},
		Task{2, 7 * time.Second},
		Task{3, 2 * time.Second},
		Task{4, 3 * time.Second},
		Task{5, 5 * time.Second},
		Task{6, 3 * time.Second},
	}
	// taskChannelGerenator 是一个函数,它接收一个 taskList 并将其转换为 Task 类型的通道
	// 执行结果(int slice channel)存储在 worker 中
	// 由于 doTask 的结果是一个通道,被分给了多个 worker,这就对应了 fan-out 处理
	taskCh := taskChannelGerenator(ctx, taskList)
	numWorkers := 4
	workers := make([]<-chan int, numWorkers)
	for i := 0; i < numWorkers; i++ {
		workers[i] = doTask(ctx, taskCh)  // doTask 处理并返回已处理的任务编号作为通道的函数
	}
	count := 0
	for d := range merge(ctx, workers) { // merge 从中读取已处理的任务编号
		count++
		log.Printf("done task number: %d\n", d)
	}
	log.Printf("Finished. Done %d tasks. Total time: %fs", count, time.Since(start).Seconds())
}

参考链接:

Fan-in/fan-out of services

Understanding the Fan-Out/Fan-In API Integration Pattern

以上就是Go语言异步API设计的扇入扇出模式详解的详细内容,更多关于Go异步API扇入扇出模式的资料请关注易采站长站其它相关文章!

如有侵权,请发邮件到 [email protected]

相关文章

  • 使用Go基于WebSocket构建千万级视频直播弹幕系统的代码详解

    使用Go基于WebSocket构建千万级视频直播弹幕系统的代码详解

    (1)业务复杂度介绍 开门见山,假设一个直播间同时500W人在线,那么1秒钟1000条弹幕,那么弹幕系统的推送频率就是: 500W * 1000条/秒=50亿条/秒 ,想想B站2019跨年晚会那次弹幕系统得是
    2020-07-08
  • golang中import cycle not allowed解决的一种思路

    golang中import cycle not allowed解决的一种思路

    发现问题 项目中碰到了一些问题,使用了指针函数的思路来解决相应问题 在实际项目中,因为两个项目互相引了对方的一些方法,导致了循环引用的错误,原本可以使用http的请求来解
    2019-11-10
  • 从go语言中找&和*区别详解

    从go语言中找&和*区别详解

    *和的区别 : 是取地址符号 , 即取得某个变量的地址 , 如 ; a*是指针运算符 , 可以表示一个变量是指针类型 , 也可以表示一个指针变量所指向的存储单元 , 也就是这个地址所存储的值 . 从
    2020-06-23
  • Go语言中利用http发起Get和Post请求的方法示例

    Go语言中利用http发起Get和Post请求的方法示例

    关于 HTTP 协议 HTTP(即超文本传输协议)是现代网络中最常见和常用的协议之一,设计它的目的是保证客户机和服务器之间的通信。 HTTP 的工作方式是客户机与服务器之间的 “请求-应答
    2019-11-10
  • golang如何实现mapreduce单进程版本详解

    golang如何实现mapreduce单进程版本详解

    前言 MapReduce作为hadoop的编程框架,是工程师最常接触的部分,也是除去了网络环境和集群配 置之外对整个Job执行效率影响很大的部分,所以很有必要深入了解整个过程。元旦放假的第一天
    2019-11-10
  • Go打包二进制文件的实现

    Go打包二进制文件的实现

    背景 众所周知,go语言可打包成目标平台二进制文件是其一大优势,如此go项目在服务器不需要配置go环境和依赖就可跑起来。 操作 需求:打包部署到centos7 笔者打包环境:mac os 方法:
    2020-03-11
  • GO语言实现简单的目录复制功能

    GO语言实现简单的目录复制功能

    本文实例讲述了GO语言实现简单的目录复制功能。分享给大家供大家参考。具体实现方法如下: 创建一个独立的 goroutine 遍历文件,主进程负责写入数据。程序会复制空目录,也可以设
    2019-11-10
  • golang中定时器cpu使用率高的现象详析

    golang中定时器cpu使用率高的现象详析

    前言: 废话少说,上线一个用golang写的高频的任务派发系统,上线跑着很稳定,但有个缺点就是当没有任务的时候,cpu的消耗也在几个百分点。 平均值在3%左右的cpu使用率。你没有任务
    2019-11-10