diff --git a/.github/workflows/automated-end-to-end-api.yaml b/.github/workflows/automated-end-to-end-api.yaml
index daa100e28..b65eab80a 100644
--- a/.github/workflows/automated-end-to-end-api.yaml
+++ b/.github/workflows/automated-end-to-end-api.yaml
@@ -6,6 +6,11 @@ on:
- 'webroot/**'
- 'web/**'
+ pull_request:
+ paths-ignore:
+ - 'webroot/**'
+ - 'web/**'
+
jobs:
test:
runs-on: ubuntu-latest
diff --git a/.gitpod.yml b/.gitpod.yml
new file mode 100644
index 000000000..6de4ee2b3
--- /dev/null
+++ b/.gitpod.yml
@@ -0,0 +1,5 @@
+# Automatic workspace preparation for gitpod instances
+
+tasks:
+ - init: sudo apt-get install ffmpeg -y && go get && go build ./... && go test ./...
+ command: go run .
diff --git a/README.md b/README.md
index 4e4e4f508..8ddbac7b5 100644
--- a/README.md
+++ b/README.md
@@ -44,7 +44,7 @@
-Owncast is an open source, self-hosted, decentralized, single user live video streaming and chat server for running your own live streams similar in style to the large mainstream options. It offers complete ownership over your content, interface, moderation and audience. Visit the demo for an example.
+Owncast is an open source, self-hosted, decentralized, single user live video streaming and chat server for running your own live streams similar in style to the large mainstream options. It offers complete ownership over your content, interface, moderation and audience. Visit the demo for an example.

@@ -59,7 +59,6 @@ Owncast is an open source, self-hosted, decentralized, single user live video st
-
---
@@ -87,7 +86,9 @@ Owncast consists of two projects.
The Owncast backend is a service written in Go.
-1. Ensure you have a c compiler installed.
+1. Ensure you have pre-requisites installed.
+ - C compiler, such as [GCC compiler](https://gcc.gnu.org/install/download.html) or a [Musl-compatible compiler](https://musl.libc.org/)
+ - [ffmpeg](https://ffmpeg.org/download.html)
1. Install the [Go toolchain](https://golang.org/dl/) (1.16 or above).
1. Clone the repo. `git clone https://github.com/owncast/owncast`
1. `go run main.go` will run from source.
@@ -100,8 +101,7 @@ The frontend is the web interface that includes the player, chat, embed componen
1. This project lives in the `web` directory.
1. Run `npm install` to install the Javascript dependencies.
-1. Run `npm run dev`
-
+1. Run `npm run dev`
## Contributing
@@ -113,9 +113,6 @@ We’ve been very lucky to have this so far, so maybe you can help us with your
There is a larger, more detailed, and more up-to-date [guide for helping contribute to Owncast on our website](https://owncast.online/help/).
-
-
-
## License
diff --git a/build/javascript/package-lock.json b/build/javascript/package-lock.json
index 17ce5d227..d59cc673e 100644
--- a/build/javascript/package-lock.json
+++ b/build/javascript/package-lock.json
@@ -350,9 +350,9 @@
}
},
"caniuse-lite": {
- "version": "1.0.30001412",
- "resolved": "https://registry.npmjs.org/caniuse-lite/-/caniuse-lite-1.0.30001412.tgz",
- "integrity": "sha512-+TeEIee1gS5bYOiuf+PS/kp2mrXic37Hl66VY6EAfxasIk5fELTktK2oOezYed12H8w7jt3s512PpulQidPjwA=="
+ "version": "1.0.30001418",
+ "resolved": "https://registry.npmjs.org/caniuse-lite/-/caniuse-lite-1.0.30001418.tgz",
+ "integrity": "sha512-oIs7+JL3K9JRQ3jPZjlH6qyYDp+nBTCais7hjh0s+fuBwufc7uZ7hPYMXrDOJhV360KGMTcczMRObk0/iMqZRg=="
},
"chalk": {
"version": "4.1.2",
@@ -416,13 +416,13 @@
}
},
"cliui": {
- "version": "7.0.4",
- "resolved": "https://registry.npmjs.org/cliui/-/cliui-7.0.4.tgz",
- "integrity": "sha512-OcRE68cOsVMXp1Yvonl/fzkQOyjLSu/8bhPDfQt0e0/Eb283TKP20Fs2MqoPsr9SwA595rRCA+QMzYc9nBP+JQ==",
+ "version": "8.0.1",
+ "resolved": "https://registry.npmjs.org/cliui/-/cliui-8.0.1.tgz",
+ "integrity": "sha512-BSeNnyus75C4//NQ9gQt1/csTXyo/8Sb+afLAkzAptFuMsod9HFokGNudZpi/oQV73hnVK+sR+5PVRMd+Dr7YQ==",
"dev": true,
"requires": {
"string-width": "^4.2.0",
- "strip-ansi": "^6.0.0",
+ "strip-ansi": "^6.0.1",
"wrap-ansi": "^7.0.0"
}
},
@@ -645,9 +645,9 @@
}
},
"electron-to-chromium": {
- "version": "1.4.262",
- "resolved": "https://registry.npmjs.org/electron-to-chromium/-/electron-to-chromium-1.4.262.tgz",
- "integrity": "sha512-Ckn5haqmGh/xS8IbcgK3dnwAVnhDyo/WQnklWn6yaMucYTq7NNxwlGE8ElzEOnonzRLzUCo2Ot3vUb2GYUF2Hw=="
+ "version": "1.4.276",
+ "resolved": "https://registry.npmjs.org/electron-to-chromium/-/electron-to-chromium-1.4.276.tgz",
+ "integrity": "sha512-EpuHPqu8YhonqLBXHoU6hDJCD98FCe6KDoet3/gY1qsQ6usjJoHqBH2YIVs8FXaAtHwVL8Uqa/fsYao/vq9VWQ=="
},
"emoji-regex": {
"version": "8.0.0",
@@ -2076,9 +2076,9 @@
"integrity": "sha512-rBJeI5CXAlmy1pV+617WB9J63U6XcazHHF2f2dbJix4XzpUF0RS3Zbj0FGIOCAva5P/d/GBOYaACQ1w+0azUkg=="
},
"update-browserslist-db": {
- "version": "1.0.9",
- "resolved": "https://registry.npmjs.org/update-browserslist-db/-/update-browserslist-db-1.0.9.tgz",
- "integrity": "sha512-/xsqn21EGVdXI3EXSum1Yckj3ZVZugqyOZQ/CxYPBD/R+ko9NSUScf8tFF4dOKY+2pvSSJA/S+5B8s4Zr4kyvg==",
+ "version": "1.0.10",
+ "resolved": "https://registry.npmjs.org/update-browserslist-db/-/update-browserslist-db-1.0.10.tgz",
+ "integrity": "sha512-OztqDenkfFkbSG+tRxBeAnCVPckDBcvibKd35yDONx6OU8N7sqgwc7rCbkJ/WcYtVRZ4ba68d6byhC21GFh7sQ==",
"requires": {
"escalade": "^3.1.1",
"picocolors": "^1.0.0"
@@ -2194,12 +2194,12 @@
"dev": true
},
"yargs": {
- "version": "17.5.1",
- "resolved": "https://registry.npmjs.org/yargs/-/yargs-17.5.1.tgz",
- "integrity": "sha512-t6YAJcxDkNX7NFYiVtKvWUz8l+PaKTLiL63mJYWR2GnHq2gjEWISzsLp9wg3aY36dY1j+gfIEL3pIF+XlJJfbA==",
+ "version": "17.6.0",
+ "resolved": "https://registry.npmjs.org/yargs/-/yargs-17.6.0.tgz",
+ "integrity": "sha512-8H/wTDqlSwoSnScvV2N/JHfLWOKuh5MVla9hqLjK3nsfyy6Y4kDSYSvkU5YCUEPOSnRXfIyx3Sq+B/IWudTo4g==",
"dev": true,
"requires": {
- "cliui": "^7.0.2",
+ "cliui": "^8.0.1",
"escalade": "^3.1.1",
"get-caller-file": "^2.0.5",
"require-directory": "^2.1.1",
diff --git a/controllers/admin/viewers.go b/controllers/admin/viewers.go
index e2f48b1c1..ed542a31a 100644
--- a/controllers/admin/viewers.go
+++ b/controllers/admin/viewers.go
@@ -37,7 +37,7 @@ func GetViewersOverTime(w http.ResponseWriter, r *http.Request) {
// GetActiveViewers returns currently connected clients.
func GetActiveViewers(w http.ResponseWriter, r *http.Request) {
c := core.GetActiveViewers()
- viewers := []models.Viewer{}
+ viewers := make([]models.Viewer, 0, len(c))
for _, v := range c {
viewers = append(viewers, *v)
}
diff --git a/core/chat/messages.go b/core/chat/messages.go
index c5a4b2655..5e05b5796 100644
--- a/core/chat/messages.go
+++ b/core/chat/messages.go
@@ -29,13 +29,7 @@ func SetMessagesVisibility(messageIDs []string, visibility bool) error {
return errors.New("error broadcasting message visibility payload " + err.Error())
}
- // Send webhook
- wh := webhooks.WebhookEvent{
- EventData: payload,
- Type: event.GetMessageType(),
- }
-
- webhooks.SendEventToWebhooks(wh)
+ webhooks.SendChatEventSetMessageVisibility(event)
return nil
}
diff --git a/core/webhooks/chat.go b/core/webhooks/chat.go
index 3647956b6..1fb5287f3 100644
--- a/core/webhooks/chat.go
+++ b/core/webhooks/chat.go
@@ -42,3 +42,14 @@ func SendChatEventUserJoined(event events.UserJoinedEvent) {
SendEventToWebhooks(webhookEvent)
}
+
+// SendChatEventSetMessageVisibility sends a webhook notifying that the visibility of one or more
+// messages has changed.
+func SendChatEventSetMessageVisibility(event events.SetMessageVisibilityEvent) {
+ webhookEvent := WebhookEvent{
+ Type: models.VisibiltyToggled,
+ EventData: event,
+ }
+
+ SendEventToWebhooks(webhookEvent)
+}
diff --git a/core/webhooks/chat_test.go b/core/webhooks/chat_test.go
new file mode 100644
index 000000000..eb8689a70
--- /dev/null
+++ b/core/webhooks/chat_test.go
@@ -0,0 +1,185 @@
+package webhooks
+
+import (
+ "testing"
+ "time"
+
+ "github.com/owncast/owncast/core/chat/events"
+ "github.com/owncast/owncast/core/user"
+ "github.com/owncast/owncast/models"
+)
+
+func TestSendChatEvent(t *testing.T) {
+ timestamp := time.Unix(72, 6).UTC()
+ user := user.User{
+ ID: "user id",
+ DisplayName: "display name",
+ DisplayColor: 4,
+ CreatedAt: time.Unix(3, 26).UTC(),
+ DisabledAt: nil,
+ PreviousNames: []string{"somebody"},
+ NameChangedAt: nil,
+ Scopes: []string{},
+ IsBot: false,
+ AuthenticatedAt: nil,
+ Authenticated: false,
+ }
+
+ checkPayload(t, models.MessageSent, func() {
+ SendChatEvent(&events.UserMessageEvent{
+ Event: events.Event{
+ Type: events.MessageSent,
+ ID: "id",
+ Timestamp: timestamp,
+ },
+ UserEvent: events.UserEvent{
+ User: &user,
+ ClientID: 51,
+ HiddenAt: nil,
+ },
+ MessageEvent: events.MessageEvent{
+ OutboundEvent: nil,
+ Body: "body",
+ RawBody: "raw body",
+ },
+ })
+ }, `{
+ "body": "body",
+ "clientId": 51,
+ "id": "id",
+ "rawBody": "raw body",
+ "timestamp": "1970-01-01T00:01:12.000000006Z",
+ "user": {
+ "authenticated": false,
+ "createdAt": "1970-01-01T00:00:03.000000026Z",
+ "displayColor": 4,
+ "displayName": "display name",
+ "id": "user id",
+ "isBot": false,
+ "previousNames": ["somebody"]
+ },
+ "visible": true
+ }`)
+}
+
+func TestSendChatEventUsernameChanged(t *testing.T) {
+ timestamp := time.Unix(72, 6).UTC()
+ user := user.User{
+ ID: "user id",
+ DisplayName: "display name",
+ DisplayColor: 4,
+ CreatedAt: time.Unix(3, 26).UTC(),
+ DisabledAt: nil,
+ PreviousNames: []string{"somebody"},
+ NameChangedAt: nil,
+ Scopes: []string{},
+ IsBot: false,
+ AuthenticatedAt: nil,
+ Authenticated: false,
+ }
+
+ checkPayload(t, models.UserNameChanged, func() {
+ SendChatEventUsernameChanged(events.NameChangeEvent{
+ Event: events.Event{
+ Type: events.UserNameChanged,
+ ID: "id",
+ Timestamp: timestamp,
+ },
+ UserEvent: events.UserEvent{
+ User: &user,
+ ClientID: 51,
+ HiddenAt: nil,
+ },
+ NewName: "new name",
+ })
+ }, `{
+ "clientId": 51,
+ "id": "id",
+ "newName": "new name",
+ "timestamp": "1970-01-01T00:01:12.000000006Z",
+ "type": "NAME_CHANGE",
+ "user": {
+ "authenticated": false,
+ "createdAt": "1970-01-01T00:00:03.000000026Z",
+ "displayColor": 4,
+ "displayName": "display name",
+ "id": "user id",
+ "isBot": false,
+ "previousNames": ["somebody"]
+ }
+ }`)
+}
+
+func TestSendChatEventUserJoined(t *testing.T) {
+ timestamp := time.Unix(72, 6).UTC()
+ user := user.User{
+ ID: "user id",
+ DisplayName: "display name",
+ DisplayColor: 4,
+ CreatedAt: time.Unix(3, 26).UTC(),
+ DisabledAt: nil,
+ PreviousNames: []string{"somebody"},
+ NameChangedAt: nil,
+ Scopes: []string{},
+ IsBot: false,
+ AuthenticatedAt: nil,
+ Authenticated: false,
+ }
+
+ checkPayload(t, models.UserJoined, func() {
+ SendChatEventUserJoined(events.UserJoinedEvent{
+ Event: events.Event{
+ Type: events.UserJoined,
+ ID: "id",
+ Timestamp: timestamp,
+ },
+ UserEvent: events.UserEvent{
+ User: &user,
+ ClientID: 51,
+ HiddenAt: nil,
+ },
+ })
+ }, `{
+ "clientId": 51,
+ "id": "id",
+ "type": "USER_JOINED",
+ "timestamp": "1970-01-01T00:01:12.000000006Z",
+ "user": {
+ "authenticated": false,
+ "createdAt": "1970-01-01T00:00:03.000000026Z",
+ "displayColor": 4,
+ "displayName": "display name",
+ "id": "user id",
+ "isBot": false,
+ "previousNames": ["somebody"]
+ }
+ }`)
+}
+
+func TestSendChatEventSetMessageVisibility(t *testing.T) {
+ timestamp := time.Unix(72, 6).UTC()
+
+ checkPayload(t, models.VisibiltyToggled, func() {
+ SendChatEventSetMessageVisibility(events.SetMessageVisibilityEvent{
+ Event: events.Event{
+ Type: events.VisibiltyUpdate,
+ ID: "id",
+ Timestamp: timestamp,
+ },
+ UserMessageEvent: events.UserMessageEvent{},
+ MessageIDs: []string{"message1", "message2"},
+ Visible: false,
+ })
+ }, `{
+ "MessageIDs": [
+ "message1",
+ "message2"
+ ],
+ "Visible": false,
+ "body": "",
+ "id": "id",
+ "timestamp": "1970-01-01T00:01:12.000000006Z",
+ "type": "VISIBILITY-UPDATE",
+ "user": null
+ }`)
+}
diff --git a/core/webhooks/stream.go b/core/webhooks/stream.go
index c2b282445..a0184ec59 100644
--- a/core/webhooks/stream.go
+++ b/core/webhooks/stream.go
@@ -10,14 +10,18 @@ import (
// SendStreamStatusEvent will send all webhook destinations the current stream status.
func SendStreamStatusEvent(eventType models.EventType) {
+ sendStreamStatusEvent(eventType, shortid.MustGenerate(), time.Now())
+}
+
+func sendStreamStatusEvent(eventType models.EventType, id string, timestamp time.Time) {
SendEventToWebhooks(WebhookEvent{
Type: eventType,
EventData: map[string]interface{}{
- "id": shortid.MustGenerate(),
+ "id": id,
"name": data.GetServerName(),
"summary": data.GetServerSummary(),
"streamTitle": data.GetStreamTitle(),
- "timestamp": time.Now(),
+ "timestamp": timestamp,
},
})
}
diff --git a/core/webhooks/stream_test.go b/core/webhooks/stream_test.go
new file mode 100644
index 000000000..a9802623d
--- /dev/null
+++ b/core/webhooks/stream_test.go
@@ -0,0 +1,26 @@
+package webhooks
+
+import (
+ "testing"
+ "time"
+
+ "github.com/owncast/owncast/core/chat/events"
+ "github.com/owncast/owncast/core/data"
+ "github.com/owncast/owncast/models"
+)
+
+func TestSendStreamStatusEvent(t *testing.T) {
+ data.SetServerName("my server")
+ data.SetServerSummary("my server where I stream")
+ data.SetStreamTitle("my stream")
+
+ checkPayload(t, models.StreamStarted, func() {
+ sendStreamStatusEvent(events.StreamStarted, "id", time.Unix(72, 6).UTC())
+ }, `{
+ "id": "id",
+ "name": "my server",
+ "streamTitle": "my stream",
+ "summary": "my server where I stream",
+ "timestamp": "1970-01-01T00:01:12.000000006Z"
+ }`)
+}
diff --git a/core/webhooks/webhooks.go b/core/webhooks/webhooks.go
index 3fa6ee67b..9e7be6e9f 100644
--- a/core/webhooks/webhooks.go
+++ b/core/webhooks/webhooks.go
@@ -1,6 +1,7 @@
package webhooks
import (
+ "sync"
"time"
"github.com/owncast/owncast/core/data"
@@ -27,9 +28,17 @@ type WebhookChatMessage struct {
// SendEventToWebhooks will send a single webhook event to all webhook destinations.
func SendEventToWebhooks(payload WebhookEvent) {
+ sendEventToWebhooks(payload, nil)
+}
+
+func sendEventToWebhooks(payload WebhookEvent, wg *sync.WaitGroup) {
webhooks := data.GetWebhooksForEvent(payload.Type)
for _, webhook := range webhooks {
- go addToQueue(webhook, payload)
+ // Use wg to track the number of notifications to be sent.
+ if wg != nil {
+ wg.Add(1)
+ }
+ addToQueue(webhook, payload, wg)
}
}
diff --git a/core/webhooks/webhooks_test.go b/core/webhooks/webhooks_test.go
new file mode 100644
index 000000000..c46254066
--- /dev/null
+++ b/core/webhooks/webhooks_test.go
@@ -0,0 +1,347 @@
+package webhooks
+
+import (
+ "bytes"
+ "encoding/json"
+ "fmt"
+ "net/http"
+ "net/http/httptest"
+ "os"
+ "sync"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/owncast/owncast/core/data"
+ "github.com/owncast/owncast/models"
+ jsonpatch "gopkg.in/evanphx/json-patch.v5"
+)
+
+func TestMain(m *testing.M) {
+ dbFile, err := os.CreateTemp(os.TempDir(), "owncast-test-db.db")
+ if err != nil {
+ panic(err)
+ }
+ dbFile.Close()
+ defer os.Remove(dbFile.Name())
+
+ if err := data.SetupPersistence(dbFile.Name()); err != nil {
+ panic(err)
+ }
+
+ InitWorkerPool()
+ defer close(queue)
+
+ m.Run()
+}
+
+// Because the other tests use `sendEventToWebhooks` with a `WaitGroup` to know when the test completes,
+// this test ensures that `SendToWebhooks` without a `WaitGroup` doesn't panic.
+func TestPublicSend(t *testing.T) {
+ // Send enough events to be sure at least one worker delivers a second event.
+ const eventsCount = webhookWorkerPoolSize + 1
+
+ var wg sync.WaitGroup
+ wg.Add(eventsCount)
+
+ svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ wg.Done()
+ }))
+ defer svr.Close()
+
+ hook, err := data.InsertWebhook(svr.URL, []models.EventType{models.MessageSent})
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer func() {
+ if err := data.DeleteWebhook(hook); err != nil {
+ t.Error(err)
+ }
+ }()
+
+ for i := 0; i < eventsCount; i++ {
+ wh := WebhookEvent{
+ EventData: struct{}{},
+ Type: models.MessageSent,
+ }
+ SendEventToWebhooks(wh)
+ }
+
+ wg.Wait()
+}
+
+// Make sure that events are only sent to interested endpoints.
+func TestRouting(t *testing.T) {
+ eventTypes := []models.EventType{models.PING, models.PONG}
+
+ calls := map[models.EventType]int{}
+ var lock sync.Mutex
+ svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ if len(r.URL.Path) < 1 || r.URL.Path[0] != '/' {
+ t.Fatalf("Got unexpected path %v", r.URL.Path)
+ }
+ pathType := r.URL.Path[1:]
+ var body WebhookEvent
+ if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
+ t.Fatal(err)
+ }
+ if body.Type != pathType {
+ t.Fatalf("Got %v payload on %v endpoint", body.Type, pathType)
+ }
+ lock.Lock()
+ defer lock.Unlock()
+ calls[pathType] += 1
+ }))
+ defer svr.Close()
+
+ for _, eventType := range eventTypes {
+ hook, err := data.InsertWebhook(svr.URL+"/"+eventType, []models.EventType{eventType})
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer func() {
+ if err := data.DeleteWebhook(hook); err != nil {
+ t.Error(err)
+ }
+ }()
+ }
+
+ var wg sync.WaitGroup
+
+ for _, eventType := range eventTypes {
+ wh := WebhookEvent{
+ EventData: struct{}{},
+ Type: eventType,
+ }
+ sendEventToWebhooks(wh, &wg)
+ }
+
+ wg.Wait()
+
+ for _, eventType := range eventTypes {
+ if calls[eventType] != 1 {
+ t.Errorf("Expected %v to be called exactly once but it was called %v times", eventType, calls[eventType])
+ }
+ }
+}
+
+// Make sure that events are sent to all interested endpoints.
+func TestMultiple(t *testing.T) {
+ const times = 2
+
+ var calls uint32
+ svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ atomic.AddUint32(&calls, 1)
+ }))
+ defer svr.Close()
+
+ for i := 0; i < times; i++ {
+ hook, err := data.InsertWebhook(fmt.Sprintf("%v/%v", svr.URL, i), []models.EventType{models.MessageSent})
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer func() {
+ if err := data.DeleteWebhook(hook); err != nil {
+ t.Error(err)
+ }
+ }()
+ }
+
+ var wg sync.WaitGroup
+
+ wh := WebhookEvent{
+ EventData: struct{}{},
+ Type: models.MessageSent,
+ }
+ sendEventToWebhooks(wh, &wg)
+
+ wg.Wait()
+
+ if atomic.LoadUint32(&calls) != times {
+ t.Errorf("Expected event to be sent exactly %v times but it was sent %v times", times, atomic.LoadUint32(&calls))
+ }
+}
+
+// Make sure when a webhook is used its last used timestamp is updated.
+func TestTimestamps(t *testing.T) {
+ const tolerance = time.Second
+ start := time.Now()
+ eventTypes := []models.EventType{models.PING, models.PONG}
+ handlerIds := []int{0, 0}
+ handlers := []*models.Webhook{nil, nil}
+ svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ }))
+ defer svr.Close()
+
+ for i, eventType := range eventTypes {
+ hook, err := data.InsertWebhook(svr.URL+"/"+eventType, []models.EventType{eventType})
+ if err != nil {
+ t.Fatal(err)
+ }
+ handlerIds[i] = hook
+ defer func() {
+ if err := data.DeleteWebhook(hook); err != nil {
+ t.Error(err)
+ }
+ }()
+ }
+
+ var wg sync.WaitGroup
+
+ wh := WebhookEvent{
+ EventData: struct{}{},
+ Type: eventTypes[0],
+ }
+ sendEventToWebhooks(wh, &wg)
+
+ wg.Wait()
+
+ hooks, err := data.GetWebhooks()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ for h, hook := range hooks {
+ for i, handlerId := range handlerIds {
+ if hook.ID == handlerId {
+ handlers[i] = &hooks[h]
+ }
+ }
+ }
+
+ if handlers[0] == nil {
+ t.Fatal("First handler was not found in registered handlers")
+ }
+ if handlers[1] == nil {
+ t.Fatal("Second handler was not found in registered handlers")
+ }
+
+ end := time.Now()
+
+ if handlers[0].Timestamp.Add(tolerance).Before(start) {
+ t.Errorf("First handler timestamp %v should not be before start of test %v", handlers[0].Timestamp, start)
+ }
+
+ if handlers[0].Timestamp.Add(tolerance).Before(handlers[1].Timestamp) {
+ t.Errorf("Second handler timestamp %v should not be before first handler timestamp %v", handlers[1].Timestamp, handlers[0].Timestamp)
+ }
+
+ if end.Add(tolerance).Before(handlers[1].Timestamp) {
+ t.Errorf("End of test %v should not be before second handler timestamp %v", end, handlers[1].Timestamp)
+ }
+
+ if handlers[0].LastUsed == nil {
+ t.Error("First handler last used should have been set")
+ } else if handlers[0].LastUsed.Add(tolerance).Before(handlers[1].Timestamp) {
+ t.Errorf("First handler last used %v should not be before second handler timestamp %v", handlers[0].LastUsed, handlers[1].Timestamp)
+ } else if end.Add(tolerance).Before(*handlers[0].LastUsed) {
+ t.Errorf("End of test %v should not be before first handler last used %v", end, handlers[0].LastUsed)
+ }
+
+ if handlers[1].LastUsed != nil {
+ t.Error("Second handler last used should not have been set")
+ }
+}
+
+// Make sure up to the expected number of events can be fired in parallel.
+func TestParallel(t *testing.T) {
+ var calls uint32
+
+ var wgStart sync.WaitGroup
+ finished := make(chan int)
+ wgStart.Add(webhookWorkerPoolSize)
+
+ svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ myId := atomic.AddUint32(&calls, 1)
+
+ // We made it to the pool size + 1 event, so we're done with the test.
+ if myId == webhookWorkerPoolSize+1 {
+ close(finished)
+ return
+ }
+
+ // Wait until all the handlers are started.
+ wgStart.Done()
+ wgStart.Wait()
+
+ // The first handler just returns so the pool size + 1 event can be handled.
+ if myId != 1 {
+ // The other handlers will wait for pool size + 1.
+ _ = <-finished
+ }
+ }))
+ defer svr.Close()
+
+ hook, err := data.InsertWebhook(svr.URL, []models.EventType{models.MessageSent})
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer func() {
+ if err := data.DeleteWebhook(hook); err != nil {
+ t.Error(err)
+ }
+ }()
+
+ var wgMessages sync.WaitGroup
+
+ for i := 0; i < webhookWorkerPoolSize+1; i++ {
+ wh := WebhookEvent{
+ EventData: struct{}{},
+ Type: models.MessageSent,
+ }
+ sendEventToWebhooks(wh, &wgMessages)
+ }
+
+ wgMessages.Wait()
+}
+
+// Send an event, capture it, and verify that it has the expected payload.
+func checkPayload(t *testing.T, eventType models.EventType, send func(), expectedJson string) {
+ eventChannel := make(chan WebhookEvent)
+
+ // Set up a server.
+ svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ data := WebhookEvent{}
+ json.NewDecoder(r.Body).Decode(&data)
+ eventChannel <- data
+ }))
+ defer svr.Close()
+
+ // Subscribe to the webhook.
+ hook, err := data.InsertWebhook(svr.URL, []models.EventType{eventType})
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer func() {
+ if err := data.DeleteWebhook(hook); err != nil {
+ t.Error(err)
+ }
+ }()
+
+ // Send and capture the event.
+ send()
+ event := <-eventChannel
+
+ if event.Type != eventType {
+ t.Errorf("Got event type %v but expected %v", event.Type, eventType)
+ }
+
+ // Compare.
+ payloadJson, err := json.MarshalIndent(event.EventData, "", " ")
+ if err != nil {
+ t.Fatal(err)
+ }
+ t.Logf("Actual payload:\n%s", payloadJson)
+
+ if !jsonpatch.Equal(payloadJson, []byte(expectedJson)) {
+ diff, err := jsonpatch.CreateMergePatch(payloadJson, []byte(expectedJson))
+ if err != nil {
+ t.Fatal(err)
+ }
+ var out bytes.Buffer
+ if err := json.Indent(&out, diff, "", " "); err != nil {
+ t.Fatal(err)
+ }
+ t.Errorf("Expected difference from actual payload:\n%s", out.Bytes())
+ }
+}
diff --git a/core/webhooks/workerpool.go b/core/webhooks/workerpool.go
index cfca428c4..838688340 100644
--- a/core/webhooks/workerpool.go
+++ b/core/webhooks/workerpool.go
@@ -4,6 +4,7 @@ import (
"bytes"
"encoding/json"
"net/http"
+ "sync"
log "github.com/sirupsen/logrus"
@@ -20,6 +21,7 @@ const (
type Job struct {
webhook models.Webhook
payload WebhookEvent
+ wg *sync.WaitGroup
}
var queue chan Job
@@ -34,9 +36,9 @@ func InitWorkerPool() {
}
}
-func addToQueue(webhook models.Webhook, payload WebhookEvent) {
+func addToQueue(webhook models.Webhook, payload WebhookEvent, wg *sync.WaitGroup) {
log.Tracef("Queued Event %s for Webhook %s", payload.Type, webhook.URL)
- queue <- Job{webhook, payload}
+ queue <- Job{webhook, payload, wg}
}
func worker(workerID int, queue <-chan Job) {
@@ -49,6 +51,9 @@ func worker(workerID int, queue <-chan Job) {
log.Errorf("Event: %s failed to send to webhook: %s Error: %s", job.payload.Type, job.webhook.URL, err)
}
log.Tracef("Done with Event %s to Webhook %s using worker %d", job.payload.Type, job.webhook.URL, workerID)
+ if job.wg != nil {
+ job.wg.Done()
+ }
}
}
diff --git a/go.mod b/go.mod
index c44548bd7..5519cd071 100644
--- a/go.mod
+++ b/go.mod
@@ -4,7 +4,7 @@ go 1.17
require (
github.com/amalfra/etag v1.0.0
- github.com/aws/aws-sdk-go v1.44.109
+ github.com/aws/aws-sdk-go v1.44.110
github.com/go-fed/activity v1.0.1-0.20210803212804-d866ba75dd0f
github.com/go-fed/httpsig v1.1.0
github.com/go-ole/go-ole v1.2.6 // indirect
@@ -12,7 +12,7 @@ require (
github.com/grafov/m3u8 v0.11.1
github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible
github.com/mattn/go-sqlite3 v1.14.15
- github.com/microcosm-cc/bluemonday v1.0.20
+ github.com/microcosm-cc/bluemonday v1.0.21
github.com/mssola/user_agent v0.5.3
github.com/nareix/joy5 v0.0.0-20210317075623-2c912ca30590
github.com/oschwald/geoip2-golang v1.8.0
@@ -35,7 +35,7 @@ require (
github.com/tklauser/numcpus v0.4.0 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3 // indirect
- golang.org/x/net v0.0.0-20220927171203-f486391704dc
+ golang.org/x/net v0.0.0-20221002022538-bcab6841153b
golang.org/x/sys v0.0.0-20220804214406-8e32c043e418 // indirect
)
@@ -53,11 +53,12 @@ require (
github.com/prometheus/procfs v0.8.0 // indirect
golang.org/x/text v0.3.7 // indirect
google.golang.org/protobuf v1.28.1 // indirect
+ gopkg.in/evanphx/json-patch.v5 v5.6.0 // indirect
)
require (
github.com/nakabonne/tstorage v0.3.5
- github.com/shirou/gopsutil/v3 v3.22.8
+ github.com/shirou/gopsutil/v3 v3.22.9
)
require github.com/SherClockHolmes/webpush-go v1.2.0
diff --git a/go.sum b/go.sum
index 2d96bbfe0..eefb8e473 100644
--- a/go.sum
+++ b/go.sum
@@ -74,6 +74,8 @@ github.com/aws/aws-sdk-go v1.44.107 h1:VP7Rq3wzsOV7wrfHqjAAKRksD4We58PaoVSDPKhm8
github.com/aws/aws-sdk-go v1.44.107/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo=
github.com/aws/aws-sdk-go v1.44.109 h1:+Na5JPeS0kiEHoBp5Umcuuf+IDqXqD0lXnM920E31YI=
github.com/aws/aws-sdk-go v1.44.109/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo=
+github.com/aws/aws-sdk-go v1.44.110 h1:unno3l2FYQo6p0wYCp9gUk8YNzhOxqSktM0Y1vukl9k=
+github.com/aws/aws-sdk-go v1.44.110/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo=
github.com/aymerick/douceur v0.2.0 h1:Mv+mAeH1Q+n9Fr+oyamOlAkUNPWPlA8PPGR0QAaYuPk=
github.com/aymerick/douceur v0.2.0/go.mod h1:wlT5vV2O3h55X9m7iVYN0TBM0NH/MmbLnd30/FjWUq4=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
@@ -167,6 +169,7 @@ github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o=
github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
+github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
@@ -236,6 +239,8 @@ github.com/microcosm-cc/bluemonday v1.0.19 h1:OI7hoF5FY4pFz2VA//RN8TfM0YJ2dJcl4P
github.com/microcosm-cc/bluemonday v1.0.19/go.mod h1:QNzV2UbLK2/53oIIwTOyLUSABMkjZ4tqiyC1g/DyqxE=
github.com/microcosm-cc/bluemonday v1.0.20 h1:flpzsq4KU3QIYAYGV/szUat7H+GPOXR0B2JU5A1Wp8Y=
github.com/microcosm-cc/bluemonday v1.0.20/go.mod h1:yfBmMi8mxvaZut3Yytv+jTXRY8mxyjJ0/kQBTElld50=
+github.com/microcosm-cc/bluemonday v1.0.21 h1:dNH3e4PSyE4vNX+KlRGHT5KrSvjeUkoNPwEORjffHJg=
+github.com/microcosm-cc/bluemonday v1.0.21/go.mod h1:ytNkv4RrDrLJ2pqlsSI46O6IVXmZOBBD4SaJyDwwTkM=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
@@ -312,6 +317,8 @@ github.com/shirou/gopsutil/v3 v3.22.7 h1:flKnuCMfUUrO+oAvwAd6GKZgnPzr098VA/UJ14n
github.com/shirou/gopsutil/v3 v3.22.7/go.mod h1:s648gW4IywYzUfE/KjXxUsqrqx/T2xO5VqOXxONeRfI=
github.com/shirou/gopsutil/v3 v3.22.8 h1:a4s3hXogo5mE2PfdfJIonDbstO/P+9JszdfhAHSzD9Y=
github.com/shirou/gopsutil/v3 v3.22.8/go.mod h1:s648gW4IywYzUfE/KjXxUsqrqx/T2xO5VqOXxONeRfI=
+github.com/shirou/gopsutil/v3 v3.22.9 h1:yibtJhIVEMcdw+tCTbOPiF1VcsuDeTE4utJ8Dm4c5eA=
+github.com/shirou/gopsutil/v3 v3.22.9/go.mod h1:bBYl1kjgEJpWpxeHmLI+dVHWtyAwfcmSBLDsp2TNT8A=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
@@ -461,6 +468,8 @@ golang.org/x/net v0.0.0-20220909164309-bea034e7d591 h1:D0B/7al0LLrVC8aWF4+oxpv/m
golang.org/x/net v0.0.0-20220909164309-bea034e7d591/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
golang.org/x/net v0.0.0-20220927171203-f486391704dc h1:FxpXZdoBqT8RjqTy6i1E8nXHhW21wK7ptQ/EPIGxzPQ=
golang.org/x/net v0.0.0-20220927171203-f486391704dc/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
+golang.org/x/net v0.0.0-20221002022538-bcab6841153b h1:6e93nYa3hNqAvLr0pD4PN1fFS+gKzp2zAXqrnTCstqU=
+golang.org/x/net v0.0.0-20221002022538-bcab6841153b/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -680,6 +689,8 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
+gopkg.in/evanphx/json-patch.v5 v5.6.0 h1:BMT6KIwBD9CaU91PJCZIe46bDmBWa9ynTQgJIOpfQBk=
+gopkg.in/evanphx/json-patch.v5 v5.6.0/go.mod h1:/kvTRh1TVm5wuM6OkHxqXtE/1nUZZpihg29RtuIyfvk=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=