在 golang 分布式系统中,协程可用于实现限流和熔断。限流通过令牌桶算法限制并发访问量,熔断则采用断路器模式,当故障频繁时暂时停止访问。限流和熔断机制可防止系统过载或崩溃,保证系统的稳定和响应能力。

利用 Go 协程实现限流和熔断
在分布式系统中,限制对资源的并发访问量和处理故障非常重要。Golang 中的协程提供了一个轻量级的并行机制,可用于轻松实现限流和熔断。
限流
立即学习“go语言免费学习笔记(深入)”;
限制对资源的并发访问量可以防止系统因过载而崩溃。可以使用令牌桶算法来实现限流:
package main
import (
"context"
"fmt"
"runtime"
"sync"
"sync/atomic"
"time"
)
func main() {
// 令牌生成速率(令牌/秒)
rate := 100
// 令牌桶容量
capacity := 1000
// 创建令牌桶
bucket := NewTokenBucket(rate, capacity)
// 模拟并发请求
wg := sync.WaitGroup{}
for i := 0; i < 1000; i++ {
wg.Add(1)
go func(requestID int) {
defer wg.Done()
if !bucket.TryAcquire() {
// 如果无法获取令牌,则丢弃请求
fmt.Println("Request", requestID, "dropped due to rate limiting")
} else {
// 处理请求
fmt.Println("Request", requestID, "processed")
}
}(i)
}
wg.Wait()
}
// 令牌桶
type TokenBucket struct {
rate int
capacity int
tokens int64
lastUpdated time.Time
lock sync.RWMutex
}
// NewTokenBucket 创建一个新的令牌桶
func NewTokenBucket(rate int, capacity int) *TokenBucket {
bucket := &TokenBucket{
rate: rate,
capacity: capacity,
tokens: capacity,
lastUpdated: time.Now(),
}
// 启动定时任务更新令牌
go bucket.Tick()
return bucket
}
// TryAcquire 尝试获取一个令牌
func (b *TokenBucket) TryAcquire() bool {
for {
b.lock.Lock()
tokens := b.tokens
// 计算自上次更新以来经过的时间
elapsed := time.Since(b.lastUpdated)
// 根据时间更新令牌
newTokens := b.rate * int(elapsed.Seconds())
// 更新令牌量
tokens += newTokens
// 确保令牌量不超过容量
tokens = min(tokens, b.capacity)
// 更新最后更新时间
b.lastUpdated = time.Now()
if tokens > 0 {
tokens--
atomic.StoreInt64(&b.tokens, tokens)
b.lock.Unlock()
return true
}
b.lock.Unlock()
return false
}
}
// Tick 定时任务更新令牌
func (b *TokenBucket) Tick() {
ticker := time.NewTicker(100 * time.Millisecond)
for {
select {
case <-ticker.C:
b.TryAcquire()
}
}
}
func min(a, b int) int {
if a < b {
return a
}
return b
}熔断
熔断是指当资源不可用或响应速度过慢时,临时停止对该资源的访问。这可以防止不必要的请求堆积,从而导致系统崩溃。可以使用断路器模式来实现熔断:
package main
import (
"context"
"fmt"
"sync/atomic"
"time"
)
func main() {
// 连续失败的请求次数阈值
failureThreshold := 5
// 熔断持续时间(秒)
timeout := 30
// 创建熔断器
breaker := NewCircuitBreaker(failureThreshold, timeout)
// 模拟并发请求
wg := sync.WaitGroup{}
for i := 0; i < 1000; i++ {
wg.Add(1)
go func(requestID int) {
defer wg.Done()
// 尝试执行请求
if breaker.Call(func() error {
// 实际的请求处理
return nil
}) {
// 请求成功
fmt.Println("Request", requestID, "processed")
} else {
// 请求被熔断
fmt.Println("Request", requestID, "dropped due to circuit breaker")
}
}(i)
}
wg.Wait()
}
// 熔断器
type CircuitBreaker struct {
failureThreshold int
timeout time.Duration
state atomic.Value
lastFailureAt atomic.Value
failureCount int32
resetTimerStarted bool
}
// NewCircuitBreaker 创建一个新的熔断器
func NewCircuitBreaker(failureThreshold int, timeout time.Duration) *CircuitBreaker {
breaker := &CircuitBreaker{
failureThreshold: failureThreshold,
timeout: timeout,
}
// 初始化熔断器状态
breaker.SetState(Closed)
return breaker
}
// SetState 设置熔断器状态
func (b *CircuitBreaker) SetState(state State) {
b.state.Store(state)
}
// State 获取熔断器状态
func (b *CircuitBreaker) State() State {
return b.state.Load()
}
// Call 执行受熔断器保护的函数
func (b *CircuitBreaker) Call(f func() error) error {
state := b.State()
switch state {
case Closed:
// 熔断器已关闭,尝试执行函数
return b.execute(f)
case Open:
// 熔断器已打开,直接返回错误
return ErrCircuitOpen
case HalfOpen:
// 熔断器处于半开状态,尝试执行函数并更新熔断器状态
if err := b.execute(f); err != nil {
b.SetState(Open)
return err
} else {
b.SetState(Closed)
return nil
}
default:
return ErrUnknownState
}
}
// execute 执行函数并更新熔断器状态
func (b *CircuitBreaker) execute(f func() error) error {
// 记录函数调用时间以上就是golang框架如何利用协程实现限流和熔断?的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号