mirror of
https://github.com/owncast/owncast.git
synced 2024-10-10 19:16:02 +00:00
426 lines
13 KiB
Go
426 lines
13 KiB
Go
package federationrepository
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"fmt"
|
|
"net/url"
|
|
"time"
|
|
|
|
"github.com/go-fed/activity/streams"
|
|
"github.com/go-fed/activity/streams/vocab"
|
|
"github.com/owncast/owncast/models"
|
|
"github.com/owncast/owncast/services/apfederation/apmodels"
|
|
"github.com/owncast/owncast/services/apfederation/resolvers"
|
|
"github.com/owncast/owncast/storage/data"
|
|
"github.com/owncast/owncast/storage/sqlstorage"
|
|
"github.com/owncast/owncast/utils"
|
|
"github.com/pkg/errors"
|
|
|
|
log "github.com/sirupsen/logrus"
|
|
)
|
|
|
|
type FederationRepository struct {
|
|
datastore *data.Store
|
|
}
|
|
|
|
func New(datastore *data.Store) *FederationRepository {
|
|
r := &FederationRepository{
|
|
datastore: datastore,
|
|
}
|
|
|
|
return r
|
|
}
|
|
|
|
// NOTE: This is temporary during the transition period.
|
|
var temporaryGlobalInstance *FederationRepository
|
|
|
|
// GetUserRepository will return the user repository.
|
|
func Get() *FederationRepository {
|
|
if temporaryGlobalInstance == nil {
|
|
i := New(data.GetDatastore())
|
|
temporaryGlobalInstance = i
|
|
}
|
|
return temporaryGlobalInstance
|
|
}
|
|
|
|
// GetFollowerCount will return the number of followers we're keeping track of.
|
|
func (f *FederationRepository) GetFollowerCount() (int64, error) {
|
|
ctx := context.Background()
|
|
return f.datastore.GetQueries().GetFollowerCount(ctx)
|
|
}
|
|
|
|
// GetFederationFollowers will return a slice of the followers we keep track of locally.
|
|
func (f *FederationRepository) GetFederationFollowers(limit int, offset int) ([]models.Follower, int, error) {
|
|
ctx := context.Background()
|
|
total, err := f.datastore.GetQueries().GetFollowerCount(ctx)
|
|
if err != nil {
|
|
return nil, 0, errors.Wrap(err, "unable to fetch total number of followers")
|
|
}
|
|
|
|
followersResult, err := f.datastore.GetQueries().GetFederationFollowersWithOffset(ctx, sqlstorage.GetFederationFollowersWithOffsetParams{
|
|
Limit: int32(limit),
|
|
Offset: int32(offset),
|
|
})
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
|
|
followers := make([]models.Follower, 0)
|
|
|
|
for _, row := range followersResult {
|
|
singleFollower := models.Follower{
|
|
Name: row.Name.String,
|
|
Username: row.Username,
|
|
Image: row.Image.String,
|
|
ActorIRI: row.Iri,
|
|
Inbox: row.Inbox,
|
|
Timestamp: utils.NullTime(row.CreatedAt),
|
|
}
|
|
|
|
followers = append(followers, singleFollower)
|
|
}
|
|
|
|
return followers, int(total), nil
|
|
}
|
|
|
|
// GetPendingFollowRequests will return pending follow requests.
|
|
func (f *FederationRepository) GetPendingFollowRequests() ([]models.Follower, error) {
|
|
pendingFollowersResult, err := f.datastore.GetQueries().GetFederationFollowerApprovalRequests(context.Background())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
followers := make([]models.Follower, 0)
|
|
|
|
for _, row := range pendingFollowersResult {
|
|
singleFollower := models.Follower{
|
|
Name: row.Name.String,
|
|
Username: row.Username,
|
|
Image: row.Image.String,
|
|
ActorIRI: row.Iri,
|
|
Inbox: row.Inbox,
|
|
Timestamp: utils.NullTime{Time: row.CreatedAt.Time, Valid: true},
|
|
}
|
|
followers = append(followers, singleFollower)
|
|
}
|
|
|
|
return followers, nil
|
|
}
|
|
|
|
// GetBlockedAndRejectedFollowers will return blocked and rejected followers.
|
|
func (f *FederationRepository) GetBlockedAndRejectedFollowers() ([]models.Follower, error) {
|
|
pendingFollowersResult, err := f.datastore.GetQueries().GetRejectedAndBlockedFollowers(context.Background())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
followers := make([]models.Follower, 0)
|
|
|
|
for _, row := range pendingFollowersResult {
|
|
singleFollower := models.Follower{
|
|
Name: row.Name.String,
|
|
Username: row.Username,
|
|
Image: row.Image.String,
|
|
ActorIRI: row.Iri,
|
|
DisabledAt: utils.NullTime{Time: row.DisabledAt.Time, Valid: true},
|
|
Timestamp: utils.NullTime{Time: row.CreatedAt.Time, Valid: true},
|
|
}
|
|
followers = append(followers, singleFollower)
|
|
}
|
|
|
|
return followers, nil
|
|
}
|
|
|
|
// AddFollow will save a follow to the datastore.
|
|
func (f *FederationRepository) AddFollow(follow apmodels.ActivityPubActor, approved bool) error {
|
|
log.Traceln("Saving", follow.ActorIri, "as a follower.")
|
|
var image string
|
|
if follow.Image != nil {
|
|
image = follow.Image.String()
|
|
}
|
|
|
|
followRequestObject, err := apmodels.Serialize(follow.RequestObject)
|
|
if err != nil {
|
|
return errors.Wrap(err, "error serializing follow request object")
|
|
}
|
|
|
|
return f.createFollow(follow.ActorIri.String(), follow.Inbox.String(), follow.FollowRequestIri.String(), follow.Name, follow.Username, image, followRequestObject, approved)
|
|
}
|
|
|
|
// RemoveFollow will remove a follow from the datastore.
|
|
func (f *FederationRepository) RemoveFollow(unfollow apmodels.ActivityPubActor) error {
|
|
log.Traceln("Removing", unfollow.ActorIri, "as a follower.")
|
|
return f.removeFollow(unfollow.ActorIri)
|
|
}
|
|
|
|
// GetFollower will return a single follower/request given an IRI.
|
|
func (f *FederationRepository) GetFollower(iri string) (*apmodels.ActivityPubActor, error) {
|
|
result, err := f.datastore.GetQueries().GetFollowerByIRI(context.Background(), iri)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
followIRI, err := url.Parse(result.Request)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "error parsing follow request IRI")
|
|
}
|
|
|
|
iriURL, err := url.Parse(result.Iri)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "error parsing actor IRI")
|
|
}
|
|
|
|
inbox, err := url.Parse(result.Inbox)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "error parsing acting inbox")
|
|
}
|
|
|
|
image, _ := url.Parse(result.Image.String)
|
|
|
|
var disabledAt *time.Time
|
|
if result.DisabledAt.Valid {
|
|
disabledAt = &result.DisabledAt.Time
|
|
}
|
|
|
|
follower := apmodels.ActivityPubActor{
|
|
ActorIri: iriURL,
|
|
Inbox: inbox,
|
|
Name: result.Name.String,
|
|
Username: result.Username,
|
|
Image: image,
|
|
FollowRequestIri: followIRI,
|
|
DisabledAt: disabledAt,
|
|
}
|
|
|
|
return &follower, nil
|
|
}
|
|
|
|
// ApprovePreviousFollowRequest will approve a follow request.
|
|
func (f *FederationRepository) ApprovePreviousFollowRequest(iri string) error {
|
|
return f.datastore.GetQueries().ApproveFederationFollower(context.Background(), sqlstorage.ApproveFederationFollowerParams{
|
|
Iri: iri,
|
|
ApprovedAt: sql.NullTime{
|
|
Time: time.Now(),
|
|
Valid: true,
|
|
},
|
|
})
|
|
}
|
|
|
|
// BlockOrRejectFollower will block an existing follower or reject a follow request.
|
|
func (f *FederationRepository) BlockOrRejectFollower(iri string) error {
|
|
return f.datastore.GetQueries().RejectFederationFollower(context.Background(), sqlstorage.RejectFederationFollowerParams{
|
|
Iri: iri,
|
|
DisabledAt: sql.NullTime{
|
|
Time: time.Now(),
|
|
Valid: true,
|
|
},
|
|
})
|
|
}
|
|
|
|
func (f *FederationRepository) createFollow(actor, inbox, request, name, username, image string, requestObject []byte, approved bool) error {
|
|
tx, err := f.datastore.DB.Begin()
|
|
if err != nil {
|
|
log.Debugln(err)
|
|
}
|
|
defer func() {
|
|
_ = tx.Rollback()
|
|
}()
|
|
|
|
var approvedAt sql.NullTime
|
|
if approved {
|
|
approvedAt = sql.NullTime{
|
|
Time: time.Now(),
|
|
Valid: true,
|
|
}
|
|
}
|
|
|
|
if err = f.datastore.GetQueries().WithTx(tx).AddFollower(context.Background(), sqlstorage.AddFollowerParams{
|
|
Iri: actor,
|
|
Inbox: inbox,
|
|
Name: sql.NullString{String: name, Valid: true},
|
|
Username: username,
|
|
Image: sql.NullString{String: image, Valid: true},
|
|
ApprovedAt: approvedAt,
|
|
Request: request,
|
|
RequestObject: requestObject,
|
|
}); err != nil {
|
|
log.Errorln("error creating new federation follow: ", err)
|
|
}
|
|
|
|
return tx.Commit()
|
|
}
|
|
|
|
// UpdateFollower will update the details of a stored follower given an IRI.
|
|
func (f *FederationRepository) UpdateFollower(actorIRI string, inbox string, name string, username string, image string) error {
|
|
f.datastore.DbLock.Lock()
|
|
defer f.datastore.DbLock.Unlock()
|
|
|
|
tx, err := f.datastore.DB.Begin()
|
|
if err != nil {
|
|
log.Debugln(err)
|
|
}
|
|
defer func() {
|
|
_ = tx.Rollback()
|
|
}()
|
|
|
|
if err = f.datastore.GetQueries().WithTx(tx).UpdateFollowerByIRI(context.Background(), sqlstorage.UpdateFollowerByIRIParams{
|
|
Inbox: inbox,
|
|
Name: sql.NullString{String: name, Valid: true},
|
|
Username: username,
|
|
Image: sql.NullString{String: image, Valid: true},
|
|
Iri: actorIRI,
|
|
}); err != nil {
|
|
return fmt.Errorf("error updating follower %s %s", actorIRI, err)
|
|
}
|
|
|
|
return tx.Commit()
|
|
}
|
|
|
|
func (f *FederationRepository) removeFollow(actor *url.URL) error {
|
|
f.datastore.DbLock.Lock()
|
|
defer f.datastore.DbLock.Unlock()
|
|
|
|
tx, err := f.datastore.DB.Begin()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() {
|
|
_ = tx.Rollback()
|
|
}()
|
|
|
|
if err := f.datastore.GetQueries().WithTx(tx).RemoveFollowerByIRI(context.Background(), actor.String()); err != nil {
|
|
return err
|
|
}
|
|
|
|
return tx.Commit()
|
|
}
|
|
|
|
// GetOutboxPostCount will return the number of posts in the outbox.
|
|
func (f *FederationRepository) GetOutboxPostCount() (int64, error) {
|
|
ctx := context.Background()
|
|
return f.datastore.GetQueries().GetLocalPostCount(ctx)
|
|
}
|
|
|
|
// GetOutbox will return an instance of the outbox populated by stored items.
|
|
func (f *FederationRepository) GetOutbox(limit int, offset int) (vocab.ActivityStreamsOrderedCollection, error) {
|
|
collection := streams.NewActivityStreamsOrderedCollection()
|
|
orderedItems := streams.NewActivityStreamsOrderedItemsProperty()
|
|
r := resolvers.Get()
|
|
|
|
rows, err := f.datastore.GetQueries().GetOutboxWithOffset(
|
|
context.Background(),
|
|
sqlstorage.GetOutboxWithOffsetParams{Limit: int32(limit), Offset: int32(offset)},
|
|
)
|
|
if err != nil {
|
|
return collection, err
|
|
}
|
|
|
|
for _, value := range rows {
|
|
createCallback := func(c context.Context, activity vocab.ActivityStreamsCreate) error {
|
|
orderedItems.AppendActivityStreamsCreate(activity)
|
|
return nil
|
|
}
|
|
if err := r.Resolve(context.Background(), value, createCallback); err != nil {
|
|
return collection, err
|
|
}
|
|
}
|
|
|
|
return collection, nil
|
|
}
|
|
|
|
// AddToOutbox will store a single payload to the persistence layer.
|
|
func (f *FederationRepository) AddToOutbox(iri string, itemData []byte, typeString string, isLiveNotification bool) error {
|
|
tx, err := f.datastore.DB.Begin()
|
|
if err != nil {
|
|
log.Debugln(err)
|
|
}
|
|
defer func() {
|
|
_ = tx.Rollback()
|
|
}()
|
|
|
|
if err = f.datastore.GetQueries().WithTx(tx).AddToOutbox(context.Background(), sqlstorage.AddToOutboxParams{
|
|
Iri: iri,
|
|
Value: itemData,
|
|
Type: typeString,
|
|
LiveNotification: sql.NullBool{Bool: isLiveNotification, Valid: true},
|
|
}); err != nil {
|
|
return fmt.Errorf("error creating new item in federation outbox %s", err)
|
|
}
|
|
|
|
return tx.Commit()
|
|
}
|
|
|
|
// GetObjectByIRI will return a string representation of a single object by the IRI.
|
|
func (f *FederationRepository) GetObjectByIRI(iri string) (string, bool, time.Time, error) {
|
|
row, err := f.datastore.GetQueries().GetObjectFromOutboxByIRI(context.Background(), iri)
|
|
return string(row.Value), row.LiveNotification.Bool, row.CreatedAt.Time, err
|
|
}
|
|
|
|
// GetLocalPostCount will return the number of posts existing locally.
|
|
func (f *FederationRepository) GetLocalPostCount() (int64, error) {
|
|
ctx := context.Background()
|
|
return f.datastore.GetQueries().GetLocalPostCount(ctx)
|
|
}
|
|
|
|
// SaveInboundFediverseActivity will save an event to the ap_inbound_activities table.
|
|
func (f *FederationRepository) SaveInboundFediverseActivity(objectIRI string, actorIRI string, eventType string, timestamp time.Time) error {
|
|
if err := f.datastore.GetQueries().AddToAcceptedActivities(context.Background(), sqlstorage.AddToAcceptedActivitiesParams{
|
|
Iri: objectIRI,
|
|
Actor: actorIRI,
|
|
Type: eventType,
|
|
Timestamp: timestamp,
|
|
}); err != nil {
|
|
return errors.Wrap(err, "error saving event "+objectIRI)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetInboundActivities will return a collection of saved, federated activities
|
|
// limited and offset by the values provided to support pagination.
|
|
func (f *FederationRepository) GetInboundActivities(limit int, offset int) ([]models.FederatedActivity, int, error) {
|
|
ctx := context.Background()
|
|
rows, err := f.datastore.GetQueries().GetInboundActivitiesWithOffset(ctx, sqlstorage.GetInboundActivitiesWithOffsetParams{
|
|
Limit: int32(limit),
|
|
Offset: int32(offset),
|
|
})
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
|
|
activities := make([]models.FederatedActivity, 0)
|
|
|
|
total, err := f.datastore.GetQueries().GetInboundActivityCount(context.Background())
|
|
if err != nil {
|
|
return nil, 0, errors.Wrap(err, "unable to fetch total activity count")
|
|
}
|
|
|
|
for _, row := range rows {
|
|
singleActivity := models.FederatedActivity{
|
|
IRI: row.Iri,
|
|
ActorIRI: row.Actor,
|
|
Type: row.Type,
|
|
Timestamp: row.Timestamp,
|
|
}
|
|
activities = append(activities, singleActivity)
|
|
}
|
|
|
|
return activities, int(total), nil
|
|
}
|
|
|
|
// HasPreviouslyHandledInboundActivity will return if we have previously handled
|
|
// an inbound federated activity.
|
|
func (f *FederationRepository) HasPreviouslyHandledInboundActivity(iri string, actorIRI string, eventType string) (bool, error) {
|
|
exists, err := f.datastore.GetQueries().DoesInboundActivityExist(context.Background(), sqlstorage.DoesInboundActivityExistParams{
|
|
Iri: iri,
|
|
Actor: actorIRI,
|
|
Type: eventType,
|
|
})
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
return exists > 0, nil
|
|
}
|