
在处理大量文本并统计其中不重复词汇时,传统的单线程方法效率低下。并行编程是解决此问题的关键。借鉴Map/Reduce(映射/归约)范式,我们可以将整个过程分解为三个主要阶段:分割(Split)、映射(Map)和归约(Reduce)。
这种架构的优势在于,当一个工作协程处理速度变慢时,由于共享队列的存在,其他协程可以继续从队列中获取任务,从而避免整个流程因单个协程的瓶颈而停滞。同时,需要一种机制来通知工作协程输入已处理完毕,以便它们可以开始将结果发送给聚合器。
以下是这种并行处理模式的逻辑结构示意图:
_ 工作协程(Worker)_
/ \
/ \
分割器(Splitter)--- 工作协程(Worker) --- 聚合器(Aggregator)
\ /
\_ 工作协程(Worker) _/Go语言的协程(Goroutines)和通道(Channels)是实现上述并行架构的理想工具。
立即学习“go语言免费学习笔记(深入)”;
以下是一个Go语言实现此并行词汇统计的简化概念性代码结构,旨在展示各组件如何协同工作:
package main
import (
"bufio"
"fmt"
"io"
"os"
"strings"
"sync"
)
// 定义文本块类型
type textChunk string
// 定义局部词汇集合类型
type wordSet map[string]struct{}
// splitter 协程:读取输入并分割成块,发送到 inputCh
func splitter(reader io.Reader, inputCh chan<- textChunk, wg *sync.WaitGroup) {
defer close(inputCh) // 确保在所有文本读取完毕后关闭通道
defer wg.Done()
scanner := bufio.NewScanner(reader)
// 可以根据需要调整分割策略,例如按行、按固定字节数等
// 这里简化为按行读取
for scanner.Scan() {
line := scanner.Text()
if len(line) > 0 {
inputCh <- textChunk(line)
}
}
if err := scanner.Err(); err != nil {
fmt.Fprintf(os.Stderr, "Error reading input: %v\n", err)
}
}
// worker 协程:处理文本块,统计局部不重复词汇
func worker(id int, inputCh <-chan textChunk, outputCh chan<- wordSet, wg *sync.WaitGroup) {
defer wg.Done()
localWords := make(wordSet)
for chunk := range inputCh { // 循环直到 inputCh 关闭
words := strings.Fields(string(chunk)) // 简单分割单词
for _, word := range words {
cleanedWord := strings.ToLower(strings.Trim(word, `.,!?;:"'`)) // 简单清洗
if len(cleanedWord) > 0 {
localWords[cleanedWord] = struct{}{}
}
}
}
// 将局部结果发送给聚合器
outputCh <- localWords
fmt.Printf("Worker %d finished, processed %d distinct words locally.\n", id, len(localWords))
}
// aggregator 协程:收集所有worker的结果并合并
func aggregator(outputCh <-chan wordSet, finalWords map[string]struct{}, wg *sync.WaitGroup) {
defer wg.Done()
for localSet := range outputCh { // 循环直到 outputCh 关闭
for word := range localSet {
finalWords[word] = struct{}{}
}
}
}
func main() {
const numWorkers = 4 // 根据CPU核心数调整工作协程数量
inputCh := make(chan textChunk, 100) // 缓冲通道,防止阻塞
outputCh := make(chan wordSet, numWorkers) // 缓冲通道,接收worker结果
var splitterWg sync.WaitGroup
var workerWg sync.WaitGroup
var aggregatorWg sync.WaitGroup
finalWords := make(map[string]struct{})
// 启动 splitter 协程
splitterWg.Add(1)
go splitter(os.Stdin, inputCh, &splitterWg)
// 启动 worker 协程池
workerWg.Add(numWorkers)
for i := 0; i < numWorkers; i++ {
go worker(i+1, inputCh, outputCh, &workerWg)
}
// 启动 aggregator 协程
aggregatorWg.Add(1)
go aggregator(outputCh, finalWords, &aggregatorWg)
// 等待 splitter 完成所有输入
splitterWg.Wait()
fmt.Println("Splitter finished sending chunks.")
// 等待所有 worker 完成处理
workerWg.Wait()
fmt.Println("All workers finished processing.")
close(outputCh) // 必须在所有 worker 完成后关闭 outputCh,否则 aggregator 会死锁
// 等待 aggregator 完成合并
aggregatorWg.Wait()
fmt.Println("Aggregator finished merging results.")
fmt.Printf("\nTotal distinct words found: %d\n", len(finalWords))
// 打印部分结果(可选)
// i := 0
// for word := range finalWords {
// fmt.Printf("%s ", word)
// i++
// if i > 20 { // 只打印前20个
// break
// }
// }
// fmt.Println()
}通过采用Map/Reduce模式并利用Go语言强大的并发特性,我们可以高效地并行统计文本中的不重复词汇。这种方法将复杂的任务分解为可管理的并发子任务,极大地提升了处理大规模文本数据的能力。理解并熟练运用Go协程、通道和同步原语,是构建高性能并发应用的基石。
以上就是Go语言高效并行统计文本中不重复词汇的方法的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号