当前位置:
Golang 实现熔断机制
时间:2025-11-04 23:35:01 出处:数据库阅读(143)
一些场景下,实现为了保障服务稳定性会引入熔断机制。熔断本文介绍了用 Go 语言自己实现熔断需要什么操作。机制

什么是实现熔断?
熔断是指在下游发生错误时上游主动关闭或限制对下游的请求。

原理
通常熔断器分为三个时期:CLOSED,熔断OPEN,机制HALFOPEN RPC 正常时,实现为 CLOSED; 当 RPC 错误增多时,熔断熔断器会被触发,机制进入 OPEN; OPEN 后经过一定的实现冷却时间,熔断器变为 HALFOPEN; HALFOPEN 时会对下游进行一些有策略的熔断访问,源码下载然后根据结果决定是机制变为 CLOSED,还是实现 OPEN;总得来说三个状态的转换大致如下图:

Go 实现
https://github.com/rubyist/circuitbreaker
IsAllowed 是否允许请求,根据当前状态判断
CLOSE 允许
OPEN
在 CoolingTimeout 冷却时间内,熔断不允许 过了冷却时间,机制状态变为 HALFOPEN,允许访问HALFOPEN
在 DetectTimeout 检测时间内,允许访问 否则不允许atomic.StoreInt32((*int32)(&b.state), int32(HALFOPEN))
trip 判断是否达到熔断限额(可以自定义)
type TripFunc func(Metricser) bool ThresholdTripFunc 错误阈值 ConsecutiveTripFunc 连续错误超过阈值 RateTripFunc 根据最少访问数和错误率判断Metricser 访问统计,包括成功数、失败数、超时数、香港云服务器错误率、采样数、连续错误数
type Metricser interface { Fail() // records a failure Succeed() // records a success Timeout() // records a timeout Failures() int64 // return the number of failures Successes() int64 // return the number of successes Timeouts() int64 // return the number of timeouts ConseErrors() int64 // return the consecutive errors recently ErrorRate() float64 // rate = (timeouts + failures) / (timeouts + failures + successes) Samples() int64 // (timeouts + failures + successes) Counts() (successes, failures, timeouts int64) Reset() }window 实现类
type window struct { sync.RWMutex oldest int32 // oldest bucket index latest int32 // latest bucket index buckets []bucket // buckets this window holds bucketTime time.Duration // time each bucket holds bucketNums int32 // the numbe of buckets inWindow int32 // the number of buckets in the window allSuccess int64 allFailure int64 allTimeout int64 conseErr int64 } type bucket struct { failure int64 success int64 timeout int64 }用环形队列实现动态统计。把一个连续的时间切成多个小份,每一个 bucket 保存 BucketTime 的统计数据,BucketTime * BucketNums 是统计的时间区间。
每 BucketTime,会有一个 bucket 过期
if w.inWindow == w.bucketNums { // the lastest covered the oldest(latest == oldest) oldBucket := &w.buckets[w.oldest] atomic.AddInt64(&w.allSuccess, -oldBucket.Successes()) atomic.AddInt64(&w.allFailure, -oldBucket.Failures()) atomic.AddInt64(&w.allTimeout, -oldBucket.Timeouts()) w.oldest++ if w.oldest >= w.bucketNums { w.oldest = 0 } } else { w.inWindow++ } w.latest++ if w.latest >= w.bucketNums { w.latest = 0 } (&w.buckets[w.latest]).Reset()Panel Metricser 的容器
PanelStateChangeHandler 熔断事件
type PanelStateChangeHandler func(key string, oldState, newState State, m Metricser)缺陷
所有 breaker 公用同一个 BucketTime,统计周期不支持更新 冷却时间不支持动态更新
                分享到:
                
温馨提示:以上内容和图片整理于网络,仅供参考,希望对您有帮助!如有侵权行为请联系删除!