欢迎来到思维库

思维库

Golang 实现熔断机制

时间:2025-11-04 23:35:01 出处:数据库阅读(143)

 一些场景下,实现为了保障服务稳定性会引入熔断机制。熔断本文介绍了用 Go 语言自己实现熔断需要什么操作。机制

什么是实现熔断?

熔断是指在下游发生错误时上游主动关闭或限制对下游的请求。

原理

 通常熔断器分为三个时期:CLOSED,熔断OPEN,机制HALFOPEN  RPC 正常时,实现为 CLOSED;  当 RPC 错误增多时,熔断熔断器会被触发,机制进入 OPEN;  OPEN 后经过一定的实现冷却时间,熔断器变为 HALFOPEN;  HALFOPEN 时会对下游进行一些有策略的熔断访问,源码下载然后根据结果决定是机制变为 CLOSED,还是实现 OPEN;

总得来说三个状态的转换大致如下图:

Go 实现

https://github.com/rubyist/circuitbreaker

IsAllowed 是否允许请求,根据当前状态判断

CLOSE 允许

OPEN

 在 CoolingTimeout 冷却时间内,熔断不允许  过了冷却时间,机制状态变为 HALFOPEN,允许访问

HALFOPEN

 在 DetectTimeout 检测时间内,允许访问  否则不允许

atomic.StoreInt32((*int32)(&b.state), int32(HALFOPEN))

trip 判断是否达到熔断限额(可以自定义)

type TripFunc func(Metricser) bool   ThresholdTripFunc 错误阈值  ConsecutiveTripFunc 连续错误超过阈值  RateTripFunc 根据最少访问数和错误率判断

Metricser 访问统计,包括成功数、失败数、超时数、香港云服务器错误率、采样数、连续错误数 

type Metricser interface {     Fail()    // records a failure     Succeed() // records a success     Timeout() // records a timeout     Failures() int64    // return the number of failures     Successes() int64   // return the number of successes     Timeouts() int64    // return the number of timeouts     ConseErrors() int64 // return the consecutive errors recently     ErrorRate() float64 // rate = (timeouts + failures) / (timeouts + failures + successes)     Samples() int64     // (timeouts + failures + successes)     Counts() (successes, failures, timeouts int64)     Reset()  } 

window 实现类 

type window struct {     sync.RWMutex     oldest  int32     // oldest bucket index     latest  int32     // latest bucket index     buckets []bucket // buckets this window holds     bucketTime time.Duration // time each bucket holds     bucketNums int32         // the numbe of buckets     inWindow   int32         // the number of buckets in the window     allSuccess int64     allFailure int64     allTimeout int64     conseErr int64  }  type bucket struct {     failure int64     success int64     timeout int64  } 

用环形队列实现动态统计。把一个连续的时间切成多个小份,每一个 bucket 保存 BucketTime 的统计数据,BucketTime * BucketNums 是统计的时间区间。

每 BucketTime,会有一个 bucket 过期 

if w.inWindow == w.bucketNums {     // the lastest covered the oldest(latest == oldest)     oldBucket := &w.buckets[w.oldest]     atomic.AddInt64(&w.allSuccess, -oldBucket.Successes())     atomic.AddInt64(&w.allFailure, -oldBucket.Failures())     atomic.AddInt64(&w.allTimeout, -oldBucket.Timeouts())     w.oldest++     if w.oldest >= w.bucketNums {        w.oldest = 0     }  } else {     w.inWindow++  }  w.latest++ if w.latest >= w.bucketNums {     w.latest = 0  }  (&w.buckets[w.latest]).Reset() 

Panel Metricser 的容器

PanelStateChangeHandler 熔断事件 

type PanelStateChangeHandler func(key string, oldState, newState State, m Metricser) 

缺陷

 所有 breaker 公用同一个 BucketTime,统计周期不支持更新  冷却时间不支持动态更新 

分享到:

温馨提示:以上内容和图片整理于网络,仅供参考,希望对您有帮助!如有侵权行为请联系删除!

友情链接: