手把手教你入门直播开发之(三)开发流媒体server

手把手教你入门直播开发之(三)开发流媒体server

Scroll Down

前言

上一篇介绍使用nginx-rtmp-module搭建直播流媒体服务,本篇会参照RTMP协议规范使用Go开发一个流媒体服务。

本系列文章分为三个部分:

  1. 直播基础知识
  2. 搭建一个流媒体直播平台
  3. 开发一个简单的流媒体server(本篇)

选用Go是因为它拥有Channel、协程等特性,而且语法简单,能够轻易的编写高并发应用。我们把要开发的流媒体服务命名为easylive,它支持的能力包括

  • 【已实现】RTMP推流和拉流
  • 【已实现】支持GOP Cache
  • 协议转换,包括输出HLS/HTTP-FLV
  • 协议转换,包括输出HLS/HTTP-FLV
  • 直播录制
  • 转推
  • 鉴权与加密

既然是要实现RTMP,最权威的就是参考官方规范了,几乎是对着协议参考往下翻译,注意的是虽然RTMP已经使用多年,但是Adobe官方在12年才首次公开RTMP协议,并且实际使用上和规范会略有不同,RTMP里面使用了AMF作为编码格式,现在有两个版本AMF0和AMF3,AMF3是对AMF0做了一些拓展。以下是RTMP和AMF的规范PDF地址:

RTMP:https://wwwimages2.adobe.com/content/dam/acom/en/devnet/rtmp/pdf/rtmp_specification_1.0.pdf

AMF0:https://www.adobe.com/content/dam/acom/en/devnet/pdf/amf0-file-format-specification.pdf

AMF3: https://www.adobe.com/content/dam/acom/en/devnet/pdf/amf-file-format-spec.pdf

easylive已开源,开源地址:https://github.com/jonas2099/easylive
本文会宏观的介绍RTMP的解析过程,涉及到的细节可参考项目代码。

一、架构简介

开始之前我们先定义用例,参与者很明显包含三个,分别是主播、观众和直播间,主播向直播间推流,观众向直播间拉流,如下图所示,这是个经典的发布订阅模式。

发布订阅

我们抛去协议解析,整个直播流媒体Pull和Push的过程可以抽象为一个简单的循环,可以看到server充当proxy的作用,把流媒体数据分发到拉流端。

while(1){
 data := conn.read()
 for audience:= range audiences{
    audience.send(data)
 }
}

按照上面的流程再稍微封装下对象,就是easylive的处理过程,流程如下图:

easylive

客户端经过握手、建立连接、建立数据流后,就进入推流或者播放的阶段。

二、代码详解

(一)主入口

入口和其他golang tcp应用类似,监听tcp端口后开协程处理连接。

for {
		var netConn net.Conn
		if netConn, err = rtmpListener.Accept(); err != nil {
			return err
		}
		go func() {
			if err := processor.New(conn.NewConn(netConn)).HandleConn(); err != nil {
				netConn.Close()
			}
		}()
}

(二)RTMP协议解析

TCP连接建立后,就开始按照RTMP协议解析包了,RTMP协议的内容比较繁杂,接下来会分小点介绍核心部分。

rtmp_process

上图是客户端和服务端使用RTMP交互的全过程流程图,而easylive的主框架也类似,分为了handshake/handleConnect/processStream三个阶段。

func (p *ConnProcessor) HandleConn() error {
	// 处理握手
	if err := p.handshake(); err != nil {
		return err
	}
	// 处理连接时的PCM(Protocol Control Messages)以及command
	if err := p.handleConnect(); err != nil {
		return err
	}
	// 读数据
	if err := p.processStream(); err != nil {
		return err
	}
	return nil
}
1.RTMP数据包格式

rtmp_chunk

如上图所示,RTMP是一个多路复用协议,一条信道里同时传输多个message消息,这是因为它把多个message按chunk进行分包混合传输,这样设计也可以使得链路不会被大的数据包阻塞,比如视频关键帧阻塞了音频包产生卡顿。

如下则是RTMP的数据包格式,chunk header携带传输所需要的元数据,而chunk data则是我们真正需要传输的数据。

img

image-20210718001744330

chunk header里面有三部分,basic header存放的是csid以及fmt,csid全称为Chunk Stream ID,用来标志具体数据类型。这个在规范里没有约定,各个客户端都可能不一样,比如ffmpeg里的定义如下

  • csid 2 SetChunkSize

  • csid 3 connect | releaseStream | FCPublish | createStream | FCUnpublish | deleteStream

  • csid 4 metaData + 视频数据

  • csid 6 音频数据

  • csid 8 publish

message header存放的字段有长度、message type和时间戳、message唯一ID。这里需要注意的是,RTMP协议里为了充分利用空间减轻传输负担,对于变长字段会想办法缩短其长度。

比如我们传输音视频消息时,如果消息连贯的话其实不总是需要所有的元数据,因为此时可以从前一个推断得出。所以message header根据不同情况长度也不一致(由basic header里的fmt决定),message header最长为11字节最短为0字节。

rtmp message header

basic Header的长度可能是1、2、或3个字节,其中fmt的长度是固定的,Basic Header的长度取决于CSID的大小。

当Basic Header为1个字节时,CSID占6位,6位最多可以表示64个数,因此这种情况下CSID在[0,63]之间,其中用户可自定义的范围为[3,63]。

当Basic Header为2个字节时,CSID占14位,此时协议将与chunk type所在字节的其他位都置为0,剩下的一个字节来表示CSID-64,这样共有8个二进制位来存储CSID,8位可以表示[0,255]共256个数,因此这种情况下CSID在[64,319],其中319=255+64。

当Basic Header为3个字节时,CSID占22位,此时协议将[2,8]字节置为1,余下的16个字节表示CSID-64,这样共有16个位来存储CSID,16位可以表示[0,65535]共65536个数,因此这种情况下CSID在[64,65599],其中65599=65535+64

Extended Timestamp也是同理,一般情况下使用message header里3字节的绝对时间戳就够了,对于特殊情况再启用Extended Timestamp。

2.握手

在TCP建连后,RTMP也需要通过握手建连,客户端向服务器发送C0,C1,C2三个chunk,服务器向客户端回复S0,S1,S2三个chunk,协议本身并没有规定这6个chunk的具体传输顺序,但RTMP协议的实现者需要保证这几点如下:

  1. 握手开始时,客户端将发送c0,c1 chunk,此时客户端必须等待,直到收到s1 chunk,才能发送c2 chunk
  2. 此时服务端必须等待,直到已收到c0后才能发送s0和s1,当然也可能会等到接收c1后才发送
  3. 当服务器收到c2后才能再发送的其他数据,同理,当客户端收到s2后才能发送其它数据

在官方规范文档里,只介绍了简单握手,导致了按照这样实现只能支持vp6编码的流,不支持h264。实际上还有复杂握手,这是更常见的方式,例如nginx-rtmp-module/ffmpeg/fms都支持复杂握手。

服务端最好两种握手方式都要支持,两种握手的详细交互图如下:

rtmp简单握手和复杂握手

可以看到,简单握手其实只要透传回客户端的包就可以了,而复杂握手增加了HmacSHA256算法,校验更复杂。简单握手中C1的第二段都是0填充,而复杂模式中是一个模式串,所以一般我们可以通过这4个字节来区分是简单还是复杂握手。

clientVersion := pio.U32BE(C1[4:8])
// 判断是简单握手还是复杂握手
if clientVersion != 0 {}

复杂模式中的模式串没有标准规范,主要是client和server自行协商,比如nginx-rtmp-module使用的是0x0C, 0x00, 0x0D, 0x0E。

3.解析数据包

握手之后就可以逐个chunk读取message了。多个csid相同的chunk根据message header里的长度还原成chunk Stream,即一个message,下面的GetMessage方法就是这样的过程。

func GetMessage(newConn *conn.Conn) *conn.ChunkStream {
	// read chunk
	var cs *conn.ChunkStream
	for {
		var err error
		cs, err = newConn.ReadChunk()
		if err != nil {
			log.Errorf("getChunk.ReadChunk fail.err:%v", err)
			return nil
		}
		if chunk.Full() {
			break
		}
	}
    // 这里根据回复包的大小判断要不要动态调整最大字节数
	newConn.Ack(cs)
	return cs
}

GetMessage里的conn.ReadChunk方法主要是逐个字节解析出chunk数据包,另外我们还会用conn.WriteChunk方法向客户端写chunk,详细过程可以参考项目里的代码这里就不贴出了。

目前为止,我们解析到了chunkStream,接下来就要根据chunkStream里的typeID进行具体的事件处理了,下图是常用的一些typeID字段概况。

message typeID枚举

4.处理命令消息

在正式接收媒体数据前,还要处理Connect、CreateStream、Publish等命令消息(command message),并且需要解析RTMP BODY,前面我们提到RTMP使用AMF来作为编码协议,在这里主要是解析20和17两个type。

	MsgTypeIDCommandMsgAMF3 = 17 //AMF3编码,RTMP命令消息
	MsgTypeIDCommandMsgAMF0 = 20 //AMF0编码,RTMP命令消息

其中MsgTypeIDCommandMsgAMF0里也可以携带AMF3编码,通过数据类型0x11表示要解析的是AMF3data类型,easylive直接引用了第三方库livego解析amf。MsgTypeIDCommandMsgAMF3我们则直接把第一个字节截去。

AMF3类型的Invoke也是使用AMF0进行序列化,只是其中的object使用AFM0类型AMF0_AMF3_OBJECT(0x11),也就是body部分是一个 采用AFM3格式的object,AMF3的command消息的第一个字节是个无用的0字节,第二个字节才是command name(以0x2开头)

完整代码如下:

func (c *Conn) HandleChunk(cs *ChunkStream) (err error) {
	var cmd *Command
	switch cs.TypeID {
	case consts.MsgTypeIDSetChunkSize:
		c.readMaxChunkSize = int(pio.U32BE(cs.Data))
		log.Debugf("HandleChunk.type MsgTypeIDSetChunkSize,size:%d", c.readMaxChunkSize)
		return nil
	case consts.MsgTypeIDWindowAckSize:
		c.remoteWindowAckSize = pio.U32BE(cs.Data)
		log.Debugf("HandleChunk.type MsgTypeIDWindowAckSize,size:%d", c.remoteWindowAckSize)
		return nil
	case consts.MsgTypeIDCommandMsgAMF3:
		log.Debugf("HandleChunk.type MsgTypeIDCommandMsgAMF3")
		if len(cs.Data) < 1 {
			err = fmt.Errorf("rtmp: short packet of CommandMsgAMF3")
			return
		}
		// skip first byte
		if cmd, err = c.handleCommandMsgAMF0(cs.Data[1:]); err != nil {
			return
		}
	case consts.MsgTypeIDCommandMsgAMF0:
		log.Debugf("HandleChunk.type MsgTypeIDCommandMsgAMF0")
		if cmd, err = c.handleCommandMsgAMF0(cs.Data); err != nil {
			return err
		}
	case consts.MsgTypeIDUserControl:
		eventType := pio.U16BE(cs.Data)
		log.Debugf("HandleChunk.type MsgTypeIDUserControl.eventType:%d", eventType)
		return nil
	default:
		log.Warnf("HandleChunk.ignore type.id:%d", cs.TypeID)
	}
	if cmd == nil {
		return fmt.Errorf("no cmd handler,typeID:%v", cs.TypeID)
	}
	log.Infof("HandleChunk.get cmd.%s", util.JSON(cmd))
	return c.processCMD(cs, cmd)
}

最后能够解析到command消息体,使用command字段路由到不同的事件处理,这里列出必须处理的三种command。

(1)connect消息

rtmp_connect

握手完成之后,客户端就会向服务端发送connect消息,在connet消息中我们可以解析推流版本、连接URL等属性以及做一些校验。

字段 类型 说明
Command Name(命令名字) String 命令的名字,如”connect”
Transaction ID(事务ID) Number 恒为1
Command Object(命令包含的参数对象) Object 键值对集合表示的命令参数
Optional User Arguments(额外的用户参数) Object 用户自定义的额外信息

该命令属于NetConnection类型,处理完成后服务端回复_result消息,以及可以在这时候向client设置chunk size等参数。

func (c *Conn) connectResp(cur *ChunkStream, cmd *Command) error {
	cs := c.NewWindowAckSize(2500000)
	c.Write(&cs)
	cs = c.NewSetPeerBandwidth(2500000)
	c.Write(&cs)
	cs = c.NewSetChunkSize(uint32(1024))
	c.Write(&cs)

	resp := make(newamf.Object)
	resp["fmsVer"] = "FMS/3,0,1,123"
	resp["capabilities"] = 31

	event := make(newamf.Object)
	event["level"] = "status"
	event["code"] = "NetConnection.Connect.Success"
	event["description"] = "Connection succeeded."
	event["objectEncoding"] = c.ConnInfo.ObjectEncoding
	return c.writeCommandMsg(cur.CSID, cur.StreamID, "_result", cmd.CommandTransId, resp, event)
}
(2)createStream消息

连接回包完成后,就可以使用createStream命令创建流了,这个命令主要用于创建一条逻辑的连接通道,如果没有特别的处理,比如在easylive中回包就可以了。

该命令属于NetConnection类型,处理完成后服务端回复_result消息。

(3)publish消息或者play消息

如果是推流处理publish消息,播放则处理play消息。该命令属于NetStream类型,处理完成后服务端回复onStatus消息。

字段名(Field Name 类型(Type 描述(Description
Command Name String 命令名,取值为onStatus
Transaction ID Number 事务ID,取值为 0
Command Object Null onStatus消息没有命令对象
Info Object Object 一个 AMF 对象至少要有“level”、“code”和“description”三个属性
5.媒体流处理

接下来就是正式处理音视频消息,上面处理命令消息的时候我们区分了本次请求是拉流还是推流。如果是拉流消息(对应代码ConnectionTypePull),则注册一个拉流事件,实际上是一个channel;如果是推流消息(对应代码ConnectionTypePublish),则在大循环里不断轮询chunk,然后发送到拉流客户端注册是channel事件。

func (p *ConnProcessor) processStream() error {
	app := p.conn.ConnInfo.App
	if p.conn.ConnType == conn.ConnectionTypePublish {
		appStream, _ := p.getStream(app, false)
		go appStream.ReadingData(p.conn)
	} else if p.conn.ConnType == conn.ConnectionTypePull {
		appStream, err := p.getStream(app, true)
		if err != nil {
			return err
		}
		if err := appStream.AddAudienceWriteEvent(p.conn); err != nil {
			return nil
		}
	}
	return nil
}

我们把一个直播流封装在AppStream对象里

type AppStream struct {
	anchorStream    *Stream            // 主播流
	audienceStreams map[string]*Stream // 观众流 
	cache           *cache.Cache       // 流的cache
}
(1)推流

服务端会持续接收流媒体数据,由于我们这里是直接输出RTMP流,所以直接转发这些包就可以了。

func (as *AppStream) ReadingAndTransferData(conn *conn.Conn) {
    for {
		p := container.GetPacketByChunk(cs)
		as.cache.Write(p)
		for _, audienceStream := range as.audienceStreams {
			if !audienceStream.init {
				if err := as.cache.Send(audienceStream.packetQueue); err != nil {
					log.Errorf("ReadingData.Send.%v", err)
				}
				audienceStream.init = true
			}
             audienceStream.packetQueue <- p
		}
	}
}

但观察上面代码,发现首次转发包还会调用cache.Send方法,这个方法有两个作用:

  • 发送AVC sequence header和AAC sequence header

    编码器需要知道音视频的元数据才能解析,所以客户端拉流的时候第一个包必须是元数据包,比如音频tag里的ADTS header以及视频tag里的sps和pps等信息,如果不发送这些关键元数据解码器则不能解码。这里我们就不手动组装这些元数据了,在cache.Write方法里,我们判断如果现在收到的chunk是元数据就保存起来,下次直接发送即可。因为要判断现在接收的chunk是否是元数据,在GetPacketByChunk这个方法里还需要根据FLV协议解出header来确定音视频数据的类型和格式。

  • 实现GOP保证发送关键帧,这是实现秒开的重要操作

    在前面的基础知识提到,GOP是一个I帧到下一个I帧的单元组合,GOP时长越长就越节约带宽,但是也会带来延迟。假设GOP是10秒,每隔10秒才会有关键帧,如果用户在第5秒时开始播放,就会出现黑屏,如果我们缓存一个GOP,拉流请求来了的时候首先发送这个缓存的GOP播放器就可以直接播放了。

(2)拉流

推流端把流媒体数据写到channel,拉流端接收到数据后直接写出,就完成转发了。

func (s *Stream) writeToAudience() {
	for {
		var cs conn.ChunkStream
		p, ok := <-s.packetQueue
		if ok {
			cs.Data = p.OriginChunk.Data
			cs.Length = uint32(len(p.OriginChunk.Data))
			cs.StreamID = p.OriginChunk.StreamID
			cs.Timestamp = p.OriginChunk.Timestamp
			cs.TypeID = p.OriginChunk.TypeID
			if err := s.sendStreamChunk(&cs); err != nil {
				return
			}
		}
	}
}

到这里,一个简单的RTMP Server就介绍完毕了,整个服务的核心就是转发数据包,没有对音视频包进行额外处理。

如果还要支持HTTP-FLV和HLS协议,主要是修改AddAudienceWriteEvent方法,指定监听的数据,以及在ReadingAndTransferData和writeToAudience方法里增加特定协议的实现。

三、效果测试

我们照旧使用ffmpeg进行推流,vlc进行拉流,能看到成功的拉取到直播流。

推流命令:ffmpeg -re -i 1.flv -c copy -f flv rtmp://localhost:1936/movie

拉流地址: rtmp://localhost:1936/movie

image-20210714233348022

四、后记

easylive实现了一个简单的流媒体服务,这里方便理解和演示尽可能减少实现细节,实际生产环境使用要求会苛刻很多。

如何尽可能的减少消耗是必须考虑的问题,例如协议的封包解包、channel的分发都消耗很可观的资源。可以采取池化等技术减少资源的使用;协议层面,chunk和gop的动态调节,利用流媒体的重组适配更通用的场景都是可以做的事情。

如果还有兴趣,可以看看其它开源项目的实现,如果局限于Golang实现的话,可以参考下以下开源project。

https://github.com/nareix/joy5

https://github.com/gwuhaolin/livego

https://github.com/Monibuca/engine

参考

RTMP协议规范中文翻译

GOP、IDR、SPS、PPS、SS等视频编码术语

Flv封装格式

RTMP协议详解

RTMP握手详解