diff --git a/chunkStorage.go b/chunkStorage.go index 705705a1d..ddb86ce9c 100644 --- a/chunkStorage.go +++ b/chunkStorage.go @@ -3,5 +3,5 @@ package main type ChunkStorage interface { Setup(config Config) Save(filePath string) string - GenerateRemotePlaylist(playlist string, segments map[string]string) string + GenerateRemotePlaylist(playlist string, variant Variant) string } diff --git a/config.go b/config.go index 0e4d301bd..abcc9bfc2 100644 --- a/config.go +++ b/config.go @@ -21,9 +21,13 @@ type Config struct { } type VideoSettings struct { - ResolutionWidth int `yaml:"resolutionWidth"` - ChunkLengthInSeconds int `yaml:"chunkLengthInSeconds"` - StreamingKey string `yaml:"streamingKey"` + ChunkLengthInSeconds int `yaml:"chunkLengthInSeconds"` + StreamingKey string `yaml:"streamingKey"` + StreamQualities []StreamQuality `yaml:"streamQualities"` +} + +type StreamQuality struct { + Bitrate string `yaml:"bitrate"` } // MaxNumberOnDisk must be at least as large as MaxNumberInPlaylist @@ -60,8 +64,6 @@ func getConfig() Config { panic(err) } - checkConfig(config) - // fmt.Printf("%+v\n", config) return config diff --git a/config/config-example.yaml b/config/config-example.yaml index 2048ffd2b..bc8191a01 100644 --- a/config/config-example.yaml +++ b/config/config-example.yaml @@ -4,10 +4,13 @@ ffmpegPath: /usr/local/bin/ffmpeg webServerPort: 8080 videoSettings: - resolutionWidth: 900 chunkLengthInSeconds: 4 streamingKey: abc123 + streamQualities: + - bitrate: 2000k + - bitrate: 6000k + files: maxNumberInPlaylist: 30 maxNumberOnDisk: 60 diff --git a/ffmpeg.go b/ffmpeg.go index 63e7b063e..8cb277ecb 100644 --- a/ffmpeg.go +++ b/ffmpeg.go @@ -6,27 +6,72 @@ import ( "os" "os/exec" "path" - "strconv" + "strings" ) func startFfmpeg(configuration Config) { var outputDir = configuration.PublicHLSPath - var hlsPlaylistName = path.Join(configuration.PublicHLSPath, "stream.m3u8") + var variantPlaylistPath = configuration.PublicHLSPath if configuration.IPFS.Enabled || configuration.S3.Enabled { outputDir = configuration.PrivateHLSPath - hlsPlaylistName = path.Join(outputDir, "temp.m3u8") + variantPlaylistPath = configuration.PrivateHLSPath } - log.Printf("Starting transcoder saving to /%s.", outputDir) + outputDir = path.Join(outputDir, "%v") + + // var masterPlaylistName = path.Join(configuration.PublicHLSPath, "%v", "stream.m3u8") + var variantPlaylistName = path.Join(variantPlaylistPath, "%v", "stream.m3u8") + // var variantRootPath = configuration.PublicHLSPath + + // variantRootPath = path.Join(variantRootPath, "%v") + // variantPlaylistName := path.Join("%v", "stream.m3u8") + + log.Printf("Starting transcoder saving to /%s.", variantPlaylistName) pipePath := getTempPipePath() - ffmpegCmd := "cat " + pipePath + " | " + configuration.FFMpegPath + - " -hide_banner -i pipe: -vf scale=" + strconv.Itoa(configuration.VideoSettings.ResolutionWidth) + ":-2 -g 48 -keyint_min 48 -preset ultrafast -f hls -hls_list_size 30 -hls_time " + - strconv.Itoa(configuration.VideoSettings.ChunkLengthInSeconds) + " -strftime 1 -use_localtime 1 -hls_segment_filename '" + - outputDir + "/stream-%Y%m%d-%s.ts' -hls_flags delete_segments -segment_wrap 100 " + hlsPlaylistName + var videoMaps = make([]string, 0) + var streamMaps = make([]string, 0) + var audioMaps = make([]string, 0) + for index, quality := range configuration.VideoSettings.StreamQualities { + videoMaps = append(videoMaps, fmt.Sprintf("-map v:0 -c:v:%d libx264 -b:v:%d %s", index, index, quality.Bitrate)) + streamMaps = append(streamMaps, fmt.Sprintf("v:%d,a:%d", index, index)) + audioMaps = append(audioMaps, "-map a:0") + } + + ffmpegFlags := []string{ + "-hide_banner", + "-i pipe:", + strings.Join(videoMaps, " "), // All the different video variants + strings.Join(audioMaps, " ") + " -c:a aac -b:a 192k -ac 2", // Audio for all the variants + "-master_pl_name stream.m3u8", + "-g 48", + "-keyint_min 48", + "-preset veryfast", + "-sc_threshold 0", + "-profile:v high", + "-f hls", + "-hls_list_size 30", + "-hls_time 10", + "-strftime 1", + "-use_localtime 1", + "-hls_playlist_type event", + "-hls_segment_filename " + path.Join(outputDir, "stream-%Y%m%d-%s.ts"), + "-hls_flags delete_segments+program_date_time+temp_file", + "-segment_wrap 100", + "-master_m3u8_publish_rate 5", + "-var_stream_map \"" + strings.Join(streamMaps, " ") + "\"", + variantPlaylistName, + } + + ffmpegFlagsString := strings.Join(ffmpegFlags, " ") + + ffmpegCmd := "cat " + pipePath + " | " + configuration.FFMpegPath + " " + ffmpegFlagsString + + // fmt.Println(ffmpegCmd) _, err := exec.Command("bash", "-c", ffmpegCmd).Output() + fmt.Println(err) verifyError(err) } diff --git a/ipfsStorage.go b/ipfsStorage.go index 5199f54dc..6208dc26e 100644 --- a/ipfsStorage.go +++ b/ipfsStorage.go @@ -1,6 +1,7 @@ package main import ( + "bufio" "context" "fmt" "io/ioutil" @@ -38,7 +39,7 @@ type IPFSStorage struct { } func (s *IPFSStorage) Setup(config Config) { - log.Println("Setting up IPFS for external storage of video...") + log.Println("Setting up IPFS for external storage of video. Please wait..") s.gateway = config.IPFS.Gateway @@ -76,14 +77,28 @@ func (s *IPFSStorage) Save(filePath string) string { newHash := s.addFileToDirectory(cidFile, filepath.Base(filePath)) - return newHash + return s.gateway + newHash } -func (s *IPFSStorage) GenerateRemotePlaylist(playlist string, segments map[string]string) string { - for local, remote := range segments { - playlist = strings.ReplaceAll(playlist, local, s.gateway+remote) +func (s *IPFSStorage) GenerateRemotePlaylist(playlist string, variant Variant) string { + var newPlaylist = "" + + scanner := bufio.NewScanner(strings.NewReader(playlist)) + for scanner.Scan() { + line := scanner.Text() + if line[0:1] != "#" { + fullRemotePath := variant.getSegmentForFilename(line) + if fullRemotePath != nil { + line = fullRemotePath.RemoteID + } else { + line = "" + } + } + + newPlaylist = newPlaylist + line + "\n" } - return playlist + + return newPlaylist } func setupPlugins(externalPluginsPath string) error { diff --git a/main.go b/main.go index 18fd9855d..53fd219ce 100644 --- a/main.go +++ b/main.go @@ -15,9 +15,9 @@ var server *Server var online = false func main() { - var hlsDirectoryPath = configuration.PublicHLSPath - log.Println("Starting up. Please wait...") + resetDirectories(configuration) + checkConfig(configuration) var usingExternalStorage = false @@ -31,8 +31,8 @@ func main() { if usingExternalStorage { storage.Setup(configuration) - hlsDirectoryPath = configuration.PrivateHLSPath - go monitorVideoContent(hlsDirectoryPath, configuration, storage) + // hlsDirectoryPath = configuration.PrivateHLSPath + go monitorVideoContent(configuration.PrivateHLSPath, configuration, storage) } go startChatServer() diff --git a/playlistMonitor.go b/playlistMonitor.go index f883531ec..c9b5fdb77 100644 --- a/playlistMonitor.go +++ b/playlistMonitor.go @@ -4,6 +4,7 @@ import ( "io/ioutil" "path" "path/filepath" + "strconv" "time" log "github.com/sirupsen/logrus" @@ -11,10 +12,54 @@ import ( "github.com/radovskyb/watcher" ) -var filesToUpload = make(map[string]string) +type Segment struct { + VariantIndex int // The bitrate variant + FullDiskPath string // Where it lives on disk + RelativeUploadPath string // Path it should have remotely + RemoteID string // Used for IPFS +} + +type Variant struct { + VariantIndex int + Segments []Segment +} + +func (v *Variant) getSegmentForFilename(filename string) *Segment { + for _, segment := range v.Segments { + if path.Base(segment.FullDiskPath) == filename { + return &segment + } + } + return nil +} + +func getSegmentFromPath(fullDiskPath string) Segment { + segment := Segment{} + segment.FullDiskPath = fullDiskPath + segment.RelativeUploadPath = getRelativePathFromAbsolutePath(fullDiskPath) + index, error := strconv.Atoi(segment.RelativeUploadPath[0:1]) + verifyError(error) + segment.VariantIndex = index + + return segment +} + +func getVariantIndexFromPath(fullDiskPath string) int { + index, error := strconv.Atoi(fullDiskPath[0:1]) + verifyError(error) + return index +} + +var variants []Variant func monitorVideoContent(pathToMonitor string, configuration Config, storage ChunkStorage) { - log.Printf("Using %s files...\n", pathToMonitor) + // Create structures to store the segments for the different stream variants + variants = make([]Variant, len(configuration.VideoSettings.StreamQualities)) + for index := range variants { + variants[index] = Variant{index, make([]Segment, 0)} + } + + log.Printf("Using %s for storing files with %d variants...\n", pathToMonitor, len(variants)) w := watcher.New() @@ -22,29 +67,40 @@ func monitorVideoContent(pathToMonitor string, configuration Config, storage Chu for { select { case event := <-w.Event: - if event.Op != watcher.Write { + + relativePath := getRelativePathFromAbsolutePath(event.Path) + + // Ignore removals + if event.Op == watcher.Remove { continue } - if filepath.Base(event.Path) == "temp.m3u8" { - for filePath, objectID := range filesToUpload { - if objectID != "" { - continue - } + // fmt.Println(event.Op, relativePath) - newObjectPath := storage.Save(path.Join(configuration.PrivateHLSPath, filePath)) - filesToUpload[filePath] = newObjectPath - } + // Handle updates to the master playlist by copying it to webroot + if relativePath == path.Join(configuration.PrivateHLSPath, "stream.m3u8") { + + copy(event.Path, path.Join(configuration.PublicHLSPath, "stream.m3u8")) + // Handle updates to playlists, but not the master playlist + } else if filepath.Ext(event.Path) == ".m3u8" { + variantIndex := getVariantIndexFromPath(relativePath) + variant := variants[variantIndex] playlistBytes, err := ioutil.ReadFile(event.Path) verifyError(err) playlistString := string(playlistBytes) + // fmt.Println("Rewriting playlist", relativePath, "to", path.Join(configuration.PublicHLSPath, relativePath)) - playlistString = storage.GenerateRemotePlaylist(playlistString, filesToUpload) - writePlaylist(playlistString, path.Join(configuration.PublicHLSPath, "/stream.m3u8")) + playlistString = storage.GenerateRemotePlaylist(playlistString, variant) + writePlaylist(playlistString, path.Join(configuration.PublicHLSPath, relativePath)) } else if filepath.Ext(event.Path) == ".ts" { - filesToUpload[filepath.Base(event.Path)] = "" + segment := getSegmentFromPath(event.Path) + newObjectPath := storage.Save(path.Join(configuration.PrivateHLSPath, segment.RelativeUploadPath)) + segment.RemoteID = newObjectPath + // fmt.Println("Uploaded", segment.RelativeUploadPath, "as", newObjectPath) + + variants[segment.VariantIndex].Segments = append(variants[segment.VariantIndex].Segments, segment) } case err := <-w.Error: log.Fatalln(err) @@ -54,8 +110,8 @@ func monitorVideoContent(pathToMonitor string, configuration Config, storage Chu } }() - // Watch this folder for changes. - if err := w.Add(pathToMonitor); err != nil { + // Watch the hls segment storage folder recursively for changes. + if err := w.AddRecursive(pathToMonitor); err != nil { log.Fatalln(err) } diff --git a/s3Storage.go b/s3Storage.go index be21a24d0..5fac389f6 100644 --- a/s3Storage.go +++ b/s3Storage.go @@ -74,26 +74,22 @@ func (s *S3Storage) Save(filePath string) string { // fmt.Println("Uploaded", filePath, "to", response.Location) - return filePath + return response.Location } -func (s *S3Storage) GenerateRemotePlaylist(playlist string, segments map[string]string) string { - baseHost, err := url.Parse(s.host) - baseHostComponents := []string{baseHost.Scheme + "://", baseHost.Host, baseHost.Path} - - verifyError(err) - - // baseHostString := fmt.Sprintf("%s://%s/%s", baseHost.Scheme, baseHost.Hostname, baseHost.Path) - +func (s *S3Storage) GenerateRemotePlaylist(playlist string, variant Variant) string { var newPlaylist = "" scanner := bufio.NewScanner(strings.NewReader(playlist)) for scanner.Scan() { line := scanner.Text() if line[0:1] != "#" { - urlComponents := baseHostComponents - urlComponents = append(urlComponents, line) - line = strings.Join(urlComponents, "") //path.Join(s.host, line) + fullRemotePath := variant.getSegmentForFilename(line) + if fullRemotePath != nil { + line = fullRemotePath.RemoteID + } else { + line = "" + } } newPlaylist = newPlaylist + line + "\n" diff --git a/utils.go b/utils.go index ce0a58656..210ec8f9f 100644 --- a/utils.go +++ b/utils.go @@ -1,8 +1,15 @@ package main import ( + "fmt" + "io/ioutil" "os" + "path" "path/filepath" + "strconv" + "strings" + + log "github.com/sirupsen/logrus" ) func getTempPipePath() string { @@ -18,8 +25,43 @@ func fileExists(name string) bool { return true } +func getRelativePathFromAbsolutePath(path string) string { + pathComponents := strings.Split(path, "/") + variant := pathComponents[len(pathComponents)-2] + file := pathComponents[len(pathComponents)-1] + return filepath.Join(variant, file) +} + func verifyError(e error) { if e != nil { - panic(e) + log.Panic(e) + } +} + +func copy(src, dst string) { + input, err := ioutil.ReadFile(src) + if err != nil { + fmt.Println(err) + return + } + + err = ioutil.WriteFile(dst, input, 0644) + if err != nil { + fmt.Println("Error creating", dst) + fmt.Println(err) + return + } +} + +func resetDirectories(configuration Config) { + // Wipe the public, web-accessible hls data directory + os.RemoveAll(configuration.PublicHLSPath) + os.MkdirAll(configuration.PublicHLSPath, 0777) + + // Create private hls data dirs + os.RemoveAll(configuration.PrivateHLSPath) + for index := range configuration.VideoSettings.StreamQualities { + os.MkdirAll(path.Join(configuration.PrivateHLSPath, strconv.Itoa(index)), 0777) + os.MkdirAll(path.Join(configuration.PublicHLSPath, strconv.Itoa(index)), 0777) } }