Wireguard Go 源码阅读之网络包处理流程

November 09, 2024 | 4 Minute Read

wireguard-go的代码结构很好。但一个网络包的处理流程中,可能经过很多channel处理,很多线程数据交换和竞争,感觉不太好啊。

粗略看了一下 网络包的处理流程。

main函数:

 device := device.NewDevice(tun, conn.NewDefaultBind(), logger)
          conn.NewDefaultBind() 是一个   bind conn.Bind  封装网络连接管理和底层的网络操作吧


 device.NewDevice 函数里面会启动 几个 事件循环:

	// create queues

	device.queue.handshake = newHandshakeQueue()
	device.queue.encryption = newOutboundQueue()
	device.queue.decryption = newInboundQueue()

	// start workers

	cpus := runtime.NumCPU()
	device.state.stopping.Wait()
	device.queue.encryption.wg.Add(cpus) // One for each RoutineHandshake
	for i := 0; i < cpus; i++ {
		go device.RoutineEncryption(i + 1)
		go device.RoutineDecryption(i + 1)
		go device.RoutineHandshake(i + 1)
	}

	device.state.stopping.Add(1)      // RoutineReadFromTUN
	device.queue.encryption.wg.Add(1) // RoutineReadFromTUN
	go device.RoutineReadFromTUN()
	go device.RoutineTUNEventReader()


device.RoutineEncryption RoutineDecryption 这些函数看上去,加解密网络包是多cpu的queue队列机制
RoutineReadFromTUN  应该是从tun接收网络包了。



tun device 启动时,启动接收事件循环
func (device *Device) RoutineTUNEventReader() {
      device.Up()

unc (device *Device) Up() error {
	return device.changeState(deviceStateUp)

func (device *Device) changeState(want deviceState) (err error) {
	case deviceStateUp:
		err = device.upLocked()

func (device *Device) upLocked() error {
	if err := device.BindUpdate(); err != nil {


func (device *Device) BindUpdate() error {
	for _, fn := range recvFns {
		go device.RoutineReceiveIncoming(fn)   // 启动网络接收循环
	}





func (peer *Peer) Start()  函数 启动了网络数据包收发循环:
    	go peer.RoutineSequentialSender()
	go peer.RoutineSequentialReceiver()






/* Outbound flow
 *
 * 1. TUN queue
 * 2. Routing (sequential)
 * 3. Nonce assignment (sequential)
 * 4. Encryption (parallel)
 * 5. Transmission (sequential)
 *
 * The functions in this file occur (roughly) in the order in
 * which the packets are processed.
 *
 * Locking, Producers and Consumers
 *
 * The order of packets (per peer) must be maintained,
 * but encryption of packets happen out-of-order:
 *
 * The sequential consumers will attempt to take the lock,
 * workers release lock when they have completed work (encryption) on the packet.
 *
 * If the element is inserted into the "encryption queue",
 * the content is preceded by enough "junk" to contain the transport header
 * (to allow the construction of transport messages in-place)
 */



/* Sequentially reads packets from queue and sends to endpoint
 *
 * Obs. Single instance per peer.
 * The routine terminates then the outbound queue is closed.
 */
func (peer *Peer) RoutineSequentialSender() {
     for elem := range peer.queue.outbound.c {        发送channel
                err := peer.SendBuffer(elem.packet)        真正的网络发送
                 device.PutMessageBuffer(elem.buffer)   网络包使用的内存池
	 device.PutOutboundElement(elem)



func (peer *Peer) SendBuffer(buffer []byte) error {
        err := peer.device.net.bind.Send(buffer, peer.endpoint)       // 网络发送



func (bind *StdNetBind) Send(buff []byte, endpoint Endpoint) error {
       _, err = conn.WriteToUDPAddrPort(buff, addrPort)              // 网络发送操作






// 这个函数才是从网络接收数据,然后添加到解密队列和  RoutineSequentialReceiver的队列
/* Receives incoming datagrams for the device
 *
 * Every time the bind is updated a new routine is started for
 * IPv4 and IPv6 (separately)
 */
func (device *Device) RoutineReceiveIncoming(recv conn.ReceiveFunc) {
       buffer := device.GetMessageBuffer()     // 内存池
       for {
          ize, endpoint, err = recv(buffer[:])    // 接收网路包
          		           elem.Lock()       //   保证 RoutineDecryption  和 RoutineSequentialReceiver的处理顺序
			 peer.queue.inbound.c <- elem 
			 device.queue.decryption.c <- elem     // 解密channel
			 buffer = device.GetMessageBuffer()




// 这个接收函数只是从  channel中循环 处理解密后的数据包
func (peer *Peer) RoutineSequentialReceiver() {
    for elem := range peer.queue.inbound.c {   // 接收channel
             elem.Lock()      // 等待解密完成
              //  这里会做些  allowedips 检查
             _, err = device.tun.device.Write(elem.buffer)    // 转发到 tun 设备
		



func (device *Device) RoutineDecryption(id int) {
    for elem := range device.queue.decryption.c {
               elem.Unlock()   // 解密完成  RoutineSequentialReceiver 会拿到锁继续处理








/* Reads packets from the TUN and inserts
 * into staged queue for peer
 *
 * Obs. Single instance per TUN device
 */
func (device *Device) RoutineReadFromTUN() {
      for {
            size, err := device.tun.device.Read(elem.buffer[:], offset)   // 从tun 设备接收到数据后
          		// 从网络包的目的ip找到对应peer
		dst := elem.packet[IPv4offsetDst : IPv4offsetDst+net.IPv4len]
		peer = device.allowedips.Lookup(dst)
                                peer.StagePacket(elem)  进入 peer的处理流程
		peer.SendStagedPackets()
 

func (peer *Peer) StagePacket(elem *QueueOutboundElement) {
          case peer.queue.staged <- elem:


func (peer *Peer) SendStagedPackets() {
	for {
		select {
		case elem := <-peer.queue.staged:
			elem.Lock()  // 加入加密和 发送channel处理,下面就转入 RoutineSequentialSender 函数了
			// add to parallel and sequential queue
				peer.queue.outbound.c <- elem
				peer.device.queue.encryption.c <- elem









来看一下tun设备的读取 和发送网络包的操作


func (tun *NativeTun) Read(buf []byte, offset int) (n int, err error) {
	select {
	case err = <-tun.errors:
	default:
		if tun.nopi {
			n, err = tun.tunFile.Read(buf[offset:])     // 通常走这里
		} else {
			buff := buf[offset-4:]
			n, err = tun.tunFile.Read(buff[:])
			if errors.Is(err, syscall.EBADFD) {
				err = os.ErrClosed
			}
			if n < 4 {
				n = 0
			} else {
				n -= 4
			}
		}
	}
	return
}

func (tun *NativeTun) Write(buf []byte, offset int) (int, error) {
	if tun.nopi {
		buf = buf[offset:]
	} else {
		// reserve space for header
		buf = buf[offset-4:]

		// add packet information header
		buf[0] = 0x00
		buf[1] = 0x00
		if buf[4]>>4 == ipv6.Version {
			buf[2] = 0x86
			buf[3] = 0xdd
		} else {
			buf[2] = 0x08
			buf[3] = 0x00
		}
	}

	n, err := tun.tunFile.Write(buf)
	if errors.Is(err, syscall.EBADFD) {
		err = os.ErrClosed
	}
	return n, err
}




 但最新的代理,有一个IFF_VNET_HDR 标记相关的优化, GRO offload ,  virio 

func (tun *NativeTun) Write(bufs [][]byte, offset int) (int, error) {
	tun.writeOpMu.Lock()
	defer func() {
		tun.tcpGROTable.reset()
		tun.udpGROTable.reset()
		tun.writeOpMu.Unlock()
	}()
	var (
		errs  error
		total int
	)
	tun.toWrite = tun.toWrite[:0]
	if tun.vnetHdr {
		err := handleGRO(bufs, offset, tun.tcpGROTable, tun.udpGROTable, tun.udpGSO, &tun.toWrite)
		if err != nil {
			return 0, err
		}
		offset -= virtioNetHdrLen
	} else {
		for i := range bufs {
			tun.toWrite = append(tun.toWrite, i)
		}
	}
	for _, bufsI := range tun.toWrite {
		n, err := tun.tunFile.Write(bufs[bufsI][offset:])
		if errors.Is(err, syscall.EBADFD) {
			return total, os.ErrClosed
		}
		if err != nil {
			errs = errors.Join(errs, err)
		} else {
			total += n
		}
	}
	return total, errs
}


// handleVirtioRead splits in into bufs, leaving offset bytes at the front of
// each buffer. It mutates sizes to reflect the size of each element of bufs,
// and returns the number of packets read.
func handleVirtioRead(in []byte, bufs [][]byte, sizes []int, offset int) (int, error) {