0%

Go源码-context

在go服务器中,通常 每个传入的请求都会在自己的goroutine中进行处理. 请求处理程序通常会启动额外的goroutine来访问数据库 或者第三方服务.
处理请求的一组goroutine通常需要访问特定于请求的值: 例如userid, request_id, token,timeout…,当请求被取消或者超时时,所有处理该请求的goroutine都应该快速退出. 以便系统可以快速回收他们正在使用的任何资源.

context 是 go 1.14 引入 的一个概念. 用于解决上述问题.

context 源码解析

context

context.Context


// Context 是协程安全的,context 的方法可以被多个协程同时调用
type Context interface {

    // 
	Deadline() (deadline time.Time, ok bool)

	// 返回一个 channel,当 这个 context 被取消时,这个channel 会写入一个 struct
	Done() <-chan struct{}

	// Done 还没有被关闭, 返回  nil
    // 如果  Done  已经被关闭, 返回 error:
    // - Canceled
    // - DeadlineExceeded
    // 一旦返回 error,这个方法之后就是幂等的
	Err() error

	// 
	Value(key any) any
}
  1. Deadline
    如果 该 Context 被设置了超时,该函数返回 true & 对应的超时时间, 否则 返回 false & nil.

  2. Done

返回一个只读 Channel, 但context 主动取消或者 自动超时取消时,该 context 以及 该context 的所有 子context 的 Done channel 都会被 close.
级联取消所有的子过程.

  1. Err()

Done() 返回的 channel被 close 之前 会返回nil.
在 close 之后会返回error(表示比关闭的原因).

// Canceled is the error returned by Context.Err when the context is canceled.
var Canceled = errors.New("context canceled")
// DeadlineExceeded is the error returned by Context.Err when the context's
// deadline passes.
var DeadlineExceeded error = deadlineExceededError{}
  1. Value(key any) any

返回绑定在 该 context 上 给定 key 对应的 value. 如果没有 返回 nil.

为了防止Key冲突,最好将 key设置为 非导出类型,然后为其定义访问器.(这在许多开源项目中都有影子)

key可是任意可比较的类型.

下面是一个 context 共享 User 信息的 一个例子.

package user
import "context"

// User 是 要存储在Context中的value 的类型.
type User struct {...}

// key 被定义为非导出类型 为了防止与其他包冲突
// key is an unexported type for keys defined in this package.
// This prevents collisions with keys defined in other packages.
type key int

// userKey 用来关联 请求中的user信息, 绑定在 context上.他是非导出的.
// 通过  NewContext & FromContext 来设置和读取  user.User 信息, 而不是直接使用 userKey.
// userKey is the key for user.User values in Contexts. It is
// unexported; clients use user.NewContext and user.FromContext
// instead of using this key directly.
var userKey key

// 携带 value
// NewContext returns a new Context that carries value u.
func NewContext(ctx context.Context, u *User) context.Context {
    return context.WithValue(ctx, userKey, u)
}

//读取 context 中的 存储的 user信息
// FromContext returns the User value stored in ctx, if any.
func FromContext(ctx context.Context) (*User, bool) {
    u, ok := ctx.Value(userKey).(*User)
    return u, ok
}

context.Context 的实现.

1. emptyContext

emtptyContext 不是一个结构体,而是一个int类型,这样每一个值 都有独立的地址. 是 Context 的一个空的实现

var (
	background = new(emptyCtx)
	todo       = new(emptyCtx)
)

context.Background() 通常被用作根节点,不能被取消,也不会超时.
通过 WithCancelcontext.Background() 派生出的 Context 要注意在对应过程完结后及时 cancel, 否则会造成 Context 泄露.

// An emptyCtx is never canceled, has no values, and has no deadline. It is not
// struct{}, since vars of this type must have distinct addresses.
type emptyCtx int

func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
	return
}

func (*emptyCtx) Done() <-chan struct{} {
	return nil
}

func (*emptyCtx) Err() error {
	return nil
}

func (*emptyCtx) Value(key any) any {
	return nil
}

func (e *emptyCtx) String() string {
	switch e {
	case background:
		return "context.Background"
	case todo:
		return "context.TODO"
	}
	return "unknown empty Context"
}

2. context.cancelCtx

context核心实现在这里,包括树形结构的构建 以及 进行级联取消.

// A cancelCtx can be canceled. When canceled, it also cancels any children
// that implement canceler.
type cancelCtx struct {
	Context

	mu       sync.Mutex            // protects following fields 保护下面的字段
	done     atomic.Value          // of chan struct{}, created lazily, closed by first cancel call
	children map[canceler]struct{} // set to nil by the first cancel call
	err      error                 // set to non-nil by the first cancel call
}

// 递归查找 parentCtx 中第一个  cancelCtx
func (c *cancelCtx) Value(key any) any {
	if key == &cancelCtxKey { // 
		return c
	}
	return value(c.Context, key)
}

func (c *cancelCtx) Done() <-chan struct{} {
	d := c.done.Load()
	if d != nil {
		return d.(chan struct{})
	}
	c.mu.Lock()
	defer c.mu.Unlock()
	d = c.done.Load()
	if d == nil {
		d = make(chan struct{})
		c.done.Store(d)
	}
	return d.(chan struct{})
}

func (c *cancelCtx) Err() error {
	c.mu.Lock()
	err := c.err
	c.mu.Unlock()
	return err
}

// cancel closes c.done, cancels each of c's children, and, if
// removeFromParent is true, removes c from its parent's children.
// cancel  关闭 c.done, 关闭 context 及其所有子节点. 如果  removeFromParent = true, 那么将会把 context 从起 父节点移除.
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
	if err == nil {
		panic("context: internal error: missing cancel error")
	}
	// process
	c.mu.Lock()
	if c.err != nil {
		c.mu.Unlock()
		return // already canceled
	}
	c.err = err
	d, _ := c.done.Load().(chan struct{})
	if d == nil {
		c.done.Store(closedchan)
	} else {
		close(d)
	}
	// 遍历子节点,  递归调用 子节点的 cancel()
	for child := range c.children {
		// NOTE: acquiring the child's lock while holding parent's lock.
		child.cancel(false, err)
	}
	c.children = nil
	c.mu.Unlock()

	// c.Context 保存的是其 父节点
	if removeFromParent {
		removeChild(c.Context, c)
	}
}

cancelContext.Value() 方法 遇到特殊keycancelCtxKey 时会返回自身, 其实这是 复用了 Value()的 回溯逻辑(后面回溯树构建中会用到).
从而在 context树种回溯遍历时, 可以找到给定context中的第一个祖先 cancelContext实例.

WithCancel:
WithCancel() 通过 调用propagateCancel() 实现回溯树的构建.


// WithCancel returns a copy of parent with a new Done channel. The returned
// context's Done channel is closed when the returned cancel function is called
// or when the parent context's Done channel is closed, whichever happens first.
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete.
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
	if parent == nil {
		panic("cannot create context from nil parent")
	}
	c := newCancelCtx(parent)
	propagateCancel(parent, &c)
	return &c, func() { c.cancel(true, Canceled) }
}
// propagateCancel arranges for child to be canceled when parent is.
func propagateCancel(parent Context, child canceler) {
	done := parent.Done()
	if done == nil {
		return // parent is never canceled
	}

	// 如果父节点已经被取消, 级联取消当前子节点.
	select {
	case <-done:
		// parent is already canceled
		child.cancel(false, parent.Err())
		return
	default:
	}

	// 否则: 
	if p, ok := parentCancelCtx(parent); ok { //  如果是 cancelCtx 类型,那就简单了, 将 curCtx 放到  parent.child 里面.
		p.mu.Lock()
		if p.err != nil {
			// 父节点已经被取消
			child.cancel(false, p.err)
		} else {
			if p.children == nil {
				p.children = make(map[canceler]struct{}) // lazy create
			}
			p.children[child] = struct{}{}
		}
		p.mu.Unlock()
	} else {                                    //  如果 不是 cancelCtx类型, 就开启一个协程监听  parent 与 curCtx 是否有取消操作
		atomic.AddInt32(&goroutines, +1)
		go func() {
			select {
			case <-parent.Done():
				child.cancel(false, parent.Err())
			case <-child.Done():
			}
		}()
	}
}



// 当 parent 时cancelCtx时才会调用此方法.
// 通过 调用 `parent.Value(&cancelCtxKey)` 找到最里面的  cancelCtx, 
// 然后检查  parent.Done == cancelCtx.Done(). 
// 如果 不匹配, 说明 cancelCtx 被包裹在了一个 自定义实现中,提供不同的 done通道. 
// 
// 
// parentCancelCtx returns the underlying *cancelCtx for parent.
// It does this by looking up parent.Value(&cancelCtxKey) to find
// the innermost enclosing *cancelCtx and then checking whether
// parent.Done() matches that *cancelCtx. (If not, the *cancelCtx
// has been wrapped in a custom implementation providing a
// different done channel, in which case we should not bypass it.)
func parentCancelCtx(parent Context) (*cancelCtx, bool) {
	done := parent.Done()
	if done == closedchan || done == nil {
		return nil, false
	}
	// 传入
	p, ok := parent.Value(&cancelCtxKey).(*cancelCtx)
	if !ok {
		return nil, false
	}
	pdone, _ := p.done.Load().(chan struct{})
	if pdone != done {
		return nil, false
	}
	return p, true
}

3. context.timerCtx

超时取消, 顾名思义, 内部应该会有一个定时执行的函数, 调用 cancel().
这里包含两种取消模式: 1. 主动调用 cancel() 2. 时间到了, 执行 cancel()

// timerCtx  携带了一个 timer 和一个 deadline.
// timerCtx 内嵌了一个  cancelCtx  复用其  `Done()`  and `Err()`.
// 当时间到了的时候,会调用  cacnelCtx 的 cancel() 执行取消
// A timerCtx carries a timer and a deadline. It embeds a cancelCtx to
// implement Done and Err. It implements cancel by stopping its timer then
// delegating to cancelCtx.cancel.
type timerCtx struct {
	cancelCtx
	timer *time.Timer // Under cancelCtx.mu.

	deadline time.Time
}

func (c *timerCtx) Deadline() (deadline time.Time, ok bool) {
	return c.deadline, true
}

func (c *timerCtx) String() string {
	return contextName(c.cancelCtx.Context) + ".WithDeadline(" +
		c.deadline.String() + " [" +
		time.Until(c.deadline).String() + "])"
}

func (c *timerCtx) cancel(removeFromParent bool, err error) {
	c.cancelCtx.cancel(false, err)
	if removeFromParent {
		// Remove this timerCtx from its parent cancelCtx's children.
		removeChild(c.cancelCtx.Context, c)
	}
	c.mu.Lock()
	if c.timer != nil {
		c.timer.Stop()
		c.timer = nil
	}
	c.mu.Unlock()
}

创建一个 timerCtx:

func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
	return WithDeadline(parent, time.Now().Add(timeout))
}

func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
	if parent == nil {
		panic("cannot create context from nil parent")
	}
	if cur, ok := parent.Deadline(); ok && cur.Before(d) {
		// The current deadline is already sooner than the new one.
		return WithCancel(parent)
	}
	c := &timerCtx{
		cancelCtx: newCancelCtx(parent),
		deadline:  d,
	}
	// 下卖弄会分析, 回溯树的构建
	propagateCancel(parent, c)
	dur := time.Until(d)

	// 如果已经超时了, 直接取消
	if dur <= 0 {
		c.cancel(true, DeadlineExceeded) // deadline has already passed
		return c, func() { c.cancel(false, Canceled) }
	}
	c.mu.Lock()
	defer c.mu.Unlock()
	if c.err == nil {
		// 添加一个延时任务
		c.timer = time.AfterFunc(dur, func() {
			c.cancel(true, DeadlineExceeded)
		})
	}
	return c, func() { c.cancel(true, Canceled) }
}

4. valueCtx

创建:


// WithValue returns a copy of parent in which the value associated with key is
// val.
//
// Use context Values only for request-scoped data that transits processes and
// APIs, not for passing optional parameters to functions.
//
// The provided key must be comparable and should not be of type
// string or any other built-in type to avoid collisions between
// packages using context. Users of WithValue should define their own
// types for keys. To avoid allocating when assigning to an
// interface{}, context keys often have concrete type
// struct{}. Alternatively, exported context key variables' static
// type should be a pointer or interface.
func WithValue(parent Context, key, val any) Context {
	if parent == nil {
		panic("cannot create context from nil parent")
	}
	if key == nil {
		panic("nil key")
	}
	if !reflectlite.TypeOf(key).Comparable() {
		panic("key is not comparable")
	}
	return &valueCtx{parent, key, val}
}

// A valueCtx carries a key-value pair. It implements Value for that key and
// delegates all other calls to the embedded Context.
type valueCtx struct {
	Context
	key, val any
}

5.1. 构建 context 树

树的构建 级联取消的不分 基础代码在 cancelCtx 中, timerCtx和 valuectx 都会引用 cancelCtx 这部分代码.

WithCancel() 创建了 一个 Donechannel 和 拷贝了parentCtx.
parentCtx 被取消 或者 当前context 的 cancel() 被调用, 当前 context.Done() 的channel将会被 close.


func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
	if parent == nil {
		panic("cannot create context from nil parent")
	}
	c := newCancelCtx(parent)
	propagateCancel(parent, &c)
	return &c, func() { c.cancel(true, Canceled) }
}
  1. 子 context 持有 父 context 的引用.
  2. context 持有 其 所有子节点(子context) 的引用.

propagateCancel(): 保证当parentCtx 被取消时,遍历取消 childCtx, 构建一个回溯树.

  • 监听 parentContext.Done(), 收到取消信号时,级联取消子节点
// propagateCancel arranges for child to be canceled when parent is.
func propagateCancel(parent Context, child canceler) {
	// parent.Done() 返回nil, 说明 parentContext 根本无法取消
	// 1. 比如 context.Background() || context.TODO()
	done := parent.Done()
	if done == nil {
		return // parent is never canceled
	}

	// 2. 判断 parentContext 已经取消 
	select {
	case <-done:
		// parent is already canceled
		child.cancel(false, parent.Err())
		return
	default:
	}

	// parentCancelCtx  用于获取底层可取消的 Context(cancelContext)
	// 如果 parent Context 本身就是 *cancelCtx 或者是标准库中基于 cancelCtx 衍生的 Context 会返回 true
	// 如果 parent Context 已经取消/或者根本无法取消 会返回 false
	// 如果 parent Context 无法转换为一个 *cancelCtx 也会返回 false
	// 如果 parent Context 是一个自定义深度包装的 cancelCtx(自己定义了 done channel) 则也会返回 false
	// 
	// ok 为 true 说明 parent Context 为 标准库的 cancelCtx 或者至少可以完全转换为 *cancelCtx
	if p, ok := parentCancelCtx(parent); ok {
		p.mu.Lock() // 加锁  double check
		// 所以并发加锁情况下如果 parent Context 的 err 不为空说明已经被取消了
		if p.err != nil {
			// parent has already been canceled
			child.cancel(false, p.err)
		} else {
			// 如果 parent Context没有被取消, 就将 自身 context 加入到 parentContext.children 中,建立关联
			// 当 parentContext被取消时, 会遍历  parent.children, 一次调用  child.cancel() 
			// 所以 -> p.children 中所有的 context 都可以转化为  cancelCtx 节点.
			if p.children == nil {
				p.children = make(map[canceler]struct{})
			}
			p.children[child] = struct{}{}
		}
		p.mu.Unlock()
	} else {
		// ok 为false 说明 parent Context 已经取消 || 根本无法取消 || 无法转换为一个 	`cancelContext` || 是一个自定义的深度包装的 `cancelCtx`
		atomic.AddInt32(&goroutines, +1)
		// 由于代码一开始已经判断了 parentContext 已经取消  和  根本无法取消 的两种情况.
		// 所以这里  <- parent.Done() 并不会产生 panic
		//
		// 后两种情况下 无法通过  parentContext.child(map) 建立关联, 只能通过 一个 goroutine 完成 级联取消操作.
		go func() {
			select {
			case <-parent.Done():
				child.cancel(false, parent.Err())
			case <-child.Done():
			}
		}()
	}
}

parentCancelCtx(): parentCancelCtx 用于获取底层可取消的 Context(cancelContext)

  • 如果 parent Context 本身就是 *cancelCtx 或者是标准库中基于 cancelCtx 衍生的 Context 会返回 true
  • 如果 parent Context 已经取消/或者根本无法取消 会返回 false
  • 如果 parent Context 无法转换为一个 *cancelCtx 也会返回 false
  • 如果 parent Context 是一个自定义深度包装的 cancelCtx(自己定义了 done channel) 则也会返回 false
// parentCancelCtx returns the underlying *cancelCtx for parent.
// It does this by looking up parent.Value(&cancelCtxKey) to find
// the innermost enclosing *cancelCtx and then checking whether
// parent.Done() matches that *cancelCtx. (If not, the *cancelCtx
// has been wrapped in a custom implementation providing a
// different done channel, in which case we should not bypass it.)
func parentCancelCtx(parent Context) (*cancelCtx, bool) {
	done := parent.Done()
	// 1.
	// parentContext.Done()   == nil 说明不是一个 cancelContext
	// parentContext.Done()   == closedchan(一个可复用的表示关闭), 说明 parentContext  已经被关闭了.
	if done == closedchan || done == nil {
		return nil, false
	}

	// 2.
	// parentContex 无法转换为一个   cancelCtx
	p, ok := parent.Value(&cancelCtxKey).(*cancelCtx)
	if !ok {
		return nil, false
	}

	// 3. 
	// 经过上面的判断后,说明 parent context 可以被转换为 *cancelCtx,这时存在多种情况:
	//   - parent context 就是 *cancelCtx
	//   - parent context 是标准库中的 timerCtx
	//   - parent context 是个自己自定义包装的 cancelCtx
	//
	// 针对这 3 种情况需要进行判断,判断方法就是: 
	//   判断 parent context 通过 Done() 方法获取的 done channel 与 Value 查找到的 context 的 done channel 是否一致
	// 
	// 一致情况说明 parent context 为 cancelCtx 或 timerCtx 或 自定义的 cancelCtx 且未重写 Done(),
	// 这种情况下可以认为拿到了底层的 *cancelCtx
	// 
	// 不一致情况说明 parent context 是一个自定义的 cancelCtx 且重写了 Done(), 并且并未返回标准 `*cancelCtx` 的
	// 的 done channel,这种情况需要单独处理,故返回 nil, false
	pdone, _ := p.done.Load().(chan struct{})
	if pdone != done {
		return nil, false
	}
	return p, true
}

5.2 级联取消 cancel

级联取消的关键函数是 cancelCtx.cancel(), 在 当前 cancelCtx 取消时,需要级联取消 以该context为根节点的 context树中所有context,
并将 cancelCtx 从 其父节点摘除,以便让 GC 回收该 cancelCtx 子树的所有资源.

// cancel closes c.done, cancels each of c's children, and, if
// removeFromParent is true, removes c from its parent's children.
func (c *cancelCtx) cancel(removeFromParent bool, err error) {

	// err == nil,那么 直接 panic
	// 这么做的目的是因为 `cancel()`是个私有方法,标准库内任何调用 `cancel()`
    // 的方法保证了一定会传入 err, 如果没传那就是非正常调用,所以可以直接 panic
	if err == nil {
		panic("context: internal error: missing cancel error")
	}
	c.mu.Lock()
	// 已经被取消了, 防止重复调用 cancel()
	if c.err != nil {
		c.mu.Unlock()
		return // already canceled
	}
	c.err = err
	d, _ := c.done.Load().(chan struct{})
	// 判断 c.done 是否为 nil,因为 context.WithCancel 创建 cancelContext 时, c.done是延迟初始化的, 所以可能为nil
	// 如果 c.done == nil, 将其赋值为 可 复用的  closechan.
	if d == nil {
		c.done.Store(closedchan)
	} else {
		close(d)
	}

	// 如果当前 Context 下面还有关联的 child Context,且这些 child Context 都是
	// 可以转换成 *cancelCtx 的 Context(见上面的 propagateCancel()分析).
	// 那么直接遍历 childre map,并且调用 child Context 的 cancel 即可
	// 如果关联的 child Context 不能转换成 *cancelCtx,那么由 propagateCancel 方法
	// 中已经创建了单独的 Goroutine 来关闭这些 child Context
	for child := range c.children {
		// NOTE: acquiring the child's lock while holding parent's lock.
		child.cancel(false, err) // 递归调用 子context cancel
	}
	c.children = nil
	c.mu.Unlock()

	if removeFromParent {
		removeChild(c.Context, c) //从 parent 中找出 curContext
	}
}

// removeChild removes a context from its parent.
func removeChild(parent Context, child canceler) {
	p, ok := parentCancelCtx(parent)
	if !ok {
		return
	}
	p.mu.Lock()
	if p.children != nil {
		delete(p.children, child)
	}
	p.mu.Unlock()
}

func parentCancelCtx(parent Context) (*cancelCtx, bool) {
	done := parent.Done()
	if done == closedchan || done == nil {
		return nil, false
	}
	p, ok := parent.Value(&cancelCtxKey).(*cancelCtx)
	if !ok {
		return nil, false
	}
	pdone, _ := p.done.Load().(chan struct{})
	if pdone != done {
		return nil, false
	}
	return p, true
}

Golang Context 源码分析

欢迎关注我的其它发布渠道