package nsqd
import (
"bytes"
"encoding/json"
"net"
"os"
"strconv"
"time"
"github.com/nsqio/go-nsq"
"github.com/nsqio/nsq/internal/version"
)
func connectCallback(n *NSQD, hostname string, syncTopicChan chan *lookupPeer) func(*lookupPeer) { return func(lp *lookupPeer) { ci := make(map[string]interface{}) ci["version"] = version.Binary
ci["tcp_port"] = n.RealTCPAddr().Port
ci["http_port"] = n.RealHTTPAddr().Port
ci["hostname"] = hostname
ci["broadcast_address"] = n.getOpts().BroadcastAddress
cmd, err := nsq.Identify(ci)
if err != nil { lp.Close()
return
}
resp, err := lp.Command(cmd)
if err != nil { n.logf("LOOKUPD(%s): ERROR %s - %s", lp, cmd, err) } else if bytes.Equal(resp, []byte("E_INVALID")) { n.logf("LOOKUPD(%s): lookupd returned %s", lp, resp) } else { err = json.Unmarshal(resp, &lp.Info)
if err != nil { n.logf("LOOKUPD(%s): ERROR parsing response - %s", lp, resp) } else { n.logf("LOOKUPD(%s): peer info %+v", lp, lp.Info) }
}
go func() { syncTopicChan <- lp
}()
}
}
func (n *NSQD) lookupLoop() { var lookupPeers []*lookupPeer
var lookupAddrs []string
syncTopicChan := make(chan *lookupPeer)
connect := true
hostname, err := os.Hostname()
if err != nil { n.logf("FATAL: failed to get hostname - %s", err) os.Exit(1)
}
// for announcements, lookupd determines the host automatically
ticker := time.Tick(15 * time.Second)
for { if connect { for _, host := range n.getOpts().NSQLookupdTCPAddresses { if in(host, lookupAddrs) { continue
}
n.logf("LOOKUP(%s): adding peer", host) lookupPeer := newLookupPeer(host, n.getOpts().MaxBodySize, n.getOpts().Logger,
connectCallback(n, hostname, syncTopicChan))
lookupPeer.Command(nil) // start the connection
lookupPeers = append(lookupPeers, lookupPeer)
lookupAddrs = append(lookupAddrs, host)
}
n.lookupPeers.Store(lookupPeers)
connect = false
}
select { case <-ticker:
// send a heartbeat and read a response (read detects closed conns)
for _, lookupPeer := range lookupPeers { n.logf("LOOKUPD(%s): sending heartbeat", lookupPeer) cmd := nsq.Ping()
_, err := lookupPeer.Command(cmd)
if err != nil { n.logf("LOOKUPD(%s): ERROR %s - %s", lookupPeer, cmd, err) }
}
case val := <-n.notifyChan:
var cmd *nsq.Command
var branch string
switch val.(type) { case *Channel:
// notify all nsqlookupds that a new channel exists, or that it's removed
branch = "channel"
channel := val.(*Channel)
if channel.Exiting() == true { cmd = nsq.UnRegister(channel.topicName, channel.name)
} else { cmd = nsq.Register(channel.topicName, channel.name)
}
case *Topic:
// notify all nsqlookupds that a new topic exists, or that it's removed
branch = "topic"
topic := val.(*Topic)
if topic.Exiting() == true { cmd = nsq.UnRegister(topic.name, "")
} else { cmd = nsq.Register(topic.name, "")
}
}
for _, lookupPeer := range lookupPeers { n.logf("LOOKUPD(%s): %s %s", lookupPeer, branch, cmd) _, err := lookupPeer.Command(cmd)
if err != nil { n.logf("LOOKUPD(%s): ERROR %s - %s", lookupPeer, cmd, err) }
}
case lookupPeer := <-syncTopicChan:
var commands []*nsq.Command
// build all the commands first so we exit the lock(s) as fast as possible
n.RLock()
for _, topic := range n.topicMap { topic.RLock()
if len(topic.channelMap) == 0 { commands = append(commands, nsq.Register(topic.name, ""))
} else { for _, channel := range topic.channelMap { commands = append(commands, nsq.Register(channel.topicName, channel.name))
}
}
topic.RUnlock()
}
n.RUnlock()
for _, cmd := range commands { n.logf("LOOKUPD(%s): %s", lookupPeer, cmd) _, err := lookupPeer.Command(cmd)
if err != nil { n.logf("LOOKUPD(%s): ERROR %s - %s", lookupPeer, cmd, err) break
}
}
case <-n.optsNotificationChan:
var tmpPeers []*lookupPeer
var tmpAddrs []string
for _, lp := range lookupPeers { if in(lp.addr, n.getOpts().NSQLookupdTCPAddresses) { tmpPeers = append(tmpPeers, lp)
tmpAddrs = append(tmpAddrs, lp.addr)
continue
}
n.logf("LOOKUP(%s): removing peer", lp) lp.Close()
}
lookupPeers = tmpPeers
lookupAddrs = tmpAddrs
connect = true
case <-n.exitChan:
goto exit
}
}
exit:
n.logf("LOOKUP: closing")}
func in(s string, lst []string) bool { for _, v := range lst { if s == v { return true
}
}
return false
}
func (n *NSQD) lookupdHTTPAddrs() []string { var lookupHTTPAddrs []string
lookupPeers := n.lookupPeers.Load()
if lookupPeers == nil { return nil
}
for _, lp := range lookupPeers.([]*lookupPeer) { if len(lp.Info.BroadcastAddress) <= 0 { continue
}
addr := net.JoinHostPort(lp.Info.BroadcastAddress, strconv.Itoa(lp.Info.HTTPPort))
lookupHTTPAddrs = append(lookupHTTPAddrs, addr)
}
return lookupHTTPAddrs
}