Go 并发模式进阶:fan-out/fan-in 与 pipeline 的工程实践
一、并发模式的"实战断层":从 goroutine 到生产级编排
Go 语言的 goroutine 和 channel 为并发编程提供了优雅的原语,但在生产环境中,"启动 N 个 goroutine 并发执行"远远不够。某数据处理团队在实现一个 ETL 管线时,为 1000 个数据分片各启动一个 goroutine 并发处理,结果在内存仅 4GB 的容器中触发了 OOM——每个 goroutine 的数据处理缓冲区占 8MB,1000 个 goroutine 共需 8GB 内存。更严重的是,所有 goroutine 共享同一个输出 channel,没有背压控制,导致下游消费者被压垮。
生产级并发需要解决三个核心问题:并行度控制(限制同时运行的 goroutine 数量)、背压传播(当下游处理不过来时,上游自动减速)、以及错误聚合(部分任务失败不影响整体流程)。fan-out/fan-in 和 pipeline 模式是解决这些问题的工程级方案。
二、fan-out/fan-in 与 pipeline 的架构对比
flowchart LR subgraph FanOut["fan-out/fan-in 模式"] IN1[输入 Channel] --> W1[Worker 1] IN1 --> W2[Worker 2] IN1 --> W3[Worker N] W1 --> OUT1[输出 Channel] W2 --> OUT1 W3 --> OUT1 end subgraph Pipeline["pipeline 模式"] SRC[数据源] --> S1[阶段1: 解析] S1 --> S2[阶段2: 校验] S2 --> S3[阶段3: 转换] S3 --> S4[阶段4: 输出] end style FanOut fill:#eef,stroke:#333 style Pipeline fill:#efe,stroke:#333两种模式的适用场景:
fan-out/fan-in:同一操作对多个数据项并行执行,结果汇总。适用于 CPU 密集型任务的并行化,如批量数据转换、并发 HTTP 请求。
pipeline:多个不同操作按顺序串联,每个阶段独立并发。适用于数据流转型任务,如 ETL 管线、日志处理链路。
三、生产级并发模式的代码实现
package concurrency import ( "context" "fmt" "sync" "time" ) // ============ fan-out/fan-in 模式 ============ // FanOut 将输入分发到 N 个 worker 并行处理 func FanOut[T any, R any]( ctx context.Context, input <-chan T, workerCount int, processor func(T) (R, error), ) (<-chan Result[R], context.CancelFunc) { // 带缓冲的结果 channel,缓冲大小 = worker 数,避免 worker 阻塞 resultCh := make(chan Result[R], workerCount) ctx, cancel := context.WithCancel(ctx) var wg sync.WaitGroup // 启动 N 个 worker for i := 0; i < workerCount; i++ { wg.Add(1) go func(workerID int) { defer wg.Done() for { select { case item, ok := <-input: if !ok { return // 输入关闭,worker 退出 } result, err := processor(item) select { case resultCh <- Result[R]{Value: result, Err: err}: case <-ctx.Done(): return } case <-ctx.Done(): return } } }(i) } // 等待所有 worker 完成后关闭结果 channel go func() { wg.Wait() close(resultCh) }() return resultCh, cancel } // FanIn 合并多个 channel 为一个 func FanIn[T any](channels ...<-chan T) <-chan T { merged := make(chan T) var wg sync.WaitGroup for _, ch := range channels { wg.Add(1) go func(c <-chan T) { defer wg.Done() for item := range c { merged <- item } }(ch) } go func() { wg.Wait() close(merged) }() return merged } // Result 带错误信息的结果包装 type Result[T any] struct { Value T Err error } // ============ pipeline 模式 ============ // PipelineStage 管线阶段定义 type PipelineStage[T any, R any] struct { Name string WorkerCount int Process func(T) (R, error) // 背压控制:当输出 channel 满时,自动阻塞 OutputBuffer int } // Pipeline 管线编排器 type Pipeline[T any, R any] struct { stages []interface{} // 每个阶段的类型不同,用 interface{} 存储 } // NewPipeline 创建管线 func NewPipeline[T any, R any]() *Pipeline[T, R] { return &Pipeline[T, R]{} } // RunPipeline 执行完整的管线(简化版:3 阶段) func RunPipeline[A, B, C, D any]( ctx context.Context, input <-chan A, stage1 PipelineStage[A, B], stage2 PipelineStage[B, C], stage3 PipelineStage[C, D], ) (<-chan Result[D], context.CancelFunc) { ctx, cancel := context.WithCancel(ctx) // 阶段1 stage1Out := make(chan Result[B], stage1.OutputBuffer) go runStage(ctx, stage1, input, stage1Out) // 阶段2:从阶段1的输出读取 stage2In := make(chan B, stage2.OutputBuffer) go func() { for r := range stage1Out { if r.Err == nil { stage2In <- r.Value } } close(stage2In) }() stage2Out := make(chan Result[C], stage2.OutputBuffer) go runStage(ctx, stage2, stage2In, stage2Out) // 阶段3 stage3In := make(chan C, stage3.OutputBuffer) go func() { for r := range stage2Out { if r.Err == nil { stage3In <- r.Value } } close(stage3In) }() stage3Out := make(chan Result[D], stage3.OutputBuffer) go runStage(ctx, stage3, stage3In, stage3Out) return stage3Out, cancel } // runStage 运行单个管线阶段(带并行度控制) func runStage[I any, O any]( ctx context.Context, stage PipelineStage[I, O], input <-chan I, output chan<- Result[O], ) { defer close(output) workerCount := stage.WorkerCount if workerCount <= 0 { workerCount = 1 } var wg sync.WaitGroup for i := 0; i < workerCount; i++ { wg.Add(1) go func() { defer wg.Done() for { select { case item, ok := <-input: if !ok { return } result, err := stage.Process(item) select { case output <- Result[O]{Value: result, Err: err}: case <-ctx.Done(): return } case <-ctx.Done(): return } } }() } wg.Wait() } // ============ 带背压的限流 fan-out ============ // BoundedFanOut 带背压控制的 fan-out // 当输出 channel 缓冲满时,worker 自动阻塞,实现背压传播 func BoundedFanOut[T any, R any]( ctx context.Context, input <-chan T, workerCount int, outputBuffer int, processor func(T) (R, error), ) <-chan Result[R] { // 有限的输出缓冲 = 背压控制 output := make(chan Result[R], outputBuffer) ctx, cancel := context.WithCancel(ctx) var wg sync.WaitGroup for i := 0; i < workerCount; i++ { wg.Add(1) go func() { defer wg.Done() for { select { case item, ok := <-input: if !ok { return } result, err := processor(item) // 如果 output 缓冲已满,此处阻塞 = 背压 select { case output <- Result[R]{Value: result, Err: err}: case <-ctx.Done(): return } case <-ctx.Done(): return } } }() } go func() { wg.Wait() close(output) cancel() // 清理 context }() return output } // ============ 错误聚合器 ============ // ErrorCollector 收集并发任务中的错误,不中断流程 type ErrorCollector struct { mu sync.Mutex errors []error } func (ec *ErrorCollector) Add(err error) { ec.mu.Lock() defer ec.mu.Unlock() ec.errors = append(ec.errors, err) } func (ec *ErrorCollector) Errors() []error { ec.mu.Lock() defer ec.mu.Unlock() cp := make([]error, len(ec.errors)) copy(cp, ec.errors) return cp } func (ec *ErrorCollector) HasErrors() bool { ec.mu.Lock() defer ec.mu.Unlock() return len(ec.errors) > 0 } // ============ 使用示例 ============ // ExampleFanOutPipeline 演示 fan-out + pipeline 组合 func ExampleFanOutPipeline() { ctx := context.Background() // 模拟输入数据 input := make(chan int, 100) go func() { for i := 0; i < 100; i++ { input <- i } close(input) }() // 带背压的 fan-out:5 个 worker,输出缓冲 10 results := BoundedFanOut(ctx, input, 5, 10, func(n int) (string, error) { time.Sleep(10 * time.Millisecond) // 模拟处理耗时 if n%20 == 0 { return "", fmt.Errorf("模拟错误: %d", n) } return fmt.Sprintf("processed-%d", n), nil }) // 收集结果 var success, failures int for r := range results { if r.Err != nil { failures++ continue } success++ } fmt.Printf("成功: %d, 失败: %d\n", success, failures) }四、并发模式的 Trade-offs
fan-out 的并行度选择。worker 数量不是越多越好。当 worker 数超过 CPU 核心数时,上下文切换开销开始显著;当 worker 数超过下游消费能力时,输出缓冲区积压导致内存膨胀。生产环境建议 worker 数 = CPU 核心数 × 1.5(留出 I/O 等待余量),输出缓冲区大小 = worker 数 × 2。
pipeline 阶段间的缓冲区大小。缓冲区太小,上游阶段频繁阻塞等待下游消费;缓冲区太大,内存占用高且错误传播延迟。经验值是缓冲区大小 = 下游 worker 数 × 2,确保下游每个 worker 都有数据可处理。
错误处理策略的分歧。pipeline 模式下,中间阶段的错误处理有两种策略:跳过错误项继续处理(吞吐优先),或遇到错误立即终止(一致性优先)。数据管线通常选择跳过 + 错误收集,金融交易管线通常选择立即终止。
goroutine 泄漏风险。fan-out 模式下,如果输入 channel 未正确关闭,worker goroutine 会永远阻塞。必须确保输入 channel 在所有场景下(包括错误场景)都能被关闭,或使用 context 取消机制作为兜底。
五、总结
fan-out/fan-in 和 pipeline 是 Go 并发编程中最实用的两种编排模式。fan-out 解决"同一操作并行化"问题,pipeline 解决"多阶段串行流水线"问题。生产级实现的关键在于:并行度控制(worker 数量与 CPU 核心数匹配)、背压传播(有限缓冲区 + 阻塞写入)、错误聚合(部分失败不中断整体流程)和资源安全(context 取消 + channel 关闭保障 goroutine 不泄漏)。两种模式可以组合使用——pipeline 的某个阶段内部使用 fan-out 并行化,实现"阶段间串行、阶段内并行"的混合编排。