// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package grpcproxy

import (
	"sync"
)

type watchBroadcasts struct {
	wp *watchProxy

	// mu protects bcasts and watchers from the coalesce loop.
	mu       sync.Mutex
	bcasts   map[*watchBroadcast]struct{}
	watchers map[*watcher]*watchBroadcast

	updatec chan *watchBroadcast
	donec   chan struct{}
}

// maxCoalesceRecievers prevents a popular watchBroadcast from being coalseced.
const maxCoalesceReceivers = 5

func newWatchBroadcasts(wp *watchProxy) *watchBroadcasts {
	wbs := &watchBroadcasts{
		wp:       wp,
		bcasts:   make(map[*watchBroadcast]struct{}),
		watchers: make(map[*watcher]*watchBroadcast),
		updatec:  make(chan *watchBroadcast, 1),
		donec:    make(chan struct{}),
	}
	go func() {
		defer close(wbs.donec)
		for wb := range wbs.updatec {
			wbs.coalesce(wb)
		}
	}()
	return wbs
}

func (wbs *watchBroadcasts) coalesce(wb *watchBroadcast) {
	if wb.size() >= maxCoalesceReceivers {
		return
	}
	wbs.mu.Lock()
	for wbswb := range wbs.bcasts {
		if wbswb == wb {
			continue
		}
		wb.mu.Lock()
		wbswb.mu.Lock()
		// 1. check if wbswb is behind wb so it won't skip any events in wb
		// 2. ensure wbswb started; nextrev == 0 may mean wbswb is waiting
		// for a current watcher and expects a create event from the server.
		if wb.nextrev >= wbswb.nextrev && wbswb.responses > 0 {
			for w := range wb.receivers {
				wbswb.receivers[w] = struct{}{}
				wbs.watchers[w] = wbswb
			}
			wb.receivers = nil
		}
		wbswb.mu.Unlock()
		wb.mu.Unlock()
		if wb.empty() {
			delete(wbs.bcasts, wb)
			wb.stop()
			break
		}
	}
	wbs.mu.Unlock()
}

func (wbs *watchBroadcasts) add(w *watcher) {
	wbs.mu.Lock()
	defer wbs.mu.Unlock()
	// find fitting bcast
	for wb := range wbs.bcasts {
		if wb.add(w) {
			wbs.watchers[w] = wb
			return
		}
	}
	// no fit; create a bcast
	wb := newWatchBroadcast(wbs.wp.lg, wbs.wp, w, wbs.update)
	wbs.watchers[w] = wb
	wbs.bcasts[wb] = struct{}{}
}

// delete removes a watcher and returns the number of remaining watchers.
func (wbs *watchBroadcasts) delete(w *watcher) int {
	wbs.mu.Lock()
	defer wbs.mu.Unlock()

	wb, ok := wbs.watchers[w]
	if !ok {
		panic("deleting missing watcher from broadcasts")
	}
	delete(wbs.watchers, w)
	wb.delete(w)
	if wb.empty() {
		delete(wbs.bcasts, wb)
		wb.stop()
	}
	return len(wbs.bcasts)
}

func (wbs *watchBroadcasts) stop() {
	wbs.mu.Lock()
	for wb := range wbs.bcasts {
		wb.stop()
	}
	wbs.bcasts = nil
	close(wbs.updatec)
	wbs.mu.Unlock()
	<-wbs.donec
}

func (wbs *watchBroadcasts) update(wb *watchBroadcast) {
	select {
	case wbs.updatec <- wb:
	default:
	}
}