feat: move webhooks under services

This commit is contained in:
Gabe Kangas 2023-06-14 17:48:48 -07:00
parent cb5cf7cecb
commit 429ad7efeb
No known key found for this signature in database
GPG Key ID: 4345B2060657F330
14 changed files with 99 additions and 66 deletions

View File

@ -9,7 +9,7 @@ import (
"github.com/owncast/owncast/config" "github.com/owncast/owncast/config"
"github.com/owncast/owncast/core/chat/events" "github.com/owncast/owncast/core/chat/events"
"github.com/owncast/owncast/core/data" "github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/core/webhooks" "github.com/owncast/owncast/services/webhooks"
"github.com/owncast/owncast/storage" "github.com/owncast/owncast/storage"
"github.com/owncast/owncast/utils" "github.com/owncast/owncast/utils"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -97,7 +97,8 @@ func (s *Server) userNameChanged(eventData chatClientEvent) {
// Send chat user name changed webhook // Send chat user name changed webhook
receivedEvent.User = savedUser receivedEvent.User = savedUser
receivedEvent.ClientID = eventData.client.Id receivedEvent.ClientID = eventData.client.Id
webhooks.SendChatEventUsernameChanged(receivedEvent) webhookManager := webhooks.GetWebhooks()
webhookManager.SendChatEventUsernameChanged(receivedEvent)
// Resend the client's user so their username is in sync. // Resend the client's user so their username is in sync.
eventData.client.sendConnectedClientInfo() eventData.client.sendConnectedClientInfo()
@ -164,7 +165,8 @@ func (s *Server) userMessageSent(eventData chatClientEvent) {
} }
// Send chat message sent webhook // Send chat message sent webhook
webhooks.SendChatEvent(&event) webhookManager := webhooks.GetWebhooks()
webhookManager.SendChatEvent(&event)
chatMessagesSentCounter.Inc() chatMessagesSentCounter.Inc()
SaveUserMessage(event) SaveUserMessage(event)

View File

@ -4,7 +4,7 @@ import (
"errors" "errors"
"github.com/owncast/owncast/core/chat/events" "github.com/owncast/owncast/core/chat/events"
"github.com/owncast/owncast/core/webhooks" "github.com/owncast/owncast/services/webhooks"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
@ -29,7 +29,8 @@ func SetMessagesVisibility(messageIDs []string, visibility bool) error {
return errors.New("error broadcasting message visibility payload " + err.Error()) return errors.New("error broadcasting message visibility payload " + err.Error())
} }
webhooks.SendChatEventSetMessageVisibility(event) webhookManager := webhooks.GetWebhooks()
webhookManager.SendChatEventSetMessageVisibility(event)
return nil return nil
} }

View File

@ -14,9 +14,9 @@ import (
"github.com/owncast/owncast/config" "github.com/owncast/owncast/config"
"github.com/owncast/owncast/core/chat/events" "github.com/owncast/owncast/core/chat/events"
"github.com/owncast/owncast/core/data" "github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/core/webhooks"
"github.com/owncast/owncast/models" "github.com/owncast/owncast/models"
"github.com/owncast/owncast/services/geoip" "github.com/owncast/owncast/services/geoip"
"github.com/owncast/owncast/services/webhooks"
"github.com/owncast/owncast/storage" "github.com/owncast/owncast/storage"
"github.com/owncast/owncast/utils" "github.com/owncast/owncast/utils"
) )
@ -147,7 +147,8 @@ func (s *Server) sendUserJoinedMessage(c *Client) {
} }
// Send chat user joined webhook // Send chat user joined webhook
webhooks.SendChatEventUserJoined(userJoinedEvent) webhookManager := webhooks.GetWebhooks()
webhookManager.SendChatEventUserJoined(userJoinedEvent)
} }
func (s *Server) handleClientDisconnected(c *Client) { func (s *Server) handleClientDisconnected(c *Client) {

View File

@ -11,9 +11,9 @@ import (
"github.com/owncast/owncast/config" "github.com/owncast/owncast/config"
"github.com/owncast/owncast/core/chat" "github.com/owncast/owncast/core/chat"
"github.com/owncast/owncast/core/data" "github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/core/webhooks"
"github.com/owncast/owncast/models" "github.com/owncast/owncast/models"
"github.com/owncast/owncast/services/notifications" "github.com/owncast/owncast/services/notifications"
"github.com/owncast/owncast/services/webhooks"
"github.com/owncast/owncast/services/yp" "github.com/owncast/owncast/services/yp"
"github.com/owncast/owncast/utils" "github.com/owncast/owncast/utils"
"github.com/owncast/owncast/video/rtmp" "github.com/owncast/owncast/video/rtmp"
@ -79,7 +79,7 @@ func Start() error {
log.Infof("RTMP is accepting inbound streams on port %d.", rtmpPort) log.Infof("RTMP is accepting inbound streams on port %d.", rtmpPort)
} }
webhooks.SetupWebhooks(GetStatus) webhooks.InitTemporarySingleton(GetStatus)
notifications.Setup(data.GetStore()) notifications.Setup(data.GetStore())

View File

@ -11,9 +11,9 @@ import (
"github.com/owncast/owncast/config" "github.com/owncast/owncast/config"
"github.com/owncast/owncast/core/chat" "github.com/owncast/owncast/core/chat"
"github.com/owncast/owncast/core/data" "github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/core/webhooks"
"github.com/owncast/owncast/models" "github.com/owncast/owncast/models"
"github.com/owncast/owncast/services/notifications" "github.com/owncast/owncast/services/notifications"
"github.com/owncast/owncast/services/webhooks"
"github.com/owncast/owncast/utils" "github.com/owncast/owncast/utils"
"github.com/owncast/owncast/video/rtmp" "github.com/owncast/owncast/video/rtmp"
"github.com/owncast/owncast/video/transcoder" "github.com/owncast/owncast/video/transcoder"
@ -68,9 +68,9 @@ func setStreamAsConnected(rtmpOut *io.PipeReader) {
_transcoder.Start(true) _transcoder.Start(true)
}() }()
go webhooks.SendStreamStatusEvent(models.StreamStarted) webhookManager := webhooks.GetWebhooks()
selectedThumbnailVideoQualityIndex, isVideoPassthrough := data.FindHighestVideoQualityIndex(_currentBroadcast.OutputSettings) go webhookManager.SendStreamStatusEvent(models.StreamStarted)
transcoder.StartThumbnailGenerator(segmentPath, selectedThumbnailVideoQualityIndex, isVideoPassthrough) transcoder.StartThumbnailGenerator(segmentPath, data.FindHighestVideoQualityIndex(_currentBroadcast.OutputSettings))
_ = chat.SendSystemAction("Stay tuned, the stream is **starting**!", true) _ = chat.SendSystemAction("Stay tuned, the stream is **starting**!", true)
chat.SendAllWelcomeMessage() chat.SendAllWelcomeMessage()
@ -126,7 +126,8 @@ func SetStreamAsDisconnected() {
stopOnlineCleanupTimer() stopOnlineCleanupTimer()
saveStats() saveStats()
go webhooks.SendStreamStatusEvent(models.StreamStopped) webhookManager := webhooks.GetWebhooks()
go webhookManager.SendStreamStatusEvent(models.StreamStopped)
} }
// StartOfflineCleanupTimer will fire a cleanup after n minutes being disconnected. // StartOfflineCleanupTimer will fire a cleanup after n minutes being disconnected.

View File

@ -6,7 +6,7 @@ import (
) )
// SendChatEvent will send a chat event to webhook destinations. // SendChatEvent will send a chat event to webhook destinations.
func SendChatEvent(chatEvent *events.UserMessageEvent) { func (w *LiveWebhookManager) SendChatEvent(chatEvent *events.UserMessageEvent) {
webhookEvent := WebhookEvent{ webhookEvent := WebhookEvent{
Type: chatEvent.GetMessageType(), Type: chatEvent.GetMessageType(),
EventData: &WebhookChatMessage{ EventData: &WebhookChatMessage{
@ -20,27 +20,27 @@ func SendChatEvent(chatEvent *events.UserMessageEvent) {
}, },
} }
SendEventToWebhooks(webhookEvent) w.SendEventToWebhooks(webhookEvent)
} }
// SendChatEventUsernameChanged will send a username changed event to webhook destinations. // SendChatEventUsernameChanged will send a username changed event to webhook destinations.
func SendChatEventUsernameChanged(event events.NameChangeEvent) { func (w *LiveWebhookManager) SendChatEventUsernameChanged(event events.NameChangeEvent) {
webhookEvent := WebhookEvent{ webhookEvent := WebhookEvent{
Type: models.UserNameChanged, Type: models.UserNameChanged,
EventData: event, EventData: event,
} }
SendEventToWebhooks(webhookEvent) w.SendEventToWebhooks(webhookEvent)
} }
// SendChatEventUserJoined sends a webhook notifying that a user has joined. // SendChatEventUserJoined sends a webhook notifying that a user has joined.
func SendChatEventUserJoined(event events.UserJoinedEvent) { func (w *LiveWebhookManager) SendChatEventUserJoined(event events.UserJoinedEvent) {
webhookEvent := WebhookEvent{ webhookEvent := WebhookEvent{
Type: models.UserJoined, Type: models.UserJoined,
EventData: event, EventData: event,
} }
SendEventToWebhooks(webhookEvent) w.SendEventToWebhooks(webhookEvent)
} }
// SendChatEventUserParted sends a webhook notifying that a user has parted. // SendChatEventUserParted sends a webhook notifying that a user has parted.
@ -55,11 +55,11 @@ func SendChatEventUserParted(event events.UserPartEvent) {
// SendChatEventSetMessageVisibility sends a webhook notifying that the visibility of one or more // SendChatEventSetMessageVisibility sends a webhook notifying that the visibility of one or more
// messages has changed. // messages has changed.
func SendChatEventSetMessageVisibility(event events.SetMessageVisibilityEvent) { func (w *LiveWebhookManager) SendChatEventSetMessageVisibility(event events.SetMessageVisibilityEvent) {
webhookEvent := WebhookEvent{ webhookEvent := WebhookEvent{
Type: models.VisibiltyToggled, Type: models.VisibiltyToggled,
EventData: event, EventData: event,
} }
SendEventToWebhooks(webhookEvent) w.SendEventToWebhooks(webhookEvent)
} }

View File

@ -26,7 +26,7 @@ func TestSendChatEvent(t *testing.T) {
} }
checkPayload(t, models.MessageSent, func() { checkPayload(t, models.MessageSent, func() {
SendChatEvent(&events.UserMessageEvent{ manager.SendChatEvent(&events.UserMessageEvent{
Event: events.Event{ Event: events.Event{
Type: events.MessageSent, Type: events.MessageSent,
ID: "id", ID: "id",
@ -79,7 +79,7 @@ func TestSendChatEventUsernameChanged(t *testing.T) {
} }
checkPayload(t, models.UserNameChanged, func() { checkPayload(t, models.UserNameChanged, func() {
SendChatEventUsernameChanged(events.NameChangeEvent{ manager.SendChatEventUsernameChanged(events.NameChangeEvent{
Event: events.Event{ Event: events.Event{
Type: events.UserNameChanged, Type: events.UserNameChanged,
ID: "id", ID: "id",
@ -127,7 +127,7 @@ func TestSendChatEventUserJoined(t *testing.T) {
} }
checkPayload(t, models.UserJoined, func() { checkPayload(t, models.UserJoined, func() {
SendChatEventUserJoined(events.UserJoinedEvent{ manager.SendChatEventUserJoined(events.UserJoinedEvent{
Event: events.Event{ Event: events.Event{
Type: events.UserJoined, Type: events.UserJoined,
ID: "id", ID: "id",
@ -160,7 +160,7 @@ func TestSendChatEventSetMessageVisibility(t *testing.T) {
timestamp := time.Unix(72, 6).UTC() timestamp := time.Unix(72, 6).UTC()
checkPayload(t, models.VisibiltyToggled, func() { checkPayload(t, models.VisibiltyToggled, func() {
SendChatEventSetMessageVisibility(events.SetMessageVisibilityEvent{ manager.SendChatEventSetMessageVisibility(events.SetMessageVisibilityEvent{
Event: events.Event{ Event: events.Event{
Type: events.VisibiltyUpdate, Type: events.VisibiltyUpdate,
ID: "id", ID: "id",

View File

@ -0,0 +1,37 @@
package webhooks
import "github.com/owncast/owncast/models"
type Manager interface {
SendEventToWebhooks(payload WebhookEvent)
}
// LiveWebhookManager represents a central place for requesting webhooks
// to be sent out to all registered webhook destinations.
type LiveWebhookManager struct {
queue chan Job
getStatus func() models.Status
}
// NewWebhookManager creates a new webhook manager.
func NewWebhookManager(getStatusFunc func() models.Status) *LiveWebhookManager {
m := &LiveWebhookManager{
getStatus: getStatusFunc,
}
m.initWorkerPool()
return m
}
// InitTemporarySingleton initializes the the temporary global instance of the webhook manager
// to be deleted once dependency injection is implemented.
func InitTemporarySingleton(getStatusFunc func() models.Status) {
temporaryGlobalInstance = NewWebhookManager(getStatusFunc)
}
var temporaryGlobalInstance *LiveWebhookManager
// GetWebhooks returns the temporary global instance of the webhook manager.
// Remove this after dependency injection is implemented.
func GetWebhooks() *LiveWebhookManager {
return temporaryGlobalInstance
}

View File

@ -9,19 +9,19 @@ import (
) )
// SendStreamStatusEvent will send all webhook destinations the current stream status. // SendStreamStatusEvent will send all webhook destinations the current stream status.
func SendStreamStatusEvent(eventType models.EventType) { func (w *LiveWebhookManager) SendStreamStatusEvent(eventType models.EventType) {
sendStreamStatusEvent(eventType, shortid.MustGenerate(), time.Now()) w.sendStreamStatusEvent(eventType, shortid.MustGenerate(), time.Now())
} }
func sendStreamStatusEvent(eventType models.EventType, id string, timestamp time.Time) { func (w *LiveWebhookManager) sendStreamStatusEvent(eventType models.EventType, id string, timestamp time.Time) {
SendEventToWebhooks(WebhookEvent{ w.SendEventToWebhooks(WebhookEvent{
Type: eventType, Type: eventType,
EventData: map[string]interface{}{ EventData: map[string]interface{}{
"id": id, "id": id,
"name": data.GetServerName(), "name": data.GetServerName(),
"summary": data.GetServerSummary(), "summary": data.GetServerSummary(),
"streamTitle": data.GetStreamTitle(), "streamTitle": data.GetStreamTitle(),
"status": getStatus(), "status": w.getStatus(),
"timestamp": timestamp, "timestamp": timestamp,
}, },
}) })

View File

@ -15,7 +15,7 @@ func TestSendStreamStatusEvent(t *testing.T) {
data.SetStreamTitle("my stream") data.SetStreamTitle("my stream")
checkPayload(t, models.StreamStarted, func() { checkPayload(t, models.StreamStarted, func() {
sendStreamStatusEvent(events.StreamStarted, "id", time.Unix(72, 6).UTC()) manager.sendStreamStatusEvent(events.StreamStarted, "id", time.Unix(72, 6).UTC())
}, `{ }, `{
"id": "id", "id": "id",
"name": "my server", "name": "my server",

View File

@ -26,11 +26,11 @@ type WebhookChatMessage struct {
} }
// SendEventToWebhooks will send a single webhook event to all webhook destinations. // SendEventToWebhooks will send a single webhook event to all webhook destinations.
func SendEventToWebhooks(payload WebhookEvent) { func (w *LiveWebhookManager) SendEventToWebhooks(payload WebhookEvent) {
sendEventToWebhooks(payload, nil) w.sendEventToWebhooks(payload, nil)
} }
func sendEventToWebhooks(payload WebhookEvent, wg *sync.WaitGroup) { func (w *LiveWebhookManager) sendEventToWebhooks(payload WebhookEvent, wg *sync.WaitGroup) {
webhooks := data.GetWebhooksForEvent(payload.Type) webhooks := data.GetWebhooksForEvent(payload.Type)
for _, webhook := range webhooks { for _, webhook := range webhooks {
@ -38,6 +38,6 @@ func sendEventToWebhooks(payload WebhookEvent, wg *sync.WaitGroup) {
if wg != nil { if wg != nil {
wg.Add(1) wg.Add(1)
} }
addToQueue(webhook, payload, wg) w.addToQueue(webhook, payload, wg)
} }
} }

View File

@ -18,6 +18,8 @@ import (
jsonpatch "gopkg.in/evanphx/json-patch.v5" jsonpatch "gopkg.in/evanphx/json-patch.v5"
) )
var manager *LiveWebhookManager
func fakeGetStatus() models.Status { func fakeGetStatus() models.Status {
return models.Status{ return models.Status{
Online: true, Online: true,
@ -41,9 +43,9 @@ func TestMain(m *testing.M) {
panic(err) panic(err)
} }
SetupWebhooks(fakeGetStatus) InitTemporarySingleton(fakeGetStatus)
manager = GetWebhooks()
defer close(queue) defer close(manager.queue)
m.Run() m.Run()
} }
@ -77,7 +79,7 @@ func TestPublicSend(t *testing.T) {
EventData: struct{}{}, EventData: struct{}{},
Type: models.MessageSent, Type: models.MessageSent,
} }
SendEventToWebhooks(wh) manager.SendEventToWebhooks(wh)
} }
wg.Wait() wg.Wait()
@ -126,7 +128,7 @@ func TestRouting(t *testing.T) {
EventData: struct{}{}, EventData: struct{}{},
Type: eventType, Type: eventType,
} }
sendEventToWebhooks(wh, &wg) manager.sendEventToWebhooks(wh, &wg)
} }
wg.Wait() wg.Wait()
@ -166,7 +168,7 @@ func TestMultiple(t *testing.T) {
EventData: struct{}{}, EventData: struct{}{},
Type: models.MessageSent, Type: models.MessageSent,
} }
sendEventToWebhooks(wh, &wg) manager.sendEventToWebhooks(wh, &wg)
wg.Wait() wg.Wait()
@ -205,7 +207,7 @@ func TestTimestamps(t *testing.T) {
EventData: struct{}{}, EventData: struct{}{},
Type: eventTypes[0], Type: eventTypes[0],
} }
sendEventToWebhooks(wh, &wg) manager.sendEventToWebhooks(wh, &wg)
wg.Wait() wg.Wait()
@ -302,7 +304,7 @@ func TestParallel(t *testing.T) {
EventData: struct{}{}, EventData: struct{}{},
Type: models.MessageSent, Type: models.MessageSent,
} }
sendEventToWebhooks(wh, &wgMessages) manager.sendEventToWebhooks(wh, &wgMessages)
} }
wgMessages.Wait() wgMessages.Wait()

View File

@ -23,39 +23,28 @@ type Job struct {
webhook models.Webhook webhook models.Webhook
} }
var (
queue chan Job
getStatus func() models.Status
)
// SetupWebhooks initializes the webhook worker pool and sets the function to get the current status.
func SetupWebhooks(getStatusFunc func() models.Status) {
getStatus = getStatusFunc
initWorkerPool()
}
// initWorkerPool starts n go routines that await webhook jobs. // initWorkerPool starts n go routines that await webhook jobs.
func initWorkerPool() { func (w *LiveWebhookManager) initWorkerPool() {
queue = make(chan Job) w.queue = make(chan Job)
// start workers // start workers
for i := 1; i <= webhookWorkerPoolSize; i++ { for i := 1; i <= webhookWorkerPoolSize; i++ {
go worker(i, queue) go w.worker(i, w.queue)
} }
} }
func addToQueue(webhook models.Webhook, payload WebhookEvent, wg *sync.WaitGroup) { func (w *LiveWebhookManager) addToQueue(webhook models.Webhook, payload WebhookEvent, wg *sync.WaitGroup) {
log.Tracef("Queued Event %s for Webhook %s", payload.Type, webhook.URL) log.Tracef("Queued Event %s for Webhook %s", payload.Type, webhook.URL)
queue <- Job{wg, payload, webhook} w.queue <- Job{webhook, payload, wg}
} }
func worker(workerID int, queue <-chan Job) { func (w *LiveWebhookManager) worker(workerID int, queue <-chan Job) {
log.Debugf("Started Webhook worker %d", workerID) log.Debugf("Started Webhook worker %d", workerID)
for job := range queue { for job := range queue {
log.Debugf("Event %s sent to Webhook %s using worker %d", job.payload.Type, job.webhook.URL, workerID) log.Debugf("Event %s sent to Webhook %s using worker %d", job.payload.Type, job.webhook.URL, workerID)
if err := sendWebhook(job); err != nil { if err := w.sendWebhook(job); err != nil {
log.Errorf("Event: %s failed to send to webhook: %s Error: %s", job.payload.Type, job.webhook.URL, err) log.Errorf("Event: %s failed to send to webhook: %s Error: %s", job.payload.Type, job.webhook.URL, err)
} }
log.Tracef("Done with Event %s to Webhook %s using worker %d", job.payload.Type, job.webhook.URL, workerID) log.Tracef("Done with Event %s to Webhook %s using worker %d", job.payload.Type, job.webhook.URL, workerID)
@ -65,7 +54,7 @@ func worker(workerID int, queue <-chan Job) {
} }
} }
func sendWebhook(job Job) error { func (w *LiveWebhookManager) sendWebhook(job Job) error {
jsonText, err := json.Marshal(job.payload) jsonText, err := json.Marshal(job.payload)
if err != nil { if err != nil {
return err return err

View File

@ -12,9 +12,8 @@ import (
"github.com/owncast/owncast/activitypub/outbox" "github.com/owncast/owncast/activitypub/outbox"
"github.com/owncast/owncast/core/chat" "github.com/owncast/owncast/core/chat"
"github.com/owncast/owncast/core/data" "github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/core/user"
"github.com/owncast/owncast/core/webhooks"
"github.com/owncast/owncast/models" "github.com/owncast/owncast/models"
"github.com/owncast/owncast/services/webhooks"
"github.com/owncast/owncast/utils" "github.com/owncast/owncast/utils"
"github.com/owncast/owncast/webserver/requests" "github.com/owncast/owncast/webserver/requests"
"github.com/owncast/owncast/webserver/responses" "github.com/owncast/owncast/webserver/responses"
@ -71,7 +70,8 @@ func (h *Handlers) SetStreamTitle(w http.ResponseWriter, r *http.Request) {
} }
if value != "" { if value != "" {
sendSystemChatAction(fmt.Sprintf("Stream title changed to **%s**", value), true) sendSystemChatAction(fmt.Sprintf("Stream title changed to **%s**", value), true)
go webhooks.SendStreamStatusEvent(models.StreamTitleUpdated) webhookManager := webhooks.GetWebhooks()
go webhookManager.SendStreamStatusEvent(models.StreamTitleUpdated)
} }
responses.WriteSimpleResponse(w, true, "changed") responses.WriteSimpleResponse(w, true, "changed")
} }