From 4ba36c17a38afb6efcbbe8356cd7767c7ec1960c Mon Sep 17 00:00:00 2001 From: Gabe Kangas Date: Wed, 9 Aug 2023 16:19:09 -0700 Subject: [PATCH] feat(video): first pass at replay functionality - Persist segments - Record configurations - Rebuild entire stream playlists - First steps to working towards https://github.com/owncast/owncast/issues/102 --- config/config.go | 3 + controllers/replays.go | 78 +++++ core/core.go | 9 +- core/data/data.go | 1 + core/data/recording.go | 44 +++ core/offlineState.go | 4 +- core/storageproviders/local.go | 59 +++- core/storageproviders/rewriteLocalPlaylist.go | 37 ++- core/storageproviders/s3Storage.go | 81 +++-- core/streamState.go | 27 +- core/transcoder/fileWriterReceiverService.go | 7 +- core/transcoder/hlsHandler.go | 30 +- core/transcoder/transcoder.go | 26 +- core/transcoder/transcoder_nvenc_test.go | 4 +- core/transcoder/transcoder_omx_test.go | 4 +- core/transcoder/transcoder_vaapi_test.go | 4 +- .../transcoder_videotoolbox_test.go | 4 +- core/transcoder/transcoder_x264_test.go | 4 +- core/transcoder/utils.go | 14 +- core/video.go | 26 ++ db/db.go | 2 +- db/models.go | 30 +- db/query.sql | 32 ++ db/query.sql.go | 294 +++++++++++++++++- db/schema.sql | 42 +++ models/currentBroadcast.go | 1 + models/storageProvider.go | 7 +- replays/hlsRecorder.go | 130 ++++++++ replays/hlsSegment.go | 12 + replays/mediaPlaylistAllowCacheTag.go | 49 +++ replays/outputConfiguration.go | 13 + replays/playlistGenerator.go | 283 +++++++++++++++++ replays/playlistGenerator_test.go | 136 ++++++++ replays/storageProvider.go | 6 + replays/stream.go | 41 +++ router/router.go | 5 + utils/utils.go | 10 +- 37 files changed, 1449 insertions(+), 110 deletions(-) create mode 100644 controllers/replays.go create mode 100644 core/data/recording.go create mode 100644 core/video.go create mode 100644 replays/hlsRecorder.go create mode 100644 replays/hlsSegment.go create mode 100644 replays/mediaPlaylistAllowCacheTag.go create mode 100644 replays/outputConfiguration.go create mode 100644 replays/playlistGenerator.go create mode 100644 replays/playlistGenerator_test.go create mode 100644 replays/storageProvider.go create mode 100644 replays/stream.go diff --git a/config/config.go b/config/config.go index d4f590382..d5e707b93 100644 --- a/config/config.go +++ b/config/config.go @@ -43,6 +43,9 @@ var EnableAutoUpdate = false // A temporary stream key that can be set via the command line. var TemporaryStreamKey = "" +// EnableRecordingFeatures will enable recording features. +var EnableRecordingFeatures = true + // GetCommit will return an identifier used for identifying the point in time this build took place. func GetCommit() string { if GitCommit == "" { diff --git a/controllers/replays.go b/controllers/replays.go new file mode 100644 index 000000000..9bd94cd02 --- /dev/null +++ b/controllers/replays.go @@ -0,0 +1,78 @@ +package controllers + +import ( + "net/http" + "strings" + + "github.com/owncast/owncast/replays" + log "github.com/sirupsen/logrus" +) + +// GetReplays will return a list of all available replays. +func GetReplays(w http.ResponseWriter, r *http.Request) { + streams, err := replays.GetStreams() + if err != nil { + log.Errorln(err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + WriteResponse(w, streams) +} + +// GetReplay will return a playable content for a given stream Id. +func GetReplay(w http.ResponseWriter, r *http.Request) { + pathComponents := strings.Split(r.URL.Path, "/") + if len(pathComponents) == 3 { + // Return the master playlist for the requested stream + streamId := pathComponents[2] + getReplayMasterPlaylist(streamId, w) + return + } else if len(pathComponents) == 4 { + // Return the media playlist for the requested stream and output config + streamId := pathComponents[2] + outputConfigId := pathComponents[3] + getReplayMediaPlaylist(streamId, outputConfigId, w) + return + } + + BadRequestHandler(w, nil) +} + +// getReplayMasterPlaylist will return a complete replay of a stream as a HLS playlist. +// /api/replay/{streamId}. +func getReplayMasterPlaylist(streamId string, w http.ResponseWriter) { + playlistGenerator := replays.NewPlaylistGenerator() + playlist, err := playlistGenerator.GenerateMasterPlaylistForStream(streamId) + if err != nil { + log.Println(err) + } + + if playlist == nil { + w.WriteHeader(http.StatusNotFound) + return + } + + w.Header().Add("Content-Type", "application/x-mpegURL") + if _, err := w.Write(playlist.Encode().Bytes()); err != nil { + log.Errorln(err) + return + } +} + +// getReplayMediaPlaylist will return a media playlist for a given stream. +// /api/replay/{streamId}/{outputConfigId}. +func getReplayMediaPlaylist(streamId, outputConfigId string, w http.ResponseWriter) { + playlistGenerator := replays.NewPlaylistGenerator() + playlist, err := playlistGenerator.GenerateMediaPlaylistForStreamAndConfiguration(streamId, outputConfigId) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + + w.Header().Add("Content-Type", "application/x-mpegURL") + if _, err := w.Write(playlist.Encode().Bytes()); err != nil { + log.Errorln(err) + return + } +} diff --git a/core/core.go b/core/core.go index 5b6e28262..ef356feac 100644 --- a/core/core.go +++ b/core/core.go @@ -99,8 +99,11 @@ func createInitialOfflineState() error { func transitionToOfflineVideoStreamContent() { log.Traceln("Firing transcoder with offline stream state") - _transcoder := transcoder.NewTranscoder() - _transcoder.SetIdentifier("offline") + streamId := "offline" + _storage.SetStreamId(streamId) + handler.SetStreamId(streamId) + + _transcoder := transcoder.NewTranscoder(streamId) _transcoder.SetLatencyLevel(models.GetLatencyLevel(4)) _transcoder.SetIsEvent(true) @@ -127,7 +130,7 @@ func resetDirectories() { log.Trace("Resetting file directories to a clean slate.") // Wipe hls data directory - utils.CleanupDirectory(config.HLSStoragePath) + utils.CleanupDirectory(config.HLSStoragePath, config.EnableRecordingFeatures) // Remove the previous thumbnail logo := data.GetLogoPath() diff --git a/core/data/data.go b/core/data/data.go index 9fc1dc74c..40e039088 100644 --- a/core/data/data.go +++ b/core/data/data.go @@ -76,6 +76,7 @@ func SetupPersistence(file string) error { createWebhooksTable() createUsersTable(db) createAccessTokenTable(db) + createRecordingTables(db) if _, err := db.Exec(`CREATE TABLE IF NOT EXISTS config ( "key" string NOT NULL PRIMARY KEY, diff --git a/core/data/recording.go b/core/data/recording.go new file mode 100644 index 000000000..4eb61f5e5 --- /dev/null +++ b/core/data/recording.go @@ -0,0 +1,44 @@ +package data + +import "database/sql" + +func createRecordingTables(db *sql.DB) { + createSegmentsTableSQL := `CREATE TABLE IF NOT EXISTS video_segments ( + "id" string NOT NULL, + "stream_id" string NOT NULL, + "output_configuration_id" string NOT NULL, + "path" TEXT NOT NULL, + "timestamp" DATE DEFAULT CURRENT_TIMESTAMP NOT NULL, + PRIMARY KEY (id) + );CREATE INDEX video_segments_stream_id ON video_segments (stream_id);CREATE INDEX video_segments_stream_id_timestamp ON video_segments (stream_id,timestamp);` + + createVideoOutputConfigsTableSQL := `CREATE TABLE IF NOT EXISTS video_segment_output_configuration ( + "id" string NOT NULL, + "variant_id" string NOT NULL, + "name" string NOT NULL, + "stream_id" string NOT NULL, + "segment_duration" INTEGER NOT NULL, + "bitrate" INTEGER NOT NULL, + "framerate" INTEGER NOT NULL, + "resolution_width" INTEGER, + "resolution_height" INTEGER, + "timestamp" DATE DEFAULT CURRENT_TIMESTAMP NOT NULL, + PRIMARY KEY (id) + );CREATE INDEX video_segment_output_configuration_stream_id ON video_segment_output_configuration (stream_id);` + + createVideoStreamsTableSQL := `CREATE TABLE IF NOT EXISTS streams ( + "id" string NOT NULL, + "stream_title" TEXT, + "start_time" DATE NOT NULL, + "end_time" DATE, + PRIMARY KEY (id) + ); + CREATE INDEX streams_id ON streams (id); + CREATE INDEX streams_start_time ON streams (start_time); + CREATE INDEX streams_start_end_time ON streams (start_time,end_time); + ` + + MustExec(createSegmentsTableSQL, db) + MustExec(createVideoOutputConfigsTableSQL, db) + MustExec(createVideoStreamsTableSQL, db) +} diff --git a/core/offlineState.go b/core/offlineState.go index e1257c37f..0048a9b79 100644 --- a/core/offlineState.go +++ b/core/offlineState.go @@ -56,7 +56,7 @@ func makeVariantIndexOffline(index int, offlineFilePath string, offlineFilename log.Warnln(err) } - if _, err := _storage.Save(segmentFilePath, 0); err != nil { + if _, err := _storage.Save(segmentFilePath, segmentFilePath, 0); err != nil { log.Warnln(err) } @@ -65,7 +65,7 @@ func makeVariantIndexOffline(index int, offlineFilePath string, offlineFilename } else { createEmptyOfflinePlaylist(playlistFilePath, offlineFilename) } - if _, err := _storage.Save(playlistFilePath, 0); err != nil { + if _, err := _storage.Save(playlistFilePath, playlistFilePath, 0); err != nil { log.Warnln(err) } } diff --git a/core/storageproviders/local.go b/core/storageproviders/local.go index f71306ca6..0cb437208 100644 --- a/core/storageproviders/local.go +++ b/core/storageproviders/local.go @@ -10,31 +10,51 @@ import ( "github.com/owncast/owncast/config" "github.com/owncast/owncast/core/data" + "github.com/owncast/owncast/utils" ) // LocalStorage represents an instance of the local storage provider for HLS video. -type LocalStorage struct{} +type LocalStorage struct { + streamID string +} // NewLocalStorage returns a new LocalStorage instance. func NewLocalStorage() *LocalStorage { return &LocalStorage{} } +// SetStreamId sets the stream id for this storage provider. +func (s *LocalStorage) SetStreamId(streamID string) { + s.streamID = streamID +} + // Setup configures this storage provider. func (s *LocalStorage) Setup() error { return nil } // SegmentWritten is called when a single segment of video is written. -func (s *LocalStorage) SegmentWritten(localFilePath string) { - if _, err := s.Save(localFilePath, 0); err != nil { - log.Warnln(err) +func (s *LocalStorage) SegmentWritten(localFilePath string) (string, int, error) { + if s.streamID == "" { + log.Fatalln("stream id must be set when handling video segments") } + + destinationPath, err := s.Save(localFilePath, localFilePath, 0) + if err != nil { + log.Warnln(err) + return "", 0, err + } + + return destinationPath, 0, nil } // VariantPlaylistWritten is called when a variant hls playlist is written. func (s *LocalStorage) VariantPlaylistWritten(localFilePath string) { - if _, err := s.Save(localFilePath, 0); err != nil { + if s.streamID == "" { + log.Fatalln("stream id must be set when handling video playlists") + } + + if _, err := s.Save(localFilePath, localFilePath, 0); err != nil { log.Errorln(err) return } @@ -42,17 +62,34 @@ func (s *LocalStorage) VariantPlaylistWritten(localFilePath string) { // MasterPlaylistWritten is called when the master hls playlist is written. func (s *LocalStorage) MasterPlaylistWritten(localFilePath string) { - if _, err := s.Save(localFilePath, 0); err != nil { - log.Warnln(err) + if s.streamID == "" { + log.Fatalln("stream id must be set when handling video playlists") + } + + masterPlaylistDestinationLocation := filepath.Join(config.HLSStoragePath, "/stream.m3u8") + if err := rewriteLocalPlaylist(localFilePath, s.streamID, masterPlaylistDestinationLocation); err != nil { + log.Errorln(err) + return } } // Save will save a local filepath using the storage provider. -func (s *LocalStorage) Save(filePath string, retryCount int) (string, error) { - return filePath, nil +func (s *LocalStorage) Save(filePath, destinationPath string, retryCount int) (string, error) { + if filePath != destinationPath { + if err := utils.Move(filePath, destinationPath); err != nil { + return "", errors.Wrap(err, "unable to move file") + } + } + + return destinationPath, nil } func (s *LocalStorage) Cleanup() error { + // If we're recording, don't perform the cleanup. + if config.EnableRecordingFeatures { + return nil + } + // Determine how many files we should keep on disk maxNumber := data.GetStreamLatencyLevel().SegmentCount buffer := 10 @@ -84,6 +121,10 @@ func (s *LocalStorage) Cleanup() error { return nil } +func (s *LocalStorage) GetRemoteDestinationPathFromLocalFilePath(localFilePath string) string { + return localFilePath +} + func getAllFilesRecursive(baseDirectory string) (map[string][]os.FileInfo, error) { files := make(map[string][]os.FileInfo) diff --git a/core/storageproviders/rewriteLocalPlaylist.go b/core/storageproviders/rewriteLocalPlaylist.go index 5ba433ee2..82bb33a13 100644 --- a/core/storageproviders/rewriteLocalPlaylist.go +++ b/core/storageproviders/rewriteLocalPlaylist.go @@ -13,7 +13,7 @@ import ( ) // rewriteRemotePlaylist will take a local playlist and rewrite it to have absolute URLs to remote locations. -func rewriteRemotePlaylist(localFilePath, remoteServingEndpoint, pathPrefix string) error { +func rewriteRemotePlaylist(localFilePath, remoteServingEndpoint string) error { f, err := os.Open(localFilePath) // nolint if err != nil { log.Fatalln(err) @@ -25,14 +25,7 @@ func rewriteRemotePlaylist(localFilePath, remoteServingEndpoint, pathPrefix stri } for _, item := range p.Variants { - // Determine the final path to this playlist. - var finalPath string - if pathPrefix != "" { - finalPath = filepath.Join(pathPrefix, "/hls") - } else { - finalPath = "/hls" - } - item.URI = remoteServingEndpoint + filepath.Join(finalPath, item.URI) + item.URI = remoteServingEndpoint + filepath.Join("/hls", item.URI) } publicPath := filepath.Join(config.HLSStoragePath, filepath.Base(localFilePath)) @@ -41,3 +34,29 @@ func rewriteRemotePlaylist(localFilePath, remoteServingEndpoint, pathPrefix stri return playlist.WritePlaylist(newPlaylist, publicPath) } + +// rewriteLocalPlaylist will take a local master playlist and rewrite it to +// refer to the path that includes the stream ID. +func rewriteLocalPlaylist(localFilePath, streamID, destinationPath string) error { + f, err := os.Open(localFilePath) // nolint + if err != nil { + log.Fatalln(err) + } + + p := m3u8.NewMasterPlaylist() + if err := p.DecodeFrom(bufio.NewReader(f), false); err != nil { + log.Warnln(err) + } + + if streamID == "" { + log.Fatalln("stream id must be set when rewriting playlist contents") + } + + for _, item := range p.Variants { + item.URI = filepath.Join("/hls", streamID, item.URI) + } + + newPlaylist := p.String() + + return playlist.WritePlaylist(newPlaylist, destinationPath) +} diff --git a/core/storageproviders/s3Storage.go b/core/storageproviders/s3Storage.go index e73233305..682ac1708 100644 --- a/core/storageproviders/s3Storage.go +++ b/core/storageproviders/s3Storage.go @@ -10,6 +10,7 @@ import ( "strings" "time" + "github.com/owncast/owncast/config" "github.com/owncast/owncast/core/data" "github.com/owncast/owncast/utils" "github.com/pkg/errors" @@ -20,12 +21,11 @@ import ( "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" - - "github.com/owncast/owncast/config" ) // S3Storage is the s3 implementation of a storage provider. type S3Storage struct { + streamId string sess *session.Session s3Client *s3.S3 host string @@ -37,7 +37,6 @@ type S3Storage struct { s3AccessKey string s3Secret string s3ACL string - s3PathPrefix string s3ForcePathStyle bool // If we try to upload a playlist but it is not yet on disk @@ -54,6 +53,11 @@ func NewS3Storage() *S3Storage { } } +// SetStreamID sets the stream id for this storage provider. +func (s *S3Storage) SetStreamId(streamId string) { + s.streamId = streamId +} + // Setup sets up the s3 storage for saving the video to s3. func (s *S3Storage) Setup() error { log.Trace("Setting up S3 for external storage of video...") @@ -74,7 +78,6 @@ func (s *S3Storage) Setup() error { s.s3AccessKey = s3Config.AccessKey s.s3Secret = s3Config.Secret s.s3ACL = s3Config.ACL - s.s3PathPrefix = s3Config.PathPrefix s.s3ForcePathStyle = s3Config.ForcePathStyle s.sess = s.connectAWS() @@ -86,16 +89,19 @@ func (s *S3Storage) Setup() error { } // SegmentWritten is called when a single segment of video is written. -func (s *S3Storage) SegmentWritten(localFilePath string) { +func (s *S3Storage) SegmentWritten(localFilePath string) (string, int, error) { index := utils.GetIndexFromFilePath(localFilePath) performanceMonitorKey := "s3upload-" + index utils.StartPerformanceMonitor(performanceMonitorKey) // Upload the segment - if _, err := s.Save(localFilePath, 0); err != nil { + remoteDestinationPath := s.GetRemoteDestinationPathFromLocalFilePath(localFilePath) + remotePath, err := s.Save(localFilePath, remoteDestinationPath, 0) + if err != nil { log.Errorln(err) - return + return "", 0, err } + averagePerformance := utils.GetAveragePerformance(performanceMonitorKey) // Warn the user about long-running save operations @@ -109,14 +115,16 @@ func (s *S3Storage) SegmentWritten(localFilePath string) { // so the segments and the HLS playlist referencing // them are in sync. playlistPath := filepath.Join(filepath.Dir(localFilePath), "stream.m3u8") - - if _, err := s.Save(playlistPath, 0); err != nil { + playlistRemoteDestinationPath := s.GetRemoteDestinationPathFromLocalFilePath(playlistPath) + if _, err := s.Save(playlistPath, playlistRemoteDestinationPath, 0); err != nil { s.queuedPlaylistUpdates[playlistPath] = playlistPath if pErr, ok := err.(*os.PathError); ok { log.Debugln(pErr.Path, "does not yet exist locally when trying to upload to S3 storage.") - return + return remotePath, 0, pErr.Err } } + + return remotePath, 0, nil } // VariantPlaylistWritten is called when a variant hls playlist is written. @@ -125,7 +133,8 @@ func (s *S3Storage) VariantPlaylistWritten(localFilePath string) { // to make sure we're not referring to files in a playlist that don't // yet exist. See SegmentWritten. if _, ok := s.queuedPlaylistUpdates[localFilePath]; ok { - if _, err := s.Save(localFilePath, 0); err != nil { + remoteDestinationPath := s.GetRemoteDestinationPathFromLocalFilePath(localFilePath) + if _, err := s.Save(localFilePath, remoteDestinationPath, 0); err != nil { log.Errorln(err) s.queuedPlaylistUpdates[localFilePath] = localFilePath } @@ -136,41 +145,30 @@ func (s *S3Storage) VariantPlaylistWritten(localFilePath string) { // MasterPlaylistWritten is called when the master hls playlist is written. func (s *S3Storage) MasterPlaylistWritten(localFilePath string) { // Rewrite the playlist to use absolute remote S3 URLs - if err := rewriteRemotePlaylist(localFilePath, s.host, s.s3PathPrefix); err != nil { + if err := rewriteRemotePlaylist(localFilePath, s.host); err != nil { log.Warnln(err) } } // Save saves the file to the s3 bucket. -func (s *S3Storage) Save(filePath string, retryCount int) (string, error) { - file, err := os.Open(filePath) // nolint +func (s *S3Storage) Save(localFilePath, remoteDestinationPath string, retryCount int) (string, error) { + file, err := os.Open(localFilePath) // nolint if err != nil { return "", err } defer file.Close() - // Convert the local path to the variant/file path by stripping the local storage location. - normalizedPath := strings.TrimPrefix(filePath, config.HLSStoragePath) - // Build the remote path by adding the "hls" path prefix. - remotePath := strings.Join([]string{"hls", normalizedPath}, "") - - // If a custom path prefix is set prepend it. - if s.s3PathPrefix != "" { - prefix := strings.TrimPrefix(s.s3PathPrefix, "/") - remotePath = strings.Join([]string{prefix, remotePath}, "/") - } - - maxAgeSeconds := utils.GetCacheDurationSecondsForPath(filePath) + maxAgeSeconds := utils.GetCacheDurationSecondsForPath(localFilePath) cacheControlHeader := fmt.Sprintf("max-age=%d", maxAgeSeconds) uploadInput := &s3manager.UploadInput{ - Bucket: aws.String(s.s3Bucket), // Bucket to be used - Key: aws.String(remotePath), // Name of the file to be saved - Body: file, // File + Bucket: aws.String(s.s3Bucket), // Bucket to be used + Key: aws.String(remoteDestinationPath), // Name of the file to be saved + Body: file, // File CacheControl: &cacheControlHeader, } - if path.Ext(filePath) == ".m3u8" { + if path.Ext(localFilePath) == ".m3u8" { noCacheHeader := "no-cache, no-store, must-revalidate" contentType := "application/x-mpegURL" @@ -190,22 +188,27 @@ func (s *S3Storage) Save(filePath string, retryCount int) (string, error) { log.Traceln("error uploading segment", err.Error()) if retryCount < 4 { log.Traceln("Retrying...") - return s.Save(filePath, retryCount+1) + return s.Save(localFilePath, remoteDestinationPath, retryCount+1) } // Upload failure. Remove the local file. - s.removeLocalFile(filePath) + s.removeLocalFile(localFilePath) - return "", fmt.Errorf("Giving up uploading %s to object storage %s", filePath, s.s3Endpoint) + return "", fmt.Errorf("Giving up uploading %s to object storage %s", localFilePath, s.s3Endpoint) } // Upload success. Remove the local file. - s.removeLocalFile(filePath) + s.removeLocalFile(localFilePath) return response.Location, nil } func (s *S3Storage) Cleanup() error { + // If we're recording, don't perform the cleanup. + if config.EnableRecordingFeatures { + return nil + } + // Determine how many files we should keep on S3 storage maxNumber := data.GetStreamLatencyLevel().SegmentCount buffer := 20 @@ -329,6 +332,16 @@ func (s *S3Storage) retrieveAllVideoSegments() ([]s3object, error) { return allObjects, nil } +func (s *S3Storage) GetRemoteDestinationPathFromLocalFilePath(localFilePath string) string { + // Convert the local path to the variant/file path by stripping the local storage location. + normalizedPath := strings.TrimPrefix(localFilePath, config.HLSStoragePath) + + // Build the remote path by adding the "hls" path prefix. + remoteDestionationPath := strings.Join([]string{"hls", normalizedPath}, "") + + return remoteDestionationPath +} + type s3object struct { key string lastModified time.Time diff --git a/core/streamState.go b/core/streamState.go index 05eca220b..f2ba9493a 100644 --- a/core/streamState.go +++ b/core/streamState.go @@ -3,9 +3,11 @@ package core import ( "context" "io" + "path/filepath" "time" log "github.com/sirupsen/logrus" + "github.com/teris-io/shortid" "github.com/owncast/owncast/activitypub" "github.com/owncast/owncast/config" @@ -39,36 +41,34 @@ func setStreamAsConnected(rtmpOut *io.PipeReader) { _stats.LastConnectTime = &now _stats.SessionMaxViewerCount = 0 + streamId := shortid.MustGenerate() + fileWriter.SetStreamID(streamId) + _currentBroadcast = &models.CurrentBroadcast{ + StreamID: streamId, LatencyLevel: data.GetStreamLatencyLevel(), OutputSettings: data.GetStreamOutputVariants(), } StopOfflineCleanupTimer() - startOnlineCleanupTimer() + + if !config.EnableRecordingFeatures { + startOnlineCleanupTimer() + } if _yp != nil { go _yp.Start() } - segmentPath := config.HLSStoragePath - if err := setupStorage(); err != nil { log.Fatalln("failed to setup the storage", err) } - go func() { - _transcoder = transcoder.NewTranscoder() - _transcoder.TranscoderCompleted = func(error) { - SetStreamAsDisconnected() - _transcoder = nil - _currentBroadcast = nil - } - _transcoder.SetStdin(rtmpOut) - _transcoder.Start(true) - }() + setupVideoComponentsForId(streamId) + setupLiveTranscoderForId(streamId, rtmpOut) go webhooks.SendStreamStatusEvent(models.StreamStarted) + segmentPath := filepath.Join(config.HLSStoragePath, streamId) transcoder.StartThumbnailGenerator(segmentPath, data.FindHighestVideoQualityIndex(_currentBroadcast.OutputSettings)) _ = chat.SendSystemAction("Stay tuned, the stream is **starting**!", true) @@ -100,6 +100,7 @@ func SetStreamAsDisconnected() { return } + handler.StreamEnded() transcoder.StopThumbnailGenerator() rtmp.Disconnect() diff --git a/core/transcoder/fileWriterReceiverService.go b/core/transcoder/fileWriterReceiverService.go index 08c1427cf..c764992a6 100644 --- a/core/transcoder/fileWriterReceiverService.go +++ b/core/transcoder/fileWriterReceiverService.go @@ -26,6 +26,7 @@ type FileWriterReceiverServiceCallback interface { // as it can send HTTP requests to this service with the results. type FileWriterReceiverService struct { callbacks FileWriterReceiverServiceCallback + streamId string } // SetupFileWriterReceiverService will start listening for transcoder responses. @@ -53,6 +54,10 @@ func (s *FileWriterReceiverService) SetupFileWriterReceiverService(callbacks Fil }() } +func (s *FileWriterReceiverService) SetStreamID(streamID string) { + s.streamId = streamID +} + func (s *FileWriterReceiverService) uploadHandler(w http.ResponseWriter, r *http.Request) { if r.Method != "PUT" { http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) @@ -79,7 +84,7 @@ func (s *FileWriterReceiverService) uploadHandler(w http.ResponseWriter, r *http } func (s *FileWriterReceiverService) fileWritten(path string) { - if utils.GetRelativePathFromAbsolutePath(path) == "hls/stream.m3u8" { + if utils.GetRelativePathFromAbsolutePath(path) == s.streamId+"/stream.m3u8" { s.callbacks.MasterPlaylistWritten(path) } else if strings.HasSuffix(path, ".ts") { s.callbacks.SegmentWritten(path) diff --git a/core/transcoder/hlsHandler.go b/core/transcoder/hlsHandler.go index 0a819ff95..3b931700c 100644 --- a/core/transcoder/hlsHandler.go +++ b/core/transcoder/hlsHandler.go @@ -1,17 +1,43 @@ package transcoder import ( + "github.com/owncast/owncast/config" "github.com/owncast/owncast/models" + "github.com/owncast/owncast/replays" + log "github.com/sirupsen/logrus" ) // HLSHandler gets told about available HLS playlists and segments. type HLSHandler struct { - Storage models.StorageProvider + Storage models.StorageProvider + Recorder *replays.HLSRecorder +} + +// StreamEnded is called when a stream is ended so the end time can be noted +// in the stream's metadata. +func (h *HLSHandler) StreamEnded() { + if config.EnableRecordingFeatures { + h.Recorder.StreamEnded() + } +} + +func (h *HLSHandler) SetStreamId(streamId string) { + h.Storage.SetStreamId(streamId) + h.Recorder = replays.NewRecording(streamId) } // SegmentWritten is fired when a HLS segment is written to disk. func (h *HLSHandler) SegmentWritten(localFilePath string) { - h.Storage.SegmentWritten(localFilePath) + remotePath, _, err := h.Storage.SegmentWritten(localFilePath) + if err != nil { + log.Errorln(err) + } + + if h.Recorder != nil { + h.Recorder.SegmentWritten(remotePath) + } else { + log.Debugln("No HLS recorder available to notify of segment written.") + } } // VariantPlaylistWritten is fired when a HLS variant playlist is written to disk. diff --git a/core/transcoder/transcoder.go b/core/transcoder/transcoder.go index ff1bbaa72..f39eac92b 100644 --- a/core/transcoder/transcoder.go +++ b/core/transcoder/transcoder.go @@ -27,9 +27,9 @@ type Transcoder struct { stdin *io.PipeReader TranscoderCompleted func(error) + StreamID string playlistOutputPath string ffmpegPath string - segmentIdentifier string internalListenerPort string input string segmentOutputPath string @@ -118,7 +118,9 @@ func (t *Transcoder) Start(shouldLog bool) { if shouldLog { log.Infof("Processing video using codec %s with %d output qualities configured.", t.codec.DisplayName(), len(t.variants)) } - createVariantDirectories() + + // Make directory for this stream. + createVariantDirectories(t.StreamID) if config.EnableDebugFeatures { log.Println(command) @@ -181,8 +183,8 @@ func (t *Transcoder) getString() string { hlsOptionFlags = append(hlsOptionFlags, "append_list") } - if t.segmentIdentifier == "" { - t.segmentIdentifier = shortid.MustGenerate() + if t.StreamID == "" { + t.StreamID = shortid.MustGenerate() } hlsEventString := "" @@ -197,6 +199,7 @@ func (t *Transcoder) getString() string { if len(hlsOptionFlags) > 0 { hlsOptionsString = "-hls_flags " + strings.Join(hlsOptionFlags, "+") } + ffmpegFlags := []string{ fmt.Sprintf(`FFREPORT=file="%s":level=32`, logging.GetTranscoderLogFilePath()), t.ffmpegPath, @@ -226,11 +229,11 @@ func (t *Transcoder) getString() string { // Filenames "-master_pl_name", "stream.m3u8", - "-hls_segment_filename", localListenerAddress + "/%v/stream-" + t.segmentIdentifier + "-%d.ts", // Send HLS segments back to us over HTTP + "-hls_segment_filename", localListenerAddress + "/" + t.StreamID + "/%v/stream-" + t.StreamID + "-%d.ts", // Send HLS segments back to us over HTTP "-max_muxing_queue_size", "400", // Workaround for Too many packets error: https://trac.ffmpeg.org/ticket/6375?cversion=0 - "-method PUT", // HLS results sent back to us will be over PUTs - localListenerAddress + "/%v/stream.m3u8", // Send HLS playlists back to us over HTTP + "-method PUT", // HLS results sent back to us will be over PUTs + localListenerAddress + "/" + t.StreamID + "/%v/stream.m3u8", // Send HLS playlists back to us over HTTP } return strings.Join(ffmpegFlags, " ") @@ -272,10 +275,11 @@ func getVariantFromConfigQuality(quality models.StreamOutputVariant, index int) } // NewTranscoder will return a new Transcoder, populated by the config. -func NewTranscoder() *Transcoder { +func NewTranscoder(streamID string) *Transcoder { ffmpegPath := utils.ValidatedFfmpegPath(data.GetFfMpegPath()) transcoder := new(Transcoder) + transcoder.StreamID = streamID transcoder.ffmpegPath = ffmpegPath transcoder.internalListenerPort = config.InternalHLSListenerPort @@ -438,9 +442,9 @@ func (t *Transcoder) SetOutputPath(output string) { t.segmentOutputPath = output } -// SetIdentifier enables appending a unique identifier to segment file name. -func (t *Transcoder) SetIdentifier(output string) { - t.segmentIdentifier = output +// SetStreamID sets a unique identifier for the currently transcoding stream. +func (t *Transcoder) SetStreamID(id string) { + t.StreamID = id } // SetInternalHTTPPort will set the port to be used for internal communication. diff --git a/core/transcoder/transcoder_nvenc_test.go b/core/transcoder/transcoder_nvenc_test.go index 32b975794..e5dc55e9a 100644 --- a/core/transcoder/transcoder_nvenc_test.go +++ b/core/transcoder/transcoder_nvenc_test.go @@ -15,7 +15,7 @@ func TestFFmpegNvencCommand(t *testing.T) { transcoder.ffmpegPath = filepath.Join("fake", "path", "ffmpeg") transcoder.SetInput("fakecontent.flv") transcoder.SetOutputPath("fakeOutput") - transcoder.SetIdentifier("jdoieGg") + transcoder.SetStreamID("jdFsdfzGg") transcoder.SetInternalHTTPPort("8123") transcoder.SetCodec(codec.Name()) transcoder.currentLatencyLevel = latencyLevel @@ -42,7 +42,7 @@ func TestFFmpegNvencCommand(t *testing.T) { cmd := transcoder.getString() expectedLogPath := filepath.Join("data", "logs", "transcoder.log") - expected := `FFREPORT=file="` + expectedLogPath + `":level=32 ` + transcoder.ffmpegPath + ` -hide_banner -loglevel warning -hwaccel cuda -fflags +genpts -flags +cgop -i fakecontent.flv -map v:0 -c:v:0 h264_nvenc -b:v:0 1008k -maxrate:v:0 1088k -g:v:0 90 -keyint_min:v:0 90 -r:v:0 30 -tune:v:0 ll -map a:0? -c:a:0 copy -preset p3 -map v:0 -c:v:1 h264_nvenc -b:v:1 3308k -maxrate:v:1 3572k -g:v:1 72 -keyint_min:v:1 72 -r:v:1 24 -tune:v:1 ll -map a:0? -c:a:1 copy -preset p5 -map v:0 -c:v:2 copy -map a:0? -c:a:2 copy -preset p1 -var_stream_map "v:0,a:0 v:1,a:1 v:2,a:2 " -f hls -hls_time 3 -hls_list_size 10 -hls_flags program_date_time+independent_segments+omit_endlist -segment_format_options mpegts_flags=mpegts_copyts=1 -pix_fmt yuv420p -sc_threshold 0 -master_pl_name stream.m3u8 -hls_segment_filename http://127.0.0.1:8123/%v/stream-jdoieGg-%d.ts -max_muxing_queue_size 400 -method PUT http://127.0.0.1:8123/%v/stream.m3u8` + expected := `FFREPORT=file="` + expectedLogPath + `":level=32 ` + transcoder.ffmpegPath + ` -hide_banner -loglevel warning -hwaccel cuda -fflags +genpts -flags +cgop -i fakecontent.flv -map v:0 -c:v:0 h264_nvenc -b:v:0 1008k -maxrate:v:0 1088k -g:v:0 90 -keyint_min:v:0 90 -r:v:0 30 -tune:v:0 ll -map a:0? -c:a:0 copy -preset p3 -map v:0 -c:v:1 h264_nvenc -b:v:1 3308k -maxrate:v:1 3572k -g:v:1 72 -keyint_min:v:1 72 -r:v:1 24 -tune:v:1 ll -map a:0? -c:a:1 copy -preset p5 -map v:0 -c:v:2 copy -map a:0? -c:a:2 copy -preset p1 -var_stream_map "v:0,a:0 v:1,a:1 v:2,a:2 " -f hls -hls_time 3 -hls_list_size 10 -hls_flags program_date_time+independent_segments+omit_endlist -segment_format_options mpegts_flags=mpegts_copyts=1 -pix_fmt yuv420p -sc_threshold 0 -master_pl_name stream.m3u8 -hls_segment_filename http://127.0.0.1:8123/jdFsdfzGg/%v/stream-jdFsdfzGg-%d.ts -max_muxing_queue_size 400 -method PUT http://127.0.0.1:8123/jdFsdfzGg/%v/stream.m3u8` if cmd != expected { t.Errorf("ffmpeg command does not match expected.\nGot %s\n, want: %s", cmd, expected) diff --git a/core/transcoder/transcoder_omx_test.go b/core/transcoder/transcoder_omx_test.go index 93082e356..bbf0ff09d 100644 --- a/core/transcoder/transcoder_omx_test.go +++ b/core/transcoder/transcoder_omx_test.go @@ -15,7 +15,7 @@ func TestFFmpegOmxCommand(t *testing.T) { transcoder.ffmpegPath = filepath.Join("fake", "path", "ffmpeg") transcoder.SetInput("fakecontent.flv") transcoder.SetOutputPath("fakeOutput") - transcoder.SetIdentifier("jdFsdfzGg") + transcoder.SetStreamID("jdFsdfzGg") transcoder.SetInternalHTTPPort("8123") transcoder.SetCodec(codec.Name()) transcoder.currentLatencyLevel = latencyLevel @@ -42,7 +42,7 @@ func TestFFmpegOmxCommand(t *testing.T) { cmd := transcoder.getString() expectedLogPath := filepath.Join("data", "logs", "transcoder.log") - expected := `FFREPORT=file="` + expectedLogPath + `":level=32 ` + transcoder.ffmpegPath + ` -hide_banner -loglevel warning -fflags +genpts -flags +cgop -i fakecontent.flv -map v:0 -c:v:0 h264_omx -b:v:0 1008k -maxrate:v:0 1088k -g:v:0 90 -keyint_min:v:0 90 -r:v:0 30 -map a:0? -c:a:0 copy -preset veryfast -map v:0 -c:v:1 h264_omx -b:v:1 3308k -maxrate:v:1 3572k -g:v:1 72 -keyint_min:v:1 72 -r:v:1 24 -map a:0? -c:a:1 copy -preset fast -map v:0 -c:v:2 copy -map a:0? -c:a:2 copy -preset ultrafast -var_stream_map "v:0,a:0 v:1,a:1 v:2,a:2 " -f hls -hls_time 3 -hls_list_size 10 -hls_flags program_date_time+independent_segments+omit_endlist -segment_format_options mpegts_flags=mpegts_copyts=1 -tune zerolatency -pix_fmt yuv420p -sc_threshold 0 -master_pl_name stream.m3u8 -hls_segment_filename http://127.0.0.1:8123/%v/stream-jdFsdfzGg-%d.ts -max_muxing_queue_size 400 -method PUT http://127.0.0.1:8123/%v/stream.m3u8` + expected := `FFREPORT=file="` + expectedLogPath + `":level=32 ` + transcoder.ffmpegPath + ` -hide_banner -loglevel warning -fflags +genpts -flags +cgop -i fakecontent.flv -map v:0 -c:v:0 h264_omx -b:v:0 1008k -maxrate:v:0 1088k -g:v:0 90 -keyint_min:v:0 90 -r:v:0 30 -map a:0? -c:a:0 copy -preset veryfast -map v:0 -c:v:1 h264_omx -b:v:1 3308k -maxrate:v:1 3572k -g:v:1 72 -keyint_min:v:1 72 -r:v:1 24 -map a:0? -c:a:1 copy -preset fast -map v:0 -c:v:2 copy -map a:0? -c:a:2 copy -preset ultrafast -var_stream_map "v:0,a:0 v:1,a:1 v:2,a:2 " -f hls -hls_time 3 -hls_list_size 10 -hls_flags program_date_time+independent_segments+omit_endlist -segment_format_options mpegts_flags=mpegts_copyts=1 -tune zerolatency -pix_fmt yuv420p -sc_threshold 0 -master_pl_name stream.m3u8 -hls_segment_filename http://127.0.0.1:8123/jdFsdfzGg/%v/stream-jdFsdfzGg-%d.ts -max_muxing_queue_size 400 -method PUT http://127.0.0.1:8123/jdFsdfzGg/%v/stream.m3u8` if cmd != expected { t.Errorf("ffmpeg command does not match expected.\nGot %s\n, want: %s", cmd, expected) diff --git a/core/transcoder/transcoder_vaapi_test.go b/core/transcoder/transcoder_vaapi_test.go index affa7fcac..f1dc0739e 100644 --- a/core/transcoder/transcoder_vaapi_test.go +++ b/core/transcoder/transcoder_vaapi_test.go @@ -15,7 +15,7 @@ func TestFFmpegVaapiCommand(t *testing.T) { transcoder.ffmpegPath = filepath.Join("fake", "path", "ffmpeg") transcoder.SetInput("fakecontent.flv") transcoder.SetOutputPath("fakeOutput") - transcoder.SetIdentifier("jdofFGg") + transcoder.SetStreamID("jdFsdfzGg") transcoder.SetInternalHTTPPort("8123") transcoder.SetCodec(codec.Name()) transcoder.currentLatencyLevel = latencyLevel @@ -42,7 +42,7 @@ func TestFFmpegVaapiCommand(t *testing.T) { cmd := transcoder.getString() expectedLogPath := filepath.Join("data", "logs", "transcoder.log") - expected := `FFREPORT=file="` + expectedLogPath + `":level=32 ` + transcoder.ffmpegPath + ` -hide_banner -loglevel warning -hwaccel vaapi -hwaccel_output_format vaapi -vaapi_device /dev/dri/renderD128 -fflags +genpts -flags +cgop -i fakecontent.flv -map v:0 -c:v:0 h264_vaapi -b:v:0 1008k -maxrate:v:0 1088k -g:v:0 90 -keyint_min:v:0 90 -r:v:0 30 -map a:0? -c:a:0 copy -preset veryfast -map v:0 -c:v:1 h264_vaapi -b:v:1 3308k -maxrate:v:1 3572k -g:v:1 72 -keyint_min:v:1 72 -r:v:1 24 -map a:0? -c:a:1 copy -preset fast -map v:0 -c:v:2 copy -map a:0? -c:a:2 copy -preset ultrafast -var_stream_map "v:0,a:0 v:1,a:1 v:2,a:2 " -f hls -hls_time 3 -hls_list_size 10 -hls_flags program_date_time+independent_segments+omit_endlist -segment_format_options mpegts_flags=mpegts_copyts=1 -pix_fmt vaapi_vld -sc_threshold 0 -master_pl_name stream.m3u8 -hls_segment_filename http://127.0.0.1:8123/%v/stream-jdofFGg-%d.ts -max_muxing_queue_size 400 -method PUT http://127.0.0.1:8123/%v/stream.m3u8` + expected := `FFREPORT=file="` + expectedLogPath + `":level=32 ` + transcoder.ffmpegPath + ` -hide_banner -loglevel warning -hwaccel vaapi -hwaccel_output_format vaapi -vaapi_device /dev/dri/renderD128 -fflags +genpts -flags +cgop -i fakecontent.flv -map v:0 -c:v:0 h264_vaapi -b:v:0 1008k -maxrate:v:0 1088k -g:v:0 90 -keyint_min:v:0 90 -r:v:0 30 -map a:0? -c:a:0 copy -preset veryfast -map v:0 -c:v:1 h264_vaapi -b:v:1 3308k -maxrate:v:1 3572k -g:v:1 72 -keyint_min:v:1 72 -r:v:1 24 -map a:0? -c:a:1 copy -preset fast -map v:0 -c:v:2 copy -map a:0? -c:a:2 copy -preset ultrafast -var_stream_map "v:0,a:0 v:1,a:1 v:2,a:2 " -f hls -hls_time 3 -hls_list_size 10 -hls_flags program_date_time+independent_segments+omit_endlist -segment_format_options mpegts_flags=mpegts_copyts=1 -pix_fmt vaapi_vld -sc_threshold 0 -master_pl_name stream.m3u8 -hls_segment_filename http://127.0.0.1:8123/jdFsdfzGg/%v/stream-jdFsdfzGg-%d.ts -max_muxing_queue_size 400 -method PUT http://127.0.0.1:8123/jdFsdfzGg/%v/stream.m3u8` if cmd != expected { t.Errorf("ffmpeg command does not match expected.\nGot %s\n, want: %s", cmd, expected) diff --git a/core/transcoder/transcoder_videotoolbox_test.go b/core/transcoder/transcoder_videotoolbox_test.go index caa67faf4..f12e82406 100644 --- a/core/transcoder/transcoder_videotoolbox_test.go +++ b/core/transcoder/transcoder_videotoolbox_test.go @@ -15,7 +15,7 @@ func TestFFmpegVideoToolboxCommand(t *testing.T) { transcoder.ffmpegPath = filepath.Join("fake", "path", "ffmpeg") transcoder.SetInput("fakecontent.flv") transcoder.SetOutputPath("fakeOutput") - transcoder.SetIdentifier("jdFsdfzGg") + transcoder.SetStreamID("jdFsdfzGg") transcoder.SetInternalHTTPPort("8123") transcoder.SetCodec(codec.Name()) transcoder.currentLatencyLevel = latencyLevel @@ -42,7 +42,7 @@ func TestFFmpegVideoToolboxCommand(t *testing.T) { cmd := transcoder.getString() expectedLogPath := filepath.Join("data", "logs", "transcoder.log") - expected := `FFREPORT=file="` + expectedLogPath + `":level=32 ` + transcoder.ffmpegPath + ` -hide_banner -loglevel warning -fflags +genpts -flags +cgop -i fakecontent.flv -map v:0 -c:v:0 h264_videotoolbox -b:v:0 1008k -maxrate:v:0 1088k -g:v:0 90 -keyint_min:v:0 90 -r:v:0 30 -realtime true -map a:0? -c:a:0 copy -preset veryfast -map v:0 -c:v:1 h264_videotoolbox -b:v:1 3308k -maxrate:v:1 3572k -g:v:1 72 -keyint_min:v:1 72 -r:v:1 24 -map a:0? -c:a:1 copy -preset fast -map v:0 -c:v:2 copy -map a:0? -c:a:2 copy -preset ultrafast -var_stream_map "v:0,a:0 v:1,a:1 v:2,a:2 " -f hls -hls_time 3 -hls_list_size 10 -hls_flags program_date_time+independent_segments+omit_endlist -segment_format_options mpegts_flags=mpegts_copyts=1 -pix_fmt nv12 -sc_threshold 0 -master_pl_name stream.m3u8 -hls_segment_filename http://127.0.0.1:8123/%v/stream-jdFsdfzGg-%d.ts -max_muxing_queue_size 400 -method PUT http://127.0.0.1:8123/%v/stream.m3u8` + expected := `FFREPORT=file="` + expectedLogPath + `":level=32 ` + transcoder.ffmpegPath + ` -hide_banner -loglevel warning -fflags +genpts -flags +cgop -i fakecontent.flv -map v:0 -c:v:0 h264_videotoolbox -b:v:0 1008k -maxrate:v:0 1088k -g:v:0 90 -keyint_min:v:0 90 -r:v:0 30 -realtime true -map a:0? -c:a:0 copy -preset veryfast -map v:0 -c:v:1 h264_videotoolbox -b:v:1 3308k -maxrate:v:1 3572k -g:v:1 72 -keyint_min:v:1 72 -r:v:1 24 -map a:0? -c:a:1 copy -preset fast -map v:0 -c:v:2 copy -map a:0? -c:a:2 copy -preset ultrafast -var_stream_map "v:0,a:0 v:1,a:1 v:2,a:2 " -f hls -hls_time 3 -hls_list_size 10 -hls_flags program_date_time+independent_segments+omit_endlist -segment_format_options mpegts_flags=mpegts_copyts=1 -pix_fmt nv12 -sc_threshold 0 -master_pl_name stream.m3u8 -hls_segment_filename http://127.0.0.1:8123/jdFsdfzGg/%v/stream-jdFsdfzGg-%d.ts -max_muxing_queue_size 400 -method PUT http://127.0.0.1:8123/jdFsdfzGg/%v/stream.m3u8` if cmd != expected { t.Errorf("ffmpeg command does not match expected.\nGot %s\n, want: %s", cmd, expected) diff --git a/core/transcoder/transcoder_x264_test.go b/core/transcoder/transcoder_x264_test.go index e7ddf6a59..aed18fa20 100644 --- a/core/transcoder/transcoder_x264_test.go +++ b/core/transcoder/transcoder_x264_test.go @@ -15,7 +15,7 @@ func TestFFmpegx264Command(t *testing.T) { transcoder.ffmpegPath = filepath.Join("fake", "path", "ffmpeg") transcoder.SetInput("fakecontent.flv") transcoder.SetOutputPath("fakeOutput") - transcoder.SetIdentifier("jdofFGg") + transcoder.SetStreamID("jdFsdfzGg") transcoder.SetInternalHTTPPort("8123") transcoder.SetCodec(codec.Name()) transcoder.currentLatencyLevel = latencyLevel @@ -42,7 +42,7 @@ func TestFFmpegx264Command(t *testing.T) { cmd := transcoder.getString() expectedLogPath := filepath.Join("data", "logs", "transcoder.log") - expected := `FFREPORT=file="` + expectedLogPath + `":level=32 ` + transcoder.ffmpegPath + ` -hide_banner -loglevel warning -fflags +genpts -flags +cgop -i fakecontent.flv -map v:0 -c:v:0 libx264 -b:v:0 1008k -maxrate:v:0 1088k -g:v:0 90 -keyint_min:v:0 90 -r:v:0 30 -x264-params:v:0 "scenecut=0:open_gop=0" -bufsize:v:0 1088k -profile:v:0 high -map a:0? -c:a:0 copy -preset veryfast -map v:0 -c:v:1 libx264 -b:v:1 3308k -maxrate:v:1 3572k -g:v:1 72 -keyint_min:v:1 72 -r:v:1 24 -x264-params:v:1 "scenecut=0:open_gop=0" -bufsize:v:1 3572k -profile:v:1 high -map a:0? -c:a:1 copy -preset fast -map v:0 -c:v:2 copy -map a:0? -c:a:2 copy -preset ultrafast -var_stream_map "v:0,a:0 v:1,a:1 v:2,a:2 " -f hls -hls_time 3 -hls_list_size 10 -hls_flags program_date_time+independent_segments+omit_endlist -segment_format_options mpegts_flags=mpegts_copyts=1 -tune zerolatency -pix_fmt yuv420p -sc_threshold 0 -master_pl_name stream.m3u8 -hls_segment_filename http://127.0.0.1:8123/%v/stream-jdofFGg-%d.ts -max_muxing_queue_size 400 -method PUT http://127.0.0.1:8123/%v/stream.m3u8` + expected := `FFREPORT=file="` + expectedLogPath + `":level=32 ` + transcoder.ffmpegPath + ` -hide_banner -loglevel warning -fflags +genpts -flags +cgop -i fakecontent.flv -map v:0 -c:v:0 libx264 -b:v:0 1008k -maxrate:v:0 1088k -g:v:0 90 -keyint_min:v:0 90 -r:v:0 30 -x264-params:v:0 "scenecut=0:open_gop=0" -bufsize:v:0 1088k -profile:v:0 high -map a:0? -c:a:0 copy -preset veryfast -map v:0 -c:v:1 libx264 -b:v:1 3308k -maxrate:v:1 3572k -g:v:1 72 -keyint_min:v:1 72 -r:v:1 24 -x264-params:v:1 "scenecut=0:open_gop=0" -bufsize:v:1 3572k -profile:v:1 high -map a:0? -c:a:1 copy -preset fast -map v:0 -c:v:2 copy -map a:0? -c:a:2 copy -preset ultrafast -var_stream_map "v:0,a:0 v:1,a:1 v:2,a:2 " -f hls -hls_time 3 -hls_list_size 10 -hls_flags program_date_time+independent_segments+omit_endlist -segment_format_options mpegts_flags=mpegts_copyts=1 -tune zerolatency -pix_fmt yuv420p -sc_threshold 0 -master_pl_name stream.m3u8 -hls_segment_filename http://127.0.0.1:8123/jdFsdfzGg/%v/stream-jdFsdfzGg-%d.ts -max_muxing_queue_size 400 -method PUT http://127.0.0.1:8123/jdFsdfzGg/%v/stream.m3u8` if cmd != expected { t.Errorf("ffmpeg command does not match expected.\nGot %s\n, want: %s", cmd, expected) diff --git a/core/transcoder/utils.go b/core/transcoder/utils.go index d89cbfe9d..a9e4ff631 100644 --- a/core/transcoder/utils.go +++ b/core/transcoder/utils.go @@ -13,8 +13,10 @@ import ( log "github.com/sirupsen/logrus" ) -var _lastTranscoderLogMessage = "" -var l = &sync.RWMutex{} +var ( + _lastTranscoderLogMessage = "" + l = &sync.RWMutex{} +) var errorMap = map[string]string{ "Unrecognized option 'vaapi_device'": "you are likely trying to utilize a vaapi codec, but your version of ffmpeg or your hardware doesn't support it. change your codec to libx264 and restart your stream", @@ -94,20 +96,20 @@ func handleTranscoderMessage(message string) { _lastTranscoderLogMessage = message } -func createVariantDirectories() { +func createVariantDirectories(streamID string) { // Create private hls data dirs - utils.CleanupDirectory(config.HLSStoragePath) + utils.CleanupDirectory(config.HLSStoragePath, config.EnableRecordingFeatures) if len(data.GetStreamOutputVariants()) != 0 { for index := range data.GetStreamOutputVariants() { - if err := os.MkdirAll(path.Join(config.HLSStoragePath, strconv.Itoa(index)), 0750); err != nil { + if err := os.MkdirAll(path.Join(config.HLSStoragePath, streamID, strconv.Itoa(index)), 0o750); err != nil { log.Fatalln(err) } } } else { dir := path.Join(config.HLSStoragePath, strconv.Itoa(0)) log.Traceln("Creating", dir) - if err := os.MkdirAll(dir, 0750); err != nil { + if err := os.MkdirAll(dir, 0o750); err != nil { log.Fatalln(err) } } diff --git a/core/video.go b/core/video.go new file mode 100644 index 000000000..fbf2582a9 --- /dev/null +++ b/core/video.go @@ -0,0 +1,26 @@ +package core + +import ( + "io" + + "github.com/owncast/owncast/core/transcoder" +) + +func setupVideoComponentsForId(streamId string) { +} + +func setupLiveTranscoderForId(streamId string, rtmpOut *io.PipeReader) { + _storage.SetStreamId(streamId) + handler.SetStreamId(streamId) + + go func() { + _transcoder = transcoder.NewTranscoder(streamId) + _transcoder.TranscoderCompleted = func(error) { + SetStreamAsDisconnected() + _transcoder = nil + _currentBroadcast = nil + } + _transcoder.SetStdin(rtmpOut) + _transcoder.Start(true) + }() +} diff --git a/db/db.go b/db/db.go index 21fc4346d..ff9718337 100644 --- a/db/db.go +++ b/db/db.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.15.0 +// sqlc v1.19.1 package db diff --git a/db/models.go b/db/models.go index f5c13b7b2..fdb3fdae9 100644 --- a/db/models.go +++ b/db/models.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.15.0 +// sqlc v1.19.1 package db @@ -72,6 +72,13 @@ type Notification struct { CreatedAt sql.NullTime } +type Stream struct { + ID string + StreamTitle sql.NullString + StartTime time.Time + EndTime sql.NullTime +} + type User struct { ID string DisplayName string @@ -91,3 +98,24 @@ type UserAccessToken struct { UserID string Timestamp time.Time } + +type VideoSegment struct { + ID string + StreamID string + OutputConfigurationID string + Path string + Timestamp time.Time +} + +type VideoSegmentOutputConfiguration struct { + ID string + VariantID string + Name string + StreamID string + SegmentDuration int32 + Bitrate int32 + Framerate int32 + ResolutionWidth sql.NullInt32 + ResolutionHeight sql.NullInt32 + Timestamp time.Time +} diff --git a/db/query.sql b/db/query.sql index 03e9ed446..5a2528026 100644 --- a/db/query.sql +++ b/db/query.sql @@ -108,3 +108,35 @@ UPDATE users SET display_name = $1, previous_names = previous_names || $2, namec -- name: ChangeDisplayColor :exec UPDATE users SET display_color = $1 WHERE id = $2; + +-- Recording and clip related queries. + +-- name: GetStreams :many +SELECT id, stream_title, start_time, end_time FROM streams ORDER BY start_time DESC; + +-- name: GetStreamById :one +SELECT id, stream_title, start_time, end_time FROM streams WHERE id = $1 LIMIT 1; + +-- name: GetOutputConfigurationsForStreamId :many +SELECT id, stream_id, variant_id, name, segment_duration, bitrate, framerate, resolution_width, resolution_height FROM video_segment_output_configuration WHERE stream_id = $1; + +-- name: GetOutputConfigurationForId :one +SELECT id, stream_id, variant_id, name, segment_duration, bitrate, framerate, resolution_width, resolution_height FROM video_segment_output_configuration WHERE id = $1; + +-- name: GetSegmentsForOutputId :many +SELECT id, stream_id, output_configuration_id, path, timestamp FROM video_segments WHERE output_configuration_id = $1 ORDER BY timestamp ASC; + +-- name: GetSegmentsForOutputIdAndWindow :many +SELECT id, stream_id, output_configuration_id, timestamp FROM video_segments WHERE output_configuration_id = $1 AND timestamp >= $2 AND timestamp <= $3 ORDER BY timestamp ASC; + +-- name: InsertStream :exec +INSERT INTO streams (id, stream_title, start_time, end_time) VALUES($1, $2, $3, $4); + +-- name: InsertOutputConfiguration :exec +INSERT INTO video_segment_output_configuration (id, variant_id, stream_id, name, segment_duration, bitrate, framerate, resolution_width, resolution_height) VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9); + +-- name: InsertSegment :exec +INSERT INTO video_segments (id, stream_id, output_configuration_id, path) VALUES($1, $2, $3, $4); + +-- name: SetStreamEnded :exec +UPDATE streams SET end_time = CURRENT_TIMESTAMP WHERE id = $1; diff --git a/db/query.sql.go b/db/query.sql.go index 97f73a665..6fade6bfd 100644 --- a/db/query.sql.go +++ b/db/query.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.15.0 +// sqlc v1.19.1 // source: query.sql package db @@ -541,6 +541,88 @@ func (q *Queries) GetOutboxWithOffset(ctx context.Context, arg GetOutboxWithOffs return items, nil } +const getOutputConfigurationForId = `-- name: GetOutputConfigurationForId :one +SELECT id, stream_id, variant_id, name, segment_duration, bitrate, framerate, resolution_width, resolution_height FROM video_segment_output_configuration WHERE id = $1 +` + +type GetOutputConfigurationForIdRow struct { + ID string + StreamID string + VariantID string + Name string + SegmentDuration int32 + Bitrate int32 + Framerate int32 + ResolutionWidth sql.NullInt32 + ResolutionHeight sql.NullInt32 +} + +func (q *Queries) GetOutputConfigurationForId(ctx context.Context, id string) (GetOutputConfigurationForIdRow, error) { + row := q.db.QueryRowContext(ctx, getOutputConfigurationForId, id) + var i GetOutputConfigurationForIdRow + err := row.Scan( + &i.ID, + &i.StreamID, + &i.VariantID, + &i.Name, + &i.SegmentDuration, + &i.Bitrate, + &i.Framerate, + &i.ResolutionWidth, + &i.ResolutionHeight, + ) + return i, err +} + +const getOutputConfigurationsForStreamId = `-- name: GetOutputConfigurationsForStreamId :many +SELECT id, stream_id, variant_id, name, segment_duration, bitrate, framerate, resolution_width, resolution_height FROM video_segment_output_configuration WHERE stream_id = $1 +` + +type GetOutputConfigurationsForStreamIdRow struct { + ID string + StreamID string + VariantID string + Name string + SegmentDuration int32 + Bitrate int32 + Framerate int32 + ResolutionWidth sql.NullInt32 + ResolutionHeight sql.NullInt32 +} + +func (q *Queries) GetOutputConfigurationsForStreamId(ctx context.Context, streamID string) ([]GetOutputConfigurationsForStreamIdRow, error) { + rows, err := q.db.QueryContext(ctx, getOutputConfigurationsForStreamId, streamID) + if err != nil { + return nil, err + } + defer rows.Close() + var items []GetOutputConfigurationsForStreamIdRow + for rows.Next() { + var i GetOutputConfigurationsForStreamIdRow + if err := rows.Scan( + &i.ID, + &i.StreamID, + &i.VariantID, + &i.Name, + &i.SegmentDuration, + &i.Bitrate, + &i.Framerate, + &i.ResolutionWidth, + &i.ResolutionHeight, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const getRejectedAndBlockedFollowers = `-- name: GetRejectedAndBlockedFollowers :many SELECT iri, name, username, image, created_at, disabled_at FROM ap_followers WHERE disabled_at is not null ` @@ -584,6 +666,134 @@ func (q *Queries) GetRejectedAndBlockedFollowers(ctx context.Context) ([]GetReje return items, nil } +const getSegmentsForOutputId = `-- name: GetSegmentsForOutputId :many +SELECT id, stream_id, output_configuration_id, path, timestamp FROM video_segments WHERE output_configuration_id = $1 ORDER BY timestamp ASC +` + +func (q *Queries) GetSegmentsForOutputId(ctx context.Context, outputConfigurationID string) ([]VideoSegment, error) { + rows, err := q.db.QueryContext(ctx, getSegmentsForOutputId, outputConfigurationID) + if err != nil { + return nil, err + } + defer rows.Close() + var items []VideoSegment + for rows.Next() { + var i VideoSegment + if err := rows.Scan( + &i.ID, + &i.StreamID, + &i.OutputConfigurationID, + &i.Path, + &i.Timestamp, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const getSegmentsForOutputIdAndWindow = `-- name: GetSegmentsForOutputIdAndWindow :many +SELECT id, stream_id, output_configuration_id, timestamp FROM video_segments WHERE output_configuration_id = $1 AND timestamp >= $2 AND timestamp <= $3 ORDER BY timestamp ASC +` + +type GetSegmentsForOutputIdAndWindowParams struct { + OutputConfigurationID string + Timestamp time.Time + Timestamp_2 time.Time +} + +type GetSegmentsForOutputIdAndWindowRow struct { + ID string + StreamID string + OutputConfigurationID string + Timestamp time.Time +} + +func (q *Queries) GetSegmentsForOutputIdAndWindow(ctx context.Context, arg GetSegmentsForOutputIdAndWindowParams) ([]GetSegmentsForOutputIdAndWindowRow, error) { + rows, err := q.db.QueryContext(ctx, getSegmentsForOutputIdAndWindow, arg.OutputConfigurationID, arg.Timestamp, arg.Timestamp_2) + if err != nil { + return nil, err + } + defer rows.Close() + var items []GetSegmentsForOutputIdAndWindowRow + for rows.Next() { + var i GetSegmentsForOutputIdAndWindowRow + if err := rows.Scan( + &i.ID, + &i.StreamID, + &i.OutputConfigurationID, + &i.Timestamp, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const getStreamById = `-- name: GetStreamById :one +SELECT id, stream_title, start_time, end_time FROM streams WHERE id = $1 LIMIT 1 +` + +func (q *Queries) GetStreamById(ctx context.Context, id string) (Stream, error) { + row := q.db.QueryRowContext(ctx, getStreamById, id) + var i Stream + err := row.Scan( + &i.ID, + &i.StreamTitle, + &i.StartTime, + &i.EndTime, + ) + return i, err +} + +const getStreams = `-- name: GetStreams :many + +SELECT id, stream_title, start_time, end_time FROM streams ORDER BY start_time DESC +` + +// Recording and clip related queries. +func (q *Queries) GetStreams(ctx context.Context) ([]Stream, error) { + rows, err := q.db.QueryContext(ctx, getStreams) + if err != nil { + return nil, err + } + defer rows.Close() + var items []Stream + for rows.Next() { + var i Stream + if err := rows.Scan( + &i.ID, + &i.StreamTitle, + &i.StartTime, + &i.EndTime, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const getUserByAccessToken = `-- name: GetUserByAccessToken :one SELECT users.id, display_name, display_color, users.created_at, disabled_at, previous_names, namechanged_at, authenticated_at, scopes FROM users, user_access_tokens WHERE token = $1 AND users.id = user_id ` @@ -666,6 +876,79 @@ func (q *Queries) GetUserDisplayNameByToken(ctx context.Context, token string) ( return display_name, err } +const insertOutputConfiguration = `-- name: InsertOutputConfiguration :exec +INSERT INTO video_segment_output_configuration (id, variant_id, stream_id, name, segment_duration, bitrate, framerate, resolution_width, resolution_height) VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9) +` + +type InsertOutputConfigurationParams struct { + ID string + VariantID string + StreamID string + Name string + SegmentDuration int32 + Bitrate int32 + Framerate int32 + ResolutionWidth sql.NullInt32 + ResolutionHeight sql.NullInt32 +} + +func (q *Queries) InsertOutputConfiguration(ctx context.Context, arg InsertOutputConfigurationParams) error { + _, err := q.db.ExecContext(ctx, insertOutputConfiguration, + arg.ID, + arg.VariantID, + arg.StreamID, + arg.Name, + arg.SegmentDuration, + arg.Bitrate, + arg.Framerate, + arg.ResolutionWidth, + arg.ResolutionHeight, + ) + return err +} + +const insertSegment = `-- name: InsertSegment :exec +INSERT INTO video_segments (id, stream_id, output_configuration_id, path) VALUES($1, $2, $3, $4) +` + +type InsertSegmentParams struct { + ID string + StreamID string + OutputConfigurationID string + Path string +} + +func (q *Queries) InsertSegment(ctx context.Context, arg InsertSegmentParams) error { + _, err := q.db.ExecContext(ctx, insertSegment, + arg.ID, + arg.StreamID, + arg.OutputConfigurationID, + arg.Path, + ) + return err +} + +const insertStream = `-- name: InsertStream :exec +INSERT INTO streams (id, stream_title, start_time, end_time) VALUES($1, $2, $3, $4) +` + +type InsertStreamParams struct { + ID string + StreamTitle sql.NullString + StartTime time.Time + EndTime sql.NullTime +} + +func (q *Queries) InsertStream(ctx context.Context, arg InsertStreamParams) error { + _, err := q.db.ExecContext(ctx, insertStream, + arg.ID, + arg.StreamTitle, + arg.StartTime, + arg.EndTime, + ) + return err +} + const isDisplayNameAvailable = `-- name: IsDisplayNameAvailable :one SELECT count(*) FROM users WHERE display_name = $1 AND authenticated_at is not null AND disabled_at is NULL ` @@ -748,6 +1031,15 @@ func (q *Queries) SetAccessTokenToOwner(ctx context.Context, arg SetAccessTokenT return err } +const setStreamEnded = `-- name: SetStreamEnded :exec +UPDATE streams SET end_time = CURRENT_TIMESTAMP WHERE id = $1 +` + +func (q *Queries) SetStreamEnded(ctx context.Context, id string) error { + _, err := q.db.ExecContext(ctx, setStreamEnded, id) + return err +} + const setUserAsAuthenticated = `-- name: SetUserAsAuthenticated :exec UPDATE users SET authenticated_at = CURRENT_TIMESTAMP WHERE id = $1 ` diff --git a/db/schema.sql b/db/schema.sql index 644813648..ad7cc58f4 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -97,3 +97,45 @@ CREATE TABLE IF NOT EXISTS messages ( CREATE INDEX user_id ON messages (user_id); CREATE INDEX hidden_at ON messages (hidden_at); CREATE INDEX timestamp ON messages (timestamp); + +-- Record the high level details of each stream. +CREATE TABLE IF NOT EXISTS streams ( + "id" string NOT NULL PRIMARY KEY, + "stream_title" TEXT, + "start_time" DATE NOT NULL, + "end_time" DATE, + PRIMARY KEY (id) +); +CREATE INDEX streams_id ON streams (id); +CREATE INDEX streams_start_time ON streams (start_time); +CREATE INDEX streams_start_end_time ON streams (start_time,end_time); + +-- Record the output configuration of a stream. +CREATE TABLE IF NOT EXISTS video_segment_output_configuration ( + "id" string NOT NULL PRIMARY KEY, + "variant_id" string NOT NULL, + "name" string NOT NULL, + "stream_id" string NOT NULL, + "segment_duration" INTEGER NOT NULL, + "bitrate" INTEGER NOT NULL, + "framerate" INTEGER NOT NULL, + "resolution_width" INTEGER, + "resolution_height" INTEGER, + "timestamp" DATE DEFAULT CURRENT_TIMESTAMP NOT NULL, + PRIMARY KEY (id) +); + CREATE INDEX video_segment_output_configuration_stream_id ON video_segment_output_configuration (stream_id); + +-- Support querying all segments for a single stream as well +-- as segments for a time window. +CREATE TABLE IF NOT EXISTS video_segments ( + "id" string NOT NULL PRIMARY KEY, + "stream_id" string NOT NULL, + "output_configuration_id" string NOT NULL, + "path" TEXT NOT NULL, + "timestamp" DATE DEFAULT CURRENT_TIMESTAMP NOT NULL, + PRIMARY KEY (id) +); + CREATE INDEX video_segments_stream_id ON video_segments (stream_id); + CREATE INDEX video_segments_stream_id_timestamp ON video_segments (stream_id,timestamp); + diff --git a/models/currentBroadcast.go b/models/currentBroadcast.go index 131400943..6a1ae7d5b 100644 --- a/models/currentBroadcast.go +++ b/models/currentBroadcast.go @@ -2,6 +2,7 @@ package models // CurrentBroadcast represents the configuration associated with the currently active stream. type CurrentBroadcast struct { + StreamID string `json:"streamId"` OutputSettings []StreamOutputVariant `json:"outputSettings"` LatencyLevel LatencyLevel `json:"latencyLevel"` } diff --git a/models/storageProvider.go b/models/storageProvider.go index 44961d5c2..2880da345 100644 --- a/models/storageProvider.go +++ b/models/storageProvider.go @@ -3,11 +3,12 @@ package models // StorageProvider is how a chunk storage provider should be implemented. type StorageProvider interface { Setup() error - Save(filePath string, retryCount int) (string, error) - - SegmentWritten(localFilePath string) + Save(localFilePath, destinationPath string, retryCount int) (string, error) + SetStreamId(streamID string) + SegmentWritten(localFilePath string) (string, int, error) VariantPlaylistWritten(localFilePath string) MasterPlaylistWritten(localFilePath string) + GetRemoteDestinationPathFromLocalFilePath(localFilePath string) string Cleanup() error } diff --git a/replays/hlsRecorder.go b/replays/hlsRecorder.go new file mode 100644 index 000000000..4915ebe59 --- /dev/null +++ b/replays/hlsRecorder.go @@ -0,0 +1,130 @@ +package replays + +import ( + "context" + "database/sql" + "strconv" + "strings" + "time" + + "github.com/owncast/owncast/core/data" + "github.com/owncast/owncast/db" + "github.com/owncast/owncast/utils" + "github.com/teris-io/shortid" + + log "github.com/sirupsen/logrus" +) + +type HLSRecorder struct { + streamID string + startTime time.Time + + // The video variant configurations that were used for this stream. + outputConfigurations []HLSOutputConfiguration + + datastore *data.Datastore +} + +// NewRecording returns a new instance of the HLS recorder. +func NewRecording(streamID string) *HLSRecorder { + // We don't support replaying offline clips. + if streamID == "offline" { + return nil + } + + h := HLSRecorder{ + streamID: streamID, + startTime: time.Now(), + datastore: data.GetDatastore(), + } + + outputs := data.GetStreamOutputVariants() + latency := data.GetStreamLatencyLevel() + + streamTitle := data.GetStreamTitle() + validTitle := streamTitle != "" + + if err := h.datastore.GetQueries().InsertStream(context.Background(), db.InsertStreamParams{ + ID: streamID, + StartTime: h.startTime, + StreamTitle: sql.NullString{String: streamTitle, Valid: validTitle}, + }); err != nil { + log.Panicln(err) + } + + // Create a reference of the output configurations that were used for this stream. + for variantId, o := range outputs { + configId := shortid.MustGenerate() + + if err := h.datastore.GetQueries().InsertOutputConfiguration(context.Background(), db.InsertOutputConfigurationParams{ + ID: configId, + Name: o.Name, + StreamID: streamID, + VariantID: strconv.Itoa(variantId), + SegmentDuration: int32(latency.SecondsPerSegment), + Bitrate: int32(o.VideoBitrate), + Framerate: int32(o.Framerate), + ResolutionWidth: sql.NullInt32{Int32: int32(o.ScaledWidth), Valid: true}, + ResolutionHeight: sql.NullInt32{Int32: int32(o.ScaledHeight), Valid: true}, + }); err != nil { + log.Panicln(err) + } + + h.outputConfigurations = append(h.outputConfigurations, HLSOutputConfiguration{ + ID: configId, + Name: o.Name, + VideoBitrate: o.VideoBitrate, + ScaledWidth: o.ScaledWidth, + ScaledHeight: o.ScaledHeight, + Framerate: o.Framerate, + SegmentDuration: float64(latency.SegmentCount), + }) + } + return &h +} + +// SetOutputConfigurations sets the output configurations for this stream. +func (h *HLSRecorder) SetOutputConfigurations(configs []HLSOutputConfiguration) { + h.outputConfigurations = configs +} + +// StreamBegan is called when a stream is started. +func (h *HLSRecorder) StreamBegan(id string) { + h.streamID = id + h.startTime = time.Now() +} + +// SegmentWritten is called when a segment is written to disk. +func (h *HLSRecorder) SegmentWritten(path string) { + outputConfigurationIndexString := utils.GetIndexFromFilePath(path) + outputConfigurationIndex, err := strconv.Atoi(outputConfigurationIndexString) + if err != nil { + log.Errorln("HLSRecorder segmentWritten error:", err) + return + } + + p := strings.ReplaceAll(path, "data/", "") + + segment := HLSSegment{ + ID: shortid.MustGenerate(), + StreamID: h.streamID, + Path: p, + } + + if err := h.datastore.GetQueries().InsertSegment(context.Background(), db.InsertSegmentParams{ + ID: segment.ID, + StreamID: segment.StreamID, + OutputConfigurationID: h.outputConfigurations[outputConfigurationIndex].ID, + Path: segment.Path, + }); err != nil { + log.Errorln(err) + } +} + +// StreamEnded is called when a stream is ended so the end time can be noted +// in the stream's metadata. +func (h *HLSRecorder) StreamEnded() { + if err := h.datastore.GetQueries().SetStreamEnded(context.Background(), h.streamID); err != nil { + log.Errorln(err) + } +} diff --git a/replays/hlsSegment.go b/replays/hlsSegment.go new file mode 100644 index 000000000..ff7de0e46 --- /dev/null +++ b/replays/hlsSegment.go @@ -0,0 +1,12 @@ +package replays + +import "time" + +// HLSSegment represents a single HLS segment. +type HLSSegment struct { + ID string + StreamID string + Timestamp time.Time + OutputConfigurationID string + Path string +} diff --git a/replays/mediaPlaylistAllowCacheTag.go b/replays/mediaPlaylistAllowCacheTag.go new file mode 100644 index 000000000..1f0e97ad3 --- /dev/null +++ b/replays/mediaPlaylistAllowCacheTag.go @@ -0,0 +1,49 @@ +package replays + +import ( + "bytes" + "fmt" + + "github.com/grafov/m3u8" +) + +// MediaPlaylistAllowCacheTag is a custom tag to explicitly state that this +// playlist is allowed to be cached. +type MediaPlaylistAllowCacheTag struct { + Type string +} + +// TagName should return the full tag identifier including the leading +// '#' and trailing ':' if the tag also contains a value or attribute +// list. +func (tag *MediaPlaylistAllowCacheTag) TagName() string { + return "#EXT-X-ALLOW-CACHE" +} + +// Decode decodes the input line. The line will be the entire matched +// line, including the identifier. +func (tag *MediaPlaylistAllowCacheTag) Decode(line string) (m3u8.CustomTag, error) { + _, err := fmt.Sscanf(line, "#EXT-X-ALLOW-CACHE") + + return tag, err +} + +// SegmentTag specifies that this tag is not for segments. +func (tag *MediaPlaylistAllowCacheTag) SegmentTag() bool { + return false +} + +// Encode formats the structure to the text result. +func (tag *MediaPlaylistAllowCacheTag) Encode() *bytes.Buffer { + buf := new(bytes.Buffer) + + buf.WriteString(tag.TagName()) + buf.WriteString(tag.Type) + + return buf +} + +// String implements Stringer interface. +func (tag *MediaPlaylistAllowCacheTag) String() string { + return tag.Encode().String() +} diff --git a/replays/outputConfiguration.go b/replays/outputConfiguration.go new file mode 100644 index 000000000..b954e35de --- /dev/null +++ b/replays/outputConfiguration.go @@ -0,0 +1,13 @@ +package replays + +type HLSOutputConfiguration struct { + ID string + StreamId string + VariantId string + Name string + VideoBitrate int + ScaledWidth int + ScaledHeight int + Framerate int + SegmentDuration float64 +} diff --git a/replays/playlistGenerator.go b/replays/playlistGenerator.go new file mode 100644 index 000000000..d0774a508 --- /dev/null +++ b/replays/playlistGenerator.go @@ -0,0 +1,283 @@ +package replays + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/grafov/m3u8" + "github.com/owncast/owncast/core/data" + "github.com/owncast/owncast/db" + "github.com/pkg/errors" +) + +/* +The PlaylistGenerator is responsible for creating the master and media +playlists, in order to replay a stream in whole, or part. It requires detailed +metadata about how the initial live stream was configured, as well as a +access to every segment that was created during the live stream. +*/ + +type PlaylistGenerator struct { + datastore *data.Datastore +} + +func NewPlaylistGenerator() *PlaylistGenerator { + return &PlaylistGenerator{ + datastore: data.GetDatastore(), + } +} + +func (p *PlaylistGenerator) GenerateMasterPlaylistForStream(streamId string) (*m3u8.MasterPlaylist, error) { + // stream, err := p.GetStream(streamId) + // if err != nil { + // return nil, errors.Wrap(err, "failed to get stream") + // } + + // Determine the different output configurations for this stream. + configs, err := p.GetConfigurationsForStream(streamId) + if err != nil { + return nil, errors.Wrap(err, "failed to get configurations for stream") + } + + // Create the master playlist that will hold the different media playlists. + masterPlaylist := p.createNewMasterPlaylist() + + // Create the media playlists for each output configuration. + for _, config := range configs { + // Verify the validity of the configuration. + if config.VideoBitrate == 0 { + return nil, errors.New("video bitrate is unavailable") + } + + if config.Framerate == 0 { + return nil, errors.New("video framerate is unavailable") + } + + mediaPlaylist, err := p.GenerateMediaPlaylistForStreamAndConfiguration(streamId, config.ID) + if err != nil { + return nil, errors.Wrap(err, "failed to create media playlist") + } + + // Append the media playlist to the master playlist. + params := m3u8.VariantParams{ + ProgramId: 1, + Name: config.Name, + FrameRate: float64(config.Framerate), + Bandwidth: uint32(config.VideoBitrate * 1000), + // Match what is generated in our live playlists. + Codecs: "avc1.64001f,mp4a.40.2", + } + + // If both the width and height are set then we can set that as + // the resolution in the media playlist. + if config.ScaledHeight > 0 && config.ScaledWidth > 0 { + params.Resolution = fmt.Sprintf("%dx%d", config.ScaledWidth, config.ScaledHeight) + } + + // Add the media playlist to the master playlist. + publicPlaylistPath := strings.Join([]string{"/replay", streamId, config.ID}, "/") + masterPlaylist.Append(publicPlaylistPath, mediaPlaylist, params) + } + + // Return the final master playlist that contains all the media playlists. + return masterPlaylist, nil +} + +func (p *PlaylistGenerator) GenerateMediaPlaylistForStreamAndConfiguration(streamId, outputConfigurationId string) (*m3u8.MediaPlaylist, error) { + stream, err := p.GetStream(streamId) + if err != nil { + return nil, errors.Wrap(err, "failed to get stream") + } + + config, err := p.GetOutputConfig(outputConfigurationId) + if err != nil { + return nil, errors.Wrap(err, "failed to get output configuration") + } + + // Fetch all the segments for this configuration. + segments, err := p.GetAllSegmentsForOutputConfiguration(outputConfigurationId) + if err != nil { + return nil, errors.Wrap(err, "failed to get all segments for output configuration") + } + + // Create the media playlist for this configuration and add the segments. + mediaPlaylist, err := p.createMediaPlaylistForConfigurationAndSegments(config, stream.StartTime, stream.InProgress, segments) + if err != nil { + return nil, errors.Wrap(err, "failed to create media playlist") + } + + return mediaPlaylist, nil +} + +func (p *PlaylistGenerator) GetStream(streamId string) (*Stream, error) { + stream, err := p.datastore.GetQueries().GetStreamById(context.Background(), streamId) + if stream.ID == "" { + return nil, errors.Wrap(err, "failed to get stream") + } + + s := Stream{ + ID: stream.ID, + Title: stream.StreamTitle.String, + StartTime: stream.StartTime, + EndTime: stream.EndTime.Time, + InProgress: !stream.EndTime.Valid, + } + + return &s, nil +} + +func (p *PlaylistGenerator) GetOutputConfig(outputConfigId string) (*HLSOutputConfiguration, error) { + config, err := p.datastore.GetQueries().GetOutputConfigurationForId(context.Background(), outputConfigId) + if err != nil { + return nil, errors.Wrap(err, "failed to get output configuration") + } + + return createConfigFromConfigRow(config), nil +} + +// GetConfigurationsForStream returns the output configurations for a given stream. +func (p *PlaylistGenerator) GetConfigurationsForStream(streamId string) ([]*HLSOutputConfiguration, error) { + outputConfigRows, err := p.datastore.GetQueries().GetOutputConfigurationsForStreamId(context.Background(), streamId) + if err != nil { + return nil, errors.Wrap(err, "failed to get output configurations for stream") + } + + outputConfigs := []*HLSOutputConfiguration{} + for _, row := range outputConfigRows { + config := &HLSOutputConfiguration{ + ID: row.ID, + StreamId: streamId, + VariantId: row.VariantID, + Name: row.Name, + VideoBitrate: int(row.Bitrate), + Framerate: int(row.Framerate), + ScaledHeight: int(row.ResolutionWidth.Int32), + ScaledWidth: int(row.ResolutionHeight.Int32), + SegmentDuration: float64(row.SegmentDuration), + } + outputConfigs = append(outputConfigs, config) + } + + return outputConfigs, nil +} + +// GetAllSegmentsForOutputConfiguration returns all the segments for a given output config. +func (p *PlaylistGenerator) GetAllSegmentsForOutputConfiguration(outputId string) ([]HLSSegment, error) { + segmentRows, err := p.datastore.GetQueries().GetSegmentsForOutputId(context.Background(), outputId) + if err != nil { + return nil, errors.Wrap(err, "failed to get segments for output config") + } + + segments := []HLSSegment{} + for _, row := range segmentRows { + segment := HLSSegment{ + ID: row.ID, + StreamID: row.StreamID, + OutputConfigurationID: row.OutputConfigurationID, + Timestamp: row.Timestamp, + Path: row.Path, + } + segments = append(segments, segment) + } + + return segments, nil +} + +func (p *PlaylistGenerator) createMediaPlaylistForConfigurationAndSegments(configuration *HLSOutputConfiguration, startTime time.Time, inProgress bool, segments []HLSSegment) (*m3u8.MediaPlaylist, error) { + playlistSize := len(segments) + segmentDuration := configuration.SegmentDuration + playlist, err := m3u8.NewMediaPlaylist(0, uint(playlistSize)) + + playlist.TargetDuration = configuration.SegmentDuration + + if !inProgress { + playlist.MediaType = m3u8.VOD + } else { + playlist.MediaType = m3u8.EVENT + } + + // Add the segments to the playlist. + for index, segment := range segments { + mediaSegment := m3u8.MediaSegment{ + URI: "/" + segment.Path, + Duration: segmentDuration, + SeqId: uint64(index), + ProgramDateTime: segment.Timestamp, + } + if err := playlist.AppendSegment(&mediaSegment); err != nil { + return nil, errors.Wrap(err, "failed to append segment to recording playlist") + } + } + + if err != nil { + return nil, err + } + + // Configure the properties of this media playlist. + if err := playlist.SetProgramDateTime(startTime); err != nil { + return nil, errors.Wrap(err, "failed to set media playlist program date time") + } + + // Our live output is specified as v6, so let's match it to be as close as + // possible to what we're doing for live streams. + playlist.SetVersion(6) + + if !inProgress { + // Specify explicitly that the playlist content is allowed to be cached. + // However, if in-progress recordings are supported this should not be enabled + // in order for the playlist to be updated with new segments. inProgress is + // determined by seeing if the stream has an endTime or not. + playlist.SetCustomTag(&MediaPlaylistAllowCacheTag{}) + + // Set the ENDLIST tag and close the playlist for writing if the stream is + // not still in progress. + playlist.Close() + } + + return playlist, nil +} + +func (p *PlaylistGenerator) createNewMasterPlaylist() *m3u8.MasterPlaylist { + playlist := m3u8.NewMasterPlaylist() + playlist.SetIndependentSegments(true) + playlist.SetVersion(6) + + return playlist +} + +func createConfigFromConfigRow(row db.GetOutputConfigurationForIdRow) *HLSOutputConfiguration { + config := HLSOutputConfiguration{ + ID: row.ID, + StreamId: row.StreamID, + VariantId: row.VariantID, + Name: row.Name, + VideoBitrate: int(row.Bitrate), + Framerate: int(row.Framerate), + ScaledHeight: int(row.ResolutionWidth.Int32), + ScaledWidth: int(row.ResolutionHeight.Int32), + SegmentDuration: float64(row.SegmentDuration), + } + return &config +} + +// func createOutputConfigsFromConfigRows(rows []db.GetOutputConfigurationsForStreamIdRow) []HLSOutputConfiguration { +// outputConfigs := []HLSOutputConfiguration{} +// for _, row := range rows { +// config := HLSOutputConfiguration{ +// ID: row.ID, +// StreamId: row.StreamID, +// VariantId: row.VariantID, +// Name: row.Name, +// VideoBitrate: int(row.Bitrate), +// Framerate: int(row.Framerate), +// ScaledHeight: int(row.ResolutionWidth.Int32), +// ScaledWidth: int(row.ResolutionHeight.Int32), +// SegmentDuration: float64(row.SegmentDuration), +// } +// outputConfigs = append(outputConfigs, config) +// } + +// return outputConfigs +// } diff --git a/replays/playlistGenerator_test.go b/replays/playlistGenerator_test.go new file mode 100644 index 000000000..3dbc668a4 --- /dev/null +++ b/replays/playlistGenerator_test.go @@ -0,0 +1,136 @@ +package replays + +import ( + "testing" + "time" + + "github.com/grafov/m3u8" +) + +var ( + generator = NewPlaylistGenerator() + config = []HLSOutputConfiguration{ + { + ID: "1", + VideoBitrate: 1000, + Framerate: 30, + }, + { + ID: "2", + VideoBitrate: 2000, + Framerate: 30, + }, + } +) + +var segments = []HLSSegment{ + { + ID: "testSegmentId", + StreamID: "testStreamId", + Timestamp: time.Now(), + OutputConfigurationID: "testOutputConfigId", + Path: "hls/testStreamId/testOutputConfigId/testSegmentId.ts", + }, +} + +func TestMasterPlaylist(t *testing.T) { + playlist := generator.createNewMasterPlaylist() + + mediaPlaylists, err := generator.createMediaPlaylistForConfigurationAndSegments(&config[0], time.Now(), false, segments) + playlist.Append("test", mediaPlaylists, m3u8.VariantParams{ + Bandwidth: uint32(config[0].VideoBitrate), + FrameRate: float64(config[0].Framerate), + }) + mediaPlaylists.Close() + + if err != nil { + t.Error(err) + } + + if playlist.Version() != 6 { + t.Error("expected version 6, got", playlist.Version()) + } + + if !playlist.IndependentSegments() { + t.Error("expected independent segments") + } + + if playlist.Variants[0].Bandwidth != uint32(config[0].VideoBitrate) { + t.Error("expected bandwidth", config[0].VideoBitrate, "got", playlist.Variants[0].Bandwidth) + } + + if playlist.Variants[0].FrameRate != float64(config[0].Framerate) { + t.Error("expected framerate", config[0].Framerate, "got", playlist.Variants[0].FrameRate) + } +} + +func TestCompletedMediaPlaylist(t *testing.T) { + startTime := segments[0].Timestamp + conf := config[0] + + // Create a completed media playlist. + playlist, err := generator.createMediaPlaylistForConfigurationAndSegments(&conf, startTime, false, segments) + if err != nil { + t.Error(err) + } + + if playlist.TargetDuration != conf.SegmentDuration { + t.Error("expected target duration", conf.SegmentDuration, "got", playlist.TargetDuration) + } + + // Verify it's marked as cachable. + if playlist.Custom["#EXT-X-ALLOW-CACHE"].String() != "#EXT-X-ALLOW-CACHE" { + t.Error("expected cachable playlist, tag not set") + } + + // Verify it has the correct number of segments in the media playlist. + if int(playlist.Count()) != len(segments) { + t.Error("expected", len(segments), "segments, got", playlist.Count()) + } + + // Test the playlist version. + if playlist.Version() != 6 { + t.Error("expected version 6, got", playlist.Version()) + } + + // Verify the playlist type + if playlist.MediaType != m3u8.VOD { + t.Error("expected VOD playlist type, got type", playlist.MediaType) + } + + // Verify the first segment URI. + if playlist.Segments[0].URI != "/"+segments[0].Path { + t.Error("expected segment URI", segments[0].Path, "got", playlist.Segments[0].URI) + } +} + +func TestInProgressMediaPlaylist(t *testing.T) { + startTime := segments[0].Timestamp + conf := config[0] + + // Create a completed media playlist. + playlist, err := generator.createMediaPlaylistForConfigurationAndSegments(&conf, startTime, true, segments) + if err != nil { + t.Error(err) + } + + // Verify it's marked as cachable. + if playlist.Custom != nil && playlist.Custom["#EXT-X-ALLOW-CACHE"].String() == "#EXT-X-ALLOW-CACHE" { + t.Error("expected non-achable playlist when stream is still in progress") + } + + // Verify it has the correct number of segments in the media playlist. + if int(playlist.Count()) != len(segments) { + t.Error("expected", len(segments), "segments, got", playlist.Count()) + } + + // Test the playlist version. + if playlist.Version() != 6 { + t.Error("expected version 6, got", playlist.Version()) + } + + // Verify the playlist type + if playlist.MediaType != m3u8.EVENT { + t.Error("expected EVENT playlist type, got type", playlist.MediaType) + } +} diff --git a/replays/storageProvider.go b/replays/storageProvider.go new file mode 100644 index 000000000..636ac3d1b --- /dev/null +++ b/replays/storageProvider.go @@ -0,0 +1,6 @@ +package replays + +type StorageProvider interface { + Setup() error + Save(localFilePath, destinationPath string, retryCount int) (string, error) +} diff --git a/replays/stream.go b/replays/stream.go new file mode 100644 index 000000000..a98c7466b --- /dev/null +++ b/replays/stream.go @@ -0,0 +1,41 @@ +package replays + +import ( + "context" + "fmt" + "time" + + "github.com/owncast/owncast/core/data" + "github.com/pkg/errors" +) + +type Stream struct { + ID string `json:"id"` + Title string `json:"title,omitempty"` + StartTime time.Time `json:"startTime"` + EndTime time.Time `json:"endTime,omitempty"` + InProgress bool `json:"inProgress,omitempty"` + Manifest string `json:"manifest,omitempty"` +} + +// GetStreams will return all streams that have been recorded. +func GetStreams() ([]*Stream, error) { + streams, err := data.GetDatastore().GetQueries().GetStreams(context.Background()) + if err != nil { + return nil, errors.WithMessage(err, "failure to get streams") + } + + response := []*Stream{} + for _, stream := range streams { + s := Stream{ + ID: stream.ID, + Title: stream.StreamTitle.String, + StartTime: stream.StartTime, + EndTime: stream.EndTime.Time, + InProgress: !stream.EndTime.Valid, + Manifest: fmt.Sprintf("/replay/%s", stream.ID), + } + response = append(response, &s) + } + return response, nil +} diff --git a/router/router.go b/router/router.go index c1866eb5f..88345b637 100644 --- a/router/router.go +++ b/router/router.go @@ -392,6 +392,11 @@ func Start() error { http.HandleFunc("/api/auth/fediverse", middleware.RequireUserAccessToken(fediverseauth.RegisterFediverseOTPRequest)) http.HandleFunc("/api/auth/fediverse/verify", fediverseauth.VerifyFediverseOTPRequest) + // Replay functionality. This route handles both /replay/{streamId} (master) + // and /replay/{streamId}/{outputConfigId} (media) routes. + http.HandleFunc("/api/replays", controllers.GetReplays) + http.HandleFunc("/replay/", controllers.GetReplay) + // ActivityPub has its own router activitypub.Start(data.GetDatastore()) diff --git a/utils/utils.go b/utils/utils.go index df37e6a95..cc8b4890d 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -274,10 +274,12 @@ func VerifyFFMpegPath(path string) error { } // CleanupDirectory removes the directory and makes it fresh again. Throws fatal error on failure. -func CleanupDirectory(path string) { - log.Traceln("Cleaning", path) - if err := os.RemoveAll(path); err != nil { - log.Fatalln("Unable to remove directory. Please check the ownership and permissions", err) +func CleanupDirectory(path string, keepOldFiles bool) { + if !keepOldFiles { + log.Traceln("Cleaning", path) + if err := os.RemoveAll(path); err != nil { + log.Fatalln("Unable to remove directory. Please check the ownership and permissions", err) + } } if err := os.MkdirAll(path, 0o750); err != nil { log.Fatalln("Unable to create directory. Please check the ownership and permissions", err)