前言
上一篇介绍使用nginx-rtmp-module搭建直播流媒体服务,本篇会参照RTMP协议规范使用Go开发一个流媒体服务。
本系列文章分为三个部分:
- 直播基础知识
- 搭建一个流媒体直播平台
- 开发一个简单的流媒体server(本篇)
选用Go是因为它拥有Channel、协程等特性,而且语法简单,能够轻易的编写高并发应用。我们把要开发的流媒体服务命名为easylive,它支持的能力包括
- 【已实现】RTMP推流和拉流
- 【已实现】支持GOP Cache
- 协议转换,包括输出HLS/HTTP-FLV
- 协议转换,包括输出HLS/HTTP-FLV
- 直播录制
- 转推
- 鉴权与加密
既然是要实现RTMP,最权威的就是参考官方规范了,几乎是对着协议参考往下翻译,注意的是虽然RTMP已经使用多年,但是Adobe官方在12年才首次公开RTMP协议,并且实际使用上和规范会略有不同,RTMP里面使用了AMF作为编码格式,现在有两个版本AMF0和AMF3,AMF3是对AMF0做了一些拓展。以下是RTMP和AMF的规范PDF地址:
RTMP:https://wwwimages2.adobe.com/content/dam/acom/en/devnet/rtmp/pdf/rtmp_specification_1.0.pdf
AMF0:https://www.adobe.com/content/dam/acom/en/devnet/pdf/amf0-file-format-specification.pdf
AMF3: https://www.adobe.com/content/dam/acom/en/devnet/pdf/amf-file-format-spec.pdf
easylive已开源,开源地址:https://github.com/jonas2099/easylive
本文会宏观的介绍RTMP的解析过程,涉及到的细节可参考项目代码。
一、架构简介
开始之前我们先定义用例,参与者很明显包含三个,分别是主播、观众和直播间,主播向直播间推流,观众向直播间拉流,如下图所示,这是个经典的发布订阅模式。
我们抛去协议解析,整个直播流媒体Pull和Push的过程可以抽象为一个简单的循环,可以看到server充当proxy的作用,把流媒体数据分发到拉流端。
while(1){
data := conn.read()
for audience:= range audiences{
audience.send(data)
}
}
按照上面的流程再稍微封装下对象,就是easylive的处理过程,流程如下图:
客户端经过握手、建立连接、建立数据流后,就进入推流或者播放的阶段。
二、代码详解
(一)主入口
入口和其他golang tcp应用类似,监听tcp端口后开协程处理连接。
for {
var netConn net.Conn
if netConn, err = rtmpListener.Accept(); err != nil {
return err
}
go func() {
if err := processor.New(conn.NewConn(netConn)).HandleConn(); err != nil {
netConn.Close()
}
}()
}
(二)RTMP协议解析
TCP连接建立后,就开始按照RTMP协议解析包了,RTMP协议的内容比较繁杂,接下来会分小点介绍核心部分。
上图是客户端和服务端使用RTMP交互的全过程流程图,而easylive的主框架也类似,分为了handshake/handleConnect/processStream三个阶段。
func (p *ConnProcessor) HandleConn() error {
// 处理握手
if err := p.handshake(); err != nil {
return err
}
// 处理连接时的PCM(Protocol Control Messages)以及command
if err := p.handleConnect(); err != nil {
return err
}
// 读数据
if err := p.processStream(); err != nil {
return err
}
return nil
}
1.RTMP数据包格式
如上图所示,RTMP是一个多路复用协议,一条信道里同时传输多个message消息,这是因为它把多个message按chunk进行分包混合传输,这样设计也可以使得链路不会被大的数据包阻塞,比如视频关键帧阻塞了音频包产生卡顿。
如下则是RTMP的数据包格式,chunk header携带传输所需要的元数据,而chunk data则是我们真正需要传输的数据。
chunk header里面有三部分,basic header存放的是csid以及fmt,csid全称为Chunk Stream ID,用来标志具体数据类型。这个在规范里没有约定,各个客户端都可能不一样,比如ffmpeg里的定义如下
csid 2 SetChunkSize
csid 3 connect | releaseStream | FCPublish | createStream | FCUnpublish | deleteStream
csid 4 metaData + 视频数据
csid 6 音频数据
csid 8 publish
message header存放的字段有长度、message type和时间戳、message唯一ID。这里需要注意的是,RTMP协议里为了充分利用空间减轻传输负担,对于变长字段会想办法缩短其长度。
比如我们传输音视频消息时,如果消息连贯的话其实不总是需要所有的元数据,因为此时可以从前一个推断得出。所以message header根据不同情况长度也不一致(由basic header里的fmt决定),message header最长为11字节最短为0字节。
basic Header的长度可能是1、2、或3个字节,其中fmt的长度是固定的,Basic Header的长度取决于CSID的大小。
当Basic Header为1个字节时,CSID占6位,6位最多可以表示64个数,因此这种情况下CSID在[0,63]之间,其中用户可自定义的范围为[3,63]。
当Basic Header为2个字节时,CSID占14位,此时协议将与chunk type所在字节的其他位都置为0,剩下的一个字节来表示CSID-64,这样共有8个二进制位来存储CSID,8位可以表示[0,255]共256个数,因此这种情况下CSID在[64,319],其中319=255+64。
当Basic Header为3个字节时,CSID占22位,此时协议将[2,8]字节置为1,余下的16个字节表示CSID-64,这样共有16个位来存储CSID,16位可以表示[0,65535]共65536个数,因此这种情况下CSID在[64,65599],其中65599=65535+64
Extended Timestamp也是同理,一般情况下使用message header里3字节的绝对时间戳就够了,对于特殊情况再启用Extended Timestamp。
2.握手
在TCP建连后,RTMP也需要通过握手建连,客户端向服务器发送C0,C1,C2三个chunk,服务器向客户端回复S0,S1,S2三个chunk,协议本身并没有规定这6个chunk的具体传输顺序,但RTMP协议的实现者需要保证这几点如下:
- 握手开始时,客户端将发送c0,c1 chunk,此时客户端必须等待,直到收到s1 chunk,才能发送c2 chunk
- 此时服务端必须等待,直到已收到c0后才能发送s0和s1,当然也可能会等到接收c1后才发送
- 当服务器收到c2后才能再发送的其他数据,同理,当客户端收到s2后才能发送其它数据
在官方规范文档里,只介绍了简单握手,导致了按照这样实现只能支持vp6编码的流,不支持h264。实际上还有复杂握手,这是更常见的方式,例如nginx-rtmp-module/ffmpeg/fms都支持复杂握手。
服务端最好两种握手方式都要支持,两种握手的详细交互图如下:
可以看到,简单握手其实只要透传回客户端的包就可以了,而复杂握手增加了HmacSHA256算法,校验更复杂。简单握手中C1的第二段都是0填充,而复杂模式中是一个模式串,所以一般我们可以通过这4个字节来区分是简单还是复杂握手。
clientVersion := pio.U32BE(C1[4:8])
// 判断是简单握手还是复杂握手
if clientVersion != 0 {}
复杂模式中的模式串没有标准规范,主要是client和server自行协商,比如nginx-rtmp-module使用的是0x0C, 0x00, 0x0D, 0x0E。
3.解析数据包
握手之后就可以逐个chunk读取message了。多个csid相同的chunk根据message header里的长度还原成chunk Stream,即一个message,下面的GetMessage方法就是这样的过程。
func GetMessage(newConn *conn.Conn) *conn.ChunkStream {
// read chunk
var cs *conn.ChunkStream
for {
var err error
cs, err = newConn.ReadChunk()
if err != nil {
log.Errorf("getChunk.ReadChunk fail.err:%v", err)
return nil
}
if chunk.Full() {
break
}
}
// 这里根据回复包的大小判断要不要动态调整最大字节数
newConn.Ack(cs)
return cs
}
GetMessage里的conn.ReadChunk方法主要是逐个字节解析出chunk数据包,另外我们还会用conn.WriteChunk方法向客户端写chunk,详细过程可以参考项目里的代码这里就不贴出了。
目前为止,我们解析到了chunkStream,接下来就要根据chunkStream里的typeID进行具体的事件处理了,下图是常用的一些typeID字段概况。
4.处理命令消息
在正式接收媒体数据前,还要处理Connect、CreateStream、Publish等命令消息(command message),并且需要解析RTMP BODY,前面我们提到RTMP使用AMF来作为编码协议,在这里主要是解析20和17两个type。
MsgTypeIDCommandMsgAMF3 = 17 //AMF3编码,RTMP命令消息
MsgTypeIDCommandMsgAMF0 = 20 //AMF0编码,RTMP命令消息
其中MsgTypeIDCommandMsgAMF0里也可以携带AMF3编码,通过数据类型0x11表示要解析的是AMF3data类型,easylive直接引用了第三方库livego解析amf。MsgTypeIDCommandMsgAMF3我们则直接把第一个字节截去。
AMF3类型的Invoke也是使用AMF0进行序列化,只是其中的object使用AFM0类型AMF0_AMF3_OBJECT(0x11),也就是body部分是一个 采用AFM3格式的object,AMF3的command消息的第一个字节是个无用的0字节,第二个字节才是command name(以0x2开头)
完整代码如下:
func (c *Conn) HandleChunk(cs *ChunkStream) (err error) {
var cmd *Command
switch cs.TypeID {
case consts.MsgTypeIDSetChunkSize:
c.readMaxChunkSize = int(pio.U32BE(cs.Data))
log.Debugf("HandleChunk.type MsgTypeIDSetChunkSize,size:%d", c.readMaxChunkSize)
return nil
case consts.MsgTypeIDWindowAckSize:
c.remoteWindowAckSize = pio.U32BE(cs.Data)
log.Debugf("HandleChunk.type MsgTypeIDWindowAckSize,size:%d", c.remoteWindowAckSize)
return nil
case consts.MsgTypeIDCommandMsgAMF3:
log.Debugf("HandleChunk.type MsgTypeIDCommandMsgAMF3")
if len(cs.Data) < 1 {
err = fmt.Errorf("rtmp: short packet of CommandMsgAMF3")
return
}
// skip first byte
if cmd, err = c.handleCommandMsgAMF0(cs.Data[1:]); err != nil {
return
}
case consts.MsgTypeIDCommandMsgAMF0:
log.Debugf("HandleChunk.type MsgTypeIDCommandMsgAMF0")
if cmd, err = c.handleCommandMsgAMF0(cs.Data); err != nil {
return err
}
case consts.MsgTypeIDUserControl:
eventType := pio.U16BE(cs.Data)
log.Debugf("HandleChunk.type MsgTypeIDUserControl.eventType:%d", eventType)
return nil
default:
log.Warnf("HandleChunk.ignore type.id:%d", cs.TypeID)
}
if cmd == nil {
return fmt.Errorf("no cmd handler,typeID:%v", cs.TypeID)
}
log.Infof("HandleChunk.get cmd.%s", util.JSON(cmd))
return c.processCMD(cs, cmd)
}
最后能够解析到command消息体,使用command字段路由到不同的事件处理,这里列出必须处理的三种command。
(1)connect消息
握手完成之后,客户端就会向服务端发送connect消息,在connet消息中我们可以解析推流版本、连接URL等属性以及做一些校验。
字段 | 类型 | 说明 |
---|---|---|
Command Name(命令名字) | String | 命令的名字,如”connect” |
Transaction ID(事务ID) | Number | 恒为1 |
Command Object(命令包含的参数对象) | Object | 键值对集合表示的命令参数 |
Optional User Arguments(额外的用户参数) | Object | 用户自定义的额外信息 |
该命令属于NetConnection类型,处理完成后服务端回复_result消息,以及可以在这时候向client设置chunk size等参数。
func (c *Conn) connectResp(cur *ChunkStream, cmd *Command) error {
cs := c.NewWindowAckSize(2500000)
c.Write(&cs)
cs = c.NewSetPeerBandwidth(2500000)
c.Write(&cs)
cs = c.NewSetChunkSize(uint32(1024))
c.Write(&cs)
resp := make(newamf.Object)
resp["fmsVer"] = "FMS/3,0,1,123"
resp["capabilities"] = 31
event := make(newamf.Object)
event["level"] = "status"
event["code"] = "NetConnection.Connect.Success"
event["description"] = "Connection succeeded."
event["objectEncoding"] = c.ConnInfo.ObjectEncoding
return c.writeCommandMsg(cur.CSID, cur.StreamID, "_result", cmd.CommandTransId, resp, event)
}
(2)createStream消息
连接回包完成后,就可以使用createStream命令创建流了,这个命令主要用于创建一条逻辑的连接通道,如果没有特别的处理,比如在easylive中回包就可以了。
该命令属于NetConnection类型,处理完成后服务端回复_result消息。
(3)publish消息或者play消息
如果是推流处理publish消息,播放则处理play消息。该命令属于NetStream类型,处理完成后服务端回复onStatus消息。
字段名(Field Name ) |
类型(Type ) |
描述(Description ) |
---|---|---|
Command Name | String | 命令名,取值为onStatus |
Transaction ID | Number | 事务ID,取值为 0 |
Command Object | Null | onStatus 消息没有命令对象 |
Info Object | Object | 一个 AMF 对象至少要有“level”、“code”和“description”三个属性 |
5.媒体流处理
接下来就是正式处理音视频消息,上面处理命令消息的时候我们区分了本次请求是拉流还是推流。如果是拉流消息(对应代码ConnectionTypePull),则注册一个拉流事件,实际上是一个channel;如果是推流消息(对应代码ConnectionTypePublish),则在大循环里不断轮询chunk,然后发送到拉流客户端注册是channel事件。
func (p *ConnProcessor) processStream() error {
app := p.conn.ConnInfo.App
if p.conn.ConnType == conn.ConnectionTypePublish {
appStream, _ := p.getStream(app, false)
go appStream.ReadingData(p.conn)
} else if p.conn.ConnType == conn.ConnectionTypePull {
appStream, err := p.getStream(app, true)
if err != nil {
return err
}
if err := appStream.AddAudienceWriteEvent(p.conn); err != nil {
return nil
}
}
return nil
}
我们把一个直播流封装在AppStream对象里
type AppStream struct {
anchorStream *Stream // 主播流
audienceStreams map[string]*Stream // 观众流
cache *cache.Cache // 流的cache
}
(1)推流
服务端会持续接收流媒体数据,由于我们这里是直接输出RTMP流,所以直接转发这些包就可以了。
func (as *AppStream) ReadingAndTransferData(conn *conn.Conn) {
for {
p := container.GetPacketByChunk(cs)
as.cache.Write(p)
for _, audienceStream := range as.audienceStreams {
if !audienceStream.init {
if err := as.cache.Send(audienceStream.packetQueue); err != nil {
log.Errorf("ReadingData.Send.%v", err)
}
audienceStream.init = true
}
audienceStream.packetQueue <- p
}
}
}
但观察上面代码,发现首次转发包还会调用cache.Send方法,这个方法有两个作用:
-
发送AVC sequence header和AAC sequence header
编码器需要知道音视频的元数据才能解析,所以客户端拉流的时候第一个包必须是元数据包,比如音频tag里的ADTS header以及视频tag里的sps和pps等信息,如果不发送这些关键元数据解码器则不能解码。这里我们就不手动组装这些元数据了,在cache.Write方法里,我们判断如果现在收到的chunk是元数据就保存起来,下次直接发送即可。因为要判断现在接收的chunk是否是元数据,在GetPacketByChunk这个方法里还需要根据FLV协议解出header来确定音视频数据的类型和格式。
-
实现GOP保证发送关键帧,这是实现秒开的重要操作
在前面的基础知识提到,GOP是一个I帧到下一个I帧的单元组合,GOP时长越长就越节约带宽,但是也会带来延迟。假设GOP是10秒,每隔10秒才会有关键帧,如果用户在第5秒时开始播放,就会出现黑屏,如果我们缓存一个GOP,拉流请求来了的时候首先发送这个缓存的GOP播放器就可以直接播放了。
(2)拉流
推流端把流媒体数据写到channel,拉流端接收到数据后直接写出,就完成转发了。
func (s *Stream) writeToAudience() {
for {
var cs conn.ChunkStream
p, ok := <-s.packetQueue
if ok {
cs.Data = p.OriginChunk.Data
cs.Length = uint32(len(p.OriginChunk.Data))
cs.StreamID = p.OriginChunk.StreamID
cs.Timestamp = p.OriginChunk.Timestamp
cs.TypeID = p.OriginChunk.TypeID
if err := s.sendStreamChunk(&cs); err != nil {
return
}
}
}
}
到这里,一个简单的RTMP Server就介绍完毕了,整个服务的核心就是转发数据包,没有对音视频包进行额外处理。
如果还要支持HTTP-FLV和HLS协议,主要是修改AddAudienceWriteEvent方法,指定监听的数据,以及在ReadingAndTransferData和writeToAudience方法里增加特定协议的实现。
三、效果测试
我们照旧使用ffmpeg进行推流,vlc进行拉流,能看到成功的拉取到直播流。
推流命令:ffmpeg -re -i 1.flv -c copy -f flv rtmp://localhost:1936/movie
拉流地址: rtmp://localhost:1936/movie
四、后记
easylive实现了一个简单的流媒体服务,这里方便理解和演示尽可能减少实现细节,实际生产环境使用要求会苛刻很多。
如何尽可能的减少消耗是必须考虑的问题,例如协议的封包解包、channel的分发都消耗很可观的资源。可以采取池化等技术减少资源的使用;协议层面,chunk和gop的动态调节,利用流媒体的重组适配更通用的场景都是可以做的事情。
如果还有兴趣,可以看看其它开源项目的实现,如果局限于Golang实现的话,可以参考下以下开源project。
https://github.com/nareix/joy5
https://github.com/gwuhaolin/livego
https://github.com/Monibuca/engine