owncast/video/state/online.go
Gabe Kangas b80ccc4966
WIP
2024-03-25 09:04:05 -07:00

125 lines
3.3 KiB
Go

package state
import (
"context"
"io"
"time"
"github.com/owncast/owncast/models"
"github.com/owncast/owncast/services/apfederation/outbox"
"github.com/owncast/owncast/services/config"
"github.com/owncast/owncast/services/notifications"
"github.com/owncast/owncast/services/webhooks"
"github.com/owncast/owncast/storage/configrepository"
"github.com/owncast/owncast/storage/data"
"github.com/owncast/owncast/utils"
"github.com/owncast/owncast/video/transcoder"
log "github.com/sirupsen/logrus"
)
// setStreamAsConnected sets the stream as connected.
func (vs *VideoState) setStreamAsConnected(rtmpOut *io.PipeReader) {
now := utils.NullTime{Time: time.Now(), Valid: true}
vs.status.StreamConnected = true
vs.status.Status.LastDisconnectTime = nil
vs.status.Status.LastConnectTime = &now
vs.status.SessionMaxViewerCount = 0
configRepository := configrepository.Get()
vs.status.SetCurrentBroadcast(&models.CurrentBroadcast{
LatencyLevel: configRepository.GetStreamLatencyLevel(),
OutputSettings: configRepository.GetStreamOutputVariants(),
})
StopOfflineCleanupTimer()
startOnlineCleanupTimer()
if _yp != nil {
go _yp.Start()
}
c := config.Get()
segmentPath := c.HLSStoragePath
if err := setupStorage(); err != nil {
log.Fatalln("failed to setup the storage", err)
}
go func() {
vs.transcoder = transcoder.NewTranscoder()
vs.transcoder.TranscoderCompleted = func(error) {
SetStreamAsDisconnected()
vs.transcoder = nil
vs.status.SetCurrentBroadcast(nil)
}
vs.transcoder.SetStdin(rtmpOut)
vs.transcoder.Start(true)
}()
webhookManager := webhooks.Get()
go webhookManager.SendStreamStatusEvent(models.StreamStarted)
transcoder.StartThumbnailGenerator(segmentPath, configRepository.FindHighestVideoQualityIndex(vs.status.GetCurrentBroadcast().OutputSettings))
_ = vs.chatService.SendSystemAction("Stay tuned, the stream is **starting**!", true)
vs.chatService.SendAllWelcomeMessage()
// Send delayed notification messages.
_onlineTimerCancelFunc = startLiveStreamNotificationsTimer()
}
func startOnlineCleanupTimer() {
_onlineCleanupTicker = time.NewTicker(1 * time.Minute)
go func() {
for range _onlineCleanupTicker.C {
if err := _storage.Cleanup(); err != nil {
log.Errorln(err)
}
}
}()
}
func stopOnlineCleanupTimer() {
if _onlineCleanupTicker != nil {
_onlineCleanupTicker.Stop()
}
}
func startLiveStreamNotificationsTimer() context.CancelFunc {
// Send delayed notification messages.
c, cancelFunc := context.WithCancel(context.Background())
_onlineTimerCancelFunc = cancelFunc
go func(c context.Context) {
select {
case <-time.After(time.Minute * 2.0):
if _lastNotified != nil && time.Since(*_lastNotified) < 10*time.Minute {
return
}
configRepository := configrepository.Get()
// Send Fediverse message.
if configRepository.GetFederationEnabled() {
ob := outbox.Get()
log.Traceln("Sending Federated Go Live message.")
if err := ob.SendLive(); err != nil {
log.Errorln(err)
}
}
// Send notification to those who have registered for them.
if notifier, err := notifications.New(data.GetDatastore()); err != nil {
log.Errorln(err)
} else {
notifier.Notify()
}
now := time.Now()
_lastNotified = &now
case <-c.Done():
}
}(c)
return cancelFunc
}