merge from master

This commit is contained in:
Xiang Li 2013-09-28 16:58:57 -07:00
parent da83ee223b
commit 784d286f37
79 changed files with 1383 additions and 2056 deletions

3
.gitignore vendored
View File

@ -1,4 +1,5 @@
src/
pkg/
./etcd
/etcd
release_version.go
/machine*

View File

@ -474,6 +474,7 @@ If you are using SSL for server to server communication, you must use it on all
**Python libraries**
- [transitorykris/etcd-py](https://github.com/transitorykris/etcd-py)
- [jplana/python-etcd](https://github.com/jplana/python-etcd)
**Node libraries**
@ -485,13 +486,23 @@ If you are using SSL for server to server communication, you must use it on all
- [jpfuentes2/etcd-ruby](https://github.com/jpfuentes2/etcd-ruby)
- [ranjib/etcd-ruby](https://github.com/ranjib/etcd-ruby)
**C libraries**
- [jdarcy/etcd-api](https://github.com/jdarcy/etcd-api)
**Chef Integration**
- [coderanger/etcd-chef](https://github.com/coderanger/etcd-chef)
**Chef Cookbook**
- [spheromak/etcd-cookbook](https://github.com/spheromak/etcd-cookbook)
**Projects using etcd**
- [binocarlos/yoda](https://github.com/binocarlos/yoda) - etcd + ZeroMQ
- [calavera/active-proxy](https://github.com/calavera/active-proxy) - HTTP Proxy configured with etcd
- [derekchiang/etcdplus](https://github.com/derekchiang/etcdplus) - A set of distributed synchronization primitives built upon etcd
- [gleicon/goreman](https://github.com/gleicon/goreman/tree/etcd) - Branch of the Go Foreman clone with etcd support
- [garethr/hiera-etcd](https://github.com/garethr/hiera-etcd) - Puppet hiera backend using etcd
- [mattn/etcd-vim](https://github.com/mattn/etcd-vim) - SET and GET keys from inside vim

2
build
View File

@ -1,4 +1,4 @@
#!/bin/bash
#!/bin/sh
ETCD_PACKAGE=github.com/coreos/etcd
export GOPATH="${PWD}"

24
build.ps1 Normal file
View File

@ -0,0 +1,24 @@
$ETCD_PACKAGE="github.com/coreos/etcd"
$env:GOPATH=$pwd.Path
$SRC_DIR="$env:GOPATH/src"
$ETCD_DIR="$SRC_DIR/$ETCD_PACKAGE"
$env:ETCD_DIR="$SRC_DIR/$ETCD_PACKAGE"
$ETCD_BASE=(Split-Path $ETCD_DIR -Parent)
if(-not(test-path $ETCD_DIR)){
mkdir -force "$ETCD_BASE" > $null
}
if(-not(test-path $ETCD_DIR )){
cmd /c 'mklink /D "%ETCD_DIR%" ..\..\..\'
}
foreach($i in (ls third_party/*)){
if("$i" -eq "third_party/src") {continue}
cp -Recurse -force "$i" src/
}
./scripts/release-version.ps1 | Out-File -Encoding UTF8 release_version.go
go build -v "${ETCD_PACKAGE}"

View File

@ -223,8 +223,10 @@ func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) {
value := fmt.Sprintf("raft=%s&etcd=%s&raftVersion=%s", c.RaftURL, c.EtcdURL, c.RaftVersion)
etcdStore.Create(key, value, store.Permanent, raftServer.CommitIndex(), raftServer.Term())
if c.Name != r.Name() { // do not add self to the peer list
r.peersStats[c.Name] = &raftPeerStats{MinLatency: 1 << 63}
// add peer stats
if c.Name != r.Name() {
r.followersStats.Followers[c.Name] = &raftFollowerStats{}
r.followersStats.Followers[c.Name].Latency.Minimum = 1 << 63
}
return b, err
@ -251,7 +253,8 @@ func (c *RemoveCommand) Apply(raftServer *raft.Server) (interface{}, error) {
key := path.Join("_etcd/machines", c.Name)
_, err := etcdStore.Delete(key, false, raftServer.CommitIndex(), raftServer.Term())
delete(r.peersStats, c.Name)
// delete from stats
delete(r.followersStats.Followers, c.Name)
if err != nil {
return []byte{0}, err

View File

@ -90,12 +90,7 @@ func init() {
const (
ElectionTimeout = 200 * time.Millisecond
HeartbeatTimeout = 50 * time.Millisecond
// Timeout for internal raft http connection
// The original timeout for http is 45 seconds
// which is too long for our usage.
HTTPTimeout = 10 * time.Second
RetryInterval = 10
RetryInterval = 10
)
//------------------------------------------------------------------------------

View File

@ -21,7 +21,7 @@ func NewEtcdMuxer() *http.ServeMux {
etcdMux.Handle("/"+version+"/keys/", errorHandler(Multiplexer))
etcdMux.Handle("/"+version+"/leader", errorHandler(LeaderHttpHandler))
etcdMux.Handle("/"+version+"/machines", errorHandler(MachinesHttpHandler))
etcdMux.Handle("/"+version+"/stats", errorHandler(StatsHttpHandler))
etcdMux.Handle("/"+version+"/stats/", errorHandler(StatsHttpHandler))
etcdMux.Handle("/version", errorHandler(VersionHttpHandler))
etcdMux.HandleFunc("/test/", TestHttpHandler)
return etcdMux
@ -223,8 +223,28 @@ func VersionHttpHandler(w http.ResponseWriter, req *http.Request) error {
// Handler to return the basic stats of etcd
func StatsHttpHandler(w http.ResponseWriter, req *http.Request) error {
w.WriteHeader(http.StatusOK)
w.Write(etcdStore.JsonStats())
w.Write(r.Stats())
option := req.URL.Path[len("/v1/stats/"):]
switch option {
case "self":
w.Write(r.Stats())
case "leader":
if r.State() == raft.Leader {
w.Write(r.PeerStats())
} else {
leader := r.Leader()
// current no leader
if leader == "" {
return etcdErr.NewError(300, "")
}
hostname, _ := nameToEtcdURL(leader)
redirect(hostname, w, req)
}
case "store":
w.Write(etcdStore.JsonStats())
}
return nil
}
@ -236,8 +256,8 @@ func GetHttpHandler(w http.ResponseWriter, req *http.Request) error {
if req.FormValue("consistent") == "true" && r.State() != raft.Leader {
// help client to redirect the request to the current leader
leader := r.Leader()
url, _ := nameToEtcdURL(leader)
redirect(url, w, req)
hostname, _ := nameToEtcdURL(leader)
redirect(hostname, w, req)
return nil
}

View File

@ -17,15 +17,15 @@ import (
type raftServer struct {
*raft.Server
version string
joinIndex uint64
name string
url string
listenHost string
tlsConf *TLSConfig
tlsInfo *TLSInfo
peersStats map[string]*raftPeerStats
serverStats *raftServerStats
version string
joinIndex uint64
name string
url string
listenHost string
tlsConf *TLSConfig
tlsInfo *TLSInfo
followersStats *raftFollowersStats
serverStats *raftServerStats
}
var r *raftServer
@ -33,10 +33,10 @@ var r *raftServer
func newRaftServer(name string, url string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo) *raftServer {
// Create transporter for raft
raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client, ElectionTimeout)
raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client)
// Create raft server
server, err := raft.NewServer(name, dirPath, raftTransporter, etcdStore, nil)
server, err := raft.NewServer(name, dirPath, raftTransporter, etcdStore, nil, "")
check(err)
@ -48,7 +48,10 @@ func newRaftServer(name string, url string, listenHost string, tlsConf *TLSConfi
listenHost: listenHost,
tlsConf: tlsConf,
tlsInfo: tlsInfo,
peersStats: make(map[string]*raftPeerStats),
followersStats: &raftFollowersStats{
Leader: name,
Followers: make(map[string]*raftFollowerStats),
},
serverStats: &raftServerStats{
StartTime: time.Now(),
sendRateQueue: &statsQueue{
@ -63,7 +66,6 @@ func newRaftServer(name string, url string, listenHost string, tlsConf *TLSConfi
// Start the raft server
func (r *raftServer) ListenAndServe() {
// Setup commands.
registerCommands()
@ -183,13 +185,16 @@ func (r *raftServer) startTransport(scheme string, tlsConf tls.Config) {
// will need to do something more sophisticated later when we allow mixed
// version clusters.
func getVersion(t *transporter, versionURL url.URL) (string, error) {
resp, err := t.Get(versionURL.String())
resp, req, err := t.Get(versionURL.String())
if err != nil {
return "", err
}
defer resp.Body.Close()
t.CancelWhenTimeout(req)
body, err := ioutil.ReadAll(resp.Body)
return string(body), nil
@ -244,7 +249,7 @@ func joinByMachine(s *raft.Server, machine string, scheme string) error {
debugf("Send Join Request to %s", joinURL.String())
resp, err := t.Post(joinURL.String(), &b)
resp, req, err := t.Post(joinURL.String(), &b)
for {
if err != nil {
@ -252,6 +257,9 @@ func joinByMachine(s *raft.Server, machine string, scheme string) error {
}
if resp != nil {
defer resp.Body.Close()
t.CancelWhenTimeout(req)
if resp.StatusCode == http.StatusOK {
b, _ := ioutil.ReadAll(resp.Body)
r.joinIndex, _ = binary.Uvarint(b)
@ -264,7 +272,7 @@ func joinByMachine(s *raft.Server, machine string, scheme string) error {
json.NewEncoder(&b).Encode(newJoinCommand())
resp, err = t.Post(address, &b)
resp, req, err = t.Post(address, &b)
} else if resp.StatusCode == http.StatusBadRequest {
debug("Reach max number machines in the cluster")
@ -282,7 +290,7 @@ func joinByMachine(s *raft.Server, machine string, scheme string) error {
}
func (r *raftServer) Stats() []byte {
r.serverStats.LeaderUptime = time.Now().Sub(r.serverStats.leaderStartTime).String()
r.serverStats.LeaderInfo.Uptime = time.Now().Sub(r.serverStats.LeaderInfo.startTime).String()
queue := r.serverStats.sendRateQueue
@ -292,20 +300,17 @@ func (r *raftServer) Stats() []byte {
r.serverStats.RecvingPkgRate, r.serverStats.RecvingBandwidthRate = queue.Rate()
sBytes, err := json.Marshal(r.serverStats)
b, _ := json.Marshal(r.serverStats)
if err != nil {
warn(err)
}
return b
}
func (r *raftServer) PeerStats() []byte {
if r.State() == raft.Leader {
pBytes, _ := json.Marshal(r.peersStats)
b := append(sBytes, pBytes...)
b, _ := json.Marshal(r.followersStats)
return b
}
return sBytes
return nil
}
// Register commands to raft server

View File

@ -33,10 +33,14 @@ func (ps *packageStats) Time() time.Time {
}
type raftServerStats struct {
State string `json:"state"`
StartTime time.Time `json:"startTime"`
Leader string `json:"leader"`
LeaderUptime string `json:"leaderUptime"`
State string `json:"state"`
StartTime time.Time `json:"startTime"`
LeaderInfo struct {
Name string `json:"leader"`
Uptime string `json:"uptime"`
startTime time.Time
} `json:"leaderInfo"`
RecvAppendRequestCnt uint64 `json:"recvAppendRequestCnt,"`
RecvingPkgRate float64 `json:"recvPkgRate,omitempty"`
@ -46,16 +50,15 @@ type raftServerStats struct {
SendingPkgRate float64 `json:"sendPkgRate,omitempty"`
SendingBandwidthRate float64 `json:"sendBandwidthRate,omitempty"`
leaderStartTime time.Time
sendRateQueue *statsQueue
recvRateQueue *statsQueue
sendRateQueue *statsQueue
recvRateQueue *statsQueue
}
func (ss *raftServerStats) RecvAppendReq(leaderName string, pkgSize int) {
ss.State = raft.Follower
if leaderName != ss.Leader {
ss.Leader = leaderName
ss.leaderStartTime = time.Now()
if leaderName != ss.LeaderInfo.Name {
ss.LeaderInfo.Name = leaderName
ss.LeaderInfo.startTime = time.Now()
}
ss.recvRateQueue.Insert(NewPackageStats(time.Now(), pkgSize))
@ -64,55 +67,66 @@ func (ss *raftServerStats) RecvAppendReq(leaderName string, pkgSize int) {
func (ss *raftServerStats) SendAppendReq(pkgSize int) {
now := time.Now()
if ss.State != raft.Leader {
ss.State = raft.Leader
ss.Leader = r.Name()
ss.leaderStartTime = now
ss.LeaderInfo.Name = r.Name()
ss.LeaderInfo.startTime = now
}
ss.sendRateQueue.Insert(NewPackageStats(time.Now(), pkgSize))
ss.sendRateQueue.Insert(NewPackageStats(now, pkgSize))
ss.SendAppendRequestCnt++
}
type raftPeerStats struct {
Latency float64 `json:"latency"`
AvgLatency float64 `json:"averageLatency"`
avgLatencySquare float64
SdvLatency float64 `json:"sdvLatency"`
MinLatency float64 `json:"minLatency"`
MaxLatency float64 `json:"maxLatency"`
FailCnt uint64 `json:"failsCount"`
SuccCnt uint64 `json:"successCount"`
type raftFollowersStats struct {
Leader string `json:"leader"`
Followers map[string]*raftFollowerStats `json:"followers"`
}
// Succ function update the raftPeerStats with a successful send
func (ps *raftPeerStats) Succ(d time.Duration) {
total := float64(ps.SuccCnt) * ps.AvgLatency
totalSquare := float64(ps.SuccCnt) * ps.avgLatencySquare
type raftFollowerStats struct {
Latency struct {
Current float64 `json:"current"`
Average float64 `json:"average"`
averageSquare float64
StandardDeviation float64 `json:"standardDeviation"`
Minimum float64 `json:"minimum"`
Maximum float64 `json:"maximum"`
} `json:"latency"`
ps.SuccCnt++
Counts struct {
Fail uint64 `json:"fail"`
Success uint64 `json:"success"`
} `json:"counts"`
}
ps.Latency = float64(d) / (1000000.0)
// Succ function update the raftFollowerStats with a successful send
func (ps *raftFollowerStats) Succ(d time.Duration) {
total := float64(ps.Counts.Success) * ps.Latency.Average
totalSquare := float64(ps.Counts.Success) * ps.Latency.averageSquare
if ps.Latency > ps.MaxLatency {
ps.MaxLatency = ps.Latency
ps.Counts.Success++
ps.Latency.Current = float64(d) / (1000000.0)
if ps.Latency.Current > ps.Latency.Maximum {
ps.Latency.Maximum = ps.Latency.Current
}
if ps.Latency < ps.MinLatency {
ps.MinLatency = ps.Latency
if ps.Latency.Current < ps.Latency.Minimum {
ps.Latency.Minimum = ps.Latency.Current
}
ps.AvgLatency = (total + ps.Latency) / float64(ps.SuccCnt)
ps.avgLatencySquare = (totalSquare + ps.Latency*ps.Latency) / float64(ps.SuccCnt)
ps.Latency.Average = (total + ps.Latency.Current) / float64(ps.Counts.Success)
ps.Latency.averageSquare = (totalSquare + ps.Latency.Current*ps.Latency.Current) / float64(ps.Counts.Success)
// sdv = sqrt(avg(x^2) - avg(x)^2)
ps.SdvLatency = math.Sqrt(ps.avgLatencySquare - ps.AvgLatency*ps.AvgLatency)
ps.Latency.StandardDeviation = math.Sqrt(ps.Latency.averageSquare - ps.Latency.Average*ps.Latency.Average)
}
// Fail function update the raftPeerStats with a unsuccessful send
func (ps *raftPeerStats) Fail() {
ps.FailCnt++
// Fail function update the raftFollowerStats with a unsuccessful send
func (ps *raftFollowerStats) Fail() {
ps.Counts.Fail++
}
type statsQueue struct {

View File

@ -0,0 +1,7 @@
$VER=(git describe --tags HEAD)
@"
package main
const releaseVersion = "$VER"
"@

19
scripts/test-cluster Executable file
View File

@ -0,0 +1,19 @@
#!/bin/bash
SESSION=etcd-cluster
tmux new-session -d -s $SESSION
# Setup a window for tailing log files
tmux new-window -t $SESSION:1 -n 'machines'
tmux split-window -h
tmux select-pane -t 0
tmux send-keys "./etcd -s 127.0.0.1:7001 -c 127.0.0.1:4001 -d machine1 -n machine1" C-m
for i in 2 3; do
tmux select-pane -t 0
tmux split-window -v
tmux send-keys "./etcd -cors='*' -s 127.0.0.1:700${i} -c 127.0.0.1:400${i} -C 127.0.0.1:7001 -d machine${i} -n machine${i}" C-m
done
# Attach to session
tmux attach-session -t $SESSION

View File

@ -48,6 +48,12 @@ func newStats() *Stats {
return s
}
func (s *Stats) clone() *Stats {
return &Stats{s.GetSuccess, s.GetFail, s.SetSuccess, s.SetFail,
s.DeleteSuccess, s.DeleteFail, s.UpdateSuccess, s.UpdateFail,
s.TestAndSetSuccess, s.TestAndSetFail, s.Watchers}
}
// Status() return the statistics info of etcd storage its recent start
func (s *Stats) toJson() []byte {
b, _ := json.Marshal(s)

View File

@ -6,6 +6,7 @@ import (
"path"
"sort"
"strings"
"sync"
"time"
etcdErr "github.com/coreos/etcd/error"
@ -17,6 +18,7 @@ type Store struct {
Index uint64
Term uint64
Stats *Stats
worldLock sync.RWMutex // stop the world lock. Used to do snapshot
}
func New() *Store {
@ -26,13 +28,15 @@ func New() *Store {
s.WatcherHub = newWatchHub(1000)
return s
}
func (s *Store) Get(nodePath string, recursive, sorted bool, index uint64, term uint64) (*Event, error) {
s.worldLock.RLock()
defer s.worldLock.RUnlock()
nodePath = path.Clean(path.Join("/", nodePath))
n, err := s.InternalGet(nodePath, index, term)
n, err := s.internalGet(nodePath, index, term)
if err != nil {
s.Stats.Inc(GetFail)
@ -91,10 +95,13 @@ func (s *Store) Get(nodePath string, recursive, sorted bool, index uint64, term
// If the node has already existed, create will fail.
// If any node on the path is a file, create will fail.
func (s *Store) Create(nodePath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) {
s.worldLock.RLock()
defer s.worldLock.RUnlock()
nodePath = path.Clean(path.Join("/", nodePath))
// make sure we can create the node
_, err := s.InternalGet(nodePath, index, term)
_, err := s.internalGet(nodePath, index, term)
if err == nil { // key already exists
s.Stats.Inc(SetFail)
@ -157,7 +164,10 @@ func (s *Store) Create(nodePath string, value string, expireTime time.Time, inde
// If the node is a file, the value and the ttl can be updated.
// If the node is a directory, only the ttl can be updated.
func (s *Store) Update(nodePath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) {
n, err := s.InternalGet(nodePath, index, term)
s.worldLock.RLock()
defer s.worldLock.RUnlock()
n, err := s.internalGet(nodePath, index, term)
if err != nil { // if the node does not exist, return error
s.Stats.Inc(UpdateFail)
@ -203,7 +213,10 @@ func (s *Store) Update(nodePath string, value string, expireTime time.Time, inde
func (s *Store) TestAndSet(nodePath string, prevValue string, prevIndex uint64,
value string, expireTime time.Time, index uint64, term uint64) (*Event, error) {
f, err := s.InternalGet(nodePath, index, term)
s.worldLock.RLock()
defer s.worldLock.RUnlock()
f, err := s.internalGet(nodePath, index, term)
if err != nil {
s.Stats.Inc(TestAndSetFail)
@ -235,7 +248,10 @@ func (s *Store) TestAndSet(nodePath string, prevValue string, prevIndex uint64,
// Delete function deletes the node at the given path.
// If the node is a directory, recursive must be true to delete it.
func (s *Store) Delete(nodePath string, recursive bool, index uint64, term uint64) (*Event, error) {
n, err := s.InternalGet(nodePath, index, term)
s.worldLock.RLock()
defer s.worldLock.RUnlock()
n, err := s.internalGet(nodePath, index, term)
if err != nil { // if the node does not exist, return error
s.Stats.Inc(DeleteFail)
@ -268,6 +284,9 @@ func (s *Store) Delete(nodePath string, recursive bool, index uint64, term uint6
}
func (s *Store) Watch(prefix string, recursive bool, sinceIndex uint64, index uint64, term uint64) (<-chan *Event, error) {
s.worldLock.RLock()
defer s.worldLock.RUnlock()
s.Index, s.Term = index, term
if sinceIndex == 0 {
@ -300,7 +319,7 @@ func (s *Store) walk(nodePath string, walkFunc func(prev *Node, component string
}
// InternalGet function get the node of the given nodePath.
func (s *Store) InternalGet(nodePath string, index uint64, term uint64) (*Node, error) {
func (s *Store) internalGet(nodePath string, index uint64, term uint64) (*Node, error) {
nodePath = path.Clean(path.Join("/", nodePath))
// update file system known index and term
@ -352,12 +371,16 @@ func (s *Store) checkDir(parent *Node, dirName string) (*Node, error) {
// Save function will not save the parent field of the node. Or there will
// be cyclic dependencies issue for the json package.
func (s *Store) Save() ([]byte, error) {
s.worldLock.Lock()
clonedStore := New()
clonedStore.Root = s.Root.Clone()
clonedStore.WatcherHub = s.WatcherHub
clonedStore.WatcherHub = s.WatcherHub.clone()
clonedStore.Index = s.Index
clonedStore.Term = s.Term
clonedStore.Stats = s.Stats
clonedStore.Stats = s.Stats.clone()
s.worldLock.Unlock()
b, err := json.Marshal(clonedStore)
@ -373,6 +396,8 @@ func (s *Store) Save() ([]byte, error) {
// It needs to delete the expired nodes since the saved time and also
// need to create monitor go routines.
func (s *Store) Recovery(state []byte) error {
s.worldLock.Lock()
defer s.worldLock.Unlock()
err := json.Unmarshal(state, s)
if err != nil {

View File

@ -116,3 +116,7 @@ func (wh *watcherHub) notify(e *Event) {
wh.EventHistory.addEvent(e)
}
func (wh *watcherHub) clone() *watcherHub {
return &watcherHub{}
}

View File

@ -431,7 +431,7 @@ func TestRequiredBit(t *testing.T) {
err := o.Marshal(pb)
if err == nil {
t.Error("did not catch missing required fields")
} else if strings.Index(err.Error(), "GoTest") < 0 {
} else if strings.Index(err.Error(), "Kind") < 0 {
t.Error("wrong error type:", err)
}
}
@ -1205,7 +1205,7 @@ func TestRequiredFieldEnforcement(t *testing.T) {
_, err := Marshal(pb)
if err == nil {
t.Error("marshal: expected error, got nil")
} else if strings.Index(err.Error(), "GoTestField") < 0 {
} else if strings.Index(err.Error(), "Label") < 0 {
t.Errorf("marshal: bad error type: %v", err)
}
@ -1216,7 +1216,7 @@ func TestRequiredFieldEnforcement(t *testing.T) {
err = Unmarshal(buf, pb)
if err == nil {
t.Error("unmarshal: expected error, got nil")
} else if strings.Index(err.Error(), "GoTestField") < 0 {
} else if strings.Index(err.Error(), "{Unknown}") < 0 {
t.Errorf("unmarshal: bad error type: %v", err)
}
}
@ -1670,6 +1670,70 @@ func TestEncodingSizes(t *testing.T) {
}
}
func TestErrRequiredNotSet(t *testing.T) {
pb := initGoTest(false)
pb.RequiredField.Label = nil
pb.F_Int32Required = nil
pb.F_Int64Required = nil
expected := "0807" + // field 1, encoding 0, value 7
"2206" + "120474797065" + // field 4, encoding 2 (GoTestField)
"5001" + // field 10, encoding 0, value 1
"6d20000000" + // field 13, encoding 5, value 0x20
"714000000000000000" + // field 14, encoding 1, value 0x40
"78a019" + // field 15, encoding 0, value 0xca0 = 3232
"8001c032" + // field 16, encoding 0, value 0x1940 = 6464
"8d0100004a45" + // field 17, encoding 5, value 3232.0
"9101000000000040b940" + // field 18, encoding 1, value 6464.0
"9a0106" + "737472696e67" + // field 19, encoding 2, string "string"
"b304" + // field 70, encoding 3, start group
"ba0408" + "7265717569726564" + // field 71, encoding 2, string "required"
"b404" + // field 70, encoding 4, end group
"aa0605" + "6279746573" + // field 101, encoding 2, string "bytes"
"b0063f" + // field 102, encoding 0, 0x3f zigzag32
"b8067f" // field 103, encoding 0, 0x7f zigzag64
o := old()
bytes, err := Marshal(pb)
if _, ok := err.(*ErrRequiredNotSet); !ok {
fmt.Printf("marshal-1 err = %v, want *ErrRequiredNotSet", err)
o.DebugPrint("", bytes)
t.Fatalf("expected = %s", expected)
}
if strings.Index(err.Error(), "RequiredField.Label") < 0 {
t.Errorf("marshal-1 wrong err msg: %v", err)
}
if !equal(bytes, expected, t) {
o.DebugPrint("neq 1", bytes)
t.Fatalf("expected = %s", expected)
}
// Now test Unmarshal by recreating the original buffer.
pbd := new(GoTest)
err = Unmarshal(bytes, pbd)
if _, ok := err.(*ErrRequiredNotSet); !ok {
t.Fatalf("unmarshal err = %v, want *ErrRequiredNotSet", err)
o.DebugPrint("", bytes)
t.Fatalf("string = %s", expected)
}
if strings.Index(err.Error(), "RequiredField.{Unknown}") < 0 {
t.Errorf("unmarshal wrong err msg: %v", err)
}
bytes, err = Marshal(pbd)
if _, ok := err.(*ErrRequiredNotSet); !ok {
t.Errorf("marshal-2 err = %v, want *ErrRequiredNotSet", err)
o.DebugPrint("", bytes)
t.Fatalf("string = %s", expected)
}
if strings.Index(err.Error(), "RequiredField.Label") < 0 {
t.Errorf("marshal-2 wrong err msg: %v", err)
}
if !equal(bytes, expected, t) {
o.DebugPrint("neq 2", bytes)
t.Fatalf("string = %s", expected)
}
}
func fuzzUnmarshal(t *testing.T, data []byte) {
defer func() {
if e := recover(); e != nil {

View File

@ -46,7 +46,7 @@ import (
// ErrWrongType occurs when the wire encoding for the field disagrees with
// that specified in the type being decoded. This is usually caused by attempting
// to convert an encoded protocol buffer into a struct of the wrong type.
var ErrWrongType = errors.New("field/encoding mismatch: wrong type for field")
var ErrWrongType = errors.New("proto: field/encoding mismatch: wrong type for field")
// errOverflow is returned when an integer is too large to be represented.
var errOverflow = errors.New("proto: integer overflow")
@ -353,6 +353,7 @@ func (p *Buffer) Unmarshal(pb Message) error {
// unmarshalType does the work of unmarshaling a structure.
func (o *Buffer) unmarshalType(st reflect.Type, prop *StructProperties, is_group bool, base structPointer) error {
var state errorState
required, reqFields := prop.reqCount, uint64(0)
var err error
@ -406,7 +407,10 @@ func (o *Buffer) unmarshalType(st reflect.Type, prop *StructProperties, is_group
continue
}
}
err = dec(o, p, base)
decErr := dec(o, p, base)
if decErr != nil && !state.shouldContinue(decErr, p) {
err = decErr
}
if err == nil && p.Required {
// Successfully decoded a required field.
if tag <= 64 {
@ -430,8 +434,14 @@ func (o *Buffer) unmarshalType(st reflect.Type, prop *StructProperties, is_group
if is_group {
return io.ErrUnexpectedEOF
}
if state.err != nil {
return state.err
}
if required > 0 {
return &ErrRequiredNotSet{st}
// Not enough information to determine the exact field. If we use extra
// CPU, we could determine the field only if the missing required field
// has a tag <= 64 and we check reqFields.
return &ErrRequiredNotSet{"{Unknown}"}
}
}
return err

View File

@ -37,6 +37,7 @@ package proto
import (
"errors"
"fmt"
"reflect"
"sort"
)
@ -46,12 +47,16 @@ import (
// all been initialized. It is also the error returned if Unmarshal is
// called with an encoded protocol buffer that does not include all the
// required fields.
//
// When printed, ErrRequiredNotSet reports the first unset required field in a
// message. If the field cannot be precisely determined, it is reported as
// "{Unknown}".
type ErrRequiredNotSet struct {
t reflect.Type
field string
}
func (e *ErrRequiredNotSet) Error() string {
return "proto: required fields not set in " + e.t.String()
return fmt.Sprintf("proto: required field %q not set", e.field)
}
var (
@ -175,7 +180,8 @@ func Marshal(pb Message) ([]byte, error) {
}
p := NewBuffer(nil)
err := p.Marshal(pb)
if err != nil {
var state errorState
if err != nil && !state.shouldContinue(err, nil) {
return nil, err
}
return p.buf, err
@ -274,6 +280,7 @@ func isNil(v reflect.Value) bool {
// Encode a message struct.
func (o *Buffer) enc_struct_message(p *Properties, base structPointer) error {
var state errorState
structp := structPointer_GetStructPointer(base, p.field)
if structPointer_IsNil(structp) {
return ErrNil
@ -283,7 +290,7 @@ func (o *Buffer) enc_struct_message(p *Properties, base structPointer) error {
if p.isMarshaler {
m := structPointer_Interface(structp, p.stype).(Marshaler)
data, err := m.Marshal()
if err != nil {
if err != nil && !state.shouldContinue(err, nil) {
return err
}
o.buf = append(o.buf, p.tagcode...)
@ -300,18 +307,19 @@ func (o *Buffer) enc_struct_message(p *Properties, base structPointer) error {
nbuf := o.buf
o.buf = obuf
if err != nil {
if err != nil && !state.shouldContinue(err, nil) {
o.buffree(nbuf)
return err
}
o.buf = append(o.buf, p.tagcode...)
o.EncodeRawBytes(nbuf)
o.buffree(nbuf)
return nil
return state.err
}
// Encode a group struct.
func (o *Buffer) enc_struct_group(p *Properties, base structPointer) error {
var state errorState
b := structPointer_GetStructPointer(base, p.field)
if structPointer_IsNil(b) {
return ErrNil
@ -319,11 +327,11 @@ func (o *Buffer) enc_struct_group(p *Properties, base structPointer) error {
o.EncodeVarint(uint64((p.Tag << 3) | WireStartGroup))
err := o.enc_struct(p.stype, p.sprop, b)
if err != nil {
if err != nil && !state.shouldContinue(err, nil) {
return err
}
o.EncodeVarint(uint64((p.Tag << 3) | WireEndGroup))
return nil
return state.err
}
// Encode a slice of bools ([]bool).
@ -470,6 +478,7 @@ func (o *Buffer) enc_slice_string(p *Properties, base structPointer) error {
// Encode a slice of message structs ([]*struct).
func (o *Buffer) enc_slice_struct_message(p *Properties, base structPointer) error {
var state errorState
s := structPointer_StructPointerSlice(base, p.field)
l := s.Len()
@ -483,7 +492,7 @@ func (o *Buffer) enc_slice_struct_message(p *Properties, base structPointer) err
if p.isMarshaler {
m := structPointer_Interface(structp, p.stype).(Marshaler)
data, err := m.Marshal()
if err != nil {
if err != nil && !state.shouldContinue(err, nil) {
return err
}
o.buf = append(o.buf, p.tagcode...)
@ -498,7 +507,7 @@ func (o *Buffer) enc_slice_struct_message(p *Properties, base structPointer) err
nbuf := o.buf
o.buf = obuf
if err != nil {
if err != nil && !state.shouldContinue(err, nil) {
o.buffree(nbuf)
if err == ErrNil {
return ErrRepeatedHasNil
@ -510,11 +519,12 @@ func (o *Buffer) enc_slice_struct_message(p *Properties, base structPointer) err
o.buffree(nbuf)
}
return nil
return state.err
}
// Encode a slice of group structs ([]*struct).
func (o *Buffer) enc_slice_struct_group(p *Properties, base structPointer) error {
var state errorState
s := structPointer_StructPointerSlice(base, p.field)
l := s.Len()
@ -528,7 +538,7 @@ func (o *Buffer) enc_slice_struct_group(p *Properties, base structPointer) error
err := o.enc_struct(p.stype, p.sprop, b)
if err != nil {
if err != nil && !state.shouldContinue(err, nil) {
if err == ErrNil {
return ErrRepeatedHasNil
}
@ -537,7 +547,7 @@ func (o *Buffer) enc_slice_struct_group(p *Properties, base structPointer) error
o.EncodeVarint(uint64((p.Tag << 3) | WireEndGroup))
}
return nil
return state.err
}
// Encode an extension map.
@ -569,7 +579,7 @@ func (o *Buffer) enc_map(p *Properties, base structPointer) error {
// Encode a struct.
func (o *Buffer) enc_struct(t reflect.Type, prop *StructProperties, base structPointer) error {
required := prop.reqCount
var state errorState
// Encode fields in tag order so that decoders may use optimizations
// that depend on the ordering.
// http://code.google.com/apis/protocolbuffers/docs/encoding.html#order
@ -577,19 +587,15 @@ func (o *Buffer) enc_struct(t reflect.Type, prop *StructProperties, base structP
p := prop.Prop[i]
if p.enc != nil {
err := p.enc(o, p, base)
if err != nil {
if err != nil && !state.shouldContinue(err, p) {
if err != ErrNil {
return err
} else if p.Required && state.err == nil {
state.err = &ErrRequiredNotSet{p.Name}
}
} else if p.Required {
required--
}
}
}
// See if we encoded all required fields.
if required > 0 {
return &ErrRequiredNotSet{t}
}
// Add unrecognized fields at the end.
if prop.unrecField.IsValid() {
@ -599,5 +605,33 @@ func (o *Buffer) enc_struct(t reflect.Type, prop *StructProperties, base structP
}
}
return nil
return state.err
}
// errorState maintains the first error that occurs and updates that error
// with additional context.
type errorState struct {
err error
}
// shouldContinue reports whether encoding should continue upon encountering the
// given error. If the error is ErrRequiredNotSet, shouldContinue returns true
// and, if this is the first appearance of that error, remembers it for future
// reporting.
//
// If prop is not nil, it may update any error with additional context about the
// field with the error.
func (s *errorState) shouldContinue(err error, prop *Properties) bool {
// Ignore unset required fields.
reqNotSet, ok := err.(*ErrRequiredNotSet)
if !ok {
return false
}
if s.err == nil {
if prop != nil {
err = &ErrRequiredNotSet{prop.Name + "." + reqNotSet.field}
}
s.err = err
}
return true
}

View File

@ -109,11 +109,11 @@ func isExtensionField(pb extendableProto, field int32) bool {
func checkExtensionTypes(pb extendableProto, extension *ExtensionDesc) error {
// Check the extended type.
if a, b := reflect.TypeOf(pb), reflect.TypeOf(extension.ExtendedType); a != b {
return errors.New("bad extended type; " + b.String() + " does not extend " + a.String())
return errors.New("proto: bad extended type; " + b.String() + " does not extend " + a.String())
}
// Check the range.
if !isExtensionField(pb, extension.Field) {
return errors.New("bad extension number; not in declared ranges")
return errors.New("proto: bad extension number; not in declared ranges")
}
return nil
}
@ -272,7 +272,7 @@ func decodeExtension(b []byte, extension *ExtensionDesc) (interface{}, error) {
func GetExtensions(pb Message, es []*ExtensionDesc) (extensions []interface{}, err error) {
epb, ok := pb.(extendableProto)
if !ok {
err = errors.New("not an extendable proto")
err = errors.New("proto: not an extendable proto")
return
}
extensions = make([]interface{}, len(es))
@ -292,7 +292,7 @@ func SetExtension(pb extendableProto, extension *ExtensionDesc, value interface{
}
typ := reflect.TypeOf(extension.ExtensionType)
if typ != reflect.TypeOf(value) {
return errors.New("bad extension value type")
return errors.New("proto: bad extension value type")
}
pb.ExtensionMap()[extension.Field] = Extension{desc: extension, value: value}

View File

@ -244,7 +244,7 @@ func (m *GoEnum) GetFoo() FOO {
if m != nil && m.Foo != nil {
return *m.Foo
}
return 0
return FOO_FOO1
}
type GoTestField struct {
@ -378,7 +378,7 @@ func (m *GoTest) GetKind() GoTest_KIND {
if m != nil && m.Kind != nil {
return *m.Kind
}
return 0
return GoTest_VOID
}
func (m *GoTest) GetTable() string {
@ -1289,7 +1289,7 @@ func (m *MyMessage) GetBikeshed() MyMessage_Color {
if m != nil && m.Bikeshed != nil {
return *m.Bikeshed
}
return 0
return MyMessage_RED
}
func (m *MyMessage) GetSomegroup() *MyMessage_SomeGroup {

View File

@ -193,8 +193,8 @@ func (p *textParser) advance() {
}
var (
errBadUTF8 = errors.New("bad UTF-8")
errBadHex = errors.New("bad hexadecimal")
errBadUTF8 = errors.New("proto: bad UTF-8")
errBadHex = errors.New("proto: bad hexadecimal")
)
func unquoteC(s string, quote rune) (string, error) {

View File

@ -303,7 +303,7 @@ type limitedWriter struct {
limit int
}
var outOfSpace = errors.New("insufficient space")
var outOfSpace = errors.New("proto: insufficient space")
func (w *limitedWriter) Write(p []byte) (n int, err error) {
var avail = w.limit - w.b.Len()

View File

@ -487,14 +487,14 @@ func (m *FieldDescriptorProto) GetLabel() FieldDescriptorProto_Label {
if m != nil && m.Label != nil {
return *m.Label
}
return 0
return FieldDescriptorProto_LABEL_OPTIONAL
}
func (m *FieldDescriptorProto) GetType() FieldDescriptorProto_Type {
if m != nil && m.Type != nil {
return *m.Type
}
return 0
return FieldDescriptorProto_TYPE_DOUBLE
}
func (m *FieldDescriptorProto) GetTypeName() string {

View File

@ -1664,6 +1664,27 @@ func (g *Generator) generateMessage(message *Descriptor) {
g.P("return false")
case descriptor.FieldDescriptorProto_TYPE_STRING:
g.P(`return ""`)
case descriptor.FieldDescriptorProto_TYPE_ENUM:
// The default default for an enum is the first value in the enum,
// not zero.
obj := g.ObjectNamed(field.GetTypeName())
var enum *EnumDescriptor
if id, ok := obj.(*ImportedDescriptor); ok {
// The enum type has been publicly imported.
enum, _ = id.o.(*EnumDescriptor)
} else {
enum, _ = obj.(*EnumDescriptor)
}
if enum == nil {
log.Printf("don't know how to generate getter for %s", field.GetName())
continue
}
if len(enum.Value) == 0 {
g.P("return 0 // empty enum")
} else {
first := enum.Value[0].GetName()
g.P("return ", g.DefaultPackageName(obj)+enum.prefix()+first)
}
default:
g.P("return 0")
}

View File

@ -199,7 +199,7 @@ func (m *Request) GetHue() Request_Color {
if m != nil && m.Hue != nil {
return *m.Hue
}
return 0
return Request_RED
}
func (m *Request) GetHat() HatType {

View File

@ -199,7 +199,7 @@ func (m *Request) GetHue() Request_Color {
if m != nil && m.Hue != nil {
return *m.Hue
}
return 0
return Request_RED
}
func (m *Request) GetHat() HatType {

View File

@ -58,7 +58,7 @@ message Request {
}
repeated int64 key = 1;
// optional imp.ImportedMessage imported_message = 2;
optional Color hue = 3;
optional Color hue = 3; // no default
optional HatType hat = 4 [default=FEDORA];
// optional imp.ImportedMessage.Owner owner = 6;
optional float deadline = 7 [default=inf];

4
third_party/deps vendored
View File

@ -1,8 +1,8 @@
packages="
github.com/coreos/go-raft
github.com/coreos/go-etcd
github.com/ccding/go-logging
github.com/ccding/go-config-reader
github.com/coreos/go-log/log
github.com/coreos/go-systemd
bitbucket.org/kardianos/osext
code.google.com/p/go.net
code.google.com/p/goprotobuf

View File

@ -1,22 +0,0 @@
# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o
*.a
*.so
# Folders
_obj
_test
# Architecture specific extensions/prefixes
*.[568vq]
[568vq].out
*.cgo1.go
*.cgo2.c
_cgo_defun.c
_cgo_gotypes.go
_cgo_export.*
_testmain.go
*.exe

View File

@ -1,2 +0,0 @@
go-config-reader
================

View File

@ -1,199 +0,0 @@
// Copyright 2013, Cong Ding. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// author: Cong Ding <dinggnu@gmail.com>
package config
import (
"bufio"
"errors"
"fmt"
"io/ioutil"
"os"
"strings"
)
var commentPrefix = []string{"//", "#", ";"}
// Config struct constructs a new configuration handler.
type Config struct {
filename string
config map[string]map[string]string
}
// NewConfig function cnstructs a new Config struct with filename. You have to
// call Read() function to let it read from the file. Otherwise you will get
// empty string (i.e., "") when you are calling Get() function. Another usage
// is that you call NewConfig() function and then call Add()/Set() function to
// add new key-values to the configuration. Finally you can call Write()
// function to write the new configuration to the file.
func NewConfig(filename string) *Config {
c := new(Config)
c.filename = filename
c.config = make(map[string]map[string]string)
return c
}
// Filename function returns the filename of the configuration.
func (c *Config) Filename() string {
return c.filename
}
// SetFilename function sets the filename of the configuration.
func (c *Config) SetFilename(filename string) {
c.filename = filename
}
// Reset function reset the map in the configuration.
func (c *Config) Reset() {
c.config = make(map[string]map[string]string)
}
// Read function reads configurations from the file defined in
// Config.filename.
func (c *Config) Read() error {
in, err := os.Open(c.filename)
if err != nil {
return err
}
defer in.Close()
scanner := bufio.NewScanner(in)
line := ""
section := ""
for scanner.Scan() {
if scanner.Text() == "" {
continue
}
if line == "" {
sec, ok := checkSection(scanner.Text())
if ok {
section = sec
continue
}
}
if checkComment(scanner.Text()) {
continue
}
line += scanner.Text()
if strings.HasSuffix(line, "\\") {
line = line[:len(line)-1]
continue
}
key, value, ok := checkLine(line)
if !ok {
return errors.New("WRONG: " + line)
}
c.Set(section, key, value)
line = ""
}
return nil
}
// Get function returns the value of a key in the configuration. If the key
// does not exist, it returns empty string (i.e., "").
func (c *Config) Get(section string, key string) string {
value, ok := c.config[section][key]
if !ok {
return ""
}
return value
}
// Set function updates the value of a key in the configuration. Function
// Set() is exactly the same as function Add().
func (c *Config) Set(section string, key string, value string) {
_, ok := c.config[section]
if !ok {
c.config[section] = make(map[string]string)
}
c.config[section][key] = value
}
// Add function adds a new key to the configuration. Function Add() is exactly
// the same as function Set().
func (c *Config) Add(section string, key string, value string) {
c.Set(section, key, value)
}
// Del function deletes a key from the configuration.
func (c *Config) Del(section string, key string) {
_, ok := c.config[section]
if ok {
delete(c.config[section], key)
if len(c.config[section]) == 0 {
delete(c.config, section)
}
}
}
// Write function writes the updated configuration back.
func (c *Config) Write() error {
return nil
}
// WriteTo function writes the configuration to a new file. This function
// re-organizes the configuration and deletes all the comments.
func (c *Config) WriteTo(filename string) error {
content := ""
for k, v := range c.config {
format := "%v = %v\n"
if k != "" {
content += fmt.Sprintf("[%v]\n", k)
format = "\t" + format
}
for key, value := range v {
content += fmt.Sprintf(format, key, value)
}
}
return ioutil.WriteFile(filename, []byte(content), 0644)
}
// To check this line is a section or not. If it is not a section, it returns
// "".
func checkSection(line string) (string, bool) {
line = strings.TrimSpace(line)
lineLen := len(line)
if lineLen < 2 {
return "", false
}
if line[0] == '[' && line[lineLen-1] == ']' {
return line[1 : lineLen-1], true
}
return "", false
}
// To check this line is a valid key-value pair or not.
func checkLine(line string) (string, string, bool) {
key := ""
value := ""
sp := strings.SplitN(line, "=", 2)
if len(sp) != 2 {
return key, value, false
}
key = strings.TrimSpace(sp[0])
value = strings.TrimSpace(sp[1])
return key, value, true
}
// To check this line is a whole line comment or not.
func checkComment(line string) bool {
line = strings.TrimSpace(line)
for p := range commentPrefix {
if strings.HasPrefix(line, commentPrefix[p]) {
return true
}
}
return false
}

View File

@ -1,10 +0,0 @@
a = b
1 = 2
cc = dd, 2 ejkl ijfadjfl
// adkfa
# 12jfiahdoif
dd = c \
oadi
[test]
a = c c d

View File

@ -1,32 +0,0 @@
// Copyright 2013, Cong Ding. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// author: Cong Ding <dinggnu@gmail.com>
package main
import (
"fmt"
"github.com/ccding/go-config-reader/config"
)
func main() {
c := config.NewConfig("example.conf")
err := c.Read()
fmt.Println(err)
fmt.Println(c)
fmt.Println(c.Get("test", "a"))
fmt.Println(c.Get("", "dd"))
c.WriteTo("example2.conf")
}

View File

@ -1,22 +0,0 @@
# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o
*.a
*.so
# Folders
_obj
_test
# Architecture specific extensions/prefixes
*.[568vq]
[568vq].out
*.cgo1.go
*.cgo2.c
_cgo_defun.c
_cgo_gotypes.go
_cgo_export.*
_testmain.go
*.exe

View File

@ -1,191 +0,0 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction, and
distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by the copyright
owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all other entities
that control, are controlled by, or are under common control with that entity.
For the purposes of this definition, "control" means (i) the power, direct or
indirect, to cause the direction or management of such entity, whether by
contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity exercising
permissions granted by this License.
"Source" form shall mean the preferred form for making modifications, including
but not limited to software source code, documentation source, and configuration
files.
"Object" form shall mean any form resulting from mechanical transformation or
translation of a Source form, including but not limited to compiled object code,
generated documentation, and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or Object form, made
available under the License, as indicated by a copyright notice that is included
in or attached to the work (an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object form, that
is based on (or derived from) the Work and for which the editorial revisions,
annotations, elaborations, or other modifications represent, as a whole, an
original work of authorship. For the purposes of this License, Derivative Works
shall not include works that remain separable from, or merely link (or bind by
name) to the interfaces of, the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including the original version
of the Work and any modifications or additions to that Work or Derivative Works
thereof, that is intentionally submitted to Licensor for inclusion in the Work
by the copyright owner or by an individual or Legal Entity authorized to submit
on behalf of the copyright owner. For the purposes of this definition,
"submitted" means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems, and
issue tracking systems that are managed by, or on behalf of, the Licensor for
the purpose of discussing and improving the Work, but excluding communication
that is conspicuously marked or otherwise designated in writing by the copyright
owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity on behalf
of whom a Contribution has been received by Licensor and subsequently
incorporated within the Work.
2. Grant of Copyright License.
Subject to the terms and conditions of this License, each Contributor hereby
grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free,
irrevocable copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the Work and such
Derivative Works in Source or Object form.
3. Grant of Patent License.
Subject to the terms and conditions of this License, each Contributor hereby
grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free,
irrevocable (except as stated in this section) patent license to make, have
made, use, offer to sell, sell, import, and otherwise transfer the Work, where
such license applies only to those patent claims licensable by such Contributor
that are necessarily infringed by their Contribution(s) alone or by combination
of their Contribution(s) with the Work to which such Contribution(s) was
submitted. If You institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work or a
Contribution incorporated within the Work constitutes direct or contributory
patent infringement, then any patent licenses granted to You under this License
for that Work shall terminate as of the date such litigation is filed.
4. Redistribution.
You may reproduce and distribute copies of the Work or Derivative Works thereof
in any medium, with or without modifications, and in Source or Object form,
provided that You meet the following conditions:
You must give any other recipients of the Work or Derivative Works a copy of
this License; and
You must cause any modified files to carry prominent notices stating that You
changed the files; and
You must retain, in the Source form of any Derivative Works that You distribute,
all copyright, patent, trademark, and attribution notices from the Source form
of the Work, excluding those notices that do not pertain to any part of the
Derivative Works; and
If the Work includes a "NOTICE" text file as part of its distribution, then any
Derivative Works that You distribute must include a readable copy of the
attribution notices contained within such NOTICE file, excluding those notices
that do not pertain to any part of the Derivative Works, in at least one of the
following places: within a NOTICE text file distributed as part of the
Derivative Works; within the Source form or documentation, if provided along
with the Derivative Works; or, within a display generated by the Derivative
Works, if and wherever such third-party notices normally appear. The contents of
the NOTICE file are for informational purposes only and do not modify the
License. You may add Your own attribution notices within Derivative Works that
You distribute, alongside or as an addendum to the NOTICE text from the Work,
provided that such additional attribution notices cannot be construed as
modifying the License.
You may add Your own copyright statement to Your modifications and may provide
additional or different license terms and conditions for use, reproduction, or
distribution of Your modifications, or for any such Derivative Works as a whole,
provided Your use, reproduction, and distribution of the Work otherwise complies
with the conditions stated in this License.
5. Submission of Contributions.
Unless You explicitly state otherwise, any Contribution intentionally submitted
for inclusion in the Work by You to the Licensor shall be under the terms and
conditions of this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify the terms of
any separate license agreement you may have executed with Licensor regarding
such Contributions.
6. Trademarks.
This License does not grant permission to use the trade names, trademarks,
service marks, or product names of the Licensor, except as required for
reasonable and customary use in describing the origin of the Work and
reproducing the content of the NOTICE file.
7. Disclaimer of Warranty.
Unless required by applicable law or agreed to in writing, Licensor provides the
Work (and each Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied,
including, without limitation, any warranties or conditions of TITLE,
NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are
solely responsible for determining the appropriateness of using or
redistributing the Work and assume any risks associated with Your exercise of
permissions under this License.
8. Limitation of Liability.
In no event and under no legal theory, whether in tort (including negligence),
contract, or otherwise, unless required by applicable law (such as deliberate
and grossly negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special, incidental,
or consequential damages of any character arising as a result of this License or
out of the use or inability to use the Work (including but not limited to
damages for loss of goodwill, work stoppage, computer failure or malfunction, or
any and all other commercial damages or losses), even if such Contributor has
been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability.
While redistributing the Work or Derivative Works thereof, You may choose to
offer, and charge a fee for, acceptance of support, warranty, indemnity, or
other liability obligations and/or rights consistent with this License. However,
in accepting such obligations, You may act only on Your own behalf and on Your
sole responsibility, not on behalf of any other Contributor, and only if You
agree to indemnify, defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason of your
accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work
To apply the Apache License to your work, attach the following boilerplate
notice, with the fields enclosed by brackets "[]" replaced with your own
identifying information. (Don't include the brackets!) The text should be
enclosed in the appropriate comment syntax for the file format. We also
recommend that a file or class name and description of purpose be included on
the same "printed page" as the copyright notice for easier identification within
third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@ -1,217 +0,0 @@
#go-logging
go-logging is a high-performance logging library for golang.
* Simple: It supports only necessary operations and easy to get start.
* Fast: Asynchronous logging without runtime-related fields has an extremely
low delay of about 800 nano-seconds.
## Getting Started
### Installation
The step below will download the library source code to
`${GOPATH}/src/github.com/ccding/go-logging`.
```bash
go get github.com/ccding/go-logging/logging
```
Given the source code downloaded, it makes you be able to run the examples,
tests, and benchmarks.
```bash
cd ${GOPATH}/src/github.com/ccding/go-logging/logging
go get
go run ../example.go
go test -v -bench .
```
### Example
go-logging is used like any other Go libraries. You can simply use the library
in this way.
```go
import "github.com/ccding/go-logging/logging"
```
Here is a simple example.
```go
package main
import (
"github.com/ccding/go-logging/logging"
)
func main() {
logger, _ := logging.SimpleLogger("main")
logger.Error("this is a test from error")
logger.Destroy()
}
```
### Configuration
#### Construction Functions
It has the following functions to create a logger.
```go
// with BasicFormat and writing to stdout
SimpleLogger(name string) (*Logger, error)
// with BasicFormat and writing to DefaultFileName
BasicLogger(name string) (*Logger, error)
// with RichFormatand writing to DefaultFileName
RichLogger(name string) (*Logger, error)
// with detailed configuration and writing to file
FileLogger(name string, level Level, format string, timeFormat string, file string, sync bool) (*Logger, error)
// with detailed configuration and writing to a writer
WriterLogger(name string, level Level, format string, timeFormat string, out io.Writer, sync bool) (*Logger, error)
// read configurations from a config file
ConfigLogger(filename string) (*Logger, error)
```
The meanings of these fields are
```go
name string // logger name
level Level // record level higher than this will be printed
format string // format configuration
timeFormat string // format for time
file string // file name for logging
out io.Writer // writer for logging
sync bool // use sync or async way to record logs
```
The detailed description of these fields will be presented later.
#### Logging Functions
It supports the following functions for logging. All of these functions are
thread-safe.
```go
(*Logger) Logf(level Level, format string, v ...interface{})
(*Logger) Log(level Level, v ...interface{})
(*Logger) Criticalf(format string, v ...interface{})
(*Logger) Critical(v ...interface{})
(*Logger) Fatalf(format string, v ...interface{})
(*Logger) Fatal(v ...interface{})
(*Logger) Errorf(format string, v ...interface{})
(*Logger) Error(v ...interface{})
(*Logger) Warningf(format string, v ...interface{})
(*Logger) Warning(v ...interface{})
(*Logger) Warnf(format string, v ...interface{})
(*Logger) Warn(v ...interface{})
(*Logger) Infof(format string, v ...interface{})
(*Logger) Info(v ...interface{})
(*Logger) Debugf(format string, v ...interface{})
(*Logger) Debug(v ...interface{})
(*Logger) Notsetf(format string, v ...interface{})
(*Logger) Notset(v ...interface{})
```
#### Logger Operations
The logger supports the following operations. In these functions, `SetWriter`
and `Destroy` are not thread-safe, while others are. All these functions are
running in a synchronous way.
```go
// Getter functions
(*Logger) Name() string // get name
(*Logger) TimeFormat() string // get time format
(*Logger) Level() Level // get level [this function is thread safe]
(*Logger) RecordFormat() string // get the first part of the format
(*Logger) RecordArgs() []string // get the second part of the format
(*Logger) Writer() io.Writer // get writer
(*Logger) Sync() bool // get sync or async
// Setter functions
(*Logger) SetLevel(level Level) // set level [this function is thread safe]
(*Logger) SetWriter(out ...io.Writer) // set multiple writers
// Other functions
(*Logger) Flush() // flush the writer
(*Logger) Destroy() // destroy the logger
```
#### Fields Description
##### Name
Name field is a string, which can be written to the logging and used to
separate multiple loggers. It allows two logger having the same name. There
is not any default value for name.
##### Logging Levels
There are these levels in logging.
```go
CRITICAL 50
FATAL CRITICAL
ERROR 40
WARNING 30
WARN WARNING
INFO 20
DEBUG 10
NOTSET 0
```
##### Record Format
The record format is described by a string, which has two parts separated by
`\n`. The first part describes the format of the log, and the second part
lists all the fields to be shown in the log. In other word, the first part is
the first parameter `format` of `fmt.Printf(format string, v ...interface{})`,
and the second part describes the second parameter `v` of it. It is not
allowed to have `\n` in the first part. The fields in the second part are
separated by comma `,`, while extra blank spaces are allowed. An example of
the format string is
```go
const BasicFormat = "%s [%6s] %30s - %s\n name,levelname,time,message"
```
which is the pre-defined `BasicFormat` used by `BasicLogger()` and
`SimpleLogger()`.
It supports the following fields for the second part of the format.
```go
"name" string %s // name of the logger
"seqid" uint64 %d // sequence number
"levelno" int32 %d // level number
"levelname" string %s // level name
"created" int64 %d // starting time of the logger
"nsecs" int64 %d // nanosecond of the starting time
"time" string %s // record created time
"timestamp" int64 %d // timestamp of record
"rtime" int64 %d // relative time since started
"filename" string %s // source filename of the caller
"pathname" string %s // filename with path
"module" string %s // executable filename
"lineno" int %d // line number in source code
"funcname" string %s // function name of the caller
"thread" int32 %d // thread id
"process" int %d // process id
"message" string %d // logger message
```
The following runtime-related fields is extremely expensive and slow, please
be careful when using them.
```go
"filename" string %s // source filename of the caller
"pathname" string %s // filename with path
"lineno" int %d // line number in source code
"funcname" string %s // function name of the caller
"thread" int32 %d // thread id
```
There are a few pre-defined values for record format.
```go
BasicFormat = "%s [%6s] %30s - %s\n name,levelname,time,message"
RichFormat = "%s [%6s] %d %30s - %d - %s:%s:%d - %s\n name, levelname, seqid, time, thread, filename, funcname, lineno, message"
```
##### Time Format
We use the same time format as golang. The default time format is
```go
DefaultTimeFormat = "2006-01-02 15:04:05.999999999" // default time format
```
##### File Name, Writer, and Sync
The meaning of these fields are obvious. Filename is used to create writer.
We also allow the user create a writer by herself and pass it to the logger.
Sync describes whether the user would like to use synchronous or asynchronous
method to write logs. `true` value means synchronous method, and `false` value
means asynchronous way. We suggest you use asynchronous way because it causes
extremely low extra delay by the logging functions.
## Contributors
In alphabetical order
* Cong Ding ([ccding][ccding])
* Xiang Li ([xiangli-cmu][xiangli])
* Zifei Tong ([5kg][5kg])
[ccding]: //github.com/ccding
[xiangli]: //github.com/xiangli-cmu
[5kg]: //github.com/5kg
## TODO List
1. logging server

View File

@ -1,3 +0,0 @@
name = example
sync = 0

View File

@ -1,45 +0,0 @@
// Copyright 2013, Cong Ding. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// author: Cong Ding <dinggnu@gmail.com>
package main
import (
"github.com/ccding/go-logging/logging"
"time"
)
func main() {
logger1, _ := logging.SimpleLogger("main")
logger1.SetLevel(logging.NOTSET)
logger1.Error("this is a test from error")
logger1.Debug("this is a test from debug")
logger1.Notset("orz", time.Now().UnixNano())
logger1.Destroy()
logger2, _ := logging.RichLogger("main")
logger2.SetLevel(logging.DEBUG)
logger2.Error("this is a test from error")
logger2.Debug("this is a test from debug")
logger2.Notset("orz", time.Now().UnixNano())
logger2.Destroy()
logger3, _ := logging.ConfigLogger("example.conf")
logger3.SetLevel(logging.DEBUG)
logger3.Error("this is a test from error")
logger3.Debug("this is a test from debug")
logger3.Notset("orz", time.Now().UnixNano())
logger3.Destroy()
}

View File

@ -1,98 +0,0 @@
// Copyright 2013, Cong Ding. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// author: Cong Ding <dinggnu@gmail.com>
package logging
// Logln receives log request from the client. The request includes a set of
// variables.
func (logger *Logger) Log(level Level, v ...interface{}) {
// Don't delete this calling. The calling is used to keep the same
// calldepth for all the logging functions. The calldepth is used to
// get runtime information such as line number, function name, etc.
logger.log(level, v...)
}
// Logf receives log request from the client. The request has a string
// parameter to describe the format of output.
func (logger *Logger) Logf(level Level, format string, v ...interface{}) {
logger.logf(level, format, v...)
}
// Other quick commands for different level
func (logger *Logger) Critical(v ...interface{}) {
logger.log(CRITICAL, v...)
}
func (logger *Logger) Fatal(v ...interface{}) {
logger.log(CRITICAL, v...)
}
func (logger *Logger) Error(v ...interface{}) {
logger.log(ERROR, v...)
}
func (logger *Logger) Warn(v ...interface{}) {
logger.log(WARNING, v...)
}
func (logger *Logger) Warning(v ...interface{}) {
logger.log(WARNING, v...)
}
func (logger *Logger) Info(v ...interface{}) {
logger.log(INFO, v...)
}
func (logger *Logger) Debug(v ...interface{}) {
logger.log(DEBUG, v...)
}
func (logger *Logger) Notset(v ...interface{}) {
logger.log(NOTSET, v...)
}
func (logger *Logger) Criticalf(format string, v ...interface{}) {
logger.logf(CRITICAL, format, v...)
}
func (logger *Logger) Fatalf(format string, v ...interface{}) {
logger.logf(CRITICAL, format, v...)
}
func (logger *Logger) Errorf(format string, v ...interface{}) {
logger.logf(ERROR, format, v...)
}
func (logger *Logger) Warnf(format string, v ...interface{}) {
logger.logf(WARNING, format, v...)
}
func (logger *Logger) Warningf(format string, v ...interface{}) {
logger.logf(WARNING, format, v...)
}
func (logger *Logger) Infof(format string, v ...interface{}) {
logger.logf(INFO, format, v...)
}
func (logger *Logger) Debugf(format string, v ...interface{}) {
logger.logf(DEBUG, format, v...)
}
func (logger *Logger) Notsetf(format string, v ...interface{}) {
logger.logf(NOTSET, format, v...)
}

View File

@ -1,236 +0,0 @@
// Copyright 2013, Cong Ding. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// author: Cong Ding <dinggnu@gmail.com>
package logging
import (
"bitbucket.org/kardianos/osext"
"os"
"path"
"runtime"
"sync/atomic"
"time"
)
// The struct for each log record
type record struct {
level Level
seqid uint64
pathname string
filename string
module string
lineno int
funcname string
thread int
process int
message string
time time.Time
}
// This variable maps fields in recordArgs to relavent function signatures
var fields = map[string]func(*Logger, *record) interface{}{
"name": (*Logger).lname, // name of the logger
"seqid": (*Logger).nextSeqid, // sequence number
"levelno": (*Logger).levelno, // level number
"levelname": (*Logger).levelname, // level name
"created": (*Logger).created, // starting time of the logger
"nsecs": (*Logger).nsecs, // nanosecond of the starting time
"time": (*Logger).time, // record created time
"timestamp": (*Logger).timestamp, // timestamp of record
"rtime": (*Logger).rtime, // relative time since started
"filename": (*Logger).filename, // source filename of the caller
"pathname": (*Logger).pathname, // filename with path
"module": (*Logger).module, // executable filename
"lineno": (*Logger).lineno, // line number in source code
"funcname": (*Logger).funcname, // function name of the caller
"thread": (*Logger).thread, // thread id
"process": (*Logger).process, // process id
"message": (*Logger).message, // logger message
}
var runtimeFields = map[string]bool{
"name": false,
"seqid": false,
"levelno": false,
"levelname": false,
"created": false,
"nsecs": false,
"time": false,
"timestamp": false,
"rtime": false,
"filename": true,
"pathname": true,
"module": false,
"lineno": true,
"funcname": true,
"thread": true,
"process": false,
"message": false,
}
// If it fails to get some fields with string type, these fields are set to
// errString value.
const errString = "???"
// GetGoID returns the id of goroutine, which is defined in ./get_go_id.c
func GetGoID() int32
// genRuntime generates the runtime information, including pathname, function
// name, filename, line number.
func genRuntime(r *record) {
calldepth := 5
pc, file, line, ok := runtime.Caller(calldepth)
if ok {
// Generate short function name
fname := runtime.FuncForPC(pc).Name()
fshort := fname
for i := len(fname) - 1; i > 0; i-- {
if fname[i] == '.' {
fshort = fname[i+1:]
break
}
}
r.pathname = file
r.funcname = fshort
r.filename = path.Base(file)
r.lineno = line
} else {
r.pathname = errString
r.funcname = errString
r.filename = errString
// Here we uses -1 rather than 0, because the default value in
// golang is 0 and we should know the value is uninitialized
// or failed to get
r.lineno = -1
}
}
// Logger name
func (logger *Logger) lname(r *record) interface{} {
return logger.name
}
// Next sequence number
func (logger *Logger) nextSeqid(r *record) interface{} {
if r.seqid == 0 {
r.seqid = atomic.AddUint64(&(logger.seqid), 1)
}
return r.seqid
}
// Log level number
func (logger *Logger) levelno(r *record) interface{} {
return int32(r.level)
}
// Log level name
func (logger *Logger) levelname(r *record) interface{} {
return levelNames[r.level]
}
// File name of calling logger, with whole path
func (logger *Logger) pathname(r *record) interface{} {
if r.pathname == "" {
genRuntime(r)
}
return r.pathname
}
// File name of calling logger
func (logger *Logger) filename(r *record) interface{} {
if r.filename == "" {
genRuntime(r)
}
return r.filename
}
// module name
func (logger *Logger) module(r *record) interface{} {
module, _ := osext.Executable()
return path.Base(module)
}
// Line number
func (logger *Logger) lineno(r *record) interface{} {
if r.lineno == 0 {
genRuntime(r)
}
return r.lineno
}
// Function name
func (logger *Logger) funcname(r *record) interface{} {
if r.funcname == "" {
genRuntime(r)
}
return r.funcname
}
// Timestamp of starting time
func (logger *Logger) created(r *record) interface{} {
return logger.startTime.UnixNano()
}
// RFC3339Nano time
func (logger *Logger) time(r *record) interface{} {
if r.time.IsZero() {
r.time = time.Now()
}
return r.time.Format(logger.timeFormat)
}
// Nanosecond of starting time
func (logger *Logger) nsecs(r *record) interface{} {
return logger.startTime.Nanosecond()
}
// Nanosecond timestamp
func (logger *Logger) timestamp(r *record) interface{} {
if r.time.IsZero() {
r.time = time.Now()
}
return r.time.UnixNano()
}
// Nanoseconds since logger created
func (logger *Logger) rtime(r *record) interface{} {
if r.time.IsZero() {
r.time = time.Now()
}
return r.time.Sub(logger.startTime).Nanoseconds()
}
// Thread ID
func (logger *Logger) thread(r *record) interface{} {
if r.thread == 0 {
r.thread = int(GetGoID())
}
return r.thread
}
// Process ID
func (logger *Logger) process(r *record) interface{} {
if r.process == 0 {
r.process = os.Getpid()
}
return r.process
}
// The log message
func (logger *Logger) message(r *record) interface{} {
return r.message
}

View File

@ -1,60 +0,0 @@
// Copyright 2013, Cong Ding. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// author: Cong Ding <dinggnu@gmail.com>
package logging
import (
"fmt"
"strconv"
"testing"
)
func empty() {
}
func TestGetGoID(t *testing.T) {
for i := 0; i < 1000; i++ {
goid := int(GetGoID())
go empty()
goid2 := int(GetGoID())
if goid != goid2 {
t.Errorf("%v, %v\n", goid, goid2)
}
}
}
func TestSeqid(t *testing.T) {
logger, _ := BasicLogger("test")
for i := 0; i < 1000; i++ {
r := new(record)
name := strconv.Itoa(i + 1)
seq := logger.nextSeqid(r)
if fmt.Sprintf("%d", seq) != name {
t.Errorf("%v, %v\n", seq, name)
}
}
logger.Destroy()
}
func TestName(t *testing.T) {
name := "test"
logger, _ := BasicLogger(name)
r := new(record)
if logger.lname(r) != name {
t.Errorf("%v, %v\n", logger.lname(r), name)
}
logger.Destroy()
}

View File

@ -1,62 +0,0 @@
// Copyright 2013, Cong Ding. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// author: Cong Ding <dinggnu@gmail.com>
package logging
import (
"errors"
"fmt"
"strings"
)
// pre-defined formats
const (
BasicFormat = "%s [%6s] %30s - %s\n name,levelname,time,message"
RichFormat = "%s [%6s] %d %30s - %d - %s:%s:%d - %s\n name, levelname, seqid, time, thread, filename, funcname, lineno, message"
)
// genLog generates log string from the format setting.
func (logger *Logger) genLog(level Level, message string) string {
fs := make([]interface{}, len(logger.recordArgs))
r := new(record)
r.message = message
r.level = level
for k, v := range logger.recordArgs {
fs[k] = fields[v](logger, r)
}
return fmt.Sprintf(logger.recordFormat, fs...)
}
// parseFormat checks the legality of format and parses it to recordFormat and recordArgs
func (logger *Logger) parseFormat(format string) error {
logger.runtime = false
fts := strings.Split(format, "\n")
if len(fts) != 2 {
return errors.New("logging format error")
}
logger.recordFormat = fts[0]
logger.recordArgs = strings.Split(fts[1], ",")
for k, v := range logger.recordArgs {
tv := strings.TrimSpace(v)
_, ok := fields[tv]
if ok == false {
return errors.New("logging format error")
}
logger.recordArgs[k] = tv
logger.runtime = logger.runtime || runtimeFields[tv]
}
return nil
}

View File

@ -1,25 +0,0 @@
// Copyright 2013, Cong Ding. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// author: Cong Ding <dinggnu@gmail.com>
// This file defines GetGoId function, which is used to get the id of the
// current goroutine. More details about this function are availeble in the
// runtime.c file of golang source code.
#include <runtime.h>
void ·GetGoID(int32 ret) {
ret = g->goid;
USED(&ret);
}

View File

@ -1,68 +0,0 @@
// Copyright 2013, Cong Ding. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// author: Cong Ding <dinggnu@gmail.com>
package logging
// Level is the type of level.
type Level int32
// Values of level
const (
CRITICAL Level = 50
FATAL Level = CRITICAL
ERROR Level = 40
WARNING Level = 30
WARN Level = WARNING
INFO Level = 20
DEBUG Level = 10
NOTSET Level = 0
)
// The mapping from level to level name
var levelNames = map[Level]string{
CRITICAL: "CRITICAL",
ERROR: "ERROR",
WARNING: "WARNING",
INFO: "INFO",
DEBUG: "DEBUG",
NOTSET: "NOTSET",
}
// The mapping from level name to level
var levelValues = map[string]Level{
"CRITICAL": CRITICAL,
"ERROR": ERROR,
"WARN": WARNING,
"WARNING": WARNING,
"INFO": INFO,
"DEBUG": DEBUG,
"NOTSET": NOTSET,
}
// String function casts level value to string
func (level *Level) String() string {
return levelNames[*level]
}
// GetLevelName lets users be able to get level name from level value.
func GetLevelName(levelValue Level) string {
return levelNames[levelValue]
}
// GetLevelValue lets users be able to get level value from level name.
func GetLevelValue(levelName string) Level {
return levelValues[levelName]
}

View File

@ -1,259 +0,0 @@
// Copyright 2013, Cong Ding. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// author: Cong Ding <dinggnu@gmail.com>
// Package logging implements log library for other applications. It provides
// functions Debug, Info, Warning, Error, Critical, and formatting version
// Logf.
//
// Example:
//
// logger := logging.SimpleLogger("main")
// logger.SetLevel(logging.WARNING)
// logger.Error("test for error")
// logger.Warning("test for warning", "second parameter")
// logger.Debug("test for debug")
//
package logging
import (
"github.com/ccding/go-config-reader/config"
"io"
"os"
"strconv"
"sync"
"sync/atomic"
"time"
)
// Pre-defined formats
const (
DefaultFileName = "logging.log" // default logging filename
DefaultConfigFile = "logging.conf" // default logging configuration file
DefaultTimeFormat = "2006-01-02 15:04:05.999999999" // defaulttime format
bufSize = 1000 // buffer size for writer
queueSize = 10000 // chan queue size in async logging
reqSize = 10000 // chan queue size in async logging
)
// Logger is the logging struct.
type Logger struct {
// Be careful of the alignment issue of the variable seqid because it
// uses the sync/atomic.AddUint64() operation. If the alignment is
// wrong, it will cause a panic. To solve the alignment issue in an
// easy way, we put seqid to the beginning of the structure.
// seqid is only visiable internally.
seqid uint64 // last used sequence number in record
// These variables can be configured by users.
name string // logger name
level Level // record level higher than this will be printed
recordFormat string // format of the record
recordArgs []string // arguments to be used in the recordFormat
out io.Writer // writer
sync bool // use sync or async way to record logs
timeFormat string // format for time
// These variables are visible to users.
startTime time.Time // start time of the logger
// Internally used variables, which don't have get and set functions.
wlock sync.Mutex // writer lock
queue chan string // queue used in async logging
request chan request // queue used in non-runtime logging
flush chan bool // flush signal for the watcher to write
quit chan bool // quit signal for the watcher to quit
fd *os.File // file handler, used to close the file on destroy
runtime bool // with runtime operation or not
}
// SimpleLogger creates a new logger with simple configuration.
func SimpleLogger(name string) (*Logger, error) {
return createLogger(name, WARNING, BasicFormat, DefaultTimeFormat, os.Stdout, false)
}
// BasicLogger creates a new logger with basic configuration.
func BasicLogger(name string) (*Logger, error) {
return FileLogger(name, WARNING, BasicFormat, DefaultTimeFormat, DefaultFileName, false)
}
// RichLogger creates a new logger with simple configuration.
func RichLogger(name string) (*Logger, error) {
return FileLogger(name, NOTSET, RichFormat, DefaultTimeFormat, DefaultFileName, false)
}
// FileLogger creates a new logger with file output.
func FileLogger(name string, level Level, format string, timeFormat string, file string, sync bool) (*Logger, error) {
out, err := os.Create(file)
if err != nil {
return nil, err
}
logger, err := createLogger(name, level, format, timeFormat, out, sync)
if err == nil {
logger.fd = out
return logger, nil
} else {
return nil, err
}
}
// WriterLogger creates a new logger with a writer
func WriterLogger(name string, level Level, format string, timeFormat string, out io.Writer, sync bool) (*Logger, error) {
return createLogger(name, level, format, timeFormat, out, sync)
}
// WriterLogger creates a new logger from a configuration file
func ConfigLogger(filename string) (*Logger, error) {
conf := config.NewConfig(filename)
err := conf.Read()
if err != nil {
return nil, err
}
name := conf.Get("", "name")
slevel := conf.Get("", "level")
if slevel == "" {
slevel = "0"
}
l, err := strconv.Atoi(slevel)
if err != nil {
return nil, err
}
level := Level(l)
format := conf.Get("", "format")
if format == "" {
format = BasicFormat
}
timeFormat := conf.Get("", "timeFormat")
if timeFormat == "" {
timeFormat = DefaultTimeFormat
}
ssync := conf.Get("", "sync")
if ssync == "" {
ssync = "0"
}
file := conf.Get("", "file")
if file == "" {
file = DefaultFileName
}
sync := true
if ssync == "0" {
sync = false
} else if ssync == "1" {
sync = true
} else {
return nil, err
}
return FileLogger(name, level, format, timeFormat, file, sync)
}
// createLogger create a new logger
func createLogger(name string, level Level, format string, timeFormat string, out io.Writer, sync bool) (*Logger, error) {
logger := new(Logger)
err := logger.parseFormat(format)
if err != nil {
return nil, err
}
// asign values to logger
logger.name = name
logger.level = level
logger.out = out
logger.seqid = 0
logger.sync = sync
logger.queue = make(chan string, queueSize)
logger.request = make(chan request, reqSize)
logger.flush = make(chan bool)
logger.quit = make(chan bool)
logger.startTime = time.Now()
logger.fd = nil
logger.timeFormat = timeFormat
// start watcher to write logs if it is async or no runtime field
if !logger.sync {
go logger.watcher()
}
return logger, nil
}
// Destroy sends quit signal to watcher and releases all the resources.
func (logger *Logger) Destroy() {
if !logger.sync {
// quit watcher
logger.quit <- true
// wait for watcher quit
<-logger.quit
}
// clean up
if logger.fd != nil {
logger.fd.Close()
}
}
// Flush the writer
func (logger *Logger) Flush() {
if !logger.sync {
// send flush signal
logger.flush <- true
// wait for flush finish
<-logger.flush
}
}
// Getter functions
func (logger *Logger) Name() string {
return logger.name
}
func (logger *Logger) StartTime() int64 {
return logger.startTime.UnixNano()
}
func (logger *Logger) TimeFormat() string {
return logger.timeFormat
}
func (logger *Logger) Level() Level {
return Level(atomic.LoadInt32((*int32)(&logger.level)))
}
func (logger *Logger) RecordFormat() string {
return logger.recordFormat
}
func (logger *Logger) RecordArgs() []string {
return logger.recordArgs
}
func (logger *Logger) Writer() io.Writer {
return logger.out
}
func (logger *Logger) Sync() bool {
return logger.sync
}
// Setter functions
func (logger *Logger) SetLevel(level Level) {
atomic.StoreInt32((*int32)(&logger.level), int32(level))
}
func (logger *Logger) SetWriter(out ...io.Writer) {
logger.out = io.MultiWriter(out...)
}

View File

@ -1,71 +0,0 @@
// Copyright 2013, Cong Ding. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// author: Cong Ding <dinggnu@gmail.com>
package logging
import (
"fmt"
"os"
"testing"
)
func BenchmarkSync(b *testing.B) {
logger, _ := RichLogger("main")
logger.SetLevel(NOTSET)
for i := 0; i < b.N; i++ {
logger.Error("this is a test from error")
}
logger.Flush()
logger.Destroy()
}
func BenchmarkAsync(b *testing.B) {
logger, _ := RichLogger("main")
logger.SetLevel(NOTSET)
for i := 0; i < b.N; i++ {
logger.Error("this is a test from error")
}
logger.Flush()
logger.Destroy()
}
func BenchmarkBasicSync(b *testing.B) {
logger, _ := BasicLogger("main")
logger.SetLevel(NOTSET)
for i := 0; i < b.N; i++ {
logger.Error("this is a test from error")
}
logger.Flush()
logger.Destroy()
}
func BenchmarkBasicAsync(b *testing.B) {
logger, _ := BasicLogger("main")
logger.SetLevel(NOTSET)
for i := 0; i < b.N; i++ {
logger.Error("this is a test from error")
}
logger.Flush()
logger.Destroy()
}
func BenchmarkPrintln(b *testing.B) {
out, _ := os.Create("logging.log")
for i := 0; i < b.N; i++ {
fmt.Fprintln(out, "this is a test from error")
}
out.Close()
}

View File

@ -1,24 +0,0 @@
// Copyright 2013, Cong Ding. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// author: Cong Ding <dinggnu@gmail.com>
package logging
// request struct stores the logger request
type request struct {
level Level
format string
v []interface{}
}

View File

@ -1,130 +0,0 @@
// Copyright 2013, Cong Ding. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// author: Cong Ding <dinggnu@gmail.com>
package logging
import (
"bytes"
"fmt"
"sync/atomic"
"time"
)
// watcher watches the logger.queue channel, and writes the logs to output
func (logger *Logger) watcher() {
var buf bytes.Buffer
for {
timeout := time.After(time.Second / 10)
for i := 0; i < bufSize; i++ {
select {
case msg := <-logger.queue:
fmt.Fprintln(&buf, msg)
case req := <-logger.request:
logger.flushReq(&buf, &req)
case <-timeout:
i = bufSize
case <-logger.flush:
logger.flushBuf(&buf)
logger.flush <- true
i = bufSize
case <-logger.quit:
// If quit signal received, cleans the channel
// and writes all of them to io.Writer.
for {
select {
case msg := <-logger.queue:
fmt.Fprintln(&buf, msg)
case req := <-logger.request:
logger.flushReq(&buf, &req)
case <-logger.flush:
// do nothing
default:
logger.flushBuf(&buf)
logger.quit <- true
return
}
}
}
}
logger.flushBuf(&buf)
}
}
// flushBuf flushes the content of buffer to out and reset the buffer
func (logger *Logger) flushBuf(b *bytes.Buffer) {
if len(b.Bytes()) > 0 {
logger.out.Write(b.Bytes())
b.Reset()
}
}
// flushReq handles the request and writes the result to writer
func (logger *Logger) flushReq(b *bytes.Buffer, req *request) {
if req.format == "" {
msg := fmt.Sprint(req.v...)
msg = logger.genLog(req.level, msg)
fmt.Fprintln(b, msg)
} else {
msg := fmt.Sprintf(req.format, req.v...)
msg = logger.genLog(req.level, msg)
fmt.Fprintln(b, msg)
}
}
// flushMsg is to print log to file, stdout, or others.
func (logger *Logger) flushMsg(message string) {
if logger.sync {
logger.wlock.Lock()
defer logger.wlock.Unlock()
fmt.Fprintln(logger.out, message)
} else {
logger.queue <- message
}
}
// log records log v... with level `level'.
func (logger *Logger) log(level Level, v ...interface{}) {
if int32(level) >= atomic.LoadInt32((*int32)(&logger.level)) {
if logger.runtime || logger.sync {
message := fmt.Sprint(v...)
message = logger.genLog(level, message)
logger.flushMsg(message)
} else {
r := new(request)
r.level = level
r.v = v
logger.request <- *r
}
}
}
// logf records log v... with level `level'.
func (logger *Logger) logf(level Level, format string, v ...interface{}) {
if int32(level) >= atomic.LoadInt32((*int32)(&logger.level)) {
if logger.runtime || logger.sync {
message := fmt.Sprintf(format, v...)
message = logger.genLog(level, message)
logger.flushMsg(message)
} else {
r := new(request)
r.level = level
r.format = format
r.v = v
logger.request <- *r
}
}
}

View File

@ -2,4 +2,49 @@
golang client library for etcd
This etcd client library is under heavy development. Check back soon for more docs. In the meantime, check out [etcd](https://github.com/coreos/etcd) for details on the client protocol.
This etcd client library is under heavy development. Check back soon for more
docs. In the meantime, check out [etcd](https://github.com/coreos/etcd) for
details on the client protocol.
For usage see example below or look at godoc: [go-etcd/etcd](http://godoc.org/github.com/coreos/go-etcd/etcd)
## Install
```bash
go get github.com/coreos/go-etcd/etcd
```
## Examples
Returning error values are not showed for the sake of simplicity, but you
should check them.
```go
package main
import (
"fmt"
"github.com/coreos/go-etcd/etcd"
)
func main() {
c := etcd.NewClient() // default binds to http://0.0.0.0:4001
// SET the value "bar" to the key "foo" with zero TTL
// returns a: *store.Response
res, _ := c.Set("foo", "bar", 0)
fmt.Printf("set response: %+v\n", res)
// GET the value that is stored for the key "foo"
// return a slice: []*store.Response
values, _ := c.Get("foo")
for i, res := range values { // .. and print them out
fmt.Printf("[%d] get response: %+v\n", i, res)
}
// DELETE the key "foo"
// returns a: *store.Response
res, _ = c.Delete("foo")
fmt.Printf("delete response: %+v\n", res)
}
```

View File

@ -40,10 +40,9 @@ func NewClient() *Client {
// default leader and machines
cluster := Cluster{
Leader: "http://0.0.0.0:4001",
Machines: make([]string, 1),
Leader: "http://127.0.0.1:4001",
Machines: []string{"http://127.0.0.1:4001"},
}
cluster.Machines[0] = "http://0.0.0.0:4001"
config := Config{
// default use http
@ -117,7 +116,7 @@ func (c *Client) SyncCluster() bool {
// sync cluster information by providing machine list
func (c *Client) internalSyncCluster(machines []string) bool {
for _, machine := range machines {
httpPath := c.createHttpPath(machine, "v1/machines")
httpPath := c.createHttpPath(machine, version+"/machines")
resp, err := c.httpClient.Get(httpPath)
if err != nil {
// try another machine in the cluster
@ -236,11 +235,12 @@ func (c *Client) sendRequest(method string, _path string, body string) (*http.Re
// try to connect the leader
continue
} else if resp.StatusCode == http.StatusInternalServerError {
resp.Body.Close()
retry++
if retry > 2*len(c.cluster.Machines) {
return nil, errors.New("Cannot reach servers")
}
resp.Body.Close()
continue
} else {
logger.Debug("send.return.response ", httpPath)

View File

@ -1,19 +1,27 @@
package etcd
import (
"github.com/ccding/go-logging/logging"
"github.com/coreos/go-log/log"
"os"
)
var logger, _ = logging.SimpleLogger("go-etcd")
var logger *log.Logger
func init() {
logger.SetLevel(logging.FATAL)
setLogger(log.PriErr)
}
func OpenDebug() {
logger.SetLevel(logging.NOTSET)
setLogger(log.PriDebug)
}
func CloseDebug() {
logger.SetLevel(logging.FATAL)
setLogger(log.PriErr)
}
func setLogger(priority log.Priority) {
logger = log.NewSimple(
log.PriorityFilter(
priority,
log.WriterSink(os.Stdout, log.BasicFormat, log.BasicFields)))
}

View File

@ -1,3 +1,3 @@
package etcd
var version = "v1"
const version = "v1"

View File

@ -0,0 +1,214 @@
package log
// Copyright 2013, David Fisher. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// author: David Fisher <ddf1991@gmail.com>
// based on previous package by: Cong Ding <dinggnu@gmail.com>
import (
"fmt"
"os"
)
var BasicFormat = "%s [%9s] %s- %s\n"
var BasicFields = []string{"time", "priority", "prefix", "message"}
var RichFormat = "%s [%9s] %d %s - %s:%s:%d - %s\n"
var RichFields = []string{"full_time", "priority", "seq", "prefix", "filename", "funcname", "lineno", "message"}
// This function has an unusual name to aid in finding it while walking the
// stack. We need to do some dead reckoning from this function to access the
// caller's stack, so there is a consistent call depth above this function.
func (logger *Logger) Log(priority Priority, v ...interface{}) {
fields := logger.fieldValues()
fields["priority"] = priority
fields["message"] = fmt.Sprint(v...)
for _, sink := range logger.sinks {
sink.Log(fields)
}
}
func (logger *Logger) Logf(priority Priority, format string, v ...interface{}) {
logger.Log(priority, fmt.Sprintf(format, v...))
}
func (logger *Logger) Emergency(v ...interface{}) {
logger.Log(PriEmerg, v...)
}
func (logger *Logger) Emergencyf(format string, v ...interface{}) {
logger.Log(PriEmerg, fmt.Sprintf(format, v...))
}
func (logger *Logger) Alert(v ...interface{}) {
logger.Log(PriAlert, v...)
}
func (logger *Logger) Alertf(format string, v ...interface{}) {
logger.Log(PriAlert, fmt.Sprintf(format, v...))
}
func (logger *Logger) Critical(v ...interface{}) {
logger.Log(PriCrit, v...)
}
func (logger *Logger) Criticalf(format string, v ...interface{}) {
logger.Log(PriCrit, fmt.Sprintf(format, v...))
}
func (logger *Logger) Error(v ...interface{}) {
logger.Log(PriErr, v...)
}
func (logger *Logger) Errorf(format string, v ...interface{}) {
logger.Log(PriErr, fmt.Sprintf(format, v...))
}
func (logger *Logger) Warning(v ...interface{}) {
logger.Log(PriWarning, v...)
}
func (logger *Logger) Warningf(format string, v ...interface{}) {
logger.Log(PriWarning, fmt.Sprintf(format, v...))
}
func (logger *Logger) Notice(v ...interface{}) {
logger.Log(PriNotice, v...)
}
func (logger *Logger) Noticef(format string, v ...interface{}) {
logger.Log(PriNotice, fmt.Sprintf(format, v...))
}
func (logger *Logger) Info(v ...interface{}) {
logger.Log(PriInfo, v...)
}
func (logger *Logger) Infof(format string, v ...interface{}) {
logger.Log(PriInfo, fmt.Sprintf(format, v...))
}
func (logger *Logger) Debug(v ...interface{}) {
logger.Log(PriDebug, v...)
}
func (logger *Logger) Debugf(format string, v ...interface{}) {
logger.Log(PriDebug, fmt.Sprintf(format, v...))
}
func Emergency(v ...interface{}) {
defaultLogger.Log(PriEmerg, v...)
}
func Emergencyf(format string, v ...interface{}) {
defaultLogger.Log(PriEmerg, fmt.Sprintf(format, v...))
}
func Alert(v ...interface{}) {
defaultLogger.Log(PriAlert, v...)
}
func Alertf(format string, v ...interface{}) {
defaultLogger.Log(PriAlert, fmt.Sprintf(format, v...))
}
func Critical(v ...interface{}) {
defaultLogger.Log(PriCrit, v...)
}
func Criticalf(format string, v ...interface{}) {
defaultLogger.Log(PriCrit, fmt.Sprintf(format, v...))
}
func Error(v ...interface{}) {
defaultLogger.Log(PriErr, v...)
}
func Errorf(format string, v ...interface{}) {
defaultLogger.Log(PriErr, fmt.Sprintf(format, v...))
}
func Warning(v ...interface{}) {
defaultLogger.Log(PriWarning, v...)
}
func Warningf(format string, v ...interface{}) {
defaultLogger.Log(PriWarning, fmt.Sprintf(format, v...))
}
func Notice(v ...interface{}) {
defaultLogger.Log(PriNotice, v...)
}
func Noticef(format string, v ...interface{}) {
defaultLogger.Log(PriNotice, fmt.Sprintf(format, v...))
}
func Info(v ...interface{}) {
defaultLogger.Log(PriInfo, v...)
}
func Infof(format string, v ...interface{}) {
defaultLogger.Log(PriInfo, fmt.Sprintf(format, v...))
}
func Debug(v ...interface{}) {
defaultLogger.Log(PriDebug, v...)
}
func Debugf(format string, v ...interface{}) {
defaultLogger.Log(PriDebug, fmt.Sprintf(format, v...))
}
// Standard library log functions
func (logger *Logger)Fatalln (v ...interface{}) {
logger.Log(PriCrit, v...)
os.Exit(1)
}
func (logger *Logger)Fatalf (format string, v ...interface{}) {
logger.Logf(PriCrit, format, v...)
os.Exit(1)
}
func (logger *Logger)Panicln (v ...interface{}) {
s := fmt.Sprint(v...)
logger.Log(PriErr, s)
panic(s)
}
func (logger *Logger)Panicf (format string, v ...interface{}) {
s := fmt.Sprintf(format, v...)
logger.Log(PriErr, s)
panic(s)
}
func (logger *Logger)Println (v ...interface{}) {
logger.Log(PriInfo, v...)
}
func (logger *Logger)Printf (format string, v ...interface{}) {
logger.Logf(PriInfo, format, v...)
}
func Fatalln (v ...interface{}) {
defaultLogger.Log(PriCrit, v...)
os.Exit(1)
}
func Fatalf (format string, v ...interface{}) {
defaultLogger.Logf(PriCrit, format, v...)
os.Exit(1)
}
func Panicln (v ...interface{}) {
s := fmt.Sprint(v...)
defaultLogger.Log(PriErr, s)
panic(s)
}
func Panicf (format string, v ...interface{}) {
s := fmt.Sprintf(format, v...)
defaultLogger.Log(PriErr, s)
panic(s)
}
func Println (v ...interface{}) {
defaultLogger.Log(PriInfo, v...)
}
func Printf (format string, v ...interface{}) {
defaultLogger.Logf(PriInfo, format, v...)
}

View File

@ -0,0 +1,69 @@
package log
// Copyright 2013, David Fisher. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// author: David Fisher <ddf1991@gmail.com>
// based on previous package by: Cong Ding <dinggnu@gmail.com>
import (
"os"
"path"
"runtime"
"strings"
"sync/atomic"
"time"
)
type Fields map[string]interface{}
func (logger *Logger) fieldValues() Fields {
now := time.Now()
fields := Fields{
"prefix": logger.prefix, // static field available to all sinks
"seq": logger.nextSeq(), // auto-incrementing sequence number
"start_time": logger.created, // start time of the logger
"time": now.Format(time.StampMilli), // formatted time of log entry
"full_time": now, // time of log entry
"rtime": time.Since(logger.created), // relative time of log entry since started
"pid": os.Getpid(), // process id
"executable": logger.executable, // executable filename
}
if logger.verbose {
setVerboseFields(fields)
}
return fields
}
func (logger *Logger) nextSeq() uint64 {
return atomic.AddUint64(&logger.seq, 1)
}
func setVerboseFields(fields Fields) {
callers := make([]uintptr, 10)
n := runtime.Callers(3, callers) // starts in (*Logger).Log or similar
callers = callers[:n]
for _, pc := range callers {
f := runtime.FuncForPC(pc)
if !strings.Contains(f.Name(), "logger.(*Logger)") {
fields["funcname"] = f.Name()
pathname, lineno := f.FileLine(pc)
fields["lineno"] = lineno
fields["pathname"] = pathname
fields["filename"] = path.Base(pathname)
return
}
}
}

View File

@ -0,0 +1,73 @@
package log
// Copyright 2013, David Fisher. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// author: David Fisher <ddf1991@gmail.com>
// based on previous package by: Cong Ding <dinggnu@gmail.com>
import (
"bitbucket.org/kardianos/osext"
"os"
"path"
"time"
)
// Logger is user-immutable immutable struct which can log to several outputs
type Logger struct {
sinks []Sink // the sinks this logger will log to
verbose bool // gather expensive logging data?
prefix string // static field available to all log sinks under this logger
created time.Time // time when this logger was created
seq uint64 // sequential number of log message, starting at 1
executable string // executable name
}
// New creates a new Logger which logs to all the supplied sinks. The prefix
// argument is passed to all loggers under the field "prefix" with every log
// message. If verbose is true, more expensive runtime fields will be computed
// and passed to loggers. These fields are funcname, lineno, pathname, and
// filename.
func New(prefix string, verbose bool, sinks ...Sink) *Logger {
return &Logger{
sinks: sinks,
verbose: verbose,
prefix: prefix,
created: time.Now(),
seq: 0,
executable: getExecutableName(),
}
}
func getExecutableName() string {
executablePath, err := osext.Executable()
if err != nil {
return "(UNKNOWN)"
} else {
return path.Base(executablePath)
}
}
// NewSimple(sinks...) is equivalent to New("", false, sinks...)
func NewSimple(sinks ...Sink) *Logger {
return New("", false, sinks...)
}
var defaultLogger *Logger
func init() {
defaultLogger = NewSimple(CombinedSink(os.Stdout, BasicFormat, BasicFields))
}

View File

@ -0,0 +1,54 @@
package log
// Copyright 2013, David Fisher. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// author: David Fisher <ddf1991@gmail.com>
// based on previous package by: Cong Ding <dinggnu@gmail.com>
type Priority int
const (
PriEmerg Priority = iota
PriAlert
PriCrit
PriErr
PriWarning
PriNotice
PriInfo
PriDebug
)
func (priority Priority) String() string {
switch priority {
case PriEmerg:
return "EMERGENCY"
case PriAlert:
return "ALERT"
case PriCrit:
return "CRITICAL"
case PriErr:
return "ERROR"
case PriWarning:
return "WARNING"
case PriNotice:
return "NOTICE"
case PriInfo:
return "INFO"
case PriDebug:
return "DEBUG"
default:
return "UNKNOWN"
}
}

View File

@ -0,0 +1,154 @@
package log
// Copyright 2013, David Fisher. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// author: David Fisher <ddf1991@gmail.com>
// based on previous package by: Cong Ding <dinggnu@gmail.com>
import (
"fmt"
"github.com/coreos/go-systemd/journal"
"io"
"strings"
"sync"
)
const AsyncBuffer = 100
type Sink interface {
Log(Fields)
}
type nullSink struct{}
func (sink *nullSink) Log(fields Fields) {}
func NullSink() Sink {
return &nullSink{}
}
type writerSink struct {
lock sync.Mutex
out io.Writer
format string
fields []string
}
func (sink *writerSink) Log(fields Fields) {
vals := make([]interface{}, len(sink.fields))
for i, field := range sink.fields {
var ok bool
vals[i], ok = fields[field]
if !ok {
vals[i] = "???"
}
}
sink.lock.Lock()
defer sink.lock.Unlock()
fmt.Fprintf(sink.out, sink.format, vals...)
}
func WriterSink(out io.Writer, format string, fields []string) Sink {
return &writerSink{
out: out,
format: format,
fields: fields,
}
}
type journalSink struct{}
func (sink *journalSink) Log(fields Fields) {
message := fields["message"].(string)
priority := toJournalPriority(fields["priority"].(Priority))
journalFields := make(map[string]string)
for k, v := range fields {
if k == "message" || k == "priority" {
continue
}
journalFields[strings.ToUpper(k)] = fmt.Sprint(v)
}
journal.Send(message, priority, journalFields)
}
func toJournalPriority(priority Priority) journal.Priority {
switch priority {
case PriEmerg:
return journal.PriEmerg
case PriAlert:
return journal.PriAlert
case PriCrit:
return journal.PriCrit
case PriErr:
return journal.PriErr
case PriWarning:
return journal.PriWarning
case PriNotice:
return journal.PriNotice
case PriInfo:
return journal.PriInfo
case PriDebug:
return journal.PriDebug
default:
return journal.PriErr
}
}
func JournalSink() Sink {
return &journalSink{}
}
type combinedSink struct {
sinks []Sink
}
func (sink *combinedSink) Log(fields Fields) {
for _, s := range sink.sinks {
s.Log(fields)
}
}
func CombinedSink(writer io.Writer, format string, fields []string) Sink {
sinks := make([]Sink, 0)
sinks = append(sinks, WriterSink(writer, format, fields))
if journal.Enabled() {
sinks = append(sinks, JournalSink())
}
return &combinedSink{
sinks: sinks,
}
}
type priorityFilter struct {
priority Priority
target Sink
}
func (filter *priorityFilter) Log(fields Fields) {
// lower priority values indicate more important messages
if fields["priority"].(Priority) <= filter.priority {
filter.target.Log(fields)
}
}
func PriorityFilter(priority Priority, target Sink) Sink {
return &priorityFilter{
priority: priority,
target: target,
}
}

View File

@ -31,7 +31,7 @@ func newAppendEntriesRequest(term uint64, prevLogIndex uint64, prevLogTerm uint6
// Encodes the AppendEntriesRequest to a buffer. Returns the number of bytes
// written and any error that may have occurred.
func (req *AppendEntriesRequest) encode(w io.Writer) (int, error) {
func (req *AppendEntriesRequest) Encode(w io.Writer) (int, error) {
protoEntries := make([]*protobuf.ProtoAppendEntriesRequest_ProtoLogEntry, len(req.Entries))
@ -63,7 +63,7 @@ func (req *AppendEntriesRequest) encode(w io.Writer) (int, error) {
// Decodes the AppendEntriesRequest from a buffer. Returns the number of bytes read and
// any error that occurs.
func (req *AppendEntriesRequest) decode(r io.Reader) (int, error) {
func (req *AppendEntriesRequest) Decode(r io.Reader) (int, error) {
data, err := ioutil.ReadAll(r)
if err != nil {

View File

@ -10,7 +10,7 @@ func BenchmarkAppendEntriesRequestEncoding(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
var buf bytes.Buffer
req.encode(&buf)
req.Encode(&buf)
}
b.SetBytes(int64(len(tmp)))
}
@ -19,7 +19,7 @@ func BenchmarkAppendEntriesRequestDecoding(b *testing.B) {
req, buf := createTestAppendEntriesRequest(2000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
req.decode(bytes.NewReader(buf))
req.Decode(bytes.NewReader(buf))
}
b.SetBytes(int64(len(buf)))
}
@ -34,7 +34,7 @@ func createTestAppendEntriesRequest(entryCount int) (*AppendEntriesRequest, []by
req := newAppendEntriesRequest(1, 1, 1, 1, "leader", entries)
var buf bytes.Buffer
req.encode(&buf)
req.Encode(&buf)
return req, buf.Bytes()
}

View File

@ -30,7 +30,7 @@ func newAppendEntriesResponse(term uint64, success bool, index uint64, commitInd
// Encodes the AppendEntriesResponse to a buffer. Returns the number of bytes
// written and any error that may have occurred.
func (resp *AppendEntriesResponse) encode(w io.Writer) (int, error) {
func (resp *AppendEntriesResponse) Encode(w io.Writer) (int, error) {
pb := &protobuf.ProtoAppendEntriesResponse{
Term: proto.Uint64(resp.Term),
Index: proto.Uint64(resp.Index),
@ -47,7 +47,7 @@ func (resp *AppendEntriesResponse) encode(w io.Writer) (int, error) {
// Decodes the AppendEntriesResponse from a buffer. Returns the number of bytes read and
// any error that occurs.
func (resp *AppendEntriesResponse) decode(r io.Reader) (int, error) {
func (resp *AppendEntriesResponse) Decode(r io.Reader) (int, error) {
data, err := ioutil.ReadAll(r)
if err != nil {

View File

@ -10,7 +10,7 @@ func BenchmarkAppendEntriesResponseEncoding(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
var buf bytes.Buffer
req.encode(&buf)
req.Encode(&buf)
}
b.SetBytes(int64(len(tmp)))
}
@ -19,7 +19,7 @@ func BenchmarkAppendEntriesResponseDecoding(b *testing.B) {
req, buf := createTestAppendEntriesResponse(2000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
req.decode(bytes.NewReader(buf))
req.Decode(bytes.NewReader(buf))
}
b.SetBytes(int64(len(buf)))
}
@ -28,7 +28,7 @@ func createTestAppendEntriesResponse(entryCount int) (*AppendEntriesResponse, []
resp := newAppendEntriesResponse(1, true, 1, 1)
var buf bytes.Buffer
resp.encode(&buf)
resp.Encode(&buf)
return resp, buf.Bytes()
}

View File

@ -89,7 +89,7 @@ func (t *HTTPTransporter) Install(server *Server, mux HTTPMuxer) {
// Sends an AppendEntries RPC to a peer.
func (t *HTTPTransporter) SendAppendEntriesRequest(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
var b bytes.Buffer
if _, err := req.encode(&b); err != nil {
if _, err := req.Encode(&b); err != nil {
traceln("transporter.ae.encoding.error:", err)
return nil
}
@ -106,7 +106,7 @@ func (t *HTTPTransporter) SendAppendEntriesRequest(server *Server, peer *Peer, r
defer httpResp.Body.Close()
resp := &AppendEntriesResponse{}
if _, err = resp.decode(httpResp.Body); err != nil && err != io.EOF {
if _, err = resp.Decode(httpResp.Body); err != nil && err != io.EOF {
traceln("transporter.ae.decoding.error:", err)
return nil
}
@ -117,7 +117,7 @@ func (t *HTTPTransporter) SendAppendEntriesRequest(server *Server, peer *Peer, r
// Sends a RequestVote RPC to a peer.
func (t *HTTPTransporter) SendVoteRequest(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
var b bytes.Buffer
if _, err := req.encode(&b); err != nil {
if _, err := req.Encode(&b); err != nil {
traceln("transporter.rv.encoding.error:", err)
return nil
}
@ -134,7 +134,7 @@ func (t *HTTPTransporter) SendVoteRequest(server *Server, peer *Peer, req *Reque
defer httpResp.Body.Close()
resp := &RequestVoteResponse{}
if _, err = resp.decode(httpResp.Body); err != nil && err != io.EOF {
if _, err = resp.Decode(httpResp.Body); err != nil && err != io.EOF {
traceln("transporter.rv.decoding.error:", err)
return nil
}
@ -162,13 +162,13 @@ func (t *HTTPTransporter) appendEntriesHandler(server *Server) http.HandlerFunc
traceln(server.Name(), "RECV /appendEntries")
req := &AppendEntriesRequest{}
if _, err := req.decode(r.Body); err != nil {
if _, err := req.Decode(r.Body); err != nil {
http.Error(w, "", http.StatusBadRequest)
return
}
resp := server.AppendEntries(req)
if _, err := resp.encode(w); err != nil {
if _, err := resp.Encode(w); err != nil {
http.Error(w, "", http.StatusInternalServerError)
return
}
@ -181,13 +181,13 @@ func (t *HTTPTransporter) requestVoteHandler(server *Server) http.HandlerFunc {
traceln(server.Name(), "RECV /requestVote")
req := &RequestVoteRequest{}
if _, err := req.decode(r.Body); err != nil {
if _, err := req.Decode(r.Body); err != nil {
http.Error(w, "", http.StatusBadRequest)
return
}
resp := server.RequestVote(req)
if _, err := resp.encode(w); err != nil {
if _, err := resp.Encode(w); err != nil {
http.Error(w, "", http.StatusInternalServerError)
return
}

View File

@ -180,26 +180,23 @@ func (l *Log) open(path string) error {
}
break
}
// Append entry.
l.entries = append(l.entries, entry)
if entry.Index <= l.commitIndex {
command, err := newCommand(entry.CommandName, entry.Command)
if err != nil {
continue
if entry.Index > l.startIndex {
// Append entry.
l.entries = append(l.entries, entry)
if entry.Index <= l.commitIndex {
command, err := newCommand(entry.CommandName, entry.Command)
if err != nil {
continue
}
l.ApplyFunc(command)
}
l.ApplyFunc(command)
debugln("open.log.append log index ", entry.Index)
}
debugln("open.log.append log index ", entry.Index)
readBytes += int64(n)
}
l.results = make([]*logResult, len(l.entries))
l.compact(l.startIndex, l.startTerm)
debugln("open.log.recovery number of log ", len(l.entries))
return nil
}
@ -273,6 +270,8 @@ func (l *Log) getEntriesAfter(index uint64, maxLogEntriesPerRequest uint64) ([]*
entries := l.entries[index-l.startIndex:]
length := len(entries)
traceln("log.entriesAfter: startIndex:", l.startIndex, " lenght", len(l.entries))
if uint64(length) < maxLogEntriesPerRequest {
// Determine the term at the given entry and return a subslice.
return entries, l.entries[index-1-l.startIndex].Term
@ -353,7 +352,10 @@ func (l *Log) lastInfo() (index uint64, term uint64) {
func (l *Log) updateCommitIndex(index uint64) {
l.mutex.Lock()
defer l.mutex.Unlock()
l.commitIndex = index
if index > l.commitIndex {
l.commitIndex = index
}
debugln("update.commit.index ", index)
}
// Updates the commit index and writes entries after that index to the stable storage.

View File

@ -255,6 +255,12 @@ func (p *Peer) sendSnapshotRecoveryRequest() {
req := newSnapshotRecoveryRequest(p.server.name, p.server.lastSnapshot)
debugln("peer.snap.recovery.send: ", p.Name)
resp := p.server.Transporter().SendSnapshotRecoveryRequest(p.server, p, req)
if resp == nil {
debugln("peer.snap.recovery.timeout: ", p.Name)
return
}
if resp.Success {
p.prevLogIndex = req.LastIndex
} else {

View File

@ -28,7 +28,7 @@ func newRequestVoteRequest(term uint64, candidateName string, lastLogIndex uint6
// Encodes the RequestVoteRequest to a buffer. Returns the number of bytes
// written and any error that may have occurred.
func (req *RequestVoteRequest) encode(w io.Writer) (int, error) {
func (req *RequestVoteRequest) Encode(w io.Writer) (int, error) {
pb := &protobuf.ProtoRequestVoteRequest{
Term: proto.Uint64(req.Term),
LastLogIndex: proto.Uint64(req.LastLogIndex),
@ -45,7 +45,7 @@ func (req *RequestVoteRequest) encode(w io.Writer) (int, error) {
// Decodes the RequestVoteRequest from a buffer. Returns the number of bytes read and
// any error that occurs.
func (req *RequestVoteRequest) decode(r io.Reader) (int, error) {
func (req *RequestVoteRequest) Decode(r io.Reader) (int, error) {
data, err := ioutil.ReadAll(r)
if err != nil {

View File

@ -24,7 +24,7 @@ func newRequestVoteResponse(term uint64, voteGranted bool) *RequestVoteResponse
// Encodes the RequestVoteResponse to a buffer. Returns the number of bytes
// written and any error that may have occurred.
func (resp *RequestVoteResponse) encode(w io.Writer) (int, error) {
func (resp *RequestVoteResponse) Encode(w io.Writer) (int, error) {
pb := &protobuf.ProtoRequestVoteResponse{
Term: proto.Uint64(resp.Term),
VoteGranted: proto.Bool(resp.VoteGranted),
@ -40,7 +40,7 @@ func (resp *RequestVoteResponse) encode(w io.Writer) (int, error) {
// Decodes the RequestVoteResponse from a buffer. Returns the number of bytes read and
// any error that occurs.
func (resp *RequestVoteResponse) decode(r io.Reader) (int, error) {
func (resp *RequestVoteResponse) Decode(r io.Reader) (int, error) {
data, err := ioutil.ReadAll(r)
if err != nil {

View File

@ -80,6 +80,8 @@ type Server struct {
lastSnapshot *Snapshot
stateMachine StateMachine
maxLogEntriesPerRequest uint64
connectionString string
}
// An event to be processed by the server's event loop.
@ -96,7 +98,7 @@ type event struct {
//------------------------------------------------------------------------------
// Creates a new server with a log at the given path.
func NewServer(name string, path string, transporter Transporter, stateMachine StateMachine, context interface{}) (*Server, error) {
func NewServer(name string, path string, transporter Transporter, stateMachine StateMachine, context interface{}, connectionString string) (*Server, error) {
if name == "" {
return nil, errors.New("raft.Server: Name cannot be blank")
}
@ -117,6 +119,7 @@ func NewServer(name string, path string, transporter Transporter, stateMachine S
electionTimeout: DefaultElectionTimeout,
heartbeatTimeout: DefaultHeartbeatTimeout,
maxLogEntriesPerRequest: MaxLogEntriesPerRequest,
connectionString: connectionString,
}
// Setup apply function.
@ -1009,10 +1012,17 @@ func (s *Server) TakeSnapshot() error {
state = []byte{0}
}
var peers []*Peer
peers := make([]*Peer, len(s.peers)+1)
i := 0
for _, peer := range s.peers {
peers = append(peers, peer.clone())
peers[i] = peer.clone()
i++
}
peers[i] = &Peer{
Name: s.Name(),
ConnectionString: s.connectionString,
}
s.currentSnapshot = &Snapshot{lastIndex, lastTerm, peers, state, path}
@ -1253,7 +1263,7 @@ func (s *Server) readConf() error {
return err
}
s.log.commitIndex = conf.CommitIndex
s.log.updateCommitIndex(conf.CommitIndex)
return nil
}

View File

@ -428,7 +428,7 @@ func TestServerRecoverFromPreviousLogAndConf(t *testing.T) {
for _, name := range names {
server := servers[name]
if server.CommitIndex() != 17 {
t.Fatalf("%s commitIndex is invalid [%d/%d]", name, server.CommitIndex(), 16)
t.Fatalf("%s commitIndex is invalid [%d/%d]", name, server.CommitIndex(), 17)
}
server.Stop()
}

View File

@ -35,7 +35,7 @@ func newSnapshotRecoveryRequest(leaderName string, snapshot *Snapshot) *Snapshot
// Encodes the SnapshotRecoveryRequest to a buffer. Returns the number of bytes
// written and any error that may have occurred.
func (req *SnapshotRecoveryRequest) encode(w io.Writer) (int, error) {
func (req *SnapshotRecoveryRequest) Encode(w io.Writer) (int, error) {
protoPeers := make([]*protobuf.ProtoSnapshotRecoveryRequest_ProtoPeer, len(req.Peers))
@ -63,7 +63,7 @@ func (req *SnapshotRecoveryRequest) encode(w io.Writer) (int, error) {
// Decodes the SnapshotRecoveryRequest from a buffer. Returns the number of bytes read and
// any error that occurs.
func (req *SnapshotRecoveryRequest) decode(r io.Reader) (int, error) {
func (req *SnapshotRecoveryRequest) Decode(r io.Reader) (int, error) {
data, err := ioutil.ReadAll(r)
if err != nil {

View File

@ -31,7 +31,7 @@ func newSnapshotRecoveryResponse(term uint64, success bool, commitIndex uint64)
// Encodes the SnapshotRecoveryResponse to a buffer. Returns the number of bytes
// written and any error that may have occurred.
func (req *SnapshotRecoveryResponse) encode(w io.Writer) (int, error) {
func (req *SnapshotRecoveryResponse) Encode(w io.Writer) (int, error) {
pb := &protobuf.ProtoSnapshotRecoveryResponse{
Term: proto.Uint64(req.Term),
Success: proto.Bool(req.Success),
@ -47,7 +47,7 @@ func (req *SnapshotRecoveryResponse) encode(w io.Writer) (int, error) {
// Decodes the SnapshotRecoveryResponse from a buffer. Returns the number of bytes read and
// any error that occurs.
func (req *SnapshotRecoveryResponse) decode(r io.Reader) (int, error) {
func (req *SnapshotRecoveryResponse) Decode(r io.Reader) (int, error) {
data, err := ioutil.ReadAll(r)
if err != nil {

View File

@ -31,7 +31,7 @@ func newSnapshotRequest(leaderName string, snapshot *Snapshot) *SnapshotRequest
// Encodes the SnapshotRequest to a buffer. Returns the number of bytes
// written and any error that may have occurred.
func (req *SnapshotRequest) encode(w io.Writer) (int, error) {
func (req *SnapshotRequest) Encode(w io.Writer) (int, error) {
pb := &protobuf.ProtoSnapshotRequest{
LeaderName: proto.String(req.LeaderName),
LastIndex: proto.Uint64(req.LastIndex),
@ -47,7 +47,7 @@ func (req *SnapshotRequest) encode(w io.Writer) (int, error) {
// Decodes the SnapshotRequest from a buffer. Returns the number of bytes read and
// any error that occurs.
func (req *SnapshotRequest) decode(r io.Reader) (int, error) {
func (req *SnapshotRequest) Decode(r io.Reader) (int, error) {
data, err := ioutil.ReadAll(r)
if err != nil {

View File

@ -27,7 +27,7 @@ func newSnapshotResponse(success bool) *SnapshotResponse {
// Encodes the SnapshotResponse to a buffer. Returns the number of bytes
// written and any error that may have occurred.
func (resp *SnapshotResponse) encode(w io.Writer) (int, error) {
func (resp *SnapshotResponse) Encode(w io.Writer) (int, error) {
pb := &protobuf.ProtoSnapshotResponse{
Success: proto.Bool(resp.Success),
}
@ -41,7 +41,7 @@ func (resp *SnapshotResponse) encode(w io.Writer) (int, error) {
// Decodes the SnapshotResponse from a buffer. Returns the number of bytes read and
// any error that occurs.
func (resp *SnapshotResponse) decode(r io.Reader) (int, error) {
func (resp *SnapshotResponse) Decode(r io.Reader) (int, error) {
data, err := ioutil.ReadAll(r)
if err != nil {

View File

@ -65,12 +65,12 @@ func newTestServer(name string, transporter Transporter) *Server {
if err := os.MkdirAll(p, 0644); err != nil {
panic(err.Error())
}
server, _ := NewServer(name, p, transporter, nil, nil)
server, _ := NewServer(name, p, transporter, nil, nil, "")
return server
}
func newTestServerWithPath(name string, transporter Transporter, p string) *Server {
server, _ := NewServer(name, p, transporter, nil, nil)
server, _ := NewServer(name, p, transporter, nil, nil, "")
return server
}

View File

@ -0,0 +1,3 @@
# go-systemd
Go bindings to systemd socket activation, journal and D-BUS APIs.

View File

@ -0,0 +1,29 @@
package activation
import (
"os"
"strconv"
"syscall"
)
// based on: https://gist.github.com/alberts/4640792
const (
listenFdsStart = 3
)
func Files() []*os.File {
pid, err := strconv.Atoi(os.Getenv("LISTEN_PID"))
if err != nil || pid != os.Getpid() {
return nil
}
nfds, err := strconv.Atoi(os.Getenv("LISTEN_FDS"))
if err != nil || nfds == 0 {
return nil
}
files := []*os.File(nil)
for fd := listenFdsStart; fd < listenFdsStart+nfds; fd++ {
syscall.CloseOnExec(fd)
files = append(files, os.NewFile(uintptr(fd), "LISTEN_FD_" + strconv.Itoa(fd)))
}
return files
}

View File

@ -0,0 +1,148 @@
// Package journal provides write bindings to the systemd journal
package journal
import (
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"os"
"strconv"
"strings"
"syscall"
"encoding/binary"
)
// Priority of a journal message
type Priority int
const (
PriEmerg Priority = iota
PriAlert
PriCrit
PriErr
PriWarning
PriNotice
PriInfo
PriDebug
)
var conn net.Conn
func init() {
conn, _ = net.Dial("unixgram", "/run/systemd/journal/socket")
}
// Enabled returns true iff the systemd journal is available for logging
func Enabled() bool {
return conn != nil
}
// Send a message to the systemd journal. vars is a map of journald fields to
// values. Fields must be composed of uppercase letters, numbers, and
// underscores, but must not start with an underscore. Within these
// restrictions, any arbitrary field name may be used. Some names have special
// significance: see the journalctl documentation
// (http://www.freedesktop.org/software/systemd/man/systemd.journal-fields.html)
// for more details. vars may be nil.
func Send(message string, priority Priority, vars map[string]string) error {
if conn == nil {
return journalError("could not connect to journald socket")
}
data := new(bytes.Buffer)
appendVariable(data, "PRIORITY", strconv.Itoa(int(priority)))
appendVariable(data, "MESSAGE", message)
for k, v := range vars {
appendVariable(data, k, v)
}
_, err := io.Copy(conn, data)
if err != nil && isSocketSpaceError(err) {
file, err := tempFd()
if err != nil {
return journalError(err.Error())
}
_, err = io.Copy(file, data)
if err != nil {
return journalError(err.Error())
}
rights := syscall.UnixRights(int(file.Fd()))
/* this connection should always be a UnixConn, but better safe than sorry */
unixConn, ok := conn.(*net.UnixConn)
if !ok {
return journalError("can't send file through non-Unix connection")
}
unixConn.WriteMsgUnix([]byte{}, rights, nil)
} else if err != nil {
return journalError(err.Error())
}
return nil
}
func appendVariable(w io.Writer, name, value string) {
if !validVarName(name) {
journalError("variable name contains invalid character, ignoring")
}
if strings.ContainsRune(value, '\n') {
/* When the value contains a newline, we write:
* - the variable name, followed by a newline
* - the size (in 64bit little endian format)
* - the data, followed by a newline
*/
fmt.Fprintln(w, name)
binary.Write(w, binary.LittleEndian, uint64(len(value)))
fmt.Fprintln(w, value)
} else {
/* just write the variable and value all on one line */
fmt.Fprintf(w, "%s=%s\n", name, value)
}
}
func validVarName(name string) bool {
/* The variable name must be in uppercase and consist only of characters,
* numbers and underscores, and may not begin with an underscore. (from the docs)
*/
valid := true
valid = valid && name[0] != '_'
for _, c := range name {
valid = valid && ('A' <= c && c <= 'Z') || ('0' <= c && c <= '9') || c == '_'
}
return valid
}
func isSocketSpaceError(err error) bool {
opErr, ok := err.(*net.OpError)
if !ok {
return false
}
sysErr, ok := opErr.Err.(syscall.Errno)
if !ok {
return false
}
return sysErr == syscall.EMSGSIZE || sysErr == syscall.ENOBUFS
}
func tempFd() (*os.File, error) {
file, err := ioutil.TempFile("/dev/shm/", "journal.XXXXX")
if err != nil {
return nil, err
}
syscall.Unlink(file.Name())
if err != nil {
return nil, err
}
return file, nil
}
func journalError(s string) error {
s = "journal error: " + s
fmt.Fprintln(os.Stderr, s)
return errors.New(s)
}

View File

@ -13,26 +13,33 @@ import (
"github.com/coreos/go-raft"
)
// Timeout for setup internal raft http connection
// This should not exceed 3 * RTT
var dailTimeout = 3 * HeartbeatTimeout
// Timeout for setup internal raft http connection + receive response header
// This should not exceed 3 * RTT + RTT
var responseHeaderTimeout = 4 * HeartbeatTimeout
// Timeout for receiving the response body from the server
// This should not exceed election timeout
var tranTimeout = ElectionTimeout
// Transporter layer for communication between raft nodes
type transporter struct {
client *http.Client
timeout time.Duration
}
// response struct
type transporterResponse struct {
resp *http.Response
err error
client *http.Client
transport *http.Transport
}
// Create transporter using by raft server
// Create http or https transporter based on
// whether the user give the server cert and key
func newTransporter(scheme string, tlsConf tls.Config, timeout time.Duration) *transporter {
func newTransporter(scheme string, tlsConf tls.Config) *transporter {
t := transporter{}
tr := &http.Transport{
Dial: dialTimeout,
Dial: dialWithTimeout,
ResponseHeaderTimeout: responseHeaderTimeout,
}
if scheme == "https" {
@ -41,14 +48,14 @@ func newTransporter(scheme string, tlsConf tls.Config, timeout time.Duration) *t
}
t.client = &http.Client{Transport: tr}
t.timeout = timeout
t.transport = tr
return &t
}
// Dial with timeout
func dialTimeout(network, addr string) (net.Conn, error) {
return net.DialTimeout(network, addr, HTTPTimeout)
func dialWithTimeout(network, addr string) (net.Conn, error) {
return net.DialTimeout(network, addr, dailTimeout)
}
// Sends AppendEntries RPCs to a peer when the server is the leader.
@ -66,29 +73,36 @@ func (t *transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.P
debugf("Send LogEntries to %s ", u)
thisPeerStats, ok := r.peersStats[peer.Name]
thisFollowerStats, ok := r.followersStats.Followers[peer.Name]
if !ok { //this is the first time this follower has been seen
thisFollowerStats = &raftFollowerStats{}
thisFollowerStats.Latency.Minimum = 1 << 63
r.followersStats.Followers[peer.Name] = thisFollowerStats
}
start := time.Now()
resp, err := t.Post(fmt.Sprintf("%s/log/append", u), &b)
resp, httpRequest, err := t.Post(fmt.Sprintf("%s/log/append", u), &b)
end := time.Now()
if err != nil {
debugf("Cannot send AppendEntriesRequest to %s: %s", u, err)
if ok {
thisPeerStats.Fail()
thisFollowerStats.Fail()
}
} else {
if ok {
thisPeerStats.Succ(end.Sub(start))
thisFollowerStats.Succ(end.Sub(start))
}
}
r.peersStats[peer.Name] = thisPeerStats
if resp != nil {
defer resp.Body.Close()
t.CancelWhenTimeout(httpRequest)
aersp = &raft.AppendEntriesResponse{}
if err := json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
return aersp
@ -108,7 +122,7 @@ func (t *transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req
u, _ := nameToRaftURL(peer.Name)
debugf("Send Vote to %s", u)
resp, err := t.Post(fmt.Sprintf("%s/vote", u), &b)
resp, httpRequest, err := t.Post(fmt.Sprintf("%s/vote", u), &b)
if err != nil {
debugf("Cannot send VoteRequest to %s : %s", u, err)
@ -116,6 +130,9 @@ func (t *transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req
if resp != nil {
defer resp.Body.Close()
t.CancelWhenTimeout(httpRequest)
rvrsp := &raft.RequestVoteResponse{}
if err := json.NewDecoder(resp.Body).Decode(&rvrsp); err == nil || err == io.EOF {
return rvrsp
@ -135,7 +152,7 @@ func (t *transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer,
debugf("Send Snapshot to %s [Last Term: %d, LastIndex %d]", u,
req.LastTerm, req.LastIndex)
resp, err := t.Post(fmt.Sprintf("%s/snapshot", u), &b)
resp, httpRequest, err := t.Post(fmt.Sprintf("%s/snapshot", u), &b)
if err != nil {
debugf("Cannot send SendSnapshotRequest to %s : %s", u, err)
@ -143,6 +160,9 @@ func (t *transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer,
if resp != nil {
defer resp.Body.Close()
t.CancelWhenTimeout(httpRequest)
aersp = &raft.SnapshotResponse{}
if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
@ -163,7 +183,7 @@ func (t *transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raf
debugf("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", u,
req.LastTerm, req.LastIndex)
resp, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", u), &b)
resp, _, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", u), &b)
if err != nil {
debugf("Cannot send SendSnapshotRecoveryRequest to %s : %s", u, err)
@ -172,6 +192,7 @@ func (t *transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raf
if resp != nil {
defer resp.Body.Close()
aersp = &raft.SnapshotRecoveryResponse{}
if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
return aersp
}
@ -181,46 +202,30 @@ func (t *transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raf
}
// Send server side POST request
func (t *transporter) Post(path string, body io.Reader) (*http.Response, error) {
func (t *transporter) Post(urlStr string, body io.Reader) (*http.Response, *http.Request, error) {
c := make(chan *transporterResponse, 1)
req, _ := http.NewRequest("POST", urlStr, body)
go func() {
tr := new(transporterResponse)
tr.resp, tr.err = t.client.Post(path, "application/json", body)
c <- tr
}()
resp, err := t.client.Do(req)
return t.waitResponse(c)
return resp, req, err
}
// Send server side GET request
func (t *transporter) Get(path string) (*http.Response, error) {
func (t *transporter) Get(urlStr string) (*http.Response, *http.Request, error) {
c := make(chan *transporterResponse, 1)
req, _ := http.NewRequest("GET", urlStr, nil)
resp, err := t.client.Do(req)
return resp, req, err
}
// Cancel the on fly HTTP transaction when timeout happens
func (t *transporter) CancelWhenTimeout(req *http.Request) {
go func() {
tr := new(transporterResponse)
tr.resp, tr.err = t.client.Get(path)
c <- tr
time.Sleep(ElectionTimeout)
t.transport.CancelRequest(req)
}()
return t.waitResponse(c)
}
func (t *transporter) waitResponse(responseChan chan *transporterResponse) (*http.Response, error) {
timeoutChan := time.After(t.timeout)
select {
case <-timeoutChan:
return nil, fmt.Errorf("Wait Response Timeout: %v", t.timeout)
case r := <-responseChan:
return r.resp, r.err
}
// for complier
return nil, nil
}

View File

@ -2,33 +2,58 @@ package main
import (
"crypto/tls"
"fmt"
"io/ioutil"
"net/http"
"testing"
"time"
)
func TestTransporterTimeout(t *testing.T) {
http.HandleFunc("/timeout", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "timeout")
w.(http.Flusher).Flush() // send headers and some body
time.Sleep(time.Second * 100)
})
go http.ListenAndServe(":8080", nil)
conf := tls.Config{}
ts := newTransporter("http", conf, time.Second)
ts := newTransporter("http", conf)
ts.Get("http://google.com")
_, err := ts.Get("http://google.com:9999") // it doesn't exisit
if err == nil || err.Error() != "Wait Response Timeout: 1s" {
t.Fatal("timeout error: ", err.Error())
_, _, err := ts.Get("http://google.com:9999")
if err == nil {
t.Fatal("timeout error")
}
_, err = ts.Post("http://google.com:9999", nil) // it doesn't exisit
if err == nil || err.Error() != "Wait Response Timeout: 1s" {
t.Fatal("timeout error: ", err.Error())
}
res, req, err := ts.Get("http://localhost:8080/timeout")
_, err = ts.Get("http://www.google.com")
if err != nil {
t.Fatal("get error")
t.Fatal("should not timeout")
}
_, err = ts.Post("http://www.google.com", nil)
ts.CancelWhenTimeout(req)
body, err := ioutil.ReadAll(res.Body)
if err == nil {
fmt.Println(string(body))
t.Fatal("expected an error reading the body")
}
_, _, err = ts.Post("http://google.com:9999", nil)
if err == nil {
t.Fatal("timeout error")
}
_, _, err = ts.Get("http://www.google.com")
if err != nil {
t.Fatal("get error: ", err.Error())
}
_, _, err = ts.Post("http://www.google.com", nil)
if err != nil {
t.Fatal("post error")
}

33
util.go
View File

@ -4,7 +4,6 @@ import (
"encoding/json"
"fmt"
"io"
"log"
"net"
"net/http"
"net/url"
@ -17,6 +16,7 @@ import (
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/store"
"github.com/coreos/etcd/web"
"github.com/coreos/go-log/log"
"github.com/coreos/go-raft"
)
@ -186,44 +186,39 @@ func getNodePath(urlPath string) string {
// Log
//--------------------------------------
var logger *log.Logger
var logger *log.Logger = log.New("etcd", false,
log.CombinedSink(os.Stdout, "[%s] %s %-9s | %s\n", []string{"prefix", "time", "priority", "message"}))
func init() {
logger = log.New(os.Stdout, "[etcd] ", log.Lmicroseconds)
func infof(format string, v ...interface{}) {
logger.Infof(format, v...)
}
func infof(msg string, v ...interface{}) {
logger.Printf("INFO "+msg+"\n", v...)
}
func debugf(msg string, v ...interface{}) {
func debugf(format string, v ...interface{}) {
if verbose {
logger.Printf("DEBUG "+msg+"\n", v...)
logger.Debugf(format, v...)
}
}
func debug(v ...interface{}) {
if verbose {
logger.Println("DEBUG " + fmt.Sprint(v...))
logger.Debug(v...)
}
}
func warnf(msg string, v ...interface{}) {
logger.Printf("WARN "+msg+"\n", v...)
func warnf(format string, v ...interface{}) {
logger.Warningf(format, v...)
}
func warn(v ...interface{}) {
logger.Println("WARN " + fmt.Sprint(v...))
logger.Warning(v...)
}
func fatalf(msg string, v ...interface{}) {
logger.Printf("FATAL "+msg+"\n", v...)
os.Exit(1)
func fatalf(format string, v ...interface{}) {
logger.Fatalf(format, v...)
}
func fatal(v ...interface{}) {
logger.Println("FATAL " + fmt.Sprint(v...))
os.Exit(1)
logger.Fatalln(v...)
}
//--------------------------------------