HUAHUA

Daily-Note 09-18

OVERVIEW

  1. Gin,go web 框架。
  2. Go MemModel,go 语言内存模型。
  3. Go Sync,Go 并发包,提供 Atomic、cond、once、pool、mutex、rwmutex、waitgroup、map 等线程并发工具。
  4. Go container,提供了 List、Ring、Heap 等容器,此外 go builtin 包括 slice、map 等多种数据结构。
  5. IPC sig,进程通信中 sig 是通过信号的发送,回调处理函数来实现进程间的通信。

Gin

Route

gin框架路由详解

LeetCode 208.实现 Trie(前缀树)

// -- main.go -- //
engine := gin.Default()
engine.GET("/test/abc")

// -- routergroup.go -- //
// GET is a shortcut for router.Handle("GET", path, handle).
func (group *RouterGroup) GET(relativePath string, handlers ...HandlerFunc) IRoutes {
	return group.handle(http.MethodGet, relativePath, handlers)
}

func (group *RouterGroup) handle(httpMethod, relativePath string, handlers HandlersChain) IRoutes {
 absolutePath := group.calculateAbsolutePath(relativePath)
 handlers = group.combineHandlers(handlers)
 group.engine.addRoute(httpMethod, absolutePath, handlers)
 return group.returnObj()
}

gin 通过 GET 等方法进行路由的注册,在 routergroup.go 中通过 handle 方法统一处理,首先将 group 下的相对路径转换为绝对路径,将传进来的 handlers 参数与 group 本身的 handlers 合并,最后调用 addRoute 这个方法来注册路由,该方法的核心代码如下。

// -- gin.go -- //
func (engine *Engine) addRoute(method, path string, handlers HandlersChain) {
	// ... 
	root := engine.trees.get(method)
	if root == nil {
		root = new(node)
		root.fullPath = "/"
		engine.trees = append(engine.trees, methodTree{method: method, root: root})
	}
	root.addRoute(path, handlers)
	// ...
}

其中 engine.trees 属性类型是 methodTrees,Gin 将 http 中 GET / POST / … 每一种方法对应一颗路由树,而 methodTrees 就是对应 http 方法和路由树的数组结构。

首先 addRoute 方法首先找到对应方法的路由树,然后调用 root.addRoute 将 path 和 handlers 进行注册。

addRoute 方法实现了一个字典树实现了路由查找,其代码实现如下,代码由于涉及 wildcards 匹配因此较为复杂。

// -- tree.go -- //
// addRoute adds a node with the given handle to the path.
// Not concurrency-safe!
func (n *node) addRoute(path string, handlers HandlersChain) {
 fullPath := path
 n.priority++

 // Empty tree
 if len(n.path) == 0 && len(n.children) == 0 {
  n.insertChild(path, fullPath, handlers)
  n.nType = root
  return
 }

 parentFullPathIndex := 0

walk:
 for {
  // Find the longest common prefix.
  // This also implies that the common prefix contains no ':' or '*'
  // since the existing key can't contain those chars.
  i := longestCommonPrefix(path, n.path)

  // Split edge
  if i < len(n.path) {
   child := node{
    path:      n.path[i:],
    wildChild: n.wildChild,
    indices:   n.indices,
    children:  n.children,
    handlers:  n.handlers,
    priority:  n.priority - 1,
    fullPath:  n.fullPath,
   }

   n.children = []*node{&child}
   // []byte for proper unicode char conversion, see #65
   n.indices = bytesconv.BytesToString([]byte{n.path[i]})
   n.path = path[:i]
   n.handlers = nil
   n.wildChild = false
   n.fullPath = fullPath[:parentFullPathIndex+i]
  }

  // Make new node a child of this node
  if i < len(path) {
   path = path[i:]
   c := path[0]

   // '/' after param
   if n.nType == param && c == '/' && len(n.children) == 1 {
    parentFullPathIndex += len(n.path)
    n = n.children[0]
    n.priority++
    continue walk
   }

   // Check if a child with the next path byte exists
   for i, max := 0, len(n.indices); i < max; i++ {
    if c == n.indices[i] {
     parentFullPathIndex += len(n.path)
     i = n.incrementChildPrio(i)
     n = n.children[i]
     continue walk
    }
   }

   // Otherwise insert it
   if c != ':' && c != '*' && n.nType != catchAll {
    // []byte for proper unicode char conversion, see #65
    n.indices += bytesconv.BytesToString([]byte{c})
    child := &node{
     fullPath: fullPath,
    }
    n.addChild(child)
    n.incrementChildPrio(len(n.indices) - 1)
    n = child
   } else if n.wildChild {
    // inserting a wildcard node, need to check if it conflicts with the existing wildcard
    n = n.children[len(n.children)-1]
    n.priority++

    // Check if the wildcard matches
    if len(path) >= len(n.path) && n.path == path[:len(n.path)] &&
     // Adding a child to a catchAll is not possible
     n.nType != catchAll &&
     // Check for longer wildcard, e.g. :name and :names
     (len(n.path) >= len(path) || path[len(n.path)] == '/') {
     continue walk
    }

    // Wildcard conflict
    pathSeg := path
    if n.nType != catchAll {
     pathSeg = strings.SplitN(pathSeg, "/", 2)[0]
    }
    prefix := fullPath[:strings.Index(fullPath, pathSeg)] + n.path
    panic("'" + pathSeg +
     "' in new path '" + fullPath +
     "' conflicts with existing wildcard '" + n.path +
     "' in existing prefix '" + prefix +
     "'")
   }

   n.insertChild(path, fullPath, handlers)
   return
  }

  // Otherwise add handle to current node
  if n.handlers != nil {
   panic("handlers are already registered for path '" + fullPath + "'")
  }
  n.handlers = handlers
  n.fullPath = fullPath
  return
 }
}

核心数据结构 node 组成如下:

type node struct {
	path      string // 当前自身路径
	indices   string // 子节点路由首字母 list
	wildChild bool // 
	nType     nodeType
	priority  uint32 // 优先级
	children  []*node // child nodes, at most 1 :param style node at the end of the array
	handlers  HandlersChain // 处理链
	fullPath  string // 完整路径
}

首先在不考虑 wildcards 的情况下:

1)当该树为空时,第一个路由将通过下面方法直接注册。

// addRoute adds a node with the given handle to the path.
// Not concurrency-safe!
func (n *node) addRoute(path string, // 绝对路径
                        handlers HandlersChain //需要注册到目标路径的处理器链
                       ) {
  fullPath := path // 存储绝对路径,path 在后续流程中改变
  n.priority++ // 提升当前节点的优先级,每当有一个路由注册到目标节点下,提升优先级
  // Empty tree 空树的情况
  if len(n.path) == 0 && len(n.children) == 0 {
   // 该方法在不考虑 wildcards 的情况下,直接将对应参数赋值给节点 n
   n.insertChild(path, fullPath, handlers) 
   n.nType = root
   return // 完成注册
  }
}

2)当有相同前缀的两个路由进行注册时,在有路由 “/test/abc” 的前提下,注册 “/test/def” 路由,这里两个路由具有相同的前缀 “/test/",因此会产生分裂。

// -- main.go -- //
func main() {
	engine := gin.Default()
	engine.GET("/test/abc")
	engine.GET("/test/def")
}

// -- tree.go -- //
	// ...

  // Find the longest common prefix.
  // This also implies that the common prefix contains no ':' or '*'
  // since the existing key can't contain those chars.
  i := longestCommonPrefix(path, n.path) // 得到最长公共前缀的结束位置

  // Split edge
  if i < len(n.path) { // 这里判断公共前缀是否比自身路径短,如果更短则需要分裂
    child := node{ // 这里将自身节点分裂
      path:      n.path[i:],  // "abc"
      wildChild: n.wildChild,
      indices:   n.indices,
      children:  n.children,
      handlers:  n.handlers,
      priority:  n.priority - 1,
      fullPath:  n.fullPath,
    }

    n.children = []*node{&child}
    // []byte for proper unicode char conversion, see #65
    n.indices = bytesconv.BytesToString([]byte{n.path[i]})
    n.path = path[:i] // "/test/"
    n.handlers = nil // 释放引用
    n.wildChild = false
    n.fullPath = fullPath[:parentFullPathIndex+i]
  }

	if i < len(path) { // 在迭代判断剩余的路径是否需要插入新的节点
    path = path[i:]
    c := path[0]
    
    // ... 这里判断 wildcards 情况
    // ... 这里判断当前节点的子节点与目标路径有公共前缀
    
    // Otherwise insert it
			if c != ':' && c != '*' && n.nType != catchAll {
				// []byte for proper unicode char conversion, see #65
				n.indices += bytesconv.BytesToString([]byte{c})
				child := &node{
					fullPath: fullPath,
				}
				n.addChild(child) // 在这里插入目标路径
				n.incrementChildPrio(len(n.indices) - 1)
				n = child
  // ... 这里判断 wildcards 情况
  n.insertChild(path, fullPath, handlers)
  return

在有相同前缀的情况下,当前节点首先会分裂出子节点,接着将目标路径加入子节点中。

3)当子节点中存在与目标路径有公共前缀时,在 “/test/abc” 和 “/test/def” 已经注册的情况下,初始的路由树由 “/test/” 根节点分别指向 “abc” 和 “def” 两个子节点,此时注册 “/test/deg” 时,“deg” 子节点与 “def” 有公共前缀,此时会触发优先级提升,节点 “def” 的优先级提升,并且将当前节点切换到 “def” 子节点,继续匹配。

incrementChildPrio() 方法首先将子节点的 priority 优先级属性自增,接着移动 node

n.children 中对应 pos 的子节点在 children slice 中的位置,最后修改 indices string 的顺序,完成子节点优先级的提升。通过提升路由节点更多的子节点的优先级,使得后续匹配时,平衡长路由和短路由之间的权重。

// -- tree.go -- //
// ...
// ... 这里判断当前节点的子节点与目标路径有公共前缀
// Check if a child with the next path byte exists
for i, max := 0, len(n.indices); i < max; i++ {
 if c == n.indices[i] {
  parentFullPathIndex += len(n.path)  // 公共前缀 "/test/"
  i = n.incrementChildPrio(i) // 提升子节点的优先级
  n = n.children[i] // 切换到子节点
  continue walk // 继续迭代
 }
}
// ...

// incrementChildPrio 提升子节点优先级
func (n *node) incrementChildPrio(pos int) int {
	cs := n.children
	cs[pos].priority++
	prio := cs[pos].priority

	// Adjust position (move to front)
	newPos := pos
	for ; newPos > 0 && cs[newPos-1].priority < prio; newPos-- {
		// Swap node positions
		cs[newPos-1], cs[newPos] = cs[newPos], cs[newPos-1]
	}

	// Build new index char string
	if newPos != pos {
		n.indices = n.indices[:newPos] + // Unchanged prefix, might be empty
			n.indices[pos:pos+1] + // The index char we move
			n.indices[newPos:pos] + n.indices[pos+1:] // Rest without char at 'pos'
	}

	return newPos
}

下面考虑包括 wildcards 的情况,通配符包括 :a*b 两类通配符,其中第一类仅匹配一项,而第二类匹配剩余所有路径项,因此第二类仅允许出现在路径末。并且必须在通配符后添加名字,否则将 panic。

engine.GET("/test/:a/ok")
engine.GET("/test/*b")
engine.GET("/test/*b/err") // err

insertChild() 方法在 for 循环中,首先判断当前 path 是否存在 wildcrads,如果没有 wildcrads 情况下仅对当前节点进行赋值,当存在 wildcards 时,考虑 wildcards 时 param 匹配还是 catchAll 匹配,分别对应第一类和第二类通配符。

如果是存在第一类通配符,则插入 param 子节点,param 子节点的 path 为 wildcards,同时判断改路径是否已经匹配完成,如果还有剩余路径,则取剩余路径并切换到子节点进行迭代。

如果存在第二类通配符,则首先插入第一个节点,该节点 path 为空,类型为 catchAll 指示该节点为第二类通配符节点,接着插入第二个节点,该节点存储对应的 path 和 handles,需要注意的是该节点的首字符为 “/"。

func (n *node) insertChild(path string, fullPath string, handlers HandlersChain) {
	for {
		// Find prefix until first wildcard
		wildcard, i, valid := findWildcard(path)
		if i < 0 { // No wildcard found
			break
		}

    // ... 判断通配符是否符合要求,具有名字并且只有一个 : 或 *

		if wildcard[0] == ':' { // param
			if i > 0 {
				// Insert prefix before the current wildcard
				n.path = path[:i]
				path = path[i:]
			}

			child := &node{
				nType:    param,
				path:     wildcard,
				fullPath: fullPath,
			}
			n.addChild(child)
			n.wildChild = true
			n = child
			n.priority++

			// if the path doesn't end with the wildcard, then there
			// will be another subpath starting with '/'
			if len(wildcard) < len(path) {
				path = path[len(wildcard):]

				child := &node{
					priority: 1,
					fullPath: fullPath,
				}
				n.addChild(child)
				n = child
				continue
			}

			// Otherwise we're done. Insert the handle in the new leaf
			n.handlers = handlers
			return
		}

		// 判断 * 通配符是否为最后位置

		// currently fixed width 1 for '/'
		i--
		if path[i] != '/' {
			panic("no / before catch-all in path '" + fullPath + "'")
		}

		n.path = path[:i]

		// First node: catchAll node with empty path
		child := &node{
			wildChild: true,
			nType:     catchAll,
			fullPath:  fullPath,
		}

		n.addChild(child)
		n.indices = string('/')
		n = child
		n.priority++

		// second node: node holding the variable
		child = &node{
			path:     path[i:],
			nType:    catchAll,
			handlers: handlers,
			priority: 1,
			fullPath: fullPath,
		}
		n.children = []*node{child}

		return
	}

	// If no wildcard was found, simply insert the path and handle
	n.path = path
	n.handlers = handlers
	n.fullPath = fullPath
}

insertChild() 方法和 addRoute() 方法的职责不同,前者主要负责插入新的节点、分析通配符,后者主要负责加入路由时分析迭代路由树找到指定的插入节点、检查通配符冲突、相同前缀的分裂父节点。

4)当注册路由包括通配符时,在 “/test” 已经注册的条件下,注册 “/test/:ok2”,此时不需要分裂 “test” 节点 path 等其他情况,直接调用插入 insertChild() 节点。

// -- main.go -- //
package main

import "github.com/gin-gonic/gin"

func main() {
	engine := gin.Default()
	engine.GET("/test")
	engine.GET("/test/:ok2")
}

// -- tree.go -- //
// Otherwise insert it
// ...
if c != ':' && c != '*' && n.nType != catchAll {
  // []byte for proper unicode char conversion, see #65
  n.indices += bytesconv.BytesToString([]byte{c})
  child := &node{
    fullPath: fullPath,
  }
  n.addChild(child)
  n.incrementChildPrio(len(n.indices) - 1)
  n = child

  // ...
}
n.insertChild(path, fullPath, handlers)
return
// ...

5)在存在通配符情况下,注册路由,在 “/test” 和 “/test/:ok” 已经注册的情况下,注册子路由 “/test/:ok/ok2”。

package main

import "github.com/gin-gonic/gin"

func main() {
	engine := gin.Default()
	engine.GET("/test")
	engine.GET("/test/:ok")
	engine.GET("/test/:ok/ok2")
}

首先路由树根节点 “/test” 下由 insertChild() 插入 “/test/:ok” 时生成一个 wildcard 节点,"/test/:ok/ok2” 匹配到 “/test” 后根据 indices 匹配到 wildcard 节点,对应以下代码。

// Check if a child with the next path byte exists
for i, max := 0, len(n.indices); i < max; i++ {
  if c == n.indices[i] {
    parentFullPathIndex += len(n.path)
    i = n.incrementChildPrio(i)
    n = n.children[i]
    continue walk
  }
}

接着 “:ok/ok2” path 在插入时,发现 “:” 满足通配符路径,首先检查通配符是否存在冲突,要求 “:” 只能有一个相同的参数,”*" 后不能匹配其他路径。

	// Otherwise insert it
if c != ':' && c != '*' && n.nType != catchAll {
 // ...
} else if n.wildChild {
  // inserting a wildcard node, need to check if it conflicts with the existing wildcard
  n = n.children[len(n.children)-1]
  n.priority++

  // Check if the wildcard matches
  if len(path) >= len(n.path) && n.path == path[:len(n.path)] &&
  // Adding a child to a catchAll is not possible
  n.nType != catchAll &&
  // Check for longer wildcard, e.g. :name and :names
  (len(n.path) >= len(path) || path[len(n.path)] == '/') {
    continue walk
  }

  // Wildcard conflict
  pathSeg := path
  if n.nType != catchAll {
    pathSeg = strings.SplitN(pathSeg, "/", 2)[0]
  }
  prefix := fullPath[:strings.Index(fullPath, pathSeg)] + n.path
  panic("'" + pathSeg +
        "' in new path '" + fullPath +
        "' conflicts with existing wildcard '" + n.path +
        "' in existing prefix '" + prefix +
        "'")
}

接着进行迭代,最后匹配 “/ok2”,调用 insertChild() 完成普通 path 的插入。

6)在已经注册了一个子路径的通配符路径下再注册一个子路径,在**情况 5)**下再注册路由 “/test/:ok/ko2”。

package main

import "github.com/gin-gonic/gin"

func main() {
	engine := gin.Default()
	engine.GET("/test")
	engine.GET("/test/:ok")
	engine.GET("/test/:ok/ok2")
	engine.GET("/test/:ok/ko2")
}

“/test/:ok/ko2” 注册路由与 “/test/:ok/ok2” 相似,但是在最后插入前,需要指定插入节点到 “/ok2”,并进行迭代,此时会分裂 “/ok2” 节点,生成 “/” 节点且子节点包括 “ok2” 和 “ko2”。对应的寻找插入节点逻辑如下。

// ...
// '/' after param
// 发现当前节点为 param 且已经存在子路径,移动到子路径对应节点上进行插入。
if n.nType == param && c == '/' && len(n.children) == 1 {
  parentFullPathIndex += len(n.path)
  n = n.children[0]
  n.priority++
  continue walk
}
// ...

至此,Gin addRoute() 注册路由的逻辑完成。

Gin Context

http2 & http3

Go MemModel

Happen Before,规定了各种行为之间的发生顺序。

introduction

  • 同步问题

    1. init() 会在 package 的其他函数被调用之前调用。main() 函数会在所有需要调用的 init() 完成后执行。
    2. goroutine 的创建会先于指定函数的执行,退出时不能保证指定函数执行完成。
    3. channel 的发送操作先于对应的接受操作,channel 的关闭操作先于接受操作。
    4. ……
  • 可能问题

    在下面这个示例中,f() 初始化不保证在 g() 之前发生,因此可能会出现 a, b 尚未初始化已经打印的问题。

    package main
    
    var a, b int
    
    func f() {
    	a = 1
    	b = 2
    }
    
    func g() {
    	print(b)
    	print(a)
    }
    
    func main() {
    	for i := 0; i < 100; i++ {
    		go f()
    		g()
    		// init
    		a, b = 0, 0
    		println()
    	}
    }
    
    // 随机输出 00 01 21
    

    在下面的这个示例中,在 setup() 中赋值可能是乱序的,因此不能保证 doprint() 函数中 done 为 true 时 a 已经初始化,因此可能打印空字符串。

    package main
    
    import (
    	"sync"
    )
    
    var a string
    var done bool
    var once sync.Once
    
    func main() {
    	twoprint()
      time.Sleep(1 * time.Second)
    }
    
    func setup() {
    	a = "hello, world"
    	done = true
    }
    
    func doprint() {
    	if !done {
    		once.Do(setup)
    	}
    	print(a)
    }
    
    func twoprint() {
    	go doprint()
    	go doprint()
    }
    

Go Sync

深度分析 Golang sync.Pool 底层原理

Pool

通过 Put() / Get() 管理缓存对象,使用 New() 生成默认缓存对象,从而减少 GC 压力,数据结构方面涉及 Dequeue、RingBuffer 等方面,难点涉及 P 的 LocalPool、Pin 等关于调度器内存结构及调度器抢占等内容。

下面是 Gin 中的实践:

// ServeHTTP conforms to the http.Handler interface.
func (engine *Engine) ServeHTTP(w http.ResponseWriter, req *http.Request) {
	c := engine.pool.Get().(*Context) // get
	c.writermem.reset(w)
	c.Request = req
	c.reset() // clean !!!

	engine.handleHTTPRequest(c)

	engine.pool.Put(c) // put
}

Golang sync.Mutex分析

Go并发原语-mutex源码解析

Mutex & RWMutex

Mutex 通过 Lock() 和 UnLock() 两个方法实现并发原语,Mutex 的结构为包括 state 和 sema 两个变量。

type Mutex struct {
 state int32
 sema  uint32
}

在加锁和解锁的主要对 state 变量进行操作,state 变量结构如下。

mutexLocked = 1 << iota // mutex is locked 1 表示锁已经上锁
mutexWoken // 1 表示当前的锁已被唤醒
mutexStarving // 1 表示当前的锁处于饥饿状态
mutexWaiterShift = iota // 3 代表等待该锁的 goroutine 数量,占3 至 31 位

Golang sync.Cond 条件变量源码分析

Cond

通过调用 Wait() / Signal() / Broadcast() 等方法来实现 goroutine 等待某一事件发生的并发同步。

Once

提供了只运行一次指定函数的保证。下面是实现逻辑:

func (o *Once) Do(f func()) {
 // Note: Here is an incorrect implementation of Do:
 //
 // if atomic.CompareAndSwapUint32(&o.done, 0, 1) {
 //  f()
 // }
 //
 // Do guarantees that when it returns, f has finished.
 // This implementation would not implement that guarantee:
 // given two simultaneous calls, the winner of the cas would
 // call f, and the second would return immediately, without
 // waiting for the first's call to f to complete.
 // This is why the slow path falls back to a mutex, and why
 // the atomic.StoreUint32 must be delayed until after f returns.

 if atomic.LoadUint32(&o.done) == 0 {
  // Outlined slow-path to allow inlining of the fast-path.
  o.doSlow(f)
 }
}

func (o *Once) doSlow(f func()) {
 o.m.Lock()
 defer o.m.Unlock()
 if o.done == 0 {
  defer atomic.StoreUint32(&o.done, 1)
  f()
 }
}

其中 comment 也指出,不能简单的通过 CAS 操作保证 f() 执行一次,因为 CAS 操作不能保证 f() 返回,两个线程同时进行 CAS 第二个线程可能在第一个线程 f() 调用未返回之前返回。而通过 mutex 能够保证所有线程都在 f() 完成一次且仅完成一次的情况下返回,在未返回之前阻塞。

Golang WaitGroup 原理深度剖析

Golang | 详解sync.WaitGroup

WaitGroup

通过 Add() / Done() / Wait() 接口实现某线程等待其他多线程完成任务的并发同步。

主要结构体如下所示:

type WaitGroup struct {
 noCopy noCopy

 // 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
 // 64-bit atomic operations require 64-bit alignment, but 32-bit
 // compilers only guarantee that 64-bit fields are 32-bit aligned.
 // For this reason on 32 bit architectures we need to check in state()
 // if state1 is aligned or not, and dynamically "swap" the field order if
 // needed.
 state1 uint64 // 高 32 位存储 counter 数目,低 32 位存储 waiter 数目。
 state2 uint32
}

其中 Done() 通过调用 Add(1) 实现

// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {
 wg.Add(-1)
}

Add() 主要逻辑如下:

func (wg *WaitGroup) Add(delta int) {
 statep, semap := wg.state()
 state := atomic.AddUint64(statep, uint64(delta)<<32)
 v := int32(state >> 32)
 w := uint32(state)
 // 如果 v(counter) 大于零并且 w(waiter) 等于零
 // 说明阻塞的 goroutine,此时直接返回
 if v > 0 || w == 0 { 
   return  
 }
 // counter 等于零此时,需要唤醒所有阻塞的 goroutine
 // Reset waiters count to 0.
 *statep = 0
  // 唤醒所有阻塞的 goroutine
 for ; w != 0; w-- {
  runtime_Semrelease(semap, false, 0)
 }
}

Wait() 逻辑如下:

func (wg *WaitGroup) Wait() {
 statep, semap := wg.state()
 for {
  state := atomic.LoadUint64(statep)
  v := int32(state >> 32)
  w := uint32(state)
  if v == 0 {
   // Counter is 0, no need to wait.
   return
  }
  // Increment waiters count.
  if atomic.CompareAndSwapUint64(statep, state, state+1) {
   runtime_Semacquire(semap)
   return
 	}
}

Map

【Go】sync.Map 源码分析

看过这篇剖析,你还不懂 Go sync.Map 吗

sync.Map 利用 atomic.Value 字段来实现只读部分并发,通过 dirty 字段实现加锁写部分并发,从而提高 Map 的并发性能。Map 的数据结构如下所示:

type Map struct {
    mu sync.Mutex
    read atomic.Value
    dirty map[interface{}]*entry
    misses int
}

Map 的 Load() 实现逻辑如下:

func (m *Map) Load(key any) (value any, ok bool) {
 read, _ := m.read.Load().(readOnly)
 e, ok := read.m[key] // 从只读部分读取数据
 if !ok && read.amended { // 如果没有命中,从 dirty 部分读取数据
  m.mu.Lock()  // 此时读取 dirty 数据需要进行加锁
  // Avoid reporting a spurious miss if m.dirty got promoted while we were
  // blocked on m.mu. (If further loads of the same key will not miss, it's
  // not worth copying the dirty map for this key.)
  read, _ = m.read.Load().(readOnly)
  e, ok = read.m[key]
  // 进行双重检查,可能在加锁之前 dirty 提升为 reed,而被初始化为 nil
  if !ok && read.amended { 
   e, ok = m.dirty[key]
   // Regardless of whether the entry was present, record a miss: this key
   // will take the slow path until the dirty map is promoted to the read
   // map.
   // 检查是否需要进行提升,每次没有在 read 中命中,都会提升 missed 值
    // 当 missed > len(dirty) 时,会发生提升
   m.missLocked() 
  }
  m.mu.Unlock()
 }
 if !ok {
  return nil, false
 }
 return e.load()
}

其中 Store() 的实现逻辑如下:

func (m *Map) Store(key, value any) {
 read, _ := m.read.Load().(readOnly)
 // 如果 read 中存在值,尝试通过 CAS 更新值
 if e, ok := read.m[key]; ok && e.tryStore(&value) {
  return
 }

 m.mu.Lock()
 read, _ = m.read.Load().(readOnly)
 if e, ok := read.m[key]; ok { 
  if e.unexpungeLocked() {
   // The entry was previously expunged, which implies that there is a
   // non-nil dirty map and this entry is not in it.
   m.dirty[key] = e
  }
  e.storeLocked(&value)
  // 插入值并在 read 但在 dirty 中,更新 dirty
 } else if e, ok := m.dirty[key]; ok { 
  e.storeLocked(&value)
 } else { // 插入新值
  if !read.amended {  // 如果 dirty 中没有数据
   // We're adding the first new key to the dirty map.
   // Make sure it is allocated and mark the read-only map as incomplete.
   m.dirtyLocked()
   m.read.Store(readOnly{m: read.m, amended: true})
  }
  m.dirty[key] = newEntry(value)
 }
 m.mu.Unlock()
}

Other……

Atomic

Go container

List

是一个双向链表。使用 List 结构体存储头节点和链表长度。

// List represents a doubly linked list.
// The zero value for List is an empty list ready to use.
type List struct {
 root Element // sentinel list element, only &root, root.prev, and root.next are used
 len  int     // current list length excluding (this) sentinel element
}

节点为 Element 元素。包括前后指针,和指向所属的 list 的指针,使用 any 存储任意值。

// Element is an element of a linked list.
type Element struct {
 // Next and previous pointers in the doubly-linked list of elements.
 // To simplify the implementation, internally a list l is implemented
 // as a ring, such that &l.root is both the next element of the last
 // list element (l.Back()) and the previous element of the first list
 // element (l.Front()).
 next, prev *Element

 // The list to which this element belongs.
 list *List

 // The value stored with this element.
 Value any
}

内部实现 insert() / remove() / move() 等基本 API 操作,衍生出暴露的诸如 InsertBefore() 等 API。

insert() 实现如下,首先确定插入节点 e 的前后指针,接着更新 e 的前后节点的 next 和 prev 指针。此外赋值所属 list 指针和增加 list 表示长度。

// insert inserts e after at, increments l.len, and returns e.
func (l *List) insert(e, at *Element) *Element {
 e.prev = at
 e.next = at.next
 e.prev.next = e
 e.next.prev = e
 e.list = l
 l.len++
 return e
}

remove() 实现如下,更新前后节点的 next 和 prev 指针,然后释放当前节点的引用。

// remove removes e from its list, decrements l.len
func (l *List) remove(e *Element) {
 e.prev.next = e.next
 e.next.prev = e.prev
 e.next = nil // avoid memory leaks
 e.prev = nil // avoid memory leaks
 e.list = nil
 l.len--
}

move() 的实现如下,首先判断是否需要移动,接着更新当前节点的前后节点指针,进行删除,最后更新插入位置的指针和插入位置前后节点的指针。

// move moves e to next to at.
func (l *List) move(e, at *Element) {
 if e == at {
  return
 }
 e.prev.next = e.next
 e.next.prev = e.prev

 e.prev = at
 e.next = at.next
 e.prev.next = e
 e.next.prev = e
}

Ring

是一个环形链表。环中的每一个元素由 Ring 构成。Ring 的组成如下。

// A Ring is an element of a circular list, or ring.
// Rings do not have a beginning or end; a pointer to any ring element
// serves as reference to the entire ring. Empty rings are represented
// as nil Ring pointers. The zero value for a Ring is a one-element
// ring with a nil Value.
type Ring struct {
 next, prev *Ring
 Value      any // for use by client; untouched by this library
}

Ring 主要包括三类操作,1)、New() / Link() / Unlink() 等初始化及链接和断开环的操作。2)、Next() / Prev() / Move() 等移动节点的操作。3)、Do() 遍历环中节点的操作。

New() 用来初始化环,New() 主要逻辑如下,首先判断长度是否大于零,然后初始化一个 Ring 指针,然后根据长度增加节点,最后链接头尾节点。

// New creates a ring of n elements.
func New(n int) *Ring {
 if n <= 0 {
  return nil
 }
 r := new(Ring)
 p := r
 for i := 1; i < n; i++ {
  p.next = &Ring{prev: p}
  p = p.next
 }
 p.next = r
 r.prev = p
 return r
}

Link() 用于链接两个环,Link() 的实现如下,源码中强调了不能使用多重赋值,是因为多重赋值是不能保证赋值顺序的,而这里我们需要强调赋值的顺序。

首先获取当前节点 r 的下一个节点及目标节点 s 的最后一个节点,在节点 s 的头部进行链接,即 r.next = s 和 s.prev = r,接着在节点 s 的尾部进行链接,即 p.next() = n 和 n.prev = p。

如果 r 和 s 是同一个环中的节点,那么 r 和 s 中间的节点将会移除,r 和 s 链接在一起,剩下的部分链接在一起。

func (r *Ring) Link(s *Ring) *Ring {
 n := r.Next()
 if s != nil {
  p := s.Prev()
  // Note: Cannot use multiple assignment because
  // evaluation order of LHS is not specified.
  r.next = s
  s.prev = r
  n.prev = p
  p.next = n
 }
 return n
}

Next() / Move() / Prev() 逻辑相近,其实现如下。

其中 if r.next == nil 的判断可能是用户自己初始化的 Ring 结构体没有成环的检查 🤔️。

// Next returns the next ring element. r must not be empty.
func (r *Ring) Next() *Ring {
 if r.next == nil {
  return r.init()
 }
 return r.next
}

// Prev returns the previous ring element. r must not be empty.
func (r *Ring) Prev() *Ring {
 if r.next == nil {
  return r.init()
 }
 return r.prev
}

// Move moves n % r.Len() elements backward (n < 0) or forward (n >= 0)
// in the ring and returns that ring element. r must not be empty.
func (r *Ring) Move(n int) *Ring {
 if r.next == nil {
  return r.init()
 }
 switch {
 case n < 0:
  for ; n < 0; n++ {
   r = r.prev
  }
 case n > 0:
  for ; n > 0; n-- {
   r = r.next
  }
 }
 return r
}

Do() 函数逻辑即遍历环,并对每个节点应用 f()。这种设计模式有一定的局限,如果 Do 能够传入可变参数列表,并将可变参数列表传入 f 中,这样可以将外部变量代入处理函数 f 中,这样在某些统计函数时带来便利性,如 Do(f(len), len) 这种模式。

// Do calls function f on each element of the ring, in forward order.
// The behavior of Do is undefined if f changes *r.
func (r *Ring) Do(f func(any)) {
 if r != nil {
  f(r.Value)
  for p := r.Next(); p != r; p = p.next {
   f(p.Value)
  }
 }
}

相比于 List 的双向链表,Ring 是有限制的双向链表,Ring 有以下特点。

  1. 无法在 O(1) 时间复杂度内确定环的长度。
  2. 不能直接调用 Insert、Remove 等传统链表操作。

Heap

container/heap

实现了堆的数据结构,这里采用 interface 的模式描述堆的特性,并不直接实现堆的容器。

type Interface interface {
 sort.Interface
 Push(x any) // add x as element Len()
 Pop() any   // remove and return element Len() - 1.
}

type Interface interface {
	Len() int
	Less(i, j int) bool
	Swap(i, j int)
}

Heap 的核心方法为 up() / down(),在《算法》一书中为 swim() / sink(),意为上浮和下沉。

up() 的实现如下,首先 i := (j - 1) / 2 找到对应节点的父节点,这里是从 0 开始计算节点,接着判断节点是否为堆顶,判断是否需要进行交换,不满足条件退出循环,否则,交换父节点于子节点,并对父节点迭代操作。

func up(h Interface, j int) {
 for {
  i := (j - 1) / 2 // parent
  if i == j || !h.Less(j, i) {
   break
  }
  h.Swap(i, j)
  j = i
 }
}

down() 的实现如下,首先用 j1 := 2*i + 1计算对应子节点的索引,这里从 0 开始计算节点,并判断该索引是否超出长度 n,此外源码这里考虑到了可能溢出的问题,判断 j1 < 0。

接着判断子节点中左节点和右节点的大小,选取更小者,最后判断更小者与父节点的大小关系,如果不满足交换则退出,否则交换子节点和父节点,并对子节点进行迭代。

最后返回结果时,根据初始索引和最后的索引值判断是否能够下沉。

func down(h Interface, i0, n int) bool {
 i := i0
 for {
  j1 := 2*i + 1
  if j1 >= n || j1 < 0 { // j1 < 0 after int overflow
   break
  }
  j := j1 // left child
  if j2 := j1 + 1; j2 < n && h.Less(j2, j1) {
   j = j2 // = 2*i + 2  // right child
  }
  if !h.Less(j, i) {
   break
  }
  h.Swap(i, j)
  i = j
 }
 return i > i0
}

Heap 根据 up() / down() 方法实现了其他暴露出来的 API。

Init() 方法对堆进行初始化,Push() / Pop() 方法分别实现向堆中有序加入元素和弹出堆顶的操作,这两个操作依赖于 Interface 中实现的 Push 和 Pop。Remove() 方法将目标索引替换至队尾并重新有序化堆,最后调用 interface 中的 Pop 实现删除目标索引的元素。Fix() 方法用于堆指定元素重新有序化,这可以运用于直接修改堆中指定索引的元素后,调用 Fix 方法保证堆的有序化。

func Init(h Interface) {
 // heapify
 n := h.Len()
 for i := n/2 - 1; i >= 0; i-- {
  down(h, i, n)
 }
}

func Push(h Interface, x any) {
	h.Push(x)
	up(h, h.Len()-1)
}

func Pop(h Interface) any {
	n := h.Len() - 1
	h.Swap(0, n)
	down(h, 0, n)
	return h.Pop()
}

func Remove(h Interface, i int) any {
	n := h.Len() - 1
	if n != i {
		h.Swap(i, n)
		if !down(h, i, n) {
			up(h, i)
		}
	}
	return h.Pop()
}

func Fix(h Interface, i int) {
	if !down(h, i, h.Len()) {
		up(h, i)
	}
}

IPC sig

Linux 信号(signal)

Linux/UNIX系统编程手册

#include <signal.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>

void handler(int sig) {
    printf("hello SIGIO, exit ...");
    exit(0);
} 

int main(int argc, char const *argv[])
{
    signal(SIGIO, handler);
    sleep(20 * 1000);
    return 0;
}