diff --git a/core/chat/events.go b/core/chat/events.go index 648c8fec4..d7482a639 100644 --- a/core/chat/events.go +++ b/core/chat/events.go @@ -9,7 +9,7 @@ import ( "github.com/owncast/owncast/config" "github.com/owncast/owncast/core/chat/events" "github.com/owncast/owncast/core/data" - "github.com/owncast/owncast/core/webhooks" + "github.com/owncast/owncast/services/webhooks" "github.com/owncast/owncast/storage" "github.com/owncast/owncast/utils" log "github.com/sirupsen/logrus" @@ -97,7 +97,8 @@ func (s *Server) userNameChanged(eventData chatClientEvent) { // Send chat user name changed webhook receivedEvent.User = savedUser receivedEvent.ClientID = eventData.client.Id - webhooks.SendChatEventUsernameChanged(receivedEvent) + webhookManager := webhooks.GetWebhooks() + webhookManager.SendChatEventUsernameChanged(receivedEvent) // Resend the client's user so their username is in sync. eventData.client.sendConnectedClientInfo() @@ -164,7 +165,8 @@ func (s *Server) userMessageSent(eventData chatClientEvent) { } // Send chat message sent webhook - webhooks.SendChatEvent(&event) + webhookManager := webhooks.GetWebhooks() + webhookManager.SendChatEvent(&event) chatMessagesSentCounter.Inc() SaveUserMessage(event) diff --git a/core/chat/messages.go b/core/chat/messages.go index 5e05b5796..601a322c5 100644 --- a/core/chat/messages.go +++ b/core/chat/messages.go @@ -4,7 +4,7 @@ import ( "errors" "github.com/owncast/owncast/core/chat/events" - "github.com/owncast/owncast/core/webhooks" + "github.com/owncast/owncast/services/webhooks" log "github.com/sirupsen/logrus" ) @@ -29,7 +29,8 @@ func SetMessagesVisibility(messageIDs []string, visibility bool) error { return errors.New("error broadcasting message visibility payload " + err.Error()) } - webhooks.SendChatEventSetMessageVisibility(event) + webhookManager := webhooks.GetWebhooks() + webhookManager.SendChatEventSetMessageVisibility(event) return nil } diff --git a/core/chat/server.go b/core/chat/server.go index 9f311addd..626c5ca79 100644 --- a/core/chat/server.go +++ b/core/chat/server.go @@ -14,9 +14,9 @@ import ( "github.com/owncast/owncast/config" "github.com/owncast/owncast/core/chat/events" "github.com/owncast/owncast/core/data" - "github.com/owncast/owncast/core/webhooks" "github.com/owncast/owncast/models" "github.com/owncast/owncast/services/geoip" + "github.com/owncast/owncast/services/webhooks" "github.com/owncast/owncast/storage" "github.com/owncast/owncast/utils" ) @@ -147,7 +147,8 @@ func (s *Server) sendUserJoinedMessage(c *Client) { } // Send chat user joined webhook - webhooks.SendChatEventUserJoined(userJoinedEvent) + webhookManager := webhooks.GetWebhooks() + webhookManager.SendChatEventUserJoined(userJoinedEvent) } func (s *Server) handleClientDisconnected(c *Client) { diff --git a/core/core.go b/core/core.go index cf9d5e0e6..6c2a7a95e 100644 --- a/core/core.go +++ b/core/core.go @@ -11,9 +11,9 @@ import ( "github.com/owncast/owncast/config" "github.com/owncast/owncast/core/chat" "github.com/owncast/owncast/core/data" - "github.com/owncast/owncast/core/webhooks" "github.com/owncast/owncast/models" "github.com/owncast/owncast/services/notifications" + "github.com/owncast/owncast/services/webhooks" "github.com/owncast/owncast/services/yp" "github.com/owncast/owncast/utils" "github.com/owncast/owncast/video/rtmp" @@ -79,7 +79,7 @@ func Start() error { log.Infof("RTMP is accepting inbound streams on port %d.", rtmpPort) } - webhooks.SetupWebhooks(GetStatus) + webhooks.InitTemporarySingleton(GetStatus) notifications.Setup(data.GetStore()) diff --git a/core/streamState.go b/core/streamState.go index 1c0768a7e..2b39109bb 100644 --- a/core/streamState.go +++ b/core/streamState.go @@ -11,9 +11,9 @@ import ( "github.com/owncast/owncast/config" "github.com/owncast/owncast/core/chat" "github.com/owncast/owncast/core/data" - "github.com/owncast/owncast/core/webhooks" "github.com/owncast/owncast/models" "github.com/owncast/owncast/services/notifications" + "github.com/owncast/owncast/services/webhooks" "github.com/owncast/owncast/utils" "github.com/owncast/owncast/video/rtmp" "github.com/owncast/owncast/video/transcoder" @@ -68,9 +68,9 @@ func setStreamAsConnected(rtmpOut *io.PipeReader) { _transcoder.Start(true) }() - go webhooks.SendStreamStatusEvent(models.StreamStarted) - selectedThumbnailVideoQualityIndex, isVideoPassthrough := data.FindHighestVideoQualityIndex(_currentBroadcast.OutputSettings) - transcoder.StartThumbnailGenerator(segmentPath, selectedThumbnailVideoQualityIndex, isVideoPassthrough) + webhookManager := webhooks.GetWebhooks() + go webhookManager.SendStreamStatusEvent(models.StreamStarted) + transcoder.StartThumbnailGenerator(segmentPath, data.FindHighestVideoQualityIndex(_currentBroadcast.OutputSettings)) _ = chat.SendSystemAction("Stay tuned, the stream is **starting**!", true) chat.SendAllWelcomeMessage() @@ -126,7 +126,8 @@ func SetStreamAsDisconnected() { stopOnlineCleanupTimer() saveStats() - go webhooks.SendStreamStatusEvent(models.StreamStopped) + webhookManager := webhooks.GetWebhooks() + go webhookManager.SendStreamStatusEvent(models.StreamStopped) } // StartOfflineCleanupTimer will fire a cleanup after n minutes being disconnected. diff --git a/core/webhooks/chat.go b/services/webhooks/chat.go similarity index 72% rename from core/webhooks/chat.go rename to services/webhooks/chat.go index a7635371d..1cc96e3a2 100644 --- a/core/webhooks/chat.go +++ b/services/webhooks/chat.go @@ -6,7 +6,7 @@ import ( ) // SendChatEvent will send a chat event to webhook destinations. -func SendChatEvent(chatEvent *events.UserMessageEvent) { +func (w *LiveWebhookManager) SendChatEvent(chatEvent *events.UserMessageEvent) { webhookEvent := WebhookEvent{ Type: chatEvent.GetMessageType(), EventData: &WebhookChatMessage{ @@ -20,27 +20,27 @@ func SendChatEvent(chatEvent *events.UserMessageEvent) { }, } - SendEventToWebhooks(webhookEvent) + w.SendEventToWebhooks(webhookEvent) } // SendChatEventUsernameChanged will send a username changed event to webhook destinations. -func SendChatEventUsernameChanged(event events.NameChangeEvent) { +func (w *LiveWebhookManager) SendChatEventUsernameChanged(event events.NameChangeEvent) { webhookEvent := WebhookEvent{ Type: models.UserNameChanged, EventData: event, } - SendEventToWebhooks(webhookEvent) + w.SendEventToWebhooks(webhookEvent) } // SendChatEventUserJoined sends a webhook notifying that a user has joined. -func SendChatEventUserJoined(event events.UserJoinedEvent) { +func (w *LiveWebhookManager) SendChatEventUserJoined(event events.UserJoinedEvent) { webhookEvent := WebhookEvent{ Type: models.UserJoined, EventData: event, } - SendEventToWebhooks(webhookEvent) + w.SendEventToWebhooks(webhookEvent) } // SendChatEventUserParted sends a webhook notifying that a user has parted. @@ -55,11 +55,11 @@ func SendChatEventUserParted(event events.UserPartEvent) { // SendChatEventSetMessageVisibility sends a webhook notifying that the visibility of one or more // messages has changed. -func SendChatEventSetMessageVisibility(event events.SetMessageVisibilityEvent) { +func (w *LiveWebhookManager) SendChatEventSetMessageVisibility(event events.SetMessageVisibilityEvent) { webhookEvent := WebhookEvent{ Type: models.VisibiltyToggled, EventData: event, } - SendEventToWebhooks(webhookEvent) + w.SendEventToWebhooks(webhookEvent) } diff --git a/core/webhooks/chat_test.go b/services/webhooks/chat_test.go similarity index 94% rename from core/webhooks/chat_test.go rename to services/webhooks/chat_test.go index eb8689a70..9ebc09ca2 100644 --- a/core/webhooks/chat_test.go +++ b/services/webhooks/chat_test.go @@ -26,7 +26,7 @@ func TestSendChatEvent(t *testing.T) { } checkPayload(t, models.MessageSent, func() { - SendChatEvent(&events.UserMessageEvent{ + manager.SendChatEvent(&events.UserMessageEvent{ Event: events.Event{ Type: events.MessageSent, ID: "id", @@ -79,7 +79,7 @@ func TestSendChatEventUsernameChanged(t *testing.T) { } checkPayload(t, models.UserNameChanged, func() { - SendChatEventUsernameChanged(events.NameChangeEvent{ + manager.SendChatEventUsernameChanged(events.NameChangeEvent{ Event: events.Event{ Type: events.UserNameChanged, ID: "id", @@ -127,7 +127,7 @@ func TestSendChatEventUserJoined(t *testing.T) { } checkPayload(t, models.UserJoined, func() { - SendChatEventUserJoined(events.UserJoinedEvent{ + manager.SendChatEventUserJoined(events.UserJoinedEvent{ Event: events.Event{ Type: events.UserJoined, ID: "id", @@ -160,7 +160,7 @@ func TestSendChatEventSetMessageVisibility(t *testing.T) { timestamp := time.Unix(72, 6).UTC() checkPayload(t, models.VisibiltyToggled, func() { - SendChatEventSetMessageVisibility(events.SetMessageVisibilityEvent{ + manager.SendChatEventSetMessageVisibility(events.SetMessageVisibilityEvent{ Event: events.Event{ Type: events.VisibiltyUpdate, ID: "id", diff --git a/services/webhooks/manager.go b/services/webhooks/manager.go new file mode 100644 index 000000000..b7ebc7510 --- /dev/null +++ b/services/webhooks/manager.go @@ -0,0 +1,37 @@ +package webhooks + +import "github.com/owncast/owncast/models" + +type Manager interface { + SendEventToWebhooks(payload WebhookEvent) +} + +// LiveWebhookManager represents a central place for requesting webhooks +// to be sent out to all registered webhook destinations. +type LiveWebhookManager struct { + queue chan Job + getStatus func() models.Status +} + +// NewWebhookManager creates a new webhook manager. +func NewWebhookManager(getStatusFunc func() models.Status) *LiveWebhookManager { + m := &LiveWebhookManager{ + getStatus: getStatusFunc, + } + m.initWorkerPool() + return m +} + +// InitTemporarySingleton initializes the the temporary global instance of the webhook manager +// to be deleted once dependency injection is implemented. +func InitTemporarySingleton(getStatusFunc func() models.Status) { + temporaryGlobalInstance = NewWebhookManager(getStatusFunc) +} + +var temporaryGlobalInstance *LiveWebhookManager + +// GetWebhooks returns the temporary global instance of the webhook manager. +// Remove this after dependency injection is implemented. +func GetWebhooks() *LiveWebhookManager { + return temporaryGlobalInstance +} diff --git a/core/webhooks/stream.go b/services/webhooks/stream.go similarity index 58% rename from core/webhooks/stream.go rename to services/webhooks/stream.go index b44dcb07c..9805c7e73 100644 --- a/core/webhooks/stream.go +++ b/services/webhooks/stream.go @@ -9,19 +9,19 @@ import ( ) // SendStreamStatusEvent will send all webhook destinations the current stream status. -func SendStreamStatusEvent(eventType models.EventType) { - sendStreamStatusEvent(eventType, shortid.MustGenerate(), time.Now()) +func (w *LiveWebhookManager) SendStreamStatusEvent(eventType models.EventType) { + w.sendStreamStatusEvent(eventType, shortid.MustGenerate(), time.Now()) } -func sendStreamStatusEvent(eventType models.EventType, id string, timestamp time.Time) { - SendEventToWebhooks(WebhookEvent{ +func (w *LiveWebhookManager) sendStreamStatusEvent(eventType models.EventType, id string, timestamp time.Time) { + w.SendEventToWebhooks(WebhookEvent{ Type: eventType, EventData: map[string]interface{}{ "id": id, "name": data.GetServerName(), "summary": data.GetServerSummary(), "streamTitle": data.GetStreamTitle(), - "status": getStatus(), + "status": w.getStatus(), "timestamp": timestamp, }, }) diff --git a/core/webhooks/stream_test.go b/services/webhooks/stream_test.go similarity index 90% rename from core/webhooks/stream_test.go rename to services/webhooks/stream_test.go index 3da067812..a1974bc8d 100644 --- a/core/webhooks/stream_test.go +++ b/services/webhooks/stream_test.go @@ -15,7 +15,7 @@ func TestSendStreamStatusEvent(t *testing.T) { data.SetStreamTitle("my stream") checkPayload(t, models.StreamStarted, func() { - sendStreamStatusEvent(events.StreamStarted, "id", time.Unix(72, 6).UTC()) + manager.sendStreamStatusEvent(events.StreamStarted, "id", time.Unix(72, 6).UTC()) }, `{ "id": "id", "name": "my server", diff --git a/core/webhooks/webhooks.go b/services/webhooks/webhooks.go similarity index 81% rename from core/webhooks/webhooks.go rename to services/webhooks/webhooks.go index c3bcb39e2..4004fdaf3 100644 --- a/core/webhooks/webhooks.go +++ b/services/webhooks/webhooks.go @@ -26,11 +26,11 @@ type WebhookChatMessage struct { } // SendEventToWebhooks will send a single webhook event to all webhook destinations. -func SendEventToWebhooks(payload WebhookEvent) { - sendEventToWebhooks(payload, nil) +func (w *LiveWebhookManager) SendEventToWebhooks(payload WebhookEvent) { + w.sendEventToWebhooks(payload, nil) } -func sendEventToWebhooks(payload WebhookEvent, wg *sync.WaitGroup) { +func (w *LiveWebhookManager) sendEventToWebhooks(payload WebhookEvent, wg *sync.WaitGroup) { webhooks := data.GetWebhooksForEvent(payload.Type) for _, webhook := range webhooks { @@ -38,6 +38,6 @@ func sendEventToWebhooks(payload WebhookEvent, wg *sync.WaitGroup) { if wg != nil { wg.Add(1) } - addToQueue(webhook, payload, wg) + w.addToQueue(webhook, payload, wg) } } diff --git a/core/webhooks/webhooks_test.go b/services/webhooks/webhooks_test.go similarity index 96% rename from core/webhooks/webhooks_test.go rename to services/webhooks/webhooks_test.go index e10b02ee8..1c8992f0b 100644 --- a/core/webhooks/webhooks_test.go +++ b/services/webhooks/webhooks_test.go @@ -18,6 +18,8 @@ import ( jsonpatch "gopkg.in/evanphx/json-patch.v5" ) +var manager *LiveWebhookManager + func fakeGetStatus() models.Status { return models.Status{ Online: true, @@ -41,9 +43,9 @@ func TestMain(m *testing.M) { panic(err) } - SetupWebhooks(fakeGetStatus) - - defer close(queue) + InitTemporarySingleton(fakeGetStatus) + manager = GetWebhooks() + defer close(manager.queue) m.Run() } @@ -77,7 +79,7 @@ func TestPublicSend(t *testing.T) { EventData: struct{}{}, Type: models.MessageSent, } - SendEventToWebhooks(wh) + manager.SendEventToWebhooks(wh) } wg.Wait() @@ -126,7 +128,7 @@ func TestRouting(t *testing.T) { EventData: struct{}{}, Type: eventType, } - sendEventToWebhooks(wh, &wg) + manager.sendEventToWebhooks(wh, &wg) } wg.Wait() @@ -166,7 +168,7 @@ func TestMultiple(t *testing.T) { EventData: struct{}{}, Type: models.MessageSent, } - sendEventToWebhooks(wh, &wg) + manager.sendEventToWebhooks(wh, &wg) wg.Wait() @@ -205,7 +207,7 @@ func TestTimestamps(t *testing.T) { EventData: struct{}{}, Type: eventTypes[0], } - sendEventToWebhooks(wh, &wg) + manager.sendEventToWebhooks(wh, &wg) wg.Wait() @@ -302,7 +304,7 @@ func TestParallel(t *testing.T) { EventData: struct{}{}, Type: models.MessageSent, } - sendEventToWebhooks(wh, &wgMessages) + manager.sendEventToWebhooks(wh, &wgMessages) } wgMessages.Wait() diff --git a/core/webhooks/workerpool.go b/services/webhooks/workerpool.go similarity index 73% rename from core/webhooks/workerpool.go rename to services/webhooks/workerpool.go index 2134ef1d2..f0ba7e875 100644 --- a/core/webhooks/workerpool.go +++ b/services/webhooks/workerpool.go @@ -23,39 +23,28 @@ type Job struct { webhook models.Webhook } -var ( - queue chan Job - getStatus func() models.Status -) - -// SetupWebhooks initializes the webhook worker pool and sets the function to get the current status. -func SetupWebhooks(getStatusFunc func() models.Status) { - getStatus = getStatusFunc - initWorkerPool() -} - // initWorkerPool starts n go routines that await webhook jobs. -func initWorkerPool() { - queue = make(chan Job) +func (w *LiveWebhookManager) initWorkerPool() { + w.queue = make(chan Job) // start workers for i := 1; i <= webhookWorkerPoolSize; i++ { - go worker(i, queue) + go w.worker(i, w.queue) } } -func addToQueue(webhook models.Webhook, payload WebhookEvent, wg *sync.WaitGroup) { +func (w *LiveWebhookManager) addToQueue(webhook models.Webhook, payload WebhookEvent, wg *sync.WaitGroup) { log.Tracef("Queued Event %s for Webhook %s", payload.Type, webhook.URL) - queue <- Job{wg, payload, webhook} + w.queue <- Job{webhook, payload, wg} } -func worker(workerID int, queue <-chan Job) { +func (w *LiveWebhookManager) worker(workerID int, queue <-chan Job) { log.Debugf("Started Webhook worker %d", workerID) for job := range queue { log.Debugf("Event %s sent to Webhook %s using worker %d", job.payload.Type, job.webhook.URL, workerID) - if err := sendWebhook(job); err != nil { + if err := w.sendWebhook(job); err != nil { log.Errorf("Event: %s failed to send to webhook: %s Error: %s", job.payload.Type, job.webhook.URL, err) } log.Tracef("Done with Event %s to Webhook %s using worker %d", job.payload.Type, job.webhook.URL, workerID) @@ -65,7 +54,7 @@ func worker(workerID int, queue <-chan Job) { } } -func sendWebhook(job Job) error { +func (w *LiveWebhookManager) sendWebhook(job Job) error { jsonText, err := json.Marshal(job.payload) if err != nil { return err diff --git a/webserver/handlers/adminApiConfigHandlers.go b/webserver/handlers/adminApiConfigHandlers.go index fea84ac14..6f002a398 100644 --- a/webserver/handlers/adminApiConfigHandlers.go +++ b/webserver/handlers/adminApiConfigHandlers.go @@ -12,9 +12,8 @@ import ( "github.com/owncast/owncast/activitypub/outbox" "github.com/owncast/owncast/core/chat" "github.com/owncast/owncast/core/data" - "github.com/owncast/owncast/core/user" - "github.com/owncast/owncast/core/webhooks" "github.com/owncast/owncast/models" + "github.com/owncast/owncast/services/webhooks" "github.com/owncast/owncast/utils" "github.com/owncast/owncast/webserver/requests" "github.com/owncast/owncast/webserver/responses" @@ -71,7 +70,8 @@ func (h *Handlers) SetStreamTitle(w http.ResponseWriter, r *http.Request) { } if value != "" { sendSystemChatAction(fmt.Sprintf("Stream title changed to **%s**", value), true) - go webhooks.SendStreamStatusEvent(models.StreamTitleUpdated) + webhookManager := webhooks.GetWebhooks() + go webhookManager.SendStreamStatusEvent(models.StreamTitleUpdated) } responses.WriteSimpleResponse(w, true, "changed") }