Circuit Breaker Explained

Circuit Breaker Principle

Modern microservice architectures are basically distributed, and the whole distributed system is composed of a very large number of microservices. Different services invoke each other and form a complex invocation chain. If a service in the complex invocation link is unstable, it may cascade and eventually the whole link may hang. Therefore, we need to circuit breaker and downgrade unstable service dependencies to temporarily cut off unstable service calls to avoid local instabilities that can lead to an avalanche of the whole distributed system.

  • Closed state: also the initial state, we need a call failure counter, if the call fails, it makes the number of failures plus 1. If the number of recent failures exceeds the threshold of allowed failures in a given time, it switches to the Open state, when a timeout clock is turned on, when the timeout clock time is reached, it switches to the Half Open state, the timeout time is set to give the system a chance to fix the error that caused the call to fail in order to return to the normal working state. In the Closed state, the error count is time-based. This prevents the circuit breaker from entering the Open state due to an accidental error, and can also be based on the number of consecutive failures.
  • Open state: In this state, the client request returns an immediate error response without invoking the server side.
  • Half-Open state: allows a certain number of client de-calls to the server, and if these requests have a successful call to the service, then it can be assumed that the error that caused the call to fail before has been corrected, at which point the circuit breaker switches to the Closed state and the error counter is reset. The Half-Open state is effective in preventing the recovering service from being hit again by a sudden influx of requests.
  1. a request comes, first allowRequest() function to determine whether in the circuit breaker, if not, then release, if so, but also to see whether a circuit breaker time slice reached, if the circuit breaker time slice to, also release, otherwise directly return an error.
  2. each call has two functions makeSuccess(duration) and makeFailure(duration) to count how many are successful or failed within a certain duration.
  3. the condition isOpen() to determine whether to circuit breaker, is to calculate the failure/(success+failure) the current error rate, if higher than a threshold, then the circuit breaker is open, otherwise closed.
  4. Hystrix will maintain a data in memory which records the statistics of the request results for each cycle, elements that exceed the length of time will be deleted.

Circuit breaker implementation

After understanding the principle of fusing, let’s implement a set of fusers by ourselves.

func newHystrixBreaker() *hystrixBreaker {
bucketDuration := time.Duration(int64(window) / int64(buckets))
stat := collection.NewRollingWindow(buckets, bucketDuration)
return &hystrixBreaker{
state: Closed,
coolingTimeout: defaultCoolingTimeout,
detectTimeout: defaultDetectTimeout,
tripFunc: rateTripFunc(defaultErrRate, defaultMinSample),
stat: stat,
now: time,
}
}
func rateTripFunc(rate float64, minSamples int64) TripFunc {
return func(rollingWindow *collection.RollingWindow) bool {
var total, errs int64
rollingWindow.Reduce(func(b *collection.Bucket) {
total += b.Count
errs += int64(b.Sum)
})
errRate := float64(errs) / float64(total)
return total >= minSamples && errRate > rate
}
}
func (b *hystrixBreaker) doReq(req func() error, fallback func(error) error, acceptable Acceptable) error {
if err := b.accept(); err ! = nil {
if fallback ! = nil {
return fallback(err)
}
return err
}

defer func() {
if e := recover(); e ! = nil {
b.markFailure()
panic(e)
}
}()

err := req()
if acceptable(err) {
b.markSuccess()
} else {
b.markFailure()
}

return err
}
func (b *hystrixBreaker) accept() error {
b.mux.Lock()
switch b.getState() {
case Open:
now := b.now()
if b.openTime.Add(b.coolingTimeout).After(now) {
b.mux.Unlock()
return ErrServiceUnavailable
}
if b.getState() == Open {
atomic.StoreInt32((*int32)(&b.state), int32(HalfOpen))
atomic.StoreInt32(&b.halfopenSuccess, 0)
b.lastRetryTime = now
b.mux.Unlock()
} else {
b.mux.Unlock()
return ErrServiceUnavailable
}
case HalfOpen:
now := b.now()
if b.lastRetryTime.Add(b.detectTimeout).After(now) {
b.mux.Unlock()
return ErrServiceUnavailable
}
b.lastRetryTime = now
b.mux.Unlock()
case Closed:
b.mux.Unlock()
}

return nil
}
func (b *hystrixBreaker) markSuccess() {
b.mux.Lock()
switch b.getState() {
case Open:
b.mux.Unlock()
case HalfOpen:
Atomic.AddInt32(&b.halfopenSuccess, 1)
if atomic.LoadInt32(&b.halfopenSuccess) > defaultHalfOpenSuccesss {
atomic.StoreInt32((*int32)(&b.state), int32(Closed))
b.stat.Reduce(func(b *collection.Bucket) {
b.Count = 0
b.Sum = 0
})
}
b.mux.Unlock()
case Closed:
b.stat.Add(1)
b.mux.Unlock()
}
}
func (b *hystrixBreaker) markFailure() {
b.mux.Lock()
b.stat.Add(0)
switch b.getState() {
case Open:
b.mux.Unlock()
case HalfOpen:
b.openTime = b.now()
atomic.StoreInt32((*int32)(&b.state), int32(Open))
b.mux.Unlock()
case Closed:
if b.tripFunc ! = nil && b.tripFunc(b.stat) {
b.openTime = b.now()
atomic.StoreInt32((*int32)(&b.state), int32(Open))
}
b.mux.Unlock()
}
}

hystrixBreaker and googlebreaker comparison

Next, compare the fusing effect of the two fuses.

func (l *UserInfoLogic) UserInfo(in *user.UserInfoRequest) (*user.UserInfoResponse, error) {
ts := time.Now().UnixMilli()
if in.UserId == int64(1) {
if ts%5 == 1 {
return nil, status.Error(codes.Internal, "internal error")
}
return &user.UserInfoResponse{
UserId: 1,
Name: "jack",
}, nil

}
return &user.UserInfoResponse{}, nil
}
var metricSuccessReqTotal = metric.NewCounterVec(&metric.CounterVecOpts{
Namespace: "circuit_breaker",
Subsystem: "requests",
Name: "req_total",
Help: "test for circuit breaker",
Labels: []string{"method"},
})

func (l *UserInfoLogic) UserInfo() (resp *types.UserInfoResponse, err error) {
for {
_, err := l.svcCtx.UserRPC.UserInfo(l.ctx, &user.UserInfoRequest{UserId: int64(1)})
if err ! = nil && err == breaker.ErrServiceUnavailable {
fmt.Println(err)
continue
}
metricSuccessReqTotal.Inc("UserInfo")
}

return &types.UserInfoResponse{}, nil
}

Source code interpretation

googleBreaker code path in: go-zero/core/breaker/googlebreaker.go

func (b *googleBreaker) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error {
if err := b.accept(); err ! = nil {
if fallback ! = nil {
return fallback(err)
}

return err
}

defer func() {
if e := recover(); e ! = nil {
b.markFailure()
panic(e)
}
}()

err := req()
if acceptable(err) {
b.markSuccess()
} else {
b.markFailure()
}

return err
}
  • Total number of requests (requests): the sum of the number of requests initiated by the caller
  • Number of requests normally processed (accepts): the number of requests normally processed by the server
max(0, (requests - K * accepts) / (requests + 1))
func (b *googleBreaker) accept() error {
accepts, total := b.history()
weightedAccepts := b.k * float64(accepts)
// https://landing.google.com/sre/sre-book/chapters/handling-overload/#eq2101
dropRatio := math.Max(0, (float64(total-protection)-weightedAccepts)/float64(total+1))
if dropRatio <= 0 {
return nil
}

if b.proba.TrueOnProba(dropRatio) {
return ErrServiceUnavailable
}

return nil
}
func (b *googleBreaker) history() (accepts, total int64) {
b.stat.Reduce(func(b *collection.Bucket) {
accepts += int64(b.Sum)
total += b.Count
})

return
}

Conclusion

This article introduces a client-side throttling mechanism in service governance — circuit breaker. There are three states to be implemented in the hystrix circuit breaker policy, Open, HalfOpen and Closed, and the timing of switching between the different states is described in detail in the above article, so you can read it again and again to understand it, or better yet, implement it yourself. For go-zero built-in circuit breaker is no state, if you have to say its state, then there are only two cases of open and closed, it is based on the success rate of the current request adaptive discard request, is a more flexible circuit breaker strategy, discard request probability with the number of requests processed normally changing, the more requests processed normally the lower the probability of discarding requests, and vice versa the higher the probability of discarding requests. The higher the probability of dropping a request, the higher the probability of dropping a request.

Reference

https://martinfowler.com/bliki/CircuitBreaker.html

Project address

https://github.com/zeromicro/go-zero

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store