feat(video): first pass at replay functionality

- Persist segments
- Record configurations
- Rebuild entire stream playlists
- First steps to working towards https://github.com/owncast/owncast/issues/102
This commit is contained in:
Gabe Kangas 2023-08-09 16:19:09 -07:00
parent 08f8149b63
commit 4ba36c17a3
No known key found for this signature in database
GPG Key ID: 4345B2060657F330
37 changed files with 1449 additions and 110 deletions

View File

@ -43,6 +43,9 @@ var EnableAutoUpdate = false
// A temporary stream key that can be set via the command line.
var TemporaryStreamKey = ""
// EnableRecordingFeatures will enable recording features.
var EnableRecordingFeatures = true
// GetCommit will return an identifier used for identifying the point in time this build took place.
func GetCommit() string {
if GitCommit == "" {

78
controllers/replays.go Normal file
View File

@ -0,0 +1,78 @@
package controllers
import (
"net/http"
"strings"
"github.com/owncast/owncast/replays"
log "github.com/sirupsen/logrus"
)
// GetReplays will return a list of all available replays.
func GetReplays(w http.ResponseWriter, r *http.Request) {
streams, err := replays.GetStreams()
if err != nil {
log.Errorln(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
WriteResponse(w, streams)
}
// GetReplay will return a playable content for a given stream Id.
func GetReplay(w http.ResponseWriter, r *http.Request) {
pathComponents := strings.Split(r.URL.Path, "/")
if len(pathComponents) == 3 {
// Return the master playlist for the requested stream
streamId := pathComponents[2]
getReplayMasterPlaylist(streamId, w)
return
} else if len(pathComponents) == 4 {
// Return the media playlist for the requested stream and output config
streamId := pathComponents[2]
outputConfigId := pathComponents[3]
getReplayMediaPlaylist(streamId, outputConfigId, w)
return
}
BadRequestHandler(w, nil)
}
// getReplayMasterPlaylist will return a complete replay of a stream as a HLS playlist.
// /api/replay/{streamId}.
func getReplayMasterPlaylist(streamId string, w http.ResponseWriter) {
playlistGenerator := replays.NewPlaylistGenerator()
playlist, err := playlistGenerator.GenerateMasterPlaylistForStream(streamId)
if err != nil {
log.Println(err)
}
if playlist == nil {
w.WriteHeader(http.StatusNotFound)
return
}
w.Header().Add("Content-Type", "application/x-mpegURL")
if _, err := w.Write(playlist.Encode().Bytes()); err != nil {
log.Errorln(err)
return
}
}
// getReplayMediaPlaylist will return a media playlist for a given stream.
// /api/replay/{streamId}/{outputConfigId}.
func getReplayMediaPlaylist(streamId, outputConfigId string, w http.ResponseWriter) {
playlistGenerator := replays.NewPlaylistGenerator()
playlist, err := playlistGenerator.GenerateMediaPlaylistForStreamAndConfiguration(streamId, outputConfigId)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
w.Header().Add("Content-Type", "application/x-mpegURL")
if _, err := w.Write(playlist.Encode().Bytes()); err != nil {
log.Errorln(err)
return
}
}

View File

@ -99,8 +99,11 @@ func createInitialOfflineState() error {
func transitionToOfflineVideoStreamContent() {
log.Traceln("Firing transcoder with offline stream state")
_transcoder := transcoder.NewTranscoder()
_transcoder.SetIdentifier("offline")
streamId := "offline"
_storage.SetStreamId(streamId)
handler.SetStreamId(streamId)
_transcoder := transcoder.NewTranscoder(streamId)
_transcoder.SetLatencyLevel(models.GetLatencyLevel(4))
_transcoder.SetIsEvent(true)
@ -127,7 +130,7 @@ func resetDirectories() {
log.Trace("Resetting file directories to a clean slate.")
// Wipe hls data directory
utils.CleanupDirectory(config.HLSStoragePath)
utils.CleanupDirectory(config.HLSStoragePath, config.EnableRecordingFeatures)
// Remove the previous thumbnail
logo := data.GetLogoPath()

View File

@ -76,6 +76,7 @@ func SetupPersistence(file string) error {
createWebhooksTable()
createUsersTable(db)
createAccessTokenTable(db)
createRecordingTables(db)
if _, err := db.Exec(`CREATE TABLE IF NOT EXISTS config (
"key" string NOT NULL PRIMARY KEY,

44
core/data/recording.go Normal file
View File

@ -0,0 +1,44 @@
package data
import "database/sql"
func createRecordingTables(db *sql.DB) {
createSegmentsTableSQL := `CREATE TABLE IF NOT EXISTS video_segments (
"id" string NOT NULL,
"stream_id" string NOT NULL,
"output_configuration_id" string NOT NULL,
"path" TEXT NOT NULL,
"timestamp" DATE DEFAULT CURRENT_TIMESTAMP NOT NULL,
PRIMARY KEY (id)
);CREATE INDEX video_segments_stream_id ON video_segments (stream_id);CREATE INDEX video_segments_stream_id_timestamp ON video_segments (stream_id,timestamp);`
createVideoOutputConfigsTableSQL := `CREATE TABLE IF NOT EXISTS video_segment_output_configuration (
"id" string NOT NULL,
"variant_id" string NOT NULL,
"name" string NOT NULL,
"stream_id" string NOT NULL,
"segment_duration" INTEGER NOT NULL,
"bitrate" INTEGER NOT NULL,
"framerate" INTEGER NOT NULL,
"resolution_width" INTEGER,
"resolution_height" INTEGER,
"timestamp" DATE DEFAULT CURRENT_TIMESTAMP NOT NULL,
PRIMARY KEY (id)
);CREATE INDEX video_segment_output_configuration_stream_id ON video_segment_output_configuration (stream_id);`
createVideoStreamsTableSQL := `CREATE TABLE IF NOT EXISTS streams (
"id" string NOT NULL,
"stream_title" TEXT,
"start_time" DATE NOT NULL,
"end_time" DATE,
PRIMARY KEY (id)
);
CREATE INDEX streams_id ON streams (id);
CREATE INDEX streams_start_time ON streams (start_time);
CREATE INDEX streams_start_end_time ON streams (start_time,end_time);
`
MustExec(createSegmentsTableSQL, db)
MustExec(createVideoOutputConfigsTableSQL, db)
MustExec(createVideoStreamsTableSQL, db)
}

View File

@ -56,7 +56,7 @@ func makeVariantIndexOffline(index int, offlineFilePath string, offlineFilename
log.Warnln(err)
}
if _, err := _storage.Save(segmentFilePath, 0); err != nil {
if _, err := _storage.Save(segmentFilePath, segmentFilePath, 0); err != nil {
log.Warnln(err)
}
@ -65,7 +65,7 @@ func makeVariantIndexOffline(index int, offlineFilePath string, offlineFilename
} else {
createEmptyOfflinePlaylist(playlistFilePath, offlineFilename)
}
if _, err := _storage.Save(playlistFilePath, 0); err != nil {
if _, err := _storage.Save(playlistFilePath, playlistFilePath, 0); err != nil {
log.Warnln(err)
}
}

View File

@ -10,31 +10,51 @@ import (
"github.com/owncast/owncast/config"
"github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/utils"
)
// LocalStorage represents an instance of the local storage provider for HLS video.
type LocalStorage struct{}
type LocalStorage struct {
streamID string
}
// NewLocalStorage returns a new LocalStorage instance.
func NewLocalStorage() *LocalStorage {
return &LocalStorage{}
}
// SetStreamId sets the stream id for this storage provider.
func (s *LocalStorage) SetStreamId(streamID string) {
s.streamID = streamID
}
// Setup configures this storage provider.
func (s *LocalStorage) Setup() error {
return nil
}
// SegmentWritten is called when a single segment of video is written.
func (s *LocalStorage) SegmentWritten(localFilePath string) {
if _, err := s.Save(localFilePath, 0); err != nil {
log.Warnln(err)
func (s *LocalStorage) SegmentWritten(localFilePath string) (string, int, error) {
if s.streamID == "" {
log.Fatalln("stream id must be set when handling video segments")
}
destinationPath, err := s.Save(localFilePath, localFilePath, 0)
if err != nil {
log.Warnln(err)
return "", 0, err
}
return destinationPath, 0, nil
}
// VariantPlaylistWritten is called when a variant hls playlist is written.
func (s *LocalStorage) VariantPlaylistWritten(localFilePath string) {
if _, err := s.Save(localFilePath, 0); err != nil {
if s.streamID == "" {
log.Fatalln("stream id must be set when handling video playlists")
}
if _, err := s.Save(localFilePath, localFilePath, 0); err != nil {
log.Errorln(err)
return
}
@ -42,17 +62,34 @@ func (s *LocalStorage) VariantPlaylistWritten(localFilePath string) {
// MasterPlaylistWritten is called when the master hls playlist is written.
func (s *LocalStorage) MasterPlaylistWritten(localFilePath string) {
if _, err := s.Save(localFilePath, 0); err != nil {
log.Warnln(err)
if s.streamID == "" {
log.Fatalln("stream id must be set when handling video playlists")
}
masterPlaylistDestinationLocation := filepath.Join(config.HLSStoragePath, "/stream.m3u8")
if err := rewriteLocalPlaylist(localFilePath, s.streamID, masterPlaylistDestinationLocation); err != nil {
log.Errorln(err)
return
}
}
// Save will save a local filepath using the storage provider.
func (s *LocalStorage) Save(filePath string, retryCount int) (string, error) {
return filePath, nil
func (s *LocalStorage) Save(filePath, destinationPath string, retryCount int) (string, error) {
if filePath != destinationPath {
if err := utils.Move(filePath, destinationPath); err != nil {
return "", errors.Wrap(err, "unable to move file")
}
}
return destinationPath, nil
}
func (s *LocalStorage) Cleanup() error {
// If we're recording, don't perform the cleanup.
if config.EnableRecordingFeatures {
return nil
}
// Determine how many files we should keep on disk
maxNumber := data.GetStreamLatencyLevel().SegmentCount
buffer := 10
@ -84,6 +121,10 @@ func (s *LocalStorage) Cleanup() error {
return nil
}
func (s *LocalStorage) GetRemoteDestinationPathFromLocalFilePath(localFilePath string) string {
return localFilePath
}
func getAllFilesRecursive(baseDirectory string) (map[string][]os.FileInfo, error) {
files := make(map[string][]os.FileInfo)

View File

@ -13,7 +13,7 @@ import (
)
// rewriteRemotePlaylist will take a local playlist and rewrite it to have absolute URLs to remote locations.
func rewriteRemotePlaylist(localFilePath, remoteServingEndpoint, pathPrefix string) error {
func rewriteRemotePlaylist(localFilePath, remoteServingEndpoint string) error {
f, err := os.Open(localFilePath) // nolint
if err != nil {
log.Fatalln(err)
@ -25,14 +25,7 @@ func rewriteRemotePlaylist(localFilePath, remoteServingEndpoint, pathPrefix stri
}
for _, item := range p.Variants {
// Determine the final path to this playlist.
var finalPath string
if pathPrefix != "" {
finalPath = filepath.Join(pathPrefix, "/hls")
} else {
finalPath = "/hls"
}
item.URI = remoteServingEndpoint + filepath.Join(finalPath, item.URI)
item.URI = remoteServingEndpoint + filepath.Join("/hls", item.URI)
}
publicPath := filepath.Join(config.HLSStoragePath, filepath.Base(localFilePath))
@ -41,3 +34,29 @@ func rewriteRemotePlaylist(localFilePath, remoteServingEndpoint, pathPrefix stri
return playlist.WritePlaylist(newPlaylist, publicPath)
}
// rewriteLocalPlaylist will take a local master playlist and rewrite it to
// refer to the path that includes the stream ID.
func rewriteLocalPlaylist(localFilePath, streamID, destinationPath string) error {
f, err := os.Open(localFilePath) // nolint
if err != nil {
log.Fatalln(err)
}
p := m3u8.NewMasterPlaylist()
if err := p.DecodeFrom(bufio.NewReader(f), false); err != nil {
log.Warnln(err)
}
if streamID == "" {
log.Fatalln("stream id must be set when rewriting playlist contents")
}
for _, item := range p.Variants {
item.URI = filepath.Join("/hls", streamID, item.URI)
}
newPlaylist := p.String()
return playlist.WritePlaylist(newPlaylist, destinationPath)
}

View File

@ -10,6 +10,7 @@ import (
"strings"
"time"
"github.com/owncast/owncast/config"
"github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/utils"
"github.com/pkg/errors"
@ -20,12 +21,11 @@ import (
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/owncast/owncast/config"
)
// S3Storage is the s3 implementation of a storage provider.
type S3Storage struct {
streamId string
sess *session.Session
s3Client *s3.S3
host string
@ -37,7 +37,6 @@ type S3Storage struct {
s3AccessKey string
s3Secret string
s3ACL string
s3PathPrefix string
s3ForcePathStyle bool
// If we try to upload a playlist but it is not yet on disk
@ -54,6 +53,11 @@ func NewS3Storage() *S3Storage {
}
}
// SetStreamID sets the stream id for this storage provider.
func (s *S3Storage) SetStreamId(streamId string) {
s.streamId = streamId
}
// Setup sets up the s3 storage for saving the video to s3.
func (s *S3Storage) Setup() error {
log.Trace("Setting up S3 for external storage of video...")
@ -74,7 +78,6 @@ func (s *S3Storage) Setup() error {
s.s3AccessKey = s3Config.AccessKey
s.s3Secret = s3Config.Secret
s.s3ACL = s3Config.ACL
s.s3PathPrefix = s3Config.PathPrefix
s.s3ForcePathStyle = s3Config.ForcePathStyle
s.sess = s.connectAWS()
@ -86,16 +89,19 @@ func (s *S3Storage) Setup() error {
}
// SegmentWritten is called when a single segment of video is written.
func (s *S3Storage) SegmentWritten(localFilePath string) {
func (s *S3Storage) SegmentWritten(localFilePath string) (string, int, error) {
index := utils.GetIndexFromFilePath(localFilePath)
performanceMonitorKey := "s3upload-" + index
utils.StartPerformanceMonitor(performanceMonitorKey)
// Upload the segment
if _, err := s.Save(localFilePath, 0); err != nil {
remoteDestinationPath := s.GetRemoteDestinationPathFromLocalFilePath(localFilePath)
remotePath, err := s.Save(localFilePath, remoteDestinationPath, 0)
if err != nil {
log.Errorln(err)
return
return "", 0, err
}
averagePerformance := utils.GetAveragePerformance(performanceMonitorKey)
// Warn the user about long-running save operations
@ -109,14 +115,16 @@ func (s *S3Storage) SegmentWritten(localFilePath string) {
// so the segments and the HLS playlist referencing
// them are in sync.
playlistPath := filepath.Join(filepath.Dir(localFilePath), "stream.m3u8")
if _, err := s.Save(playlistPath, 0); err != nil {
playlistRemoteDestinationPath := s.GetRemoteDestinationPathFromLocalFilePath(playlistPath)
if _, err := s.Save(playlistPath, playlistRemoteDestinationPath, 0); err != nil {
s.queuedPlaylistUpdates[playlistPath] = playlistPath
if pErr, ok := err.(*os.PathError); ok {
log.Debugln(pErr.Path, "does not yet exist locally when trying to upload to S3 storage.")
return
return remotePath, 0, pErr.Err
}
}
return remotePath, 0, nil
}
// VariantPlaylistWritten is called when a variant hls playlist is written.
@ -125,7 +133,8 @@ func (s *S3Storage) VariantPlaylistWritten(localFilePath string) {
// to make sure we're not referring to files in a playlist that don't
// yet exist. See SegmentWritten.
if _, ok := s.queuedPlaylistUpdates[localFilePath]; ok {
if _, err := s.Save(localFilePath, 0); err != nil {
remoteDestinationPath := s.GetRemoteDestinationPathFromLocalFilePath(localFilePath)
if _, err := s.Save(localFilePath, remoteDestinationPath, 0); err != nil {
log.Errorln(err)
s.queuedPlaylistUpdates[localFilePath] = localFilePath
}
@ -136,41 +145,30 @@ func (s *S3Storage) VariantPlaylistWritten(localFilePath string) {
// MasterPlaylistWritten is called when the master hls playlist is written.
func (s *S3Storage) MasterPlaylistWritten(localFilePath string) {
// Rewrite the playlist to use absolute remote S3 URLs
if err := rewriteRemotePlaylist(localFilePath, s.host, s.s3PathPrefix); err != nil {
if err := rewriteRemotePlaylist(localFilePath, s.host); err != nil {
log.Warnln(err)
}
}
// Save saves the file to the s3 bucket.
func (s *S3Storage) Save(filePath string, retryCount int) (string, error) {
file, err := os.Open(filePath) // nolint
func (s *S3Storage) Save(localFilePath, remoteDestinationPath string, retryCount int) (string, error) {
file, err := os.Open(localFilePath) // nolint
if err != nil {
return "", err
}
defer file.Close()
// Convert the local path to the variant/file path by stripping the local storage location.
normalizedPath := strings.TrimPrefix(filePath, config.HLSStoragePath)
// Build the remote path by adding the "hls" path prefix.
remotePath := strings.Join([]string{"hls", normalizedPath}, "")
// If a custom path prefix is set prepend it.
if s.s3PathPrefix != "" {
prefix := strings.TrimPrefix(s.s3PathPrefix, "/")
remotePath = strings.Join([]string{prefix, remotePath}, "/")
}
maxAgeSeconds := utils.GetCacheDurationSecondsForPath(filePath)
maxAgeSeconds := utils.GetCacheDurationSecondsForPath(localFilePath)
cacheControlHeader := fmt.Sprintf("max-age=%d", maxAgeSeconds)
uploadInput := &s3manager.UploadInput{
Bucket: aws.String(s.s3Bucket), // Bucket to be used
Key: aws.String(remotePath), // Name of the file to be saved
Key: aws.String(remoteDestinationPath), // Name of the file to be saved
Body: file, // File
CacheControl: &cacheControlHeader,
}
if path.Ext(filePath) == ".m3u8" {
if path.Ext(localFilePath) == ".m3u8" {
noCacheHeader := "no-cache, no-store, must-revalidate"
contentType := "application/x-mpegURL"
@ -190,22 +188,27 @@ func (s *S3Storage) Save(filePath string, retryCount int) (string, error) {
log.Traceln("error uploading segment", err.Error())
if retryCount < 4 {
log.Traceln("Retrying...")
return s.Save(filePath, retryCount+1)
return s.Save(localFilePath, remoteDestinationPath, retryCount+1)
}
// Upload failure. Remove the local file.
s.removeLocalFile(filePath)
s.removeLocalFile(localFilePath)
return "", fmt.Errorf("Giving up uploading %s to object storage %s", filePath, s.s3Endpoint)
return "", fmt.Errorf("Giving up uploading %s to object storage %s", localFilePath, s.s3Endpoint)
}
// Upload success. Remove the local file.
s.removeLocalFile(filePath)
s.removeLocalFile(localFilePath)
return response.Location, nil
}
func (s *S3Storage) Cleanup() error {
// If we're recording, don't perform the cleanup.
if config.EnableRecordingFeatures {
return nil
}
// Determine how many files we should keep on S3 storage
maxNumber := data.GetStreamLatencyLevel().SegmentCount
buffer := 20
@ -329,6 +332,16 @@ func (s *S3Storage) retrieveAllVideoSegments() ([]s3object, error) {
return allObjects, nil
}
func (s *S3Storage) GetRemoteDestinationPathFromLocalFilePath(localFilePath string) string {
// Convert the local path to the variant/file path by stripping the local storage location.
normalizedPath := strings.TrimPrefix(localFilePath, config.HLSStoragePath)
// Build the remote path by adding the "hls" path prefix.
remoteDestionationPath := strings.Join([]string{"hls", normalizedPath}, "")
return remoteDestionationPath
}
type s3object struct {
key string
lastModified time.Time

View File

@ -3,9 +3,11 @@ package core
import (
"context"
"io"
"path/filepath"
"time"
log "github.com/sirupsen/logrus"
"github.com/teris-io/shortid"
"github.com/owncast/owncast/activitypub"
"github.com/owncast/owncast/config"
@ -39,36 +41,34 @@ func setStreamAsConnected(rtmpOut *io.PipeReader) {
_stats.LastConnectTime = &now
_stats.SessionMaxViewerCount = 0
streamId := shortid.MustGenerate()
fileWriter.SetStreamID(streamId)
_currentBroadcast = &models.CurrentBroadcast{
StreamID: streamId,
LatencyLevel: data.GetStreamLatencyLevel(),
OutputSettings: data.GetStreamOutputVariants(),
}
StopOfflineCleanupTimer()
if !config.EnableRecordingFeatures {
startOnlineCleanupTimer()
}
if _yp != nil {
go _yp.Start()
}
segmentPath := config.HLSStoragePath
if err := setupStorage(); err != nil {
log.Fatalln("failed to setup the storage", err)
}
go func() {
_transcoder = transcoder.NewTranscoder()
_transcoder.TranscoderCompleted = func(error) {
SetStreamAsDisconnected()
_transcoder = nil
_currentBroadcast = nil
}
_transcoder.SetStdin(rtmpOut)
_transcoder.Start(true)
}()
setupVideoComponentsForId(streamId)
setupLiveTranscoderForId(streamId, rtmpOut)
go webhooks.SendStreamStatusEvent(models.StreamStarted)
segmentPath := filepath.Join(config.HLSStoragePath, streamId)
transcoder.StartThumbnailGenerator(segmentPath, data.FindHighestVideoQualityIndex(_currentBroadcast.OutputSettings))
_ = chat.SendSystemAction("Stay tuned, the stream is **starting**!", true)
@ -100,6 +100,7 @@ func SetStreamAsDisconnected() {
return
}
handler.StreamEnded()
transcoder.StopThumbnailGenerator()
rtmp.Disconnect()

View File

@ -26,6 +26,7 @@ type FileWriterReceiverServiceCallback interface {
// as it can send HTTP requests to this service with the results.
type FileWriterReceiverService struct {
callbacks FileWriterReceiverServiceCallback
streamId string
}
// SetupFileWriterReceiverService will start listening for transcoder responses.
@ -53,6 +54,10 @@ func (s *FileWriterReceiverService) SetupFileWriterReceiverService(callbacks Fil
}()
}
func (s *FileWriterReceiverService) SetStreamID(streamID string) {
s.streamId = streamID
}
func (s *FileWriterReceiverService) uploadHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "PUT" {
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
@ -79,7 +84,7 @@ func (s *FileWriterReceiverService) uploadHandler(w http.ResponseWriter, r *http
}
func (s *FileWriterReceiverService) fileWritten(path string) {
if utils.GetRelativePathFromAbsolutePath(path) == "hls/stream.m3u8" {
if utils.GetRelativePathFromAbsolutePath(path) == s.streamId+"/stream.m3u8" {
s.callbacks.MasterPlaylistWritten(path)
} else if strings.HasSuffix(path, ".ts") {
s.callbacks.SegmentWritten(path)

View File

@ -1,17 +1,43 @@
package transcoder
import (
"github.com/owncast/owncast/config"
"github.com/owncast/owncast/models"
"github.com/owncast/owncast/replays"
log "github.com/sirupsen/logrus"
)
// HLSHandler gets told about available HLS playlists and segments.
type HLSHandler struct {
Storage models.StorageProvider
Recorder *replays.HLSRecorder
}
// StreamEnded is called when a stream is ended so the end time can be noted
// in the stream's metadata.
func (h *HLSHandler) StreamEnded() {
if config.EnableRecordingFeatures {
h.Recorder.StreamEnded()
}
}
func (h *HLSHandler) SetStreamId(streamId string) {
h.Storage.SetStreamId(streamId)
h.Recorder = replays.NewRecording(streamId)
}
// SegmentWritten is fired when a HLS segment is written to disk.
func (h *HLSHandler) SegmentWritten(localFilePath string) {
h.Storage.SegmentWritten(localFilePath)
remotePath, _, err := h.Storage.SegmentWritten(localFilePath)
if err != nil {
log.Errorln(err)
}
if h.Recorder != nil {
h.Recorder.SegmentWritten(remotePath)
} else {
log.Debugln("No HLS recorder available to notify of segment written.")
}
}
// VariantPlaylistWritten is fired when a HLS variant playlist is written to disk.

View File

@ -27,9 +27,9 @@ type Transcoder struct {
stdin *io.PipeReader
TranscoderCompleted func(error)
StreamID string
playlistOutputPath string
ffmpegPath string
segmentIdentifier string
internalListenerPort string
input string
segmentOutputPath string
@ -118,7 +118,9 @@ func (t *Transcoder) Start(shouldLog bool) {
if shouldLog {
log.Infof("Processing video using codec %s with %d output qualities configured.", t.codec.DisplayName(), len(t.variants))
}
createVariantDirectories()
// Make directory for this stream.
createVariantDirectories(t.StreamID)
if config.EnableDebugFeatures {
log.Println(command)
@ -181,8 +183,8 @@ func (t *Transcoder) getString() string {
hlsOptionFlags = append(hlsOptionFlags, "append_list")
}
if t.segmentIdentifier == "" {
t.segmentIdentifier = shortid.MustGenerate()
if t.StreamID == "" {
t.StreamID = shortid.MustGenerate()
}
hlsEventString := ""
@ -197,6 +199,7 @@ func (t *Transcoder) getString() string {
if len(hlsOptionFlags) > 0 {
hlsOptionsString = "-hls_flags " + strings.Join(hlsOptionFlags, "+")
}
ffmpegFlags := []string{
fmt.Sprintf(`FFREPORT=file="%s":level=32`, logging.GetTranscoderLogFilePath()),
t.ffmpegPath,
@ -226,11 +229,11 @@ func (t *Transcoder) getString() string {
// Filenames
"-master_pl_name", "stream.m3u8",
"-hls_segment_filename", localListenerAddress + "/%v/stream-" + t.segmentIdentifier + "-%d.ts", // Send HLS segments back to us over HTTP
"-hls_segment_filename", localListenerAddress + "/" + t.StreamID + "/%v/stream-" + t.StreamID + "-%d.ts", // Send HLS segments back to us over HTTP
"-max_muxing_queue_size", "400", // Workaround for Too many packets error: https://trac.ffmpeg.org/ticket/6375?cversion=0
"-method PUT", // HLS results sent back to us will be over PUTs
localListenerAddress + "/%v/stream.m3u8", // Send HLS playlists back to us over HTTP
localListenerAddress + "/" + t.StreamID + "/%v/stream.m3u8", // Send HLS playlists back to us over HTTP
}
return strings.Join(ffmpegFlags, " ")
@ -272,10 +275,11 @@ func getVariantFromConfigQuality(quality models.StreamOutputVariant, index int)
}
// NewTranscoder will return a new Transcoder, populated by the config.
func NewTranscoder() *Transcoder {
func NewTranscoder(streamID string) *Transcoder {
ffmpegPath := utils.ValidatedFfmpegPath(data.GetFfMpegPath())
transcoder := new(Transcoder)
transcoder.StreamID = streamID
transcoder.ffmpegPath = ffmpegPath
transcoder.internalListenerPort = config.InternalHLSListenerPort
@ -438,9 +442,9 @@ func (t *Transcoder) SetOutputPath(output string) {
t.segmentOutputPath = output
}
// SetIdentifier enables appending a unique identifier to segment file name.
func (t *Transcoder) SetIdentifier(output string) {
t.segmentIdentifier = output
// SetStreamID sets a unique identifier for the currently transcoding stream.
func (t *Transcoder) SetStreamID(id string) {
t.StreamID = id
}
// SetInternalHTTPPort will set the port to be used for internal communication.

View File

@ -15,7 +15,7 @@ func TestFFmpegNvencCommand(t *testing.T) {
transcoder.ffmpegPath = filepath.Join("fake", "path", "ffmpeg")
transcoder.SetInput("fakecontent.flv")
transcoder.SetOutputPath("fakeOutput")
transcoder.SetIdentifier("jdoieGg")
transcoder.SetStreamID("jdFsdfzGg")
transcoder.SetInternalHTTPPort("8123")
transcoder.SetCodec(codec.Name())
transcoder.currentLatencyLevel = latencyLevel
@ -42,7 +42,7 @@ func TestFFmpegNvencCommand(t *testing.T) {
cmd := transcoder.getString()
expectedLogPath := filepath.Join("data", "logs", "transcoder.log")
expected := `FFREPORT=file="` + expectedLogPath + `":level=32 ` + transcoder.ffmpegPath + ` -hide_banner -loglevel warning -hwaccel cuda -fflags +genpts -flags +cgop -i fakecontent.flv -map v:0 -c:v:0 h264_nvenc -b:v:0 1008k -maxrate:v:0 1088k -g:v:0 90 -keyint_min:v:0 90 -r:v:0 30 -tune:v:0 ll -map a:0? -c:a:0 copy -preset p3 -map v:0 -c:v:1 h264_nvenc -b:v:1 3308k -maxrate:v:1 3572k -g:v:1 72 -keyint_min:v:1 72 -r:v:1 24 -tune:v:1 ll -map a:0? -c:a:1 copy -preset p5 -map v:0 -c:v:2 copy -map a:0? -c:a:2 copy -preset p1 -var_stream_map "v:0,a:0 v:1,a:1 v:2,a:2 " -f hls -hls_time 3 -hls_list_size 10 -hls_flags program_date_time+independent_segments+omit_endlist -segment_format_options mpegts_flags=mpegts_copyts=1 -pix_fmt yuv420p -sc_threshold 0 -master_pl_name stream.m3u8 -hls_segment_filename http://127.0.0.1:8123/%v/stream-jdoieGg-%d.ts -max_muxing_queue_size 400 -method PUT http://127.0.0.1:8123/%v/stream.m3u8`
expected := `FFREPORT=file="` + expectedLogPath + `":level=32 ` + transcoder.ffmpegPath + ` -hide_banner -loglevel warning -hwaccel cuda -fflags +genpts -flags +cgop -i fakecontent.flv -map v:0 -c:v:0 h264_nvenc -b:v:0 1008k -maxrate:v:0 1088k -g:v:0 90 -keyint_min:v:0 90 -r:v:0 30 -tune:v:0 ll -map a:0? -c:a:0 copy -preset p3 -map v:0 -c:v:1 h264_nvenc -b:v:1 3308k -maxrate:v:1 3572k -g:v:1 72 -keyint_min:v:1 72 -r:v:1 24 -tune:v:1 ll -map a:0? -c:a:1 copy -preset p5 -map v:0 -c:v:2 copy -map a:0? -c:a:2 copy -preset p1 -var_stream_map "v:0,a:0 v:1,a:1 v:2,a:2 " -f hls -hls_time 3 -hls_list_size 10 -hls_flags program_date_time+independent_segments+omit_endlist -segment_format_options mpegts_flags=mpegts_copyts=1 -pix_fmt yuv420p -sc_threshold 0 -master_pl_name stream.m3u8 -hls_segment_filename http://127.0.0.1:8123/jdFsdfzGg/%v/stream-jdFsdfzGg-%d.ts -max_muxing_queue_size 400 -method PUT http://127.0.0.1:8123/jdFsdfzGg/%v/stream.m3u8`
if cmd != expected {
t.Errorf("ffmpeg command does not match expected.\nGot %s\n, want: %s", cmd, expected)

View File

@ -15,7 +15,7 @@ func TestFFmpegOmxCommand(t *testing.T) {
transcoder.ffmpegPath = filepath.Join("fake", "path", "ffmpeg")
transcoder.SetInput("fakecontent.flv")
transcoder.SetOutputPath("fakeOutput")
transcoder.SetIdentifier("jdFsdfzGg")
transcoder.SetStreamID("jdFsdfzGg")
transcoder.SetInternalHTTPPort("8123")
transcoder.SetCodec(codec.Name())
transcoder.currentLatencyLevel = latencyLevel
@ -42,7 +42,7 @@ func TestFFmpegOmxCommand(t *testing.T) {
cmd := transcoder.getString()
expectedLogPath := filepath.Join("data", "logs", "transcoder.log")
expected := `FFREPORT=file="` + expectedLogPath + `":level=32 ` + transcoder.ffmpegPath + ` -hide_banner -loglevel warning -fflags +genpts -flags +cgop -i fakecontent.flv -map v:0 -c:v:0 h264_omx -b:v:0 1008k -maxrate:v:0 1088k -g:v:0 90 -keyint_min:v:0 90 -r:v:0 30 -map a:0? -c:a:0 copy -preset veryfast -map v:0 -c:v:1 h264_omx -b:v:1 3308k -maxrate:v:1 3572k -g:v:1 72 -keyint_min:v:1 72 -r:v:1 24 -map a:0? -c:a:1 copy -preset fast -map v:0 -c:v:2 copy -map a:0? -c:a:2 copy -preset ultrafast -var_stream_map "v:0,a:0 v:1,a:1 v:2,a:2 " -f hls -hls_time 3 -hls_list_size 10 -hls_flags program_date_time+independent_segments+omit_endlist -segment_format_options mpegts_flags=mpegts_copyts=1 -tune zerolatency -pix_fmt yuv420p -sc_threshold 0 -master_pl_name stream.m3u8 -hls_segment_filename http://127.0.0.1:8123/%v/stream-jdFsdfzGg-%d.ts -max_muxing_queue_size 400 -method PUT http://127.0.0.1:8123/%v/stream.m3u8`
expected := `FFREPORT=file="` + expectedLogPath + `":level=32 ` + transcoder.ffmpegPath + ` -hide_banner -loglevel warning -fflags +genpts -flags +cgop -i fakecontent.flv -map v:0 -c:v:0 h264_omx -b:v:0 1008k -maxrate:v:0 1088k -g:v:0 90 -keyint_min:v:0 90 -r:v:0 30 -map a:0? -c:a:0 copy -preset veryfast -map v:0 -c:v:1 h264_omx -b:v:1 3308k -maxrate:v:1 3572k -g:v:1 72 -keyint_min:v:1 72 -r:v:1 24 -map a:0? -c:a:1 copy -preset fast -map v:0 -c:v:2 copy -map a:0? -c:a:2 copy -preset ultrafast -var_stream_map "v:0,a:0 v:1,a:1 v:2,a:2 " -f hls -hls_time 3 -hls_list_size 10 -hls_flags program_date_time+independent_segments+omit_endlist -segment_format_options mpegts_flags=mpegts_copyts=1 -tune zerolatency -pix_fmt yuv420p -sc_threshold 0 -master_pl_name stream.m3u8 -hls_segment_filename http://127.0.0.1:8123/jdFsdfzGg/%v/stream-jdFsdfzGg-%d.ts -max_muxing_queue_size 400 -method PUT http://127.0.0.1:8123/jdFsdfzGg/%v/stream.m3u8`
if cmd != expected {
t.Errorf("ffmpeg command does not match expected.\nGot %s\n, want: %s", cmd, expected)

View File

@ -15,7 +15,7 @@ func TestFFmpegVaapiCommand(t *testing.T) {
transcoder.ffmpegPath = filepath.Join("fake", "path", "ffmpeg")
transcoder.SetInput("fakecontent.flv")
transcoder.SetOutputPath("fakeOutput")
transcoder.SetIdentifier("jdofFGg")
transcoder.SetStreamID("jdFsdfzGg")
transcoder.SetInternalHTTPPort("8123")
transcoder.SetCodec(codec.Name())
transcoder.currentLatencyLevel = latencyLevel
@ -42,7 +42,7 @@ func TestFFmpegVaapiCommand(t *testing.T) {
cmd := transcoder.getString()
expectedLogPath := filepath.Join("data", "logs", "transcoder.log")
expected := `FFREPORT=file="` + expectedLogPath + `":level=32 ` + transcoder.ffmpegPath + ` -hide_banner -loglevel warning -hwaccel vaapi -hwaccel_output_format vaapi -vaapi_device /dev/dri/renderD128 -fflags +genpts -flags +cgop -i fakecontent.flv -map v:0 -c:v:0 h264_vaapi -b:v:0 1008k -maxrate:v:0 1088k -g:v:0 90 -keyint_min:v:0 90 -r:v:0 30 -map a:0? -c:a:0 copy -preset veryfast -map v:0 -c:v:1 h264_vaapi -b:v:1 3308k -maxrate:v:1 3572k -g:v:1 72 -keyint_min:v:1 72 -r:v:1 24 -map a:0? -c:a:1 copy -preset fast -map v:0 -c:v:2 copy -map a:0? -c:a:2 copy -preset ultrafast -var_stream_map "v:0,a:0 v:1,a:1 v:2,a:2 " -f hls -hls_time 3 -hls_list_size 10 -hls_flags program_date_time+independent_segments+omit_endlist -segment_format_options mpegts_flags=mpegts_copyts=1 -pix_fmt vaapi_vld -sc_threshold 0 -master_pl_name stream.m3u8 -hls_segment_filename http://127.0.0.1:8123/%v/stream-jdofFGg-%d.ts -max_muxing_queue_size 400 -method PUT http://127.0.0.1:8123/%v/stream.m3u8`
expected := `FFREPORT=file="` + expectedLogPath + `":level=32 ` + transcoder.ffmpegPath + ` -hide_banner -loglevel warning -hwaccel vaapi -hwaccel_output_format vaapi -vaapi_device /dev/dri/renderD128 -fflags +genpts -flags +cgop -i fakecontent.flv -map v:0 -c:v:0 h264_vaapi -b:v:0 1008k -maxrate:v:0 1088k -g:v:0 90 -keyint_min:v:0 90 -r:v:0 30 -map a:0? -c:a:0 copy -preset veryfast -map v:0 -c:v:1 h264_vaapi -b:v:1 3308k -maxrate:v:1 3572k -g:v:1 72 -keyint_min:v:1 72 -r:v:1 24 -map a:0? -c:a:1 copy -preset fast -map v:0 -c:v:2 copy -map a:0? -c:a:2 copy -preset ultrafast -var_stream_map "v:0,a:0 v:1,a:1 v:2,a:2 " -f hls -hls_time 3 -hls_list_size 10 -hls_flags program_date_time+independent_segments+omit_endlist -segment_format_options mpegts_flags=mpegts_copyts=1 -pix_fmt vaapi_vld -sc_threshold 0 -master_pl_name stream.m3u8 -hls_segment_filename http://127.0.0.1:8123/jdFsdfzGg/%v/stream-jdFsdfzGg-%d.ts -max_muxing_queue_size 400 -method PUT http://127.0.0.1:8123/jdFsdfzGg/%v/stream.m3u8`
if cmd != expected {
t.Errorf("ffmpeg command does not match expected.\nGot %s\n, want: %s", cmd, expected)

View File

@ -15,7 +15,7 @@ func TestFFmpegVideoToolboxCommand(t *testing.T) {
transcoder.ffmpegPath = filepath.Join("fake", "path", "ffmpeg")
transcoder.SetInput("fakecontent.flv")
transcoder.SetOutputPath("fakeOutput")
transcoder.SetIdentifier("jdFsdfzGg")
transcoder.SetStreamID("jdFsdfzGg")
transcoder.SetInternalHTTPPort("8123")
transcoder.SetCodec(codec.Name())
transcoder.currentLatencyLevel = latencyLevel
@ -42,7 +42,7 @@ func TestFFmpegVideoToolboxCommand(t *testing.T) {
cmd := transcoder.getString()
expectedLogPath := filepath.Join("data", "logs", "transcoder.log")
expected := `FFREPORT=file="` + expectedLogPath + `":level=32 ` + transcoder.ffmpegPath + ` -hide_banner -loglevel warning -fflags +genpts -flags +cgop -i fakecontent.flv -map v:0 -c:v:0 h264_videotoolbox -b:v:0 1008k -maxrate:v:0 1088k -g:v:0 90 -keyint_min:v:0 90 -r:v:0 30 -realtime true -map a:0? -c:a:0 copy -preset veryfast -map v:0 -c:v:1 h264_videotoolbox -b:v:1 3308k -maxrate:v:1 3572k -g:v:1 72 -keyint_min:v:1 72 -r:v:1 24 -map a:0? -c:a:1 copy -preset fast -map v:0 -c:v:2 copy -map a:0? -c:a:2 copy -preset ultrafast -var_stream_map "v:0,a:0 v:1,a:1 v:2,a:2 " -f hls -hls_time 3 -hls_list_size 10 -hls_flags program_date_time+independent_segments+omit_endlist -segment_format_options mpegts_flags=mpegts_copyts=1 -pix_fmt nv12 -sc_threshold 0 -master_pl_name stream.m3u8 -hls_segment_filename http://127.0.0.1:8123/%v/stream-jdFsdfzGg-%d.ts -max_muxing_queue_size 400 -method PUT http://127.0.0.1:8123/%v/stream.m3u8`
expected := `FFREPORT=file="` + expectedLogPath + `":level=32 ` + transcoder.ffmpegPath + ` -hide_banner -loglevel warning -fflags +genpts -flags +cgop -i fakecontent.flv -map v:0 -c:v:0 h264_videotoolbox -b:v:0 1008k -maxrate:v:0 1088k -g:v:0 90 -keyint_min:v:0 90 -r:v:0 30 -realtime true -map a:0? -c:a:0 copy -preset veryfast -map v:0 -c:v:1 h264_videotoolbox -b:v:1 3308k -maxrate:v:1 3572k -g:v:1 72 -keyint_min:v:1 72 -r:v:1 24 -map a:0? -c:a:1 copy -preset fast -map v:0 -c:v:2 copy -map a:0? -c:a:2 copy -preset ultrafast -var_stream_map "v:0,a:0 v:1,a:1 v:2,a:2 " -f hls -hls_time 3 -hls_list_size 10 -hls_flags program_date_time+independent_segments+omit_endlist -segment_format_options mpegts_flags=mpegts_copyts=1 -pix_fmt nv12 -sc_threshold 0 -master_pl_name stream.m3u8 -hls_segment_filename http://127.0.0.1:8123/jdFsdfzGg/%v/stream-jdFsdfzGg-%d.ts -max_muxing_queue_size 400 -method PUT http://127.0.0.1:8123/jdFsdfzGg/%v/stream.m3u8`
if cmd != expected {
t.Errorf("ffmpeg command does not match expected.\nGot %s\n, want: %s", cmd, expected)

View File

@ -15,7 +15,7 @@ func TestFFmpegx264Command(t *testing.T) {
transcoder.ffmpegPath = filepath.Join("fake", "path", "ffmpeg")
transcoder.SetInput("fakecontent.flv")
transcoder.SetOutputPath("fakeOutput")
transcoder.SetIdentifier("jdofFGg")
transcoder.SetStreamID("jdFsdfzGg")
transcoder.SetInternalHTTPPort("8123")
transcoder.SetCodec(codec.Name())
transcoder.currentLatencyLevel = latencyLevel
@ -42,7 +42,7 @@ func TestFFmpegx264Command(t *testing.T) {
cmd := transcoder.getString()
expectedLogPath := filepath.Join("data", "logs", "transcoder.log")
expected := `FFREPORT=file="` + expectedLogPath + `":level=32 ` + transcoder.ffmpegPath + ` -hide_banner -loglevel warning -fflags +genpts -flags +cgop -i fakecontent.flv -map v:0 -c:v:0 libx264 -b:v:0 1008k -maxrate:v:0 1088k -g:v:0 90 -keyint_min:v:0 90 -r:v:0 30 -x264-params:v:0 "scenecut=0:open_gop=0" -bufsize:v:0 1088k -profile:v:0 high -map a:0? -c:a:0 copy -preset veryfast -map v:0 -c:v:1 libx264 -b:v:1 3308k -maxrate:v:1 3572k -g:v:1 72 -keyint_min:v:1 72 -r:v:1 24 -x264-params:v:1 "scenecut=0:open_gop=0" -bufsize:v:1 3572k -profile:v:1 high -map a:0? -c:a:1 copy -preset fast -map v:0 -c:v:2 copy -map a:0? -c:a:2 copy -preset ultrafast -var_stream_map "v:0,a:0 v:1,a:1 v:2,a:2 " -f hls -hls_time 3 -hls_list_size 10 -hls_flags program_date_time+independent_segments+omit_endlist -segment_format_options mpegts_flags=mpegts_copyts=1 -tune zerolatency -pix_fmt yuv420p -sc_threshold 0 -master_pl_name stream.m3u8 -hls_segment_filename http://127.0.0.1:8123/%v/stream-jdofFGg-%d.ts -max_muxing_queue_size 400 -method PUT http://127.0.0.1:8123/%v/stream.m3u8`
expected := `FFREPORT=file="` + expectedLogPath + `":level=32 ` + transcoder.ffmpegPath + ` -hide_banner -loglevel warning -fflags +genpts -flags +cgop -i fakecontent.flv -map v:0 -c:v:0 libx264 -b:v:0 1008k -maxrate:v:0 1088k -g:v:0 90 -keyint_min:v:0 90 -r:v:0 30 -x264-params:v:0 "scenecut=0:open_gop=0" -bufsize:v:0 1088k -profile:v:0 high -map a:0? -c:a:0 copy -preset veryfast -map v:0 -c:v:1 libx264 -b:v:1 3308k -maxrate:v:1 3572k -g:v:1 72 -keyint_min:v:1 72 -r:v:1 24 -x264-params:v:1 "scenecut=0:open_gop=0" -bufsize:v:1 3572k -profile:v:1 high -map a:0? -c:a:1 copy -preset fast -map v:0 -c:v:2 copy -map a:0? -c:a:2 copy -preset ultrafast -var_stream_map "v:0,a:0 v:1,a:1 v:2,a:2 " -f hls -hls_time 3 -hls_list_size 10 -hls_flags program_date_time+independent_segments+omit_endlist -segment_format_options mpegts_flags=mpegts_copyts=1 -tune zerolatency -pix_fmt yuv420p -sc_threshold 0 -master_pl_name stream.m3u8 -hls_segment_filename http://127.0.0.1:8123/jdFsdfzGg/%v/stream-jdFsdfzGg-%d.ts -max_muxing_queue_size 400 -method PUT http://127.0.0.1:8123/jdFsdfzGg/%v/stream.m3u8`
if cmd != expected {
t.Errorf("ffmpeg command does not match expected.\nGot %s\n, want: %s", cmd, expected)

View File

@ -13,8 +13,10 @@ import (
log "github.com/sirupsen/logrus"
)
var _lastTranscoderLogMessage = ""
var l = &sync.RWMutex{}
var (
_lastTranscoderLogMessage = ""
l = &sync.RWMutex{}
)
var errorMap = map[string]string{
"Unrecognized option 'vaapi_device'": "you are likely trying to utilize a vaapi codec, but your version of ffmpeg or your hardware doesn't support it. change your codec to libx264 and restart your stream",
@ -94,20 +96,20 @@ func handleTranscoderMessage(message string) {
_lastTranscoderLogMessage = message
}
func createVariantDirectories() {
func createVariantDirectories(streamID string) {
// Create private hls data dirs
utils.CleanupDirectory(config.HLSStoragePath)
utils.CleanupDirectory(config.HLSStoragePath, config.EnableRecordingFeatures)
if len(data.GetStreamOutputVariants()) != 0 {
for index := range data.GetStreamOutputVariants() {
if err := os.MkdirAll(path.Join(config.HLSStoragePath, strconv.Itoa(index)), 0750); err != nil {
if err := os.MkdirAll(path.Join(config.HLSStoragePath, streamID, strconv.Itoa(index)), 0o750); err != nil {
log.Fatalln(err)
}
}
} else {
dir := path.Join(config.HLSStoragePath, strconv.Itoa(0))
log.Traceln("Creating", dir)
if err := os.MkdirAll(dir, 0750); err != nil {
if err := os.MkdirAll(dir, 0o750); err != nil {
log.Fatalln(err)
}
}

26
core/video.go Normal file
View File

@ -0,0 +1,26 @@
package core
import (
"io"
"github.com/owncast/owncast/core/transcoder"
)
func setupVideoComponentsForId(streamId string) {
}
func setupLiveTranscoderForId(streamId string, rtmpOut *io.PipeReader) {
_storage.SetStreamId(streamId)
handler.SetStreamId(streamId)
go func() {
_transcoder = transcoder.NewTranscoder(streamId)
_transcoder.TranscoderCompleted = func(error) {
SetStreamAsDisconnected()
_transcoder = nil
_currentBroadcast = nil
}
_transcoder.SetStdin(rtmpOut)
_transcoder.Start(true)
}()
}

View File

@ -1,6 +1,6 @@
// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.15.0
// sqlc v1.19.1
package db

View File

@ -1,6 +1,6 @@
// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.15.0
// sqlc v1.19.1
package db
@ -72,6 +72,13 @@ type Notification struct {
CreatedAt sql.NullTime
}
type Stream struct {
ID string
StreamTitle sql.NullString
StartTime time.Time
EndTime sql.NullTime
}
type User struct {
ID string
DisplayName string
@ -91,3 +98,24 @@ type UserAccessToken struct {
UserID string
Timestamp time.Time
}
type VideoSegment struct {
ID string
StreamID string
OutputConfigurationID string
Path string
Timestamp time.Time
}
type VideoSegmentOutputConfiguration struct {
ID string
VariantID string
Name string
StreamID string
SegmentDuration int32
Bitrate int32
Framerate int32
ResolutionWidth sql.NullInt32
ResolutionHeight sql.NullInt32
Timestamp time.Time
}

View File

@ -108,3 +108,35 @@ UPDATE users SET display_name = $1, previous_names = previous_names || $2, namec
-- name: ChangeDisplayColor :exec
UPDATE users SET display_color = $1 WHERE id = $2;
-- Recording and clip related queries.
-- name: GetStreams :many
SELECT id, stream_title, start_time, end_time FROM streams ORDER BY start_time DESC;
-- name: GetStreamById :one
SELECT id, stream_title, start_time, end_time FROM streams WHERE id = $1 LIMIT 1;
-- name: GetOutputConfigurationsForStreamId :many
SELECT id, stream_id, variant_id, name, segment_duration, bitrate, framerate, resolution_width, resolution_height FROM video_segment_output_configuration WHERE stream_id = $1;
-- name: GetOutputConfigurationForId :one
SELECT id, stream_id, variant_id, name, segment_duration, bitrate, framerate, resolution_width, resolution_height FROM video_segment_output_configuration WHERE id = $1;
-- name: GetSegmentsForOutputId :many
SELECT id, stream_id, output_configuration_id, path, timestamp FROM video_segments WHERE output_configuration_id = $1 ORDER BY timestamp ASC;
-- name: GetSegmentsForOutputIdAndWindow :many
SELECT id, stream_id, output_configuration_id, timestamp FROM video_segments WHERE output_configuration_id = $1 AND timestamp >= $2 AND timestamp <= $3 ORDER BY timestamp ASC;
-- name: InsertStream :exec
INSERT INTO streams (id, stream_title, start_time, end_time) VALUES($1, $2, $3, $4);
-- name: InsertOutputConfiguration :exec
INSERT INTO video_segment_output_configuration (id, variant_id, stream_id, name, segment_duration, bitrate, framerate, resolution_width, resolution_height) VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9);
-- name: InsertSegment :exec
INSERT INTO video_segments (id, stream_id, output_configuration_id, path) VALUES($1, $2, $3, $4);
-- name: SetStreamEnded :exec
UPDATE streams SET end_time = CURRENT_TIMESTAMP WHERE id = $1;

View File

@ -1,6 +1,6 @@
// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.15.0
// sqlc v1.19.1
// source: query.sql
package db
@ -541,6 +541,88 @@ func (q *Queries) GetOutboxWithOffset(ctx context.Context, arg GetOutboxWithOffs
return items, nil
}
const getOutputConfigurationForId = `-- name: GetOutputConfigurationForId :one
SELECT id, stream_id, variant_id, name, segment_duration, bitrate, framerate, resolution_width, resolution_height FROM video_segment_output_configuration WHERE id = $1
`
type GetOutputConfigurationForIdRow struct {
ID string
StreamID string
VariantID string
Name string
SegmentDuration int32
Bitrate int32
Framerate int32
ResolutionWidth sql.NullInt32
ResolutionHeight sql.NullInt32
}
func (q *Queries) GetOutputConfigurationForId(ctx context.Context, id string) (GetOutputConfigurationForIdRow, error) {
row := q.db.QueryRowContext(ctx, getOutputConfigurationForId, id)
var i GetOutputConfigurationForIdRow
err := row.Scan(
&i.ID,
&i.StreamID,
&i.VariantID,
&i.Name,
&i.SegmentDuration,
&i.Bitrate,
&i.Framerate,
&i.ResolutionWidth,
&i.ResolutionHeight,
)
return i, err
}
const getOutputConfigurationsForStreamId = `-- name: GetOutputConfigurationsForStreamId :many
SELECT id, stream_id, variant_id, name, segment_duration, bitrate, framerate, resolution_width, resolution_height FROM video_segment_output_configuration WHERE stream_id = $1
`
type GetOutputConfigurationsForStreamIdRow struct {
ID string
StreamID string
VariantID string
Name string
SegmentDuration int32
Bitrate int32
Framerate int32
ResolutionWidth sql.NullInt32
ResolutionHeight sql.NullInt32
}
func (q *Queries) GetOutputConfigurationsForStreamId(ctx context.Context, streamID string) ([]GetOutputConfigurationsForStreamIdRow, error) {
rows, err := q.db.QueryContext(ctx, getOutputConfigurationsForStreamId, streamID)
if err != nil {
return nil, err
}
defer rows.Close()
var items []GetOutputConfigurationsForStreamIdRow
for rows.Next() {
var i GetOutputConfigurationsForStreamIdRow
if err := rows.Scan(
&i.ID,
&i.StreamID,
&i.VariantID,
&i.Name,
&i.SegmentDuration,
&i.Bitrate,
&i.Framerate,
&i.ResolutionWidth,
&i.ResolutionHeight,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Close(); err != nil {
return nil, err
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const getRejectedAndBlockedFollowers = `-- name: GetRejectedAndBlockedFollowers :many
SELECT iri, name, username, image, created_at, disabled_at FROM ap_followers WHERE disabled_at is not null
`
@ -584,6 +666,134 @@ func (q *Queries) GetRejectedAndBlockedFollowers(ctx context.Context) ([]GetReje
return items, nil
}
const getSegmentsForOutputId = `-- name: GetSegmentsForOutputId :many
SELECT id, stream_id, output_configuration_id, path, timestamp FROM video_segments WHERE output_configuration_id = $1 ORDER BY timestamp ASC
`
func (q *Queries) GetSegmentsForOutputId(ctx context.Context, outputConfigurationID string) ([]VideoSegment, error) {
rows, err := q.db.QueryContext(ctx, getSegmentsForOutputId, outputConfigurationID)
if err != nil {
return nil, err
}
defer rows.Close()
var items []VideoSegment
for rows.Next() {
var i VideoSegment
if err := rows.Scan(
&i.ID,
&i.StreamID,
&i.OutputConfigurationID,
&i.Path,
&i.Timestamp,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Close(); err != nil {
return nil, err
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const getSegmentsForOutputIdAndWindow = `-- name: GetSegmentsForOutputIdAndWindow :many
SELECT id, stream_id, output_configuration_id, timestamp FROM video_segments WHERE output_configuration_id = $1 AND timestamp >= $2 AND timestamp <= $3 ORDER BY timestamp ASC
`
type GetSegmentsForOutputIdAndWindowParams struct {
OutputConfigurationID string
Timestamp time.Time
Timestamp_2 time.Time
}
type GetSegmentsForOutputIdAndWindowRow struct {
ID string
StreamID string
OutputConfigurationID string
Timestamp time.Time
}
func (q *Queries) GetSegmentsForOutputIdAndWindow(ctx context.Context, arg GetSegmentsForOutputIdAndWindowParams) ([]GetSegmentsForOutputIdAndWindowRow, error) {
rows, err := q.db.QueryContext(ctx, getSegmentsForOutputIdAndWindow, arg.OutputConfigurationID, arg.Timestamp, arg.Timestamp_2)
if err != nil {
return nil, err
}
defer rows.Close()
var items []GetSegmentsForOutputIdAndWindowRow
for rows.Next() {
var i GetSegmentsForOutputIdAndWindowRow
if err := rows.Scan(
&i.ID,
&i.StreamID,
&i.OutputConfigurationID,
&i.Timestamp,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Close(); err != nil {
return nil, err
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const getStreamById = `-- name: GetStreamById :one
SELECT id, stream_title, start_time, end_time FROM streams WHERE id = $1 LIMIT 1
`
func (q *Queries) GetStreamById(ctx context.Context, id string) (Stream, error) {
row := q.db.QueryRowContext(ctx, getStreamById, id)
var i Stream
err := row.Scan(
&i.ID,
&i.StreamTitle,
&i.StartTime,
&i.EndTime,
)
return i, err
}
const getStreams = `-- name: GetStreams :many
SELECT id, stream_title, start_time, end_time FROM streams ORDER BY start_time DESC
`
// Recording and clip related queries.
func (q *Queries) GetStreams(ctx context.Context) ([]Stream, error) {
rows, err := q.db.QueryContext(ctx, getStreams)
if err != nil {
return nil, err
}
defer rows.Close()
var items []Stream
for rows.Next() {
var i Stream
if err := rows.Scan(
&i.ID,
&i.StreamTitle,
&i.StartTime,
&i.EndTime,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Close(); err != nil {
return nil, err
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const getUserByAccessToken = `-- name: GetUserByAccessToken :one
SELECT users.id, display_name, display_color, users.created_at, disabled_at, previous_names, namechanged_at, authenticated_at, scopes FROM users, user_access_tokens WHERE token = $1 AND users.id = user_id
`
@ -666,6 +876,79 @@ func (q *Queries) GetUserDisplayNameByToken(ctx context.Context, token string) (
return display_name, err
}
const insertOutputConfiguration = `-- name: InsertOutputConfiguration :exec
INSERT INTO video_segment_output_configuration (id, variant_id, stream_id, name, segment_duration, bitrate, framerate, resolution_width, resolution_height) VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9)
`
type InsertOutputConfigurationParams struct {
ID string
VariantID string
StreamID string
Name string
SegmentDuration int32
Bitrate int32
Framerate int32
ResolutionWidth sql.NullInt32
ResolutionHeight sql.NullInt32
}
func (q *Queries) InsertOutputConfiguration(ctx context.Context, arg InsertOutputConfigurationParams) error {
_, err := q.db.ExecContext(ctx, insertOutputConfiguration,
arg.ID,
arg.VariantID,
arg.StreamID,
arg.Name,
arg.SegmentDuration,
arg.Bitrate,
arg.Framerate,
arg.ResolutionWidth,
arg.ResolutionHeight,
)
return err
}
const insertSegment = `-- name: InsertSegment :exec
INSERT INTO video_segments (id, stream_id, output_configuration_id, path) VALUES($1, $2, $3, $4)
`
type InsertSegmentParams struct {
ID string
StreamID string
OutputConfigurationID string
Path string
}
func (q *Queries) InsertSegment(ctx context.Context, arg InsertSegmentParams) error {
_, err := q.db.ExecContext(ctx, insertSegment,
arg.ID,
arg.StreamID,
arg.OutputConfigurationID,
arg.Path,
)
return err
}
const insertStream = `-- name: InsertStream :exec
INSERT INTO streams (id, stream_title, start_time, end_time) VALUES($1, $2, $3, $4)
`
type InsertStreamParams struct {
ID string
StreamTitle sql.NullString
StartTime time.Time
EndTime sql.NullTime
}
func (q *Queries) InsertStream(ctx context.Context, arg InsertStreamParams) error {
_, err := q.db.ExecContext(ctx, insertStream,
arg.ID,
arg.StreamTitle,
arg.StartTime,
arg.EndTime,
)
return err
}
const isDisplayNameAvailable = `-- name: IsDisplayNameAvailable :one
SELECT count(*) FROM users WHERE display_name = $1 AND authenticated_at is not null AND disabled_at is NULL
`
@ -748,6 +1031,15 @@ func (q *Queries) SetAccessTokenToOwner(ctx context.Context, arg SetAccessTokenT
return err
}
const setStreamEnded = `-- name: SetStreamEnded :exec
UPDATE streams SET end_time = CURRENT_TIMESTAMP WHERE id = $1
`
func (q *Queries) SetStreamEnded(ctx context.Context, id string) error {
_, err := q.db.ExecContext(ctx, setStreamEnded, id)
return err
}
const setUserAsAuthenticated = `-- name: SetUserAsAuthenticated :exec
UPDATE users SET authenticated_at = CURRENT_TIMESTAMP WHERE id = $1
`

View File

@ -97,3 +97,45 @@ CREATE TABLE IF NOT EXISTS messages (
CREATE INDEX user_id ON messages (user_id);
CREATE INDEX hidden_at ON messages (hidden_at);
CREATE INDEX timestamp ON messages (timestamp);
-- Record the high level details of each stream.
CREATE TABLE IF NOT EXISTS streams (
"id" string NOT NULL PRIMARY KEY,
"stream_title" TEXT,
"start_time" DATE NOT NULL,
"end_time" DATE,
PRIMARY KEY (id)
);
CREATE INDEX streams_id ON streams (id);
CREATE INDEX streams_start_time ON streams (start_time);
CREATE INDEX streams_start_end_time ON streams (start_time,end_time);
-- Record the output configuration of a stream.
CREATE TABLE IF NOT EXISTS video_segment_output_configuration (
"id" string NOT NULL PRIMARY KEY,
"variant_id" string NOT NULL,
"name" string NOT NULL,
"stream_id" string NOT NULL,
"segment_duration" INTEGER NOT NULL,
"bitrate" INTEGER NOT NULL,
"framerate" INTEGER NOT NULL,
"resolution_width" INTEGER,
"resolution_height" INTEGER,
"timestamp" DATE DEFAULT CURRENT_TIMESTAMP NOT NULL,
PRIMARY KEY (id)
);
CREATE INDEX video_segment_output_configuration_stream_id ON video_segment_output_configuration (stream_id);
-- Support querying all segments for a single stream as well
-- as segments for a time window.
CREATE TABLE IF NOT EXISTS video_segments (
"id" string NOT NULL PRIMARY KEY,
"stream_id" string NOT NULL,
"output_configuration_id" string NOT NULL,
"path" TEXT NOT NULL,
"timestamp" DATE DEFAULT CURRENT_TIMESTAMP NOT NULL,
PRIMARY KEY (id)
);
CREATE INDEX video_segments_stream_id ON video_segments (stream_id);
CREATE INDEX video_segments_stream_id_timestamp ON video_segments (stream_id,timestamp);

View File

@ -2,6 +2,7 @@ package models
// CurrentBroadcast represents the configuration associated with the currently active stream.
type CurrentBroadcast struct {
StreamID string `json:"streamId"`
OutputSettings []StreamOutputVariant `json:"outputSettings"`
LatencyLevel LatencyLevel `json:"latencyLevel"`
}

View File

@ -3,11 +3,12 @@ package models
// StorageProvider is how a chunk storage provider should be implemented.
type StorageProvider interface {
Setup() error
Save(filePath string, retryCount int) (string, error)
SegmentWritten(localFilePath string)
Save(localFilePath, destinationPath string, retryCount int) (string, error)
SetStreamId(streamID string)
SegmentWritten(localFilePath string) (string, int, error)
VariantPlaylistWritten(localFilePath string)
MasterPlaylistWritten(localFilePath string)
GetRemoteDestinationPathFromLocalFilePath(localFilePath string) string
Cleanup() error
}

130
replays/hlsRecorder.go Normal file
View File

@ -0,0 +1,130 @@
package replays
import (
"context"
"database/sql"
"strconv"
"strings"
"time"
"github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/db"
"github.com/owncast/owncast/utils"
"github.com/teris-io/shortid"
log "github.com/sirupsen/logrus"
)
type HLSRecorder struct {
streamID string
startTime time.Time
// The video variant configurations that were used for this stream.
outputConfigurations []HLSOutputConfiguration
datastore *data.Datastore
}
// NewRecording returns a new instance of the HLS recorder.
func NewRecording(streamID string) *HLSRecorder {
// We don't support replaying offline clips.
if streamID == "offline" {
return nil
}
h := HLSRecorder{
streamID: streamID,
startTime: time.Now(),
datastore: data.GetDatastore(),
}
outputs := data.GetStreamOutputVariants()
latency := data.GetStreamLatencyLevel()
streamTitle := data.GetStreamTitle()
validTitle := streamTitle != ""
if err := h.datastore.GetQueries().InsertStream(context.Background(), db.InsertStreamParams{
ID: streamID,
StartTime: h.startTime,
StreamTitle: sql.NullString{String: streamTitle, Valid: validTitle},
}); err != nil {
log.Panicln(err)
}
// Create a reference of the output configurations that were used for this stream.
for variantId, o := range outputs {
configId := shortid.MustGenerate()
if err := h.datastore.GetQueries().InsertOutputConfiguration(context.Background(), db.InsertOutputConfigurationParams{
ID: configId,
Name: o.Name,
StreamID: streamID,
VariantID: strconv.Itoa(variantId),
SegmentDuration: int32(latency.SecondsPerSegment),
Bitrate: int32(o.VideoBitrate),
Framerate: int32(o.Framerate),
ResolutionWidth: sql.NullInt32{Int32: int32(o.ScaledWidth), Valid: true},
ResolutionHeight: sql.NullInt32{Int32: int32(o.ScaledHeight), Valid: true},
}); err != nil {
log.Panicln(err)
}
h.outputConfigurations = append(h.outputConfigurations, HLSOutputConfiguration{
ID: configId,
Name: o.Name,
VideoBitrate: o.VideoBitrate,
ScaledWidth: o.ScaledWidth,
ScaledHeight: o.ScaledHeight,
Framerate: o.Framerate,
SegmentDuration: float64(latency.SegmentCount),
})
}
return &h
}
// SetOutputConfigurations sets the output configurations for this stream.
func (h *HLSRecorder) SetOutputConfigurations(configs []HLSOutputConfiguration) {
h.outputConfigurations = configs
}
// StreamBegan is called when a stream is started.
func (h *HLSRecorder) StreamBegan(id string) {
h.streamID = id
h.startTime = time.Now()
}
// SegmentWritten is called when a segment is written to disk.
func (h *HLSRecorder) SegmentWritten(path string) {
outputConfigurationIndexString := utils.GetIndexFromFilePath(path)
outputConfigurationIndex, err := strconv.Atoi(outputConfigurationIndexString)
if err != nil {
log.Errorln("HLSRecorder segmentWritten error:", err)
return
}
p := strings.ReplaceAll(path, "data/", "")
segment := HLSSegment{
ID: shortid.MustGenerate(),
StreamID: h.streamID,
Path: p,
}
if err := h.datastore.GetQueries().InsertSegment(context.Background(), db.InsertSegmentParams{
ID: segment.ID,
StreamID: segment.StreamID,
OutputConfigurationID: h.outputConfigurations[outputConfigurationIndex].ID,
Path: segment.Path,
}); err != nil {
log.Errorln(err)
}
}
// StreamEnded is called when a stream is ended so the end time can be noted
// in the stream's metadata.
func (h *HLSRecorder) StreamEnded() {
if err := h.datastore.GetQueries().SetStreamEnded(context.Background(), h.streamID); err != nil {
log.Errorln(err)
}
}

12
replays/hlsSegment.go Normal file
View File

@ -0,0 +1,12 @@
package replays
import "time"
// HLSSegment represents a single HLS segment.
type HLSSegment struct {
ID string
StreamID string
Timestamp time.Time
OutputConfigurationID string
Path string
}

View File

@ -0,0 +1,49 @@
package replays
import (
"bytes"
"fmt"
"github.com/grafov/m3u8"
)
// MediaPlaylistAllowCacheTag is a custom tag to explicitly state that this
// playlist is allowed to be cached.
type MediaPlaylistAllowCacheTag struct {
Type string
}
// TagName should return the full tag identifier including the leading
// '#' and trailing ':' if the tag also contains a value or attribute
// list.
func (tag *MediaPlaylistAllowCacheTag) TagName() string {
return "#EXT-X-ALLOW-CACHE"
}
// Decode decodes the input line. The line will be the entire matched
// line, including the identifier.
func (tag *MediaPlaylistAllowCacheTag) Decode(line string) (m3u8.CustomTag, error) {
_, err := fmt.Sscanf(line, "#EXT-X-ALLOW-CACHE")
return tag, err
}
// SegmentTag specifies that this tag is not for segments.
func (tag *MediaPlaylistAllowCacheTag) SegmentTag() bool {
return false
}
// Encode formats the structure to the text result.
func (tag *MediaPlaylistAllowCacheTag) Encode() *bytes.Buffer {
buf := new(bytes.Buffer)
buf.WriteString(tag.TagName())
buf.WriteString(tag.Type)
return buf
}
// String implements Stringer interface.
func (tag *MediaPlaylistAllowCacheTag) String() string {
return tag.Encode().String()
}

View File

@ -0,0 +1,13 @@
package replays
type HLSOutputConfiguration struct {
ID string
StreamId string
VariantId string
Name string
VideoBitrate int
ScaledWidth int
ScaledHeight int
Framerate int
SegmentDuration float64
}

View File

@ -0,0 +1,283 @@
package replays
import (
"context"
"fmt"
"strings"
"time"
"github.com/grafov/m3u8"
"github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/db"
"github.com/pkg/errors"
)
/*
The PlaylistGenerator is responsible for creating the master and media
playlists, in order to replay a stream in whole, or part. It requires detailed
metadata about how the initial live stream was configured, as well as a
access to every segment that was created during the live stream.
*/
type PlaylistGenerator struct {
datastore *data.Datastore
}
func NewPlaylistGenerator() *PlaylistGenerator {
return &PlaylistGenerator{
datastore: data.GetDatastore(),
}
}
func (p *PlaylistGenerator) GenerateMasterPlaylistForStream(streamId string) (*m3u8.MasterPlaylist, error) {
// stream, err := p.GetStream(streamId)
// if err != nil {
// return nil, errors.Wrap(err, "failed to get stream")
// }
// Determine the different output configurations for this stream.
configs, err := p.GetConfigurationsForStream(streamId)
if err != nil {
return nil, errors.Wrap(err, "failed to get configurations for stream")
}
// Create the master playlist that will hold the different media playlists.
masterPlaylist := p.createNewMasterPlaylist()
// Create the media playlists for each output configuration.
for _, config := range configs {
// Verify the validity of the configuration.
if config.VideoBitrate == 0 {
return nil, errors.New("video bitrate is unavailable")
}
if config.Framerate == 0 {
return nil, errors.New("video framerate is unavailable")
}
mediaPlaylist, err := p.GenerateMediaPlaylistForStreamAndConfiguration(streamId, config.ID)
if err != nil {
return nil, errors.Wrap(err, "failed to create media playlist")
}
// Append the media playlist to the master playlist.
params := m3u8.VariantParams{
ProgramId: 1,
Name: config.Name,
FrameRate: float64(config.Framerate),
Bandwidth: uint32(config.VideoBitrate * 1000),
// Match what is generated in our live playlists.
Codecs: "avc1.64001f,mp4a.40.2",
}
// If both the width and height are set then we can set that as
// the resolution in the media playlist.
if config.ScaledHeight > 0 && config.ScaledWidth > 0 {
params.Resolution = fmt.Sprintf("%dx%d", config.ScaledWidth, config.ScaledHeight)
}
// Add the media playlist to the master playlist.
publicPlaylistPath := strings.Join([]string{"/replay", streamId, config.ID}, "/")
masterPlaylist.Append(publicPlaylistPath, mediaPlaylist, params)
}
// Return the final master playlist that contains all the media playlists.
return masterPlaylist, nil
}
func (p *PlaylistGenerator) GenerateMediaPlaylistForStreamAndConfiguration(streamId, outputConfigurationId string) (*m3u8.MediaPlaylist, error) {
stream, err := p.GetStream(streamId)
if err != nil {
return nil, errors.Wrap(err, "failed to get stream")
}
config, err := p.GetOutputConfig(outputConfigurationId)
if err != nil {
return nil, errors.Wrap(err, "failed to get output configuration")
}
// Fetch all the segments for this configuration.
segments, err := p.GetAllSegmentsForOutputConfiguration(outputConfigurationId)
if err != nil {
return nil, errors.Wrap(err, "failed to get all segments for output configuration")
}
// Create the media playlist for this configuration and add the segments.
mediaPlaylist, err := p.createMediaPlaylistForConfigurationAndSegments(config, stream.StartTime, stream.InProgress, segments)
if err != nil {
return nil, errors.Wrap(err, "failed to create media playlist")
}
return mediaPlaylist, nil
}
func (p *PlaylistGenerator) GetStream(streamId string) (*Stream, error) {
stream, err := p.datastore.GetQueries().GetStreamById(context.Background(), streamId)
if stream.ID == "" {
return nil, errors.Wrap(err, "failed to get stream")
}
s := Stream{
ID: stream.ID,
Title: stream.StreamTitle.String,
StartTime: stream.StartTime,
EndTime: stream.EndTime.Time,
InProgress: !stream.EndTime.Valid,
}
return &s, nil
}
func (p *PlaylistGenerator) GetOutputConfig(outputConfigId string) (*HLSOutputConfiguration, error) {
config, err := p.datastore.GetQueries().GetOutputConfigurationForId(context.Background(), outputConfigId)
if err != nil {
return nil, errors.Wrap(err, "failed to get output configuration")
}
return createConfigFromConfigRow(config), nil
}
// GetConfigurationsForStream returns the output configurations for a given stream.
func (p *PlaylistGenerator) GetConfigurationsForStream(streamId string) ([]*HLSOutputConfiguration, error) {
outputConfigRows, err := p.datastore.GetQueries().GetOutputConfigurationsForStreamId(context.Background(), streamId)
if err != nil {
return nil, errors.Wrap(err, "failed to get output configurations for stream")
}
outputConfigs := []*HLSOutputConfiguration{}
for _, row := range outputConfigRows {
config := &HLSOutputConfiguration{
ID: row.ID,
StreamId: streamId,
VariantId: row.VariantID,
Name: row.Name,
VideoBitrate: int(row.Bitrate),
Framerate: int(row.Framerate),
ScaledHeight: int(row.ResolutionWidth.Int32),
ScaledWidth: int(row.ResolutionHeight.Int32),
SegmentDuration: float64(row.SegmentDuration),
}
outputConfigs = append(outputConfigs, config)
}
return outputConfigs, nil
}
// GetAllSegmentsForOutputConfiguration returns all the segments for a given output config.
func (p *PlaylistGenerator) GetAllSegmentsForOutputConfiguration(outputId string) ([]HLSSegment, error) {
segmentRows, err := p.datastore.GetQueries().GetSegmentsForOutputId(context.Background(), outputId)
if err != nil {
return nil, errors.Wrap(err, "failed to get segments for output config")
}
segments := []HLSSegment{}
for _, row := range segmentRows {
segment := HLSSegment{
ID: row.ID,
StreamID: row.StreamID,
OutputConfigurationID: row.OutputConfigurationID,
Timestamp: row.Timestamp,
Path: row.Path,
}
segments = append(segments, segment)
}
return segments, nil
}
func (p *PlaylistGenerator) createMediaPlaylistForConfigurationAndSegments(configuration *HLSOutputConfiguration, startTime time.Time, inProgress bool, segments []HLSSegment) (*m3u8.MediaPlaylist, error) {
playlistSize := len(segments)
segmentDuration := configuration.SegmentDuration
playlist, err := m3u8.NewMediaPlaylist(0, uint(playlistSize))
playlist.TargetDuration = configuration.SegmentDuration
if !inProgress {
playlist.MediaType = m3u8.VOD
} else {
playlist.MediaType = m3u8.EVENT
}
// Add the segments to the playlist.
for index, segment := range segments {
mediaSegment := m3u8.MediaSegment{
URI: "/" + segment.Path,
Duration: segmentDuration,
SeqId: uint64(index),
ProgramDateTime: segment.Timestamp,
}
if err := playlist.AppendSegment(&mediaSegment); err != nil {
return nil, errors.Wrap(err, "failed to append segment to recording playlist")
}
}
if err != nil {
return nil, err
}
// Configure the properties of this media playlist.
if err := playlist.SetProgramDateTime(startTime); err != nil {
return nil, errors.Wrap(err, "failed to set media playlist program date time")
}
// Our live output is specified as v6, so let's match it to be as close as
// possible to what we're doing for live streams.
playlist.SetVersion(6)
if !inProgress {
// Specify explicitly that the playlist content is allowed to be cached.
// However, if in-progress recordings are supported this should not be enabled
// in order for the playlist to be updated with new segments. inProgress is
// determined by seeing if the stream has an endTime or not.
playlist.SetCustomTag(&MediaPlaylistAllowCacheTag{})
// Set the ENDLIST tag and close the playlist for writing if the stream is
// not still in progress.
playlist.Close()
}
return playlist, nil
}
func (p *PlaylistGenerator) createNewMasterPlaylist() *m3u8.MasterPlaylist {
playlist := m3u8.NewMasterPlaylist()
playlist.SetIndependentSegments(true)
playlist.SetVersion(6)
return playlist
}
func createConfigFromConfigRow(row db.GetOutputConfigurationForIdRow) *HLSOutputConfiguration {
config := HLSOutputConfiguration{
ID: row.ID,
StreamId: row.StreamID,
VariantId: row.VariantID,
Name: row.Name,
VideoBitrate: int(row.Bitrate),
Framerate: int(row.Framerate),
ScaledHeight: int(row.ResolutionWidth.Int32),
ScaledWidth: int(row.ResolutionHeight.Int32),
SegmentDuration: float64(row.SegmentDuration),
}
return &config
}
// func createOutputConfigsFromConfigRows(rows []db.GetOutputConfigurationsForStreamIdRow) []HLSOutputConfiguration {
// outputConfigs := []HLSOutputConfiguration{}
// for _, row := range rows {
// config := HLSOutputConfiguration{
// ID: row.ID,
// StreamId: row.StreamID,
// VariantId: row.VariantID,
// Name: row.Name,
// VideoBitrate: int(row.Bitrate),
// Framerate: int(row.Framerate),
// ScaledHeight: int(row.ResolutionWidth.Int32),
// ScaledWidth: int(row.ResolutionHeight.Int32),
// SegmentDuration: float64(row.SegmentDuration),
// }
// outputConfigs = append(outputConfigs, config)
// }
// return outputConfigs
// }

View File

@ -0,0 +1,136 @@
package replays
import (
"testing"
"time"
"github.com/grafov/m3u8"
)
var (
generator = NewPlaylistGenerator()
config = []HLSOutputConfiguration{
{
ID: "1",
VideoBitrate: 1000,
Framerate: 30,
},
{
ID: "2",
VideoBitrate: 2000,
Framerate: 30,
},
}
)
var segments = []HLSSegment{
{
ID: "testSegmentId",
StreamID: "testStreamId",
Timestamp: time.Now(),
OutputConfigurationID: "testOutputConfigId",
Path: "hls/testStreamId/testOutputConfigId/testSegmentId.ts",
},
}
func TestMasterPlaylist(t *testing.T) {
playlist := generator.createNewMasterPlaylist()
mediaPlaylists, err := generator.createMediaPlaylistForConfigurationAndSegments(&config[0], time.Now(), false, segments)
playlist.Append("test", mediaPlaylists, m3u8.VariantParams{
Bandwidth: uint32(config[0].VideoBitrate),
FrameRate: float64(config[0].Framerate),
})
mediaPlaylists.Close()
if err != nil {
t.Error(err)
}
if playlist.Version() != 6 {
t.Error("expected version 6, got", playlist.Version())
}
if !playlist.IndependentSegments() {
t.Error("expected independent segments")
}
if playlist.Variants[0].Bandwidth != uint32(config[0].VideoBitrate) {
t.Error("expected bandwidth", config[0].VideoBitrate, "got", playlist.Variants[0].Bandwidth)
}
if playlist.Variants[0].FrameRate != float64(config[0].Framerate) {
t.Error("expected framerate", config[0].Framerate, "got", playlist.Variants[0].FrameRate)
}
}
func TestCompletedMediaPlaylist(t *testing.T) {
startTime := segments[0].Timestamp
conf := config[0]
// Create a completed media playlist.
playlist, err := generator.createMediaPlaylistForConfigurationAndSegments(&conf, startTime, false, segments)
if err != nil {
t.Error(err)
}
if playlist.TargetDuration != conf.SegmentDuration {
t.Error("expected target duration", conf.SegmentDuration, "got", playlist.TargetDuration)
}
// Verify it's marked as cachable.
if playlist.Custom["#EXT-X-ALLOW-CACHE"].String() != "#EXT-X-ALLOW-CACHE" {
t.Error("expected cachable playlist, tag not set")
}
// Verify it has the correct number of segments in the media playlist.
if int(playlist.Count()) != len(segments) {
t.Error("expected", len(segments), "segments, got", playlist.Count())
}
// Test the playlist version.
if playlist.Version() != 6 {
t.Error("expected version 6, got", playlist.Version())
}
// Verify the playlist type
if playlist.MediaType != m3u8.VOD {
t.Error("expected VOD playlist type, got type", playlist.MediaType)
}
// Verify the first segment URI.
if playlist.Segments[0].URI != "/"+segments[0].Path {
t.Error("expected segment URI", segments[0].Path, "got", playlist.Segments[0].URI)
}
}
func TestInProgressMediaPlaylist(t *testing.T) {
startTime := segments[0].Timestamp
conf := config[0]
// Create a completed media playlist.
playlist, err := generator.createMediaPlaylistForConfigurationAndSegments(&conf, startTime, true, segments)
if err != nil {
t.Error(err)
}
// Verify it's marked as cachable.
if playlist.Custom != nil && playlist.Custom["#EXT-X-ALLOW-CACHE"].String() == "#EXT-X-ALLOW-CACHE" {
t.Error("expected non-achable playlist when stream is still in progress")
}
// Verify it has the correct number of segments in the media playlist.
if int(playlist.Count()) != len(segments) {
t.Error("expected", len(segments), "segments, got", playlist.Count())
}
// Test the playlist version.
if playlist.Version() != 6 {
t.Error("expected version 6, got", playlist.Version())
}
// Verify the playlist type
if playlist.MediaType != m3u8.EVENT {
t.Error("expected EVENT playlist type, got type", playlist.MediaType)
}
}

View File

@ -0,0 +1,6 @@
package replays
type StorageProvider interface {
Setup() error
Save(localFilePath, destinationPath string, retryCount int) (string, error)
}

41
replays/stream.go Normal file
View File

@ -0,0 +1,41 @@
package replays
import (
"context"
"fmt"
"time"
"github.com/owncast/owncast/core/data"
"github.com/pkg/errors"
)
type Stream struct {
ID string `json:"id"`
Title string `json:"title,omitempty"`
StartTime time.Time `json:"startTime"`
EndTime time.Time `json:"endTime,omitempty"`
InProgress bool `json:"inProgress,omitempty"`
Manifest string `json:"manifest,omitempty"`
}
// GetStreams will return all streams that have been recorded.
func GetStreams() ([]*Stream, error) {
streams, err := data.GetDatastore().GetQueries().GetStreams(context.Background())
if err != nil {
return nil, errors.WithMessage(err, "failure to get streams")
}
response := []*Stream{}
for _, stream := range streams {
s := Stream{
ID: stream.ID,
Title: stream.StreamTitle.String,
StartTime: stream.StartTime,
EndTime: stream.EndTime.Time,
InProgress: !stream.EndTime.Valid,
Manifest: fmt.Sprintf("/replay/%s", stream.ID),
}
response = append(response, &s)
}
return response, nil
}

View File

@ -392,6 +392,11 @@ func Start() error {
http.HandleFunc("/api/auth/fediverse", middleware.RequireUserAccessToken(fediverseauth.RegisterFediverseOTPRequest))
http.HandleFunc("/api/auth/fediverse/verify", fediverseauth.VerifyFediverseOTPRequest)
// Replay functionality. This route handles both /replay/{streamId} (master)
// and /replay/{streamId}/{outputConfigId} (media) routes.
http.HandleFunc("/api/replays", controllers.GetReplays)
http.HandleFunc("/replay/", controllers.GetReplay)
// ActivityPub has its own router
activitypub.Start(data.GetDatastore())

View File

@ -274,11 +274,13 @@ func VerifyFFMpegPath(path string) error {
}
// CleanupDirectory removes the directory and makes it fresh again. Throws fatal error on failure.
func CleanupDirectory(path string) {
func CleanupDirectory(path string, keepOldFiles bool) {
if !keepOldFiles {
log.Traceln("Cleaning", path)
if err := os.RemoveAll(path); err != nil {
log.Fatalln("Unable to remove directory. Please check the ownership and permissions", err)
}
}
if err := os.MkdirAll(path, 0o750); err != nil {
log.Fatalln("Unable to create directory. Please check the ownership and permissions", err)
}