Support multiple adaptive bitrates

This commit is contained in:
Gabe Kangas 2020-06-09 01:52:15 -07:00
parent 29f51f6ccc
commit 0b5452de89
9 changed files with 213 additions and 54 deletions

View File

@ -3,5 +3,5 @@ package main
type ChunkStorage interface {
Setup(config Config)
Save(filePath string) string
GenerateRemotePlaylist(playlist string, segments map[string]string) string
GenerateRemotePlaylist(playlist string, variant Variant) string
}

View File

@ -21,9 +21,13 @@ type Config struct {
}
type VideoSettings struct {
ResolutionWidth int `yaml:"resolutionWidth"`
ChunkLengthInSeconds int `yaml:"chunkLengthInSeconds"`
StreamingKey string `yaml:"streamingKey"`
ChunkLengthInSeconds int `yaml:"chunkLengthInSeconds"`
StreamingKey string `yaml:"streamingKey"`
StreamQualities []StreamQuality `yaml:"streamQualities"`
}
type StreamQuality struct {
Bitrate string `yaml:"bitrate"`
}
// MaxNumberOnDisk must be at least as large as MaxNumberInPlaylist
@ -60,8 +64,6 @@ func getConfig() Config {
panic(err)
}
checkConfig(config)
// fmt.Printf("%+v\n", config)
return config

View File

@ -4,10 +4,13 @@ ffmpegPath: /usr/local/bin/ffmpeg
webServerPort: 8080
videoSettings:
resolutionWidth: 900
chunkLengthInSeconds: 4
streamingKey: abc123
streamQualities:
- bitrate: 2000k
- bitrate: 6000k
files:
maxNumberInPlaylist: 30
maxNumberOnDisk: 60

View File

@ -6,27 +6,72 @@ import (
"os"
"os/exec"
"path"
"strconv"
"strings"
)
func startFfmpeg(configuration Config) {
var outputDir = configuration.PublicHLSPath
var hlsPlaylistName = path.Join(configuration.PublicHLSPath, "stream.m3u8")
var variantPlaylistPath = configuration.PublicHLSPath
if configuration.IPFS.Enabled || configuration.S3.Enabled {
outputDir = configuration.PrivateHLSPath
hlsPlaylistName = path.Join(outputDir, "temp.m3u8")
variantPlaylistPath = configuration.PrivateHLSPath
}
log.Printf("Starting transcoder saving to /%s.", outputDir)
outputDir = path.Join(outputDir, "%v")
// var masterPlaylistName = path.Join(configuration.PublicHLSPath, "%v", "stream.m3u8")
var variantPlaylistName = path.Join(variantPlaylistPath, "%v", "stream.m3u8")
// var variantRootPath = configuration.PublicHLSPath
// variantRootPath = path.Join(variantRootPath, "%v")
// variantPlaylistName := path.Join("%v", "stream.m3u8")
log.Printf("Starting transcoder saving to /%s.", variantPlaylistName)
pipePath := getTempPipePath()
ffmpegCmd := "cat " + pipePath + " | " + configuration.FFMpegPath +
" -hide_banner -i pipe: -vf scale=" + strconv.Itoa(configuration.VideoSettings.ResolutionWidth) + ":-2 -g 48 -keyint_min 48 -preset ultrafast -f hls -hls_list_size 30 -hls_time " +
strconv.Itoa(configuration.VideoSettings.ChunkLengthInSeconds) + " -strftime 1 -use_localtime 1 -hls_segment_filename '" +
outputDir + "/stream-%Y%m%d-%s.ts' -hls_flags delete_segments -segment_wrap 100 " + hlsPlaylistName
var videoMaps = make([]string, 0)
var streamMaps = make([]string, 0)
var audioMaps = make([]string, 0)
for index, quality := range configuration.VideoSettings.StreamQualities {
videoMaps = append(videoMaps, fmt.Sprintf("-map v:0 -c:v:%d libx264 -b:v:%d %s", index, index, quality.Bitrate))
streamMaps = append(streamMaps, fmt.Sprintf("v:%d,a:%d", index, index))
audioMaps = append(audioMaps, "-map a:0")
}
ffmpegFlags := []string{
"-hide_banner",
"-i pipe:",
strings.Join(videoMaps, " "), // All the different video variants
strings.Join(audioMaps, " ") + " -c:a aac -b:a 192k -ac 2", // Audio for all the variants
"-master_pl_name stream.m3u8",
"-g 48",
"-keyint_min 48",
"-preset veryfast",
"-sc_threshold 0",
"-profile:v high",
"-f hls",
"-hls_list_size 30",
"-hls_time 10",
"-strftime 1",
"-use_localtime 1",
"-hls_playlist_type event",
"-hls_segment_filename " + path.Join(outputDir, "stream-%Y%m%d-%s.ts"),
"-hls_flags delete_segments+program_date_time+temp_file",
"-segment_wrap 100",
"-master_m3u8_publish_rate 5",
"-var_stream_map \"" + strings.Join(streamMaps, " ") + "\"",
variantPlaylistName,
}
ffmpegFlagsString := strings.Join(ffmpegFlags, " ")
ffmpegCmd := "cat " + pipePath + " | " + configuration.FFMpegPath + " " + ffmpegFlagsString
// fmt.Println(ffmpegCmd)
_, err := exec.Command("bash", "-c", ffmpegCmd).Output()
fmt.Println(err)
verifyError(err)
}

View File

@ -1,6 +1,7 @@
package main
import (
"bufio"
"context"
"fmt"
"io/ioutil"
@ -38,7 +39,7 @@ type IPFSStorage struct {
}
func (s *IPFSStorage) Setup(config Config) {
log.Println("Setting up IPFS for external storage of video...")
log.Println("Setting up IPFS for external storage of video. Please wait..")
s.gateway = config.IPFS.Gateway
@ -76,14 +77,28 @@ func (s *IPFSStorage) Save(filePath string) string {
newHash := s.addFileToDirectory(cidFile, filepath.Base(filePath))
return newHash
return s.gateway + newHash
}
func (s *IPFSStorage) GenerateRemotePlaylist(playlist string, segments map[string]string) string {
for local, remote := range segments {
playlist = strings.ReplaceAll(playlist, local, s.gateway+remote)
func (s *IPFSStorage) GenerateRemotePlaylist(playlist string, variant Variant) string {
var newPlaylist = ""
scanner := bufio.NewScanner(strings.NewReader(playlist))
for scanner.Scan() {
line := scanner.Text()
if line[0:1] != "#" {
fullRemotePath := variant.getSegmentForFilename(line)
if fullRemotePath != nil {
line = fullRemotePath.RemoteID
} else {
line = ""
}
}
newPlaylist = newPlaylist + line + "\n"
}
return playlist
return newPlaylist
}
func setupPlugins(externalPluginsPath string) error {

View File

@ -15,9 +15,9 @@ var server *Server
var online = false
func main() {
var hlsDirectoryPath = configuration.PublicHLSPath
log.Println("Starting up. Please wait...")
resetDirectories(configuration)
checkConfig(configuration)
var usingExternalStorage = false
@ -31,8 +31,8 @@ func main() {
if usingExternalStorage {
storage.Setup(configuration)
hlsDirectoryPath = configuration.PrivateHLSPath
go monitorVideoContent(hlsDirectoryPath, configuration, storage)
// hlsDirectoryPath = configuration.PrivateHLSPath
go monitorVideoContent(configuration.PrivateHLSPath, configuration, storage)
}
go startChatServer()

View File

@ -4,6 +4,7 @@ import (
"io/ioutil"
"path"
"path/filepath"
"strconv"
"time"
log "github.com/sirupsen/logrus"
@ -11,10 +12,54 @@ import (
"github.com/radovskyb/watcher"
)
var filesToUpload = make(map[string]string)
type Segment struct {
VariantIndex int // The bitrate variant
FullDiskPath string // Where it lives on disk
RelativeUploadPath string // Path it should have remotely
RemoteID string // Used for IPFS
}
type Variant struct {
VariantIndex int
Segments []Segment
}
func (v *Variant) getSegmentForFilename(filename string) *Segment {
for _, segment := range v.Segments {
if path.Base(segment.FullDiskPath) == filename {
return &segment
}
}
return nil
}
func getSegmentFromPath(fullDiskPath string) Segment {
segment := Segment{}
segment.FullDiskPath = fullDiskPath
segment.RelativeUploadPath = getRelativePathFromAbsolutePath(fullDiskPath)
index, error := strconv.Atoi(segment.RelativeUploadPath[0:1])
verifyError(error)
segment.VariantIndex = index
return segment
}
func getVariantIndexFromPath(fullDiskPath string) int {
index, error := strconv.Atoi(fullDiskPath[0:1])
verifyError(error)
return index
}
var variants []Variant
func monitorVideoContent(pathToMonitor string, configuration Config, storage ChunkStorage) {
log.Printf("Using %s files...\n", pathToMonitor)
// Create structures to store the segments for the different stream variants
variants = make([]Variant, len(configuration.VideoSettings.StreamQualities))
for index := range variants {
variants[index] = Variant{index, make([]Segment, 0)}
}
log.Printf("Using %s for storing files with %d variants...\n", pathToMonitor, len(variants))
w := watcher.New()
@ -22,29 +67,40 @@ func monitorVideoContent(pathToMonitor string, configuration Config, storage Chu
for {
select {
case event := <-w.Event:
if event.Op != watcher.Write {
relativePath := getRelativePathFromAbsolutePath(event.Path)
// Ignore removals
if event.Op == watcher.Remove {
continue
}
if filepath.Base(event.Path) == "temp.m3u8" {
for filePath, objectID := range filesToUpload {
if objectID != "" {
continue
}
// fmt.Println(event.Op, relativePath)
newObjectPath := storage.Save(path.Join(configuration.PrivateHLSPath, filePath))
filesToUpload[filePath] = newObjectPath
}
// Handle updates to the master playlist by copying it to webroot
if relativePath == path.Join(configuration.PrivateHLSPath, "stream.m3u8") {
copy(event.Path, path.Join(configuration.PublicHLSPath, "stream.m3u8"))
// Handle updates to playlists, but not the master playlist
} else if filepath.Ext(event.Path) == ".m3u8" {
variantIndex := getVariantIndexFromPath(relativePath)
variant := variants[variantIndex]
playlistBytes, err := ioutil.ReadFile(event.Path)
verifyError(err)
playlistString := string(playlistBytes)
// fmt.Println("Rewriting playlist", relativePath, "to", path.Join(configuration.PublicHLSPath, relativePath))
playlistString = storage.GenerateRemotePlaylist(playlistString, filesToUpload)
writePlaylist(playlistString, path.Join(configuration.PublicHLSPath, "/stream.m3u8"))
playlistString = storage.GenerateRemotePlaylist(playlistString, variant)
writePlaylist(playlistString, path.Join(configuration.PublicHLSPath, relativePath))
} else if filepath.Ext(event.Path) == ".ts" {
filesToUpload[filepath.Base(event.Path)] = ""
segment := getSegmentFromPath(event.Path)
newObjectPath := storage.Save(path.Join(configuration.PrivateHLSPath, segment.RelativeUploadPath))
segment.RemoteID = newObjectPath
// fmt.Println("Uploaded", segment.RelativeUploadPath, "as", newObjectPath)
variants[segment.VariantIndex].Segments = append(variants[segment.VariantIndex].Segments, segment)
}
case err := <-w.Error:
log.Fatalln(err)
@ -54,8 +110,8 @@ func monitorVideoContent(pathToMonitor string, configuration Config, storage Chu
}
}()
// Watch this folder for changes.
if err := w.Add(pathToMonitor); err != nil {
// Watch the hls segment storage folder recursively for changes.
if err := w.AddRecursive(pathToMonitor); err != nil {
log.Fatalln(err)
}

View File

@ -74,26 +74,22 @@ func (s *S3Storage) Save(filePath string) string {
// fmt.Println("Uploaded", filePath, "to", response.Location)
return filePath
return response.Location
}
func (s *S3Storage) GenerateRemotePlaylist(playlist string, segments map[string]string) string {
baseHost, err := url.Parse(s.host)
baseHostComponents := []string{baseHost.Scheme + "://", baseHost.Host, baseHost.Path}
verifyError(err)
// baseHostString := fmt.Sprintf("%s://%s/%s", baseHost.Scheme, baseHost.Hostname, baseHost.Path)
func (s *S3Storage) GenerateRemotePlaylist(playlist string, variant Variant) string {
var newPlaylist = ""
scanner := bufio.NewScanner(strings.NewReader(playlist))
for scanner.Scan() {
line := scanner.Text()
if line[0:1] != "#" {
urlComponents := baseHostComponents
urlComponents = append(urlComponents, line)
line = strings.Join(urlComponents, "") //path.Join(s.host, line)
fullRemotePath := variant.getSegmentForFilename(line)
if fullRemotePath != nil {
line = fullRemotePath.RemoteID
} else {
line = ""
}
}
newPlaylist = newPlaylist + line + "\n"

View File

@ -1,8 +1,15 @@
package main
import (
"fmt"
"io/ioutil"
"os"
"path"
"path/filepath"
"strconv"
"strings"
log "github.com/sirupsen/logrus"
)
func getTempPipePath() string {
@ -18,8 +25,43 @@ func fileExists(name string) bool {
return true
}
func getRelativePathFromAbsolutePath(path string) string {
pathComponents := strings.Split(path, "/")
variant := pathComponents[len(pathComponents)-2]
file := pathComponents[len(pathComponents)-1]
return filepath.Join(variant, file)
}
func verifyError(e error) {
if e != nil {
panic(e)
log.Panic(e)
}
}
func copy(src, dst string) {
input, err := ioutil.ReadFile(src)
if err != nil {
fmt.Println(err)
return
}
err = ioutil.WriteFile(dst, input, 0644)
if err != nil {
fmt.Println("Error creating", dst)
fmt.Println(err)
return
}
}
func resetDirectories(configuration Config) {
// Wipe the public, web-accessible hls data directory
os.RemoveAll(configuration.PublicHLSPath)
os.MkdirAll(configuration.PublicHLSPath, 0777)
// Create private hls data dirs
os.RemoveAll(configuration.PrivateHLSPath)
for index := range configuration.VideoSettings.StreamQualities {
os.MkdirAll(path.Join(configuration.PrivateHLSPath, strconv.Itoa(index)), 0777)
os.MkdirAll(path.Join(configuration.PublicHLSPath, strconv.Itoa(index)), 0777)
}
}