在go服务器中,通常 每个传入的请求都会在自己的goroutine
中进行处理. 请求处理程序通常会启动额外的goroutine
来访问数据库 或者第三方服务.
处理请求的一组goroutine
通常需要访问特定于请求的值: 例如userid
, request_id
, token
,timeout
…,当请求被取消或者超时时,所有处理该请求的goroutine都应该快速退出. 以便系统可以快速回收他们正在使用的任何资源.
context 是 go 1.14 引入 的一个概念. 用于解决上述问题.
context 源码解析
context.Context
// Context 是协程安全的,context 的方法可以被多个协程同时调用
type Context interface {
//
Deadline() (deadline time.Time, ok bool)
// 返回一个 channel,当 这个 context 被取消时,这个channel 会写入一个 struct
Done() <-chan struct{}
// Done 还没有被关闭, 返回 nil
// 如果 Done 已经被关闭, 返回 error:
// - Canceled
// - DeadlineExceeded
// 一旦返回 error,这个方法之后就是幂等的
Err() error
//
Value(key any) any
}
Deadline
如果 该 Context 被设置了超时,该函数返回true & 对应的超时时间
, 否则 返回false & nil
.Done
返回一个只读 Channel, 但context 主动取消或者 自动超时取消时,该 context 以及 该context 的所有 子context 的 Done channel 都会被 close.
级联取消所有的子过程.
Err()
Done()
返回的 channel被 close 之前 会返回nil.
在 close 之后会返回error(表示比关闭的原因).
// Canceled is the error returned by Context.Err when the context is canceled.
var Canceled = errors.New("context canceled")
// DeadlineExceeded is the error returned by Context.Err when the context's
// deadline passes.
var DeadlineExceeded error = deadlineExceededError{}
Value(key any) any
返回绑定在 该 context 上 给定 key 对应的 value. 如果没有 返回 nil.
为了防止Key冲突,最好将 key设置为 非导出类型,然后为其定义访问器.(这在许多开源项目中都有影子)
key可是任意可比较的类型.
下面是一个 context 共享 User 信息的 一个例子.
package user
import "context"
// User 是 要存储在Context中的value 的类型.
type User struct {...}
// key 被定义为非导出类型 为了防止与其他包冲突
// key is an unexported type for keys defined in this package.
// This prevents collisions with keys defined in other packages.
type key int
// userKey 用来关联 请求中的user信息, 绑定在 context上.他是非导出的.
// 通过 NewContext & FromContext 来设置和读取 user.User 信息, 而不是直接使用 userKey.
// userKey is the key for user.User values in Contexts. It is
// unexported; clients use user.NewContext and user.FromContext
// instead of using this key directly.
var userKey key
// 携带 value
// NewContext returns a new Context that carries value u.
func NewContext(ctx context.Context, u *User) context.Context {
return context.WithValue(ctx, userKey, u)
}
//读取 context 中的 存储的 user信息
// FromContext returns the User value stored in ctx, if any.
func FromContext(ctx context.Context) (*User, bool) {
u, ok := ctx.Value(userKey).(*User)
return u, ok
}
context.Context 的实现.
1. emptyContext
emtptyContext 不是一个结构体,而是一个int类型,这样每一个值 都有独立的地址. 是 Context 的一个空的实现
var (
background = new(emptyCtx)
todo = new(emptyCtx)
)
context.Background() 通常被用作根节点,不能被取消,也不会超时.
通过 WithCancel
从 context.Background()
派生出的 Context 要注意在对应过程完结后及时 cancel, 否则会造成 Context 泄露.
// An emptyCtx is never canceled, has no values, and has no deadline. It is not
// struct{}, since vars of this type must have distinct addresses.
type emptyCtx int
func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
return
}
func (*emptyCtx) Done() <-chan struct{} {
return nil
}
func (*emptyCtx) Err() error {
return nil
}
func (*emptyCtx) Value(key any) any {
return nil
}
func (e *emptyCtx) String() string {
switch e {
case background:
return "context.Background"
case todo:
return "context.TODO"
}
return "unknown empty Context"
}
2. context.cancelCtx
context核心实现在这里,包括树形结构的构建 以及 进行级联取消.
// A cancelCtx can be canceled. When canceled, it also cancels any children
// that implement canceler.
type cancelCtx struct {
Context
mu sync.Mutex // protects following fields 保护下面的字段
done atomic.Value // of chan struct{}, created lazily, closed by first cancel call
children map[canceler]struct{} // set to nil by the first cancel call
err error // set to non-nil by the first cancel call
}
// 递归查找 parentCtx 中第一个 cancelCtx
func (c *cancelCtx) Value(key any) any {
if key == &cancelCtxKey { //
return c
}
return value(c.Context, key)
}
func (c *cancelCtx) Done() <-chan struct{} {
d := c.done.Load()
if d != nil {
return d.(chan struct{})
}
c.mu.Lock()
defer c.mu.Unlock()
d = c.done.Load()
if d == nil {
d = make(chan struct{})
c.done.Store(d)
}
return d.(chan struct{})
}
func (c *cancelCtx) Err() error {
c.mu.Lock()
err := c.err
c.mu.Unlock()
return err
}
// cancel closes c.done, cancels each of c's children, and, if
// removeFromParent is true, removes c from its parent's children.
// cancel 关闭 c.done, 关闭 context 及其所有子节点. 如果 removeFromParent = true, 那么将会把 context 从起 父节点移除.
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
if err == nil {
panic("context: internal error: missing cancel error")
}
// process
c.mu.Lock()
if c.err != nil {
c.mu.Unlock()
return // already canceled
}
c.err = err
d, _ := c.done.Load().(chan struct{})
if d == nil {
c.done.Store(closedchan)
} else {
close(d)
}
// 遍历子节点, 递归调用 子节点的 cancel()
for child := range c.children {
// NOTE: acquiring the child's lock while holding parent's lock.
child.cancel(false, err)
}
c.children = nil
c.mu.Unlock()
// c.Context 保存的是其 父节点
if removeFromParent {
removeChild(c.Context, c)
}
}
cancelContext.Value() 方法 遇到特殊keycancelCtxKey
时会返回自身, 其实这是 复用了 Value()
的 回溯逻辑(后面回溯树构建
中会用到).
从而在 context树种回溯遍历时, 可以找到给定context中的第一个祖先 cancelContext
实例.
WithCancel
:WithCancel()
通过 调用propagateCancel()
实现回溯树的构建.
// WithCancel returns a copy of parent with a new Done channel. The returned
// context's Done channel is closed when the returned cancel function is called
// or when the parent context's Done channel is closed, whichever happens first.
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete.
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
if parent == nil {
panic("cannot create context from nil parent")
}
c := newCancelCtx(parent)
propagateCancel(parent, &c)
return &c, func() { c.cancel(true, Canceled) }
}
// propagateCancel arranges for child to be canceled when parent is.
func propagateCancel(parent Context, child canceler) {
done := parent.Done()
if done == nil {
return // parent is never canceled
}
// 如果父节点已经被取消, 级联取消当前子节点.
select {
case <-done:
// parent is already canceled
child.cancel(false, parent.Err())
return
default:
}
// 否则:
if p, ok := parentCancelCtx(parent); ok { // 如果是 cancelCtx 类型,那就简单了, 将 curCtx 放到 parent.child 里面.
p.mu.Lock()
if p.err != nil {
// 父节点已经被取消
child.cancel(false, p.err)
} else {
if p.children == nil {
p.children = make(map[canceler]struct{}) // lazy create
}
p.children[child] = struct{}{}
}
p.mu.Unlock()
} else { // 如果 不是 cancelCtx类型, 就开启一个协程监听 parent 与 curCtx 是否有取消操作
atomic.AddInt32(&goroutines, +1)
go func() {
select {
case <-parent.Done():
child.cancel(false, parent.Err())
case <-child.Done():
}
}()
}
}
// 当 parent 时cancelCtx时才会调用此方法.
// 通过 调用 `parent.Value(&cancelCtxKey)` 找到最里面的 cancelCtx,
// 然后检查 parent.Done == cancelCtx.Done().
// 如果 不匹配, 说明 cancelCtx 被包裹在了一个 自定义实现中,提供不同的 done通道.
//
//
// parentCancelCtx returns the underlying *cancelCtx for parent.
// It does this by looking up parent.Value(&cancelCtxKey) to find
// the innermost enclosing *cancelCtx and then checking whether
// parent.Done() matches that *cancelCtx. (If not, the *cancelCtx
// has been wrapped in a custom implementation providing a
// different done channel, in which case we should not bypass it.)
func parentCancelCtx(parent Context) (*cancelCtx, bool) {
done := parent.Done()
if done == closedchan || done == nil {
return nil, false
}
// 传入
p, ok := parent.Value(&cancelCtxKey).(*cancelCtx)
if !ok {
return nil, false
}
pdone, _ := p.done.Load().(chan struct{})
if pdone != done {
return nil, false
}
return p, true
}
3. context.timerCtx
超时取消, 顾名思义, 内部应该会有一个定时执行的函数, 调用 cancel().
这里包含两种取消模式: 1. 主动调用 cancel() 2. 时间到了, 执行 cancel()
// timerCtx 携带了一个 timer 和一个 deadline.
// timerCtx 内嵌了一个 cancelCtx 复用其 `Done()` and `Err()`.
// 当时间到了的时候,会调用 cacnelCtx 的 cancel() 执行取消
// A timerCtx carries a timer and a deadline. It embeds a cancelCtx to
// implement Done and Err. It implements cancel by stopping its timer then
// delegating to cancelCtx.cancel.
type timerCtx struct {
cancelCtx
timer *time.Timer // Under cancelCtx.mu.
deadline time.Time
}
func (c *timerCtx) Deadline() (deadline time.Time, ok bool) {
return c.deadline, true
}
func (c *timerCtx) String() string {
return contextName(c.cancelCtx.Context) + ".WithDeadline(" +
c.deadline.String() + " [" +
time.Until(c.deadline).String() + "])"
}
func (c *timerCtx) cancel(removeFromParent bool, err error) {
c.cancelCtx.cancel(false, err)
if removeFromParent {
// Remove this timerCtx from its parent cancelCtx's children.
removeChild(c.cancelCtx.Context, c)
}
c.mu.Lock()
if c.timer != nil {
c.timer.Stop()
c.timer = nil
}
c.mu.Unlock()
}
创建一个 timerCtx:
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
return WithDeadline(parent, time.Now().Add(timeout))
}
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
if parent == nil {
panic("cannot create context from nil parent")
}
if cur, ok := parent.Deadline(); ok && cur.Before(d) {
// The current deadline is already sooner than the new one.
return WithCancel(parent)
}
c := &timerCtx{
cancelCtx: newCancelCtx(parent),
deadline: d,
}
// 下卖弄会分析, 回溯树的构建
propagateCancel(parent, c)
dur := time.Until(d)
// 如果已经超时了, 直接取消
if dur <= 0 {
c.cancel(true, DeadlineExceeded) // deadline has already passed
return c, func() { c.cancel(false, Canceled) }
}
c.mu.Lock()
defer c.mu.Unlock()
if c.err == nil {
// 添加一个延时任务
c.timer = time.AfterFunc(dur, func() {
c.cancel(true, DeadlineExceeded)
})
}
return c, func() { c.cancel(true, Canceled) }
}
4. valueCtx
创建:
// WithValue returns a copy of parent in which the value associated with key is
// val.
//
// Use context Values only for request-scoped data that transits processes and
// APIs, not for passing optional parameters to functions.
//
// The provided key must be comparable and should not be of type
// string or any other built-in type to avoid collisions between
// packages using context. Users of WithValue should define their own
// types for keys. To avoid allocating when assigning to an
// interface{}, context keys often have concrete type
// struct{}. Alternatively, exported context key variables' static
// type should be a pointer or interface.
func WithValue(parent Context, key, val any) Context {
if parent == nil {
panic("cannot create context from nil parent")
}
if key == nil {
panic("nil key")
}
if !reflectlite.TypeOf(key).Comparable() {
panic("key is not comparable")
}
return &valueCtx{parent, key, val}
}
// A valueCtx carries a key-value pair. It implements Value for that key and
// delegates all other calls to the embedded Context.
type valueCtx struct {
Context
key, val any
}
5.1. 构建 context 树
树的构建 级联取消的不分 基础代码在 cancelCtx
中, timerCtx和 valuectx
都会引用 cancelCtx 这部分代码.
WithCancel()
创建了 一个 Donechannel 和 拷贝了parentCtx.
当 parentCtx 被取消
或者 当前context 的 cancel() 被调用
, 当前 context.Done()
的channel将会被 close.
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
if parent == nil {
panic("cannot create context from nil parent")
}
c := newCancelCtx(parent)
propagateCancel(parent, &c)
return &c, func() { c.cancel(true, Canceled) }
}
- 子 context 持有 父 context 的引用.
- context 持有 其 所有子节点(子context) 的引用.
propagateCancel()
: 保证当parentCtx 被取消时,遍历取消 childCtx, 构建一个回溯树.
- 监听 parentContext.Done(), 收到取消信号时,级联取消子节点
// propagateCancel arranges for child to be canceled when parent is.
func propagateCancel(parent Context, child canceler) {
// parent.Done() 返回nil, 说明 parentContext 根本无法取消
// 1. 比如 context.Background() || context.TODO()
done := parent.Done()
if done == nil {
return // parent is never canceled
}
// 2. 判断 parentContext 已经取消
select {
case <-done:
// parent is already canceled
child.cancel(false, parent.Err())
return
default:
}
// parentCancelCtx 用于获取底层可取消的 Context(cancelContext)
// 如果 parent Context 本身就是 *cancelCtx 或者是标准库中基于 cancelCtx 衍生的 Context 会返回 true
// 如果 parent Context 已经取消/或者根本无法取消 会返回 false
// 如果 parent Context 无法转换为一个 *cancelCtx 也会返回 false
// 如果 parent Context 是一个自定义深度包装的 cancelCtx(自己定义了 done channel) 则也会返回 false
//
// ok 为 true 说明 parent Context 为 标准库的 cancelCtx 或者至少可以完全转换为 *cancelCtx
if p, ok := parentCancelCtx(parent); ok {
p.mu.Lock() // 加锁 double check
// 所以并发加锁情况下如果 parent Context 的 err 不为空说明已经被取消了
if p.err != nil {
// parent has already been canceled
child.cancel(false, p.err)
} else {
// 如果 parent Context没有被取消, 就将 自身 context 加入到 parentContext.children 中,建立关联
// 当 parentContext被取消时, 会遍历 parent.children, 一次调用 child.cancel()
// 所以 -> p.children 中所有的 context 都可以转化为 cancelCtx 节点.
if p.children == nil {
p.children = make(map[canceler]struct{})
}
p.children[child] = struct{}{}
}
p.mu.Unlock()
} else {
// ok 为false 说明 parent Context 已经取消 || 根本无法取消 || 无法转换为一个 `cancelContext` || 是一个自定义的深度包装的 `cancelCtx`
atomic.AddInt32(&goroutines, +1)
// 由于代码一开始已经判断了 parentContext 已经取消 和 根本无法取消 的两种情况.
// 所以这里 <- parent.Done() 并不会产生 panic
//
// 后两种情况下 无法通过 parentContext.child(map) 建立关联, 只能通过 一个 goroutine 完成 级联取消操作.
go func() {
select {
case <-parent.Done():
child.cancel(false, parent.Err())
case <-child.Done():
}
}()
}
}
parentCancelCtx()
: parentCancelCtx 用于获取底层可取消的 Context(cancelContext)
- 如果 parent Context 本身就是 *cancelCtx 或者是标准库中基于 cancelCtx 衍生的 Context 会返回 true
- 如果 parent Context 已经取消/或者根本无法取消 会返回 false
- 如果 parent Context 无法转换为一个 *cancelCtx 也会返回 false
- 如果 parent Context 是一个自定义深度包装的 cancelCtx(自己定义了 done channel) 则也会返回 false
// parentCancelCtx returns the underlying *cancelCtx for parent.
// It does this by looking up parent.Value(&cancelCtxKey) to find
// the innermost enclosing *cancelCtx and then checking whether
// parent.Done() matches that *cancelCtx. (If not, the *cancelCtx
// has been wrapped in a custom implementation providing a
// different done channel, in which case we should not bypass it.)
func parentCancelCtx(parent Context) (*cancelCtx, bool) {
done := parent.Done()
// 1.
// parentContext.Done() == nil 说明不是一个 cancelContext
// parentContext.Done() == closedchan(一个可复用的表示关闭), 说明 parentContext 已经被关闭了.
if done == closedchan || done == nil {
return nil, false
}
// 2.
// parentContex 无法转换为一个 cancelCtx
p, ok := parent.Value(&cancelCtxKey).(*cancelCtx)
if !ok {
return nil, false
}
// 3.
// 经过上面的判断后,说明 parent context 可以被转换为 *cancelCtx,这时存在多种情况:
// - parent context 就是 *cancelCtx
// - parent context 是标准库中的 timerCtx
// - parent context 是个自己自定义包装的 cancelCtx
//
// 针对这 3 种情况需要进行判断,判断方法就是:
// 判断 parent context 通过 Done() 方法获取的 done channel 与 Value 查找到的 context 的 done channel 是否一致
//
// 一致情况说明 parent context 为 cancelCtx 或 timerCtx 或 自定义的 cancelCtx 且未重写 Done(),
// 这种情况下可以认为拿到了底层的 *cancelCtx
//
// 不一致情况说明 parent context 是一个自定义的 cancelCtx 且重写了 Done(), 并且并未返回标准 `*cancelCtx` 的
// 的 done channel,这种情况需要单独处理,故返回 nil, false
pdone, _ := p.done.Load().(chan struct{})
if pdone != done {
return nil, false
}
return p, true
}
5.2 级联取消 cancel
级联取消的关键函数是 cancelCtx.cancel()
, 在 当前 cancelCtx 取消时,需要级联取消 以该context为根节点的 context树中所有context,
并将 cancelCtx 从 其父节点摘除,以便让 GC 回收该 cancelCtx 子树的所有资源.
// cancel closes c.done, cancels each of c's children, and, if
// removeFromParent is true, removes c from its parent's children.
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
// err == nil,那么 直接 panic
// 这么做的目的是因为 `cancel()`是个私有方法,标准库内任何调用 `cancel()`
// 的方法保证了一定会传入 err, 如果没传那就是非正常调用,所以可以直接 panic
if err == nil {
panic("context: internal error: missing cancel error")
}
c.mu.Lock()
// 已经被取消了, 防止重复调用 cancel()
if c.err != nil {
c.mu.Unlock()
return // already canceled
}
c.err = err
d, _ := c.done.Load().(chan struct{})
// 判断 c.done 是否为 nil,因为 context.WithCancel 创建 cancelContext 时, c.done是延迟初始化的, 所以可能为nil
// 如果 c.done == nil, 将其赋值为 可 复用的 closechan.
if d == nil {
c.done.Store(closedchan)
} else {
close(d)
}
// 如果当前 Context 下面还有关联的 child Context,且这些 child Context 都是
// 可以转换成 *cancelCtx 的 Context(见上面的 propagateCancel()分析).
// 那么直接遍历 childre map,并且调用 child Context 的 cancel 即可
// 如果关联的 child Context 不能转换成 *cancelCtx,那么由 propagateCancel 方法
// 中已经创建了单独的 Goroutine 来关闭这些 child Context
for child := range c.children {
// NOTE: acquiring the child's lock while holding parent's lock.
child.cancel(false, err) // 递归调用 子context cancel
}
c.children = nil
c.mu.Unlock()
if removeFromParent {
removeChild(c.Context, c) //从 parent 中找出 curContext
}
}
// removeChild removes a context from its parent.
func removeChild(parent Context, child canceler) {
p, ok := parentCancelCtx(parent)
if !ok {
return
}
p.mu.Lock()
if p.children != nil {
delete(p.children, child)
}
p.mu.Unlock()
}
func parentCancelCtx(parent Context) (*cancelCtx, bool) {
done := parent.Done()
if done == closedchan || done == nil {
return nil, false
}
p, ok := parent.Value(&cancelCtxKey).(*cancelCtx)
if !ok {
return nil, false
}
pdone, _ := p.done.Load().(chan struct{})
if pdone != done {
return nil, false
}
return p, true
}