
在处理大量文本数据时,统计其中不重复词汇的总数是一个常见需求。当文本量巨大时,单线程处理效率低下,难以满足性能要求。因此,引入并行编程思想成为必然选择。核心挑战在于如何有效地将任务分解、分配给多个处理单元并行执行,并最终将它们的结果正确、高效地合并,以得到最终的去重词汇总数。
Map/Reduce是一种处理大规模数据集的编程模型,它天然适合解决词汇统计这类问题。其基本思想是将复杂的计算任务分解为两个主要阶段:映射(Map)和归约(Reduce)。在并行去重词汇统计中,我们可以将这一范式具体化为以下三个核心组件:
拆分器负责从输入源(如标准输入)读取原始文本数据,并将其分解成更小的、可管理的文本块或单词流。这些数据块随后被分发给多个工作者进行并行处理。为了实现高效的数据传输和协调,拆分器通常会将数据通过通道发送给工作者。当所有输入数据都被处理完毕后,拆分器会发出一个信号(例如关闭通道),通知工作者不再有新的数据到来。
工作者是并行处理的核心。每个工作者从拆分器接收分配给它的文本块。它们的主要任务是独立地处理这些文本块,识别其中的词汇,并维护一个本地的不重复词汇集合。这个集合通常是一个哈希表(如Go中的map[string]struct{}),用于快速地进行词汇去重。一旦一个工作者处理完其所有分配的数据,它会将自己本地的不重复词汇集合发送给聚合器。
聚合器是Map/Reduce范式中的归约阶段。它负责收集所有工作者发送过来的本地不重复词汇集合。聚合器的任务是将这些分散的集合合并成一个全局的、最终的不重复词汇集合。在合并过程中,聚合器需要确保并发安全,例如使用互斥锁(sync.Mutex)保护共享的全局集合,或者利用Go语言提供的并发安全数据结构(如sync.Map)来避免数据竞争。最终,聚合器会统计全局集合中词汇的数量,并输出结果。
Go语言的并发原语,如Goroutines和Channels,为实现上述Map/Reduce范式提供了天然的优势。
以下是一个概念性的Go语言代码结构,展示了各组件的交互:
package main
import (
"bufio"
"fmt"
"io"
"os"
"strings"
"sync"
"unicode"
)
// 定义工作者数量
const numWorkers = 4
// WordSet 是一个并发安全的字符串集合
type WordSet struct {
mu sync.Mutex
data map[string]struct{}
}
func NewWordSet() *WordSet {
return &WordSet{
data: make(map[string]struct{}),
}
}
func (ws *WordSet) Add(word string) {
ws.mu.Lock()
defer ws.mu.Unlock()
ws.data[word] = struct{}{}
}
func (ws *WordSet) Merge(other map[string]struct{}) {
ws.mu.Lock()
defer ws.mu.Unlock()
for word := range other {
ws.data[word] = struct{}{}
}
}
func (ws *WordSet) Count() int {
ws.mu.Lock()
defer ws.mu.Unlock()
return len(ws.data)
}
// Splitter 负责读取输入并分发单词
func splitter(input io.Reader, wordChan chan<- string, wg *sync.WaitGroup) {
defer wg.Done()
defer close(wordChan) // 所有单词读取完毕后关闭通道
scanner := bufio.NewScanner(input)
scanner.Split(bufio.ScanWords) // 按单词分割
for scanner.Scan() {
word := strings.ToLower(scanner.Text()) // 转换为小写
// 移除标点符号
word = strings.Map(func(r rune) rune {
if unicode.IsLetter(r) || unicode.IsNumber(r) {
return r
}
return -1 // 移除非字母数字字符
}, word)
if word != "" {
wordChan <- word // 发送单词给工作者
}
}
if err := scanner.Err(); err != nil {
fmt.Fprintf(os.Stderr, "Error reading input: %v\n", err)
}
}
// Worker 负责处理单词并生成本地去重集合
func worker(wordChan <-chan string, resultChan chan<- map[string]struct{}, wg *sync.WaitGroup) {
defer wg.Done()
localDistinctWords := make(map[string]struct{})
for word := range wordChan { // 从通道接收单词,直到通道关闭
localDistinctWords[word] = struct{}{}
}
resultChan <- localDistinctWords // 将本地结果发送给聚合器
}
// Aggregator 负责收集并合并所有工作者的结果
func aggregator(resultChan <-chan map[string]struct{}, finalSet *WordSet, wg *sync.WaitGroup) {
defer wg.Done()
for workerResult := range resultChan { // 从通道接收工作者结果,直到通道关闭
finalSet.Merge(workerResult) // 合并到全局集合
}
}
func main() {
var wg sync.WaitGroup
// 创建通道
wordChan := make(chan string, 100) // 缓冲通道,提高效率
resultChan := make(chan map[string]struct{}, numWorkers) // 每个工作者一个结果
finalDistinctWords := NewWordSet() // 全局去重集合
// 启动聚合器
wg.Add(1)
go aggregator(resultChan, finalDistinctWords, &wg)
// 启动工作者
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go worker(wordChan, resultChan, &wg)
}
// 启动拆分器
wg.Add(1)
go splitter(os.Stdin, wordChan, &wg)
// 等待拆分器和所有工作者完成
// 注意:这里需要一个更精细的WaitGroup管理,以确保resultChan在所有worker发送完后才关闭
// 简单的做法是:等待所有worker完成,然后关闭resultChan,再等待aggregator完成。
// 或者通过一个单独的goroutine来管理resultChan的关闭。
// 更安全的等待机制:
// 1. 等待 splitter 完成,关闭 wordChan
// 2. 等待所有 worker 完成
// 3. 关闭 resultChan
// 4. 等待 aggregator 完成
// 步骤1: 等待 splitter 完成并关闭 wordChan
splitterDone := make(chan struct{})
go func() {
wg.Wait() // 等待所有 goroutine 完成
close(splitterDone)
}()
// 确保所有 worker 启动后,再等待 splitter 完成
// 这里的 wg.Wait() 会等待所有 Add(1) 的 Goroutine
// 我们需要一个更细粒度的控制,例如两个 WaitGroup
// 改进的 WaitGroup 管理
var splitterWg, workerWg, aggregatorWg sync.WaitGroup
// 启动聚合器
aggregatorWg.Add(1)
go aggregator(resultChan, finalDistinctWords, &aggregatorWg)
// 启动工作者
for i := 0; i < numWorkers; i++ {
workerWg.Add(1)
go worker(wordChan, resultChan, &workerWg)
}
// 启动拆分器
splitterWg.Add(1)
go splitter(os.Stdin, wordChan, &splitterWg)
// 等待拆分器完成并关闭 wordChan
splitterWg.Wait()
//fmt.Println("Splitter finished, closing wordChan...") // Debug
// wordChan 已经在 splitter 内部 defer close 了
// 等待所有工作者完成
workerWg.Wait()
//fmt.Println("All workers finished, closing resultChan...") // Debug
close(resultChan) // 所有工作者都已发送结果,关闭结果通道
// 等待聚合器完成
aggregatorWg.Wait()
//fmt.Println("Aggregator finished.") // Debug
fmt.Printf("Total distinct words: %d\n", finalDistinctWords.Count())
}通过借鉴Map/Reduce范式,并结合Go语言强大的并发特性(Goroutines和Channels),我们可以高效地实现并行去重词汇统计。这种架构不仅能够充分利用多核CPU的计算能力,提高处理大规模文本数据的效率,也提供了一种清晰、模块化的设计思路,便于维护和扩展。理解和掌握这种并发模式,对于开发高性能、可伸缩的Go应用程序至关重要。
以上就是Go 语言并行去重词汇统计教程的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号