使用Consul做服务发现


Consul是golang开发的一个高可用的分布式服务注册系统,有service discovery和key/value,健康检查, 节点选举,多数据中心等功能,与zookeeper和etcd等相似。

zookeeper和Consul比较

  • 开发语言方面,zookeeper采用java开发,安装的时候需要部署java环境;Consul采用golang开发,所有依赖都编译到了可执行程序中。

  • 部署方面,zookeeper一般部署奇数个节点方便做简单多数的选举机制。Consul部署的时候分server节点和client节点(通过不同的启动参数区分),server节点做leader选举和数据一致性维护,client节点部署在服务机器上,作为服务程序访问Consul的接口。

  • 一致性协议方面,zookeeper使用自定义的zab协议,Consul的一致性协议采用更流行的Raft。

  • zookeeper不支持多数据中心,Consul可以跨机房支持多数据中心部署,有效避免了单数据中心故障不能访问的情况。

  • 链接方式上,zookeeper client api和服务器保持长连接,需要服务程序自行管理和维护链接有效性,服务程序注册回调函数处理zookeeper事件,并自己维护在zookeeper上建立的目录结构有效性(如临时节点维护;Consul采用DNS或者http获取服务信息,没有主动通知,需要自己轮训获取。

Consul工作原理

下面这张图来源于Consul官网,很好的解释了Consul的工作原理:

consul_arch.png

Consul支持多数据中心,在上图中有两个DataCenter,他们通过Internet互联,同时请注意为了提高通信效率,只有Server节点才加入跨数据中心的通信。

在单个数据中心中,Consul分为Client和Server两种节点(所有的节点也被称为Agent),Server节点保存数据,Client负责健康检查及转发数据请求到Server;Server节点有一个Leader和多个Follower,Leader节点会将数据同步到Follower,Server的数量推荐是3个或者5个,在Leader挂掉的时候会启动选举机制产生一个新的Leader。

集群内的Consul节点通过gossip协议(流言协议)维护成员关系,也就是说某个节点了解集群内现在还有哪些节点,这些节点是Client还是Server。单个数据中心的流言协议同时使用TCP和UDP通信,并且都使用8301端口。跨数据中心的流言协议也同时使用TCP和UDP通信,端口使用8302。

集群内数据的读写请求既可以直接发到Server,也可以通过Client使用RPC转发到Server,请求最终会到达Leader节点,在允许数据轻微陈旧的情况下,读请求也可以在普通的Server节点完成,集群内数据的读写和复制都是通过TCP的8300端口完成。

安装Consul

https://www.consul.io/downloads.html下载对应平台的安装包,在Linux平台,只需要把对应的可执行文件拷贝到可执行路径即可。安装成功后,执行如下命令可以看到:

[heql@ubuntu ~]$ consul --version
Consul v1.3.0
Protocol 2 spoken by default, understands 2 to 3 (agent will automatically use protocol >2 when speaking to compatible agents)

Consul一般运行在集群模式下,通常一个数据中心需要3个或者5个server mode, 其他的运行client mode。Consul也提供开发模式用于启动单节点服务供开发调试用,运行下面的命令的可以启动Consul的开发模式:

[heql@ubuntu ~]$ consul agent -dev

Consul集群

Consul运行在集群模式时,一般需要3个或者5个server mode。如下使用Docker,启动4个Consul Agent,3个Server(会选举出一个leader),1个Client,这些Consul节点在Docker的容器内是互通的,他们通过桥接的模式通信:

启动第1个Server节点,将容器8500端口映射到主机8900端口,同时开启管理界面:

[heql@ubuntu ~]$ docker run -d --name=consul1 -p 8900:8500 -e CONSUL_BIND_INTERFACE=eth0 consul agent --server=true --bootstrap-expect=3 --client=0.0.0.0 -ui

启动第2个Server节点,并加入集群:

[heql@ubuntu ~]$ docker run -d --name=consul2 -e CONSUL_BIND_INTERFACE=eth0 consul agent --server=true --client=0.0.0.0 --join 172.17.0.2

启动第3个Server节点,并加入集群:

[heql@ubuntu ~]$ docker run -d --name=consul3 -e CONSUL_BIND_INTERFACE=eth0 consul agent --server=true --client=0.0.0.0 --join 172.17.0.2

启动第4个Client节点,并加入集群:

[heql@ubuntu ~]$ docker run -d --name=consul4 -e CONSUL_BIND_INTERFACE=eth0 consul agent --server=false --client=0.0.0.0 --join 172.17.0.2

查看集群的状态

启动成功后,就可以运行如下命令,查看集群的状态:

[heql@ubuntu ~]$ docker exec consul1 consul members
Node          Address          Status  Type    Build  Protocol  DC   Segment
52e765ec782b  172.17.0.3:8301  alive   server  1.3.0  2         dc1  <all>
c33456162dfb  172.17.0.2:8301  alive   server  1.3.0  2         dc1  <all>
e6ce79caaba9  172.17.0.4:8301  alive   server  1.3.0  2         dc1  <all>
c57535032e4e  172.17.0.5:8301  alive   client  1.3.0  2         dc1  <default>

在启动第一个节点时,把节点的8500端口映射到了主机的8900端口,所以也可以通过浏览器访问127.0.0.1:8900,查看集群的信息:

consul_members.png

Consul服务发现原理

如下图,描述了Consul服务发现的流程:

consul_service_discovery.png

在服务器Server1、Server2、Server3上分别部署了Consul Server,假设他们选举了Server2上的Consul Server节点为Leader。这些服务器上最好只部署Consul程序,以尽量维护Consul Server的稳定。

然后在服务器Server4和Server5上通过Consul Client分别注册Service A、B、C,这里每个Service分别部署在了两个服务器上,这样可以避免Service的单点问题。服务注册到Consul可以通过HTTP API(8500端口)的方式,也可以通过Consul配置文件的方式。
Consul Client可以认为是无状态的,它将注册信息通过RPC转发到Consul Server,服务信息保存在Server的各个节点中,并且通过Raft实现了强一致性。

最后在服务器Server6中Program D需要访问Service B,这时候Program D首先访问本机Consul Client提供的HTTP API,本机Client会将请求转发到Consul Server,Consul Server查询到Service B当前的信息返回,最终Program D拿到了Service B的所有部署的IP和端口,然后就可以选择Service B的其中一个部署并向其发起请求了。
如果服务发现采用的是DNS方式,则Program D中直接使用Service B的服务发现域名,域名解析请求首先到达本机DNS代理,然后转发到本机Consul Client,本机Client会将请求转发到Consul Server,Consul Server查询到Service B当前的信息返回,最终Program D拿到了Service B的某个部署的IP和端口。

服务注册

服务可以通过提供服务定义的配置文件或通过对HTTP API进行适当的调用来注册。

如下,是使用golang开发的一个echo的服务端程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package main

import (
"fmt"
"bufio"
"io"
"log"
"net"
)

func main() {
listener, err := net.Listen("tcp", ":8000")
if err != nil {
panic(err)
}

for {
conn, err := listener.Accept()
if err != nil {
log.Print(err)
continue
}
go handleConn(conn)
}
}

func handleConn(c net.Conn) {
defer c.Close()
for {
var msg string
var err error

reader := bufio.NewReader(c)

if msg, err = reader.ReadString('\n'); err != nil {
if err != io.EOF {
log.Println(err)
}
return
}

if _, err = c.Write([]byte(fmt.Sprintf("Response: %s", msg))); err != nil {
log.Println(err)
return
}
}
}

编译程序,因为要上传到consul docker容器中运行,consul docker使用的是alpine系统,使用如下命令进行编译:

[heql@ubuntu service]$ CGO_ENABLED=0 go build -a -installsuffix cgo .

把编译生成的二进制文件hello上传到容器中:

[heql@ubuntu service]$ docker cp hello consul4:/
通过配置文件注册服务

这里将上面的程序定义为hello服务,通过配置文件的方式注册到Consul,服务的相关信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
{
"services": [
{
"id": "hello",
"name": "hello",
"tags": [
"primary"
],
"address": "172.17.0.5",
"port": 8000,
"checks": [
{
"DeregisterCriticalServiceAfter": "1m",
"tcp": "172.17.0.5:8000",
"tls_skip_verify": false,
"interval": "10s",
"timeout": "1s"
}
]
}
]
}
  • name:hello,服务名称,需要能够区分不同的业务服务,可以部署多份并使用相同的name注册。

  • id:hello,服务id,在每个节点上需要唯一,如果有重复会被覆盖

  • address:172.17.0.5,服务所在机器的地址

  • port:5000,服务的端口。

  • checks:健康检查,使用tcp的方式,地址为:172.17.0.5:8000,interval:每10秒Consul请求一次,timeout:请求超时时间为1秒。DeregisterCriticalServiceAfter:服务不可用时,超过1分钟,则删除。Consul也提供了其他方式的健康检查,如http,详情可以参考:https://learn.hashicorp.com/consul/getting-started/checks.html

将上面的内容保存成文件services.json,并上传到容器的/consul/config目录中,consul启动时默认加载配置的路径:

[heql@ubuntu service]$ docker cp service.json consul4:/consul/config

运行hello服务:

[heql@ubuntu service]$ docker exec -it consul4 /bin/sh
/ # ./hello &

从新加载consul的配置:

/ # consul reload
Configuration reload triggered

然后这个服务就注册成功了,可以在界面上看到hello服务,目前运行在172.17.0.5节点上:

consul_hello_service.png

因为上面的consul容器是通过桥接的模式进行通信的,所以本机也是可以和consul的容器进行通信的,所以可以在本机启动一个consul节点,把service.json配置的address和tcp的ip地址改为本机的ip地址192.168.1.136,启动节点,节点加载的配置为当前目录:

[heql@ubuntu service]$ consul agent --server=false -config-dir=$(pwd) -data-dir=/tmp --bind=192.168.1.136 --join 172.17.0.2

在本机启动一个hello服务:

[heql@ubuntu service]$ ./hello

在界面上看到hello服务,目前运行在172.17.0.5192.168.1.136两个节点上:

consul_hello_service_2.png

通过HTTP API注册服务

consul也可以使用HTTP API进行注册,但是官方更推荐使用配置文件的方式进行注册,如下程序是用通过go的consul提供的api进行注册服务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package main

import (
consulapi "github.com/hashicorp/consul/api"
)

func main() {
config := consulapi.DefaultConfig()
client, err := consulapi.NewClient(config)
if err != nil {
panic(err)
}

registration := new(consulapi.AgentServiceRegistration)
registration.ID = "hello"
registration.Name = "hello"
registration.Tags = []string{"primary"}
registration.Port = 8000
registration.Address = "192.168.1.136"
registration.Check = &consulapi.AgentServiceCheck{
TCP: "192.168.1.136:8000",
Interval: "10s",
Timeout: "1s",
DeregisterCriticalServiceAfter: "1m",
}

err = client.Agent().ServiceRegister(registration)
if err != nil {
panic(err)
}
}

运行上面的程序,即可看到与通过配置文件方式注册的hello的服务。

服务的发现

服务注册成功以后,consul提供以DNS的方式和HTTP的方式来发现服务:

DNS方式

对于DNS的方式,服务的DNS名称是NAME.service.consul。如上面注册的hello服务为:hello.service.consul

[heql@ubuntu ~]$ dig  @127.0.0.1 -p 8600 hello.service.consul

; <<>> DiG 9.9.5-3ubuntu0.17-Ubuntu <<>> @127.0.0.1 -p 8600 hello.service.consul
; (1 server found)
;; global options: +cmd
;; Got answer:
;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 4356
;; flags: qr aa rd; QUERY: 1, ANSWER: 2, AUTHORITY: 0, ADDITIONAL: 3
;; WARNING: recursion requested but not available

;; OPT PSEUDOSECTION:
; EDNS: version: 0, flags:; udp: 4096
;; QUESTION SECTION:
;hello.service.consul.        IN    A

;; ANSWER SECTION:
hello.service.consul.    0    IN    A    172.17.0.5
hello.service.consul.    0    IN    A    192.168.1.136

;; ADDITIONAL SECTION:
hello.service.consul.    0    IN    TXT    "consul-network-segment="
hello.service.consul.    0    IN    TXT    "consul-network-segment="

;; Query time: 6 msec
;; SERVER: 127.0.0.1#8600(127.0.0.1)
;; WHEN: Wed Nov 21 21:01:42 CST 2018
;; MSG SIZE  rcvd: 153

如上面的输出信息,两个A记录返回了服务可用的节点的IP地址。 A记录只能保存IP地址。

也可以使用DNS来检索整个地址/端口的SRV记录:

[heql@ubuntu ~]$ dig  @127.0.0.1 -p 8600 hello.service.consul SRV

; <<>> DiG 9.9.5-3ubuntu0.17-Ubuntu <<>> @127.0.0.1 -p 8600 hello.service.consul SRV
; (1 server found)
;; global options: +cmd
;; Got answer:
;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 62070
;; flags: qr aa rd; QUERY: 1, ANSWER: 2, AUTHORITY: 0, ADDITIONAL: 5
;; WARNING: recursion requested but not available

;; OPT PSEUDOSECTION:
; EDNS: version: 0, flags:; udp: 4096
;; QUESTION SECTION:
;hello.service.consul.        IN    SRV

;; ANSWER SECTION:
hello.service.consul.    0    IN    SRV    1 1 8000 f9a30e055338.node.dc1.consul.
hello.service.consul.    0    IN    SRV    1 1 8000 ubuntu.node.dc1.consul.

;; ADDITIONAL SECTION:
f9a30e055338.node.dc1.consul. 0    IN    A    172.17.0.5
f9a30e055338.node.dc1.consul. 0    IN    TXT    "consul-network-segment="
ubuntu.node.dc1.consul.    0    IN    A    192.168.1.136
ubuntu.node.dc1.consul.    0    IN    TXT    "consul-network-segment="

;; Query time: 14 msec
;; SERVER: 127.0.0.1#8600(127.0.0.1)
;; WHEN: Wed Nov 21 21:05:37 CST 2018
;; MSG SIZE  rcvd: 243

SRV记录表示该服务正在端口8000上运行,并且存在于节点f9a30e055338.node.dc1.consulubuntu.node.dc1.consul上。DNS使用该记录的A记录返回附加部分。

HTTP API方式

如下面的命令会返回所有注册的Consul节点信息、服务信息及服务的健康检查信息:

[heql@ubuntu ~]$ curl http://localhost:8500/v1/catalog/service/hello
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
[{
"ID": "3088d97d-9009-5254-77e9-4c087abe3862",
"Node": "f9a30e055338",
"Address": "172.17.0.5",
"Datacenter": "dc1",
"TaggedAddresses": {
"lan": "172.17.0.5",
"wan": "172.17.0.5"
},
"NodeMeta": {
"consul-network-segment": ""
},
"ServiceKind": "",
"ServiceID": "hello",
"ServiceName": "hello",
"ServiceTags": ["primary"],
"ServiceAddress": "172.17.0.5",
"ServiceWeights": {
"Passing": 1,
"Warning": 1
},
"ServiceMeta": {},
"ServicePort": 8000,
"ServiceEnableTagOverride": false,
"ServiceProxyDestination": "",
"ServiceProxy": {},
"ServiceConnect": {},
"CreateIndex": 516,
"ModifyIndex": 516
}, {
"ID": "551cbb63-f693-3d1b-35fc-e6b141595373",
"Node": "ubuntu",
"Address": "192.168.1.136",
"Datacenter": "dc1",
"TaggedAddresses": {
"lan": "192.168.1.136",
"wan": "192.168.1.136"
},
"NodeMeta": {
"consul-network-segment": ""
},
"ServiceKind": "",
"ServiceID": "hello",
"ServiceName": "hello",
"ServiceTags": ["primary"],
"ServiceAddress": "192.168.1.136",
"ServiceWeights": {
"Passing": 1,
"Warning": 1
},
"ServiceMeta": {},
"ServicePort": 8000,
"ServiceEnableTagOverride": false,
"ServiceProxyDestination": "",
"ServiceProxy": {},
"ServiceConnect": {},
"CreateIndex": 497,
"ModifyIndex": 497
}]

上面的查询的是服务的所有节点,如果需要自动过滤掉不健康的服务,包括本身不健康的服务和不健康的Consul节点上的服务,可以使用下面的命令:

[heql@ubuntu ~]$ curl http://localhost:8500/v1/health/service/hello?passing
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
[{
"Node": {
"ID": "3088d97d-9009-5254-77e9-4c087abe3862",
"Node": "f9a30e055338",
"Address": "172.17.0.5",
"Datacenter": "dc1",
"TaggedAddresses": {
"lan": "172.17.0.5",
"wan": "172.17.0.5"
},
"Meta": {
"consul-network-segment": ""
},
"CreateIndex": 12,
"ModifyIndex": 13
},
"Service": {
"ID": "hello",
"Service": "hello",
"Tags": ["primary"],
"Address": "172.17.0.5",
"Meta": null,
"Port": 8000,
"Weights": {
"Passing": 1,
"Warning": 1
},
"EnableTagOverride": false,
"ProxyDestination": "",
"Proxy": {},
"Connect": {},
"CreateIndex": 516,
"ModifyIndex": 516
},
"Checks": [{
"Node": "f9a30e055338",
"CheckID": "serfHealth",
"Name": "Serf Health Status",
"Status": "passing",
"Notes": "",
"Output": "Agent alive and reachable",
"ServiceID": "",
"ServiceName": "",
"ServiceTags": [],
"Definition": {},
"CreateIndex": 12,
"ModifyIndex": 12
}, {
"Node": "f9a30e055338",
"CheckID": "service:hello",
"Name": "Service 'hello' check",
"Status": "passing",
"Notes": "",
"Output": "TCP connect 172.17.0.5:8000: Success",
"ServiceID": "hello",
"ServiceName": "hello",
"ServiceTags": ["primary"],
"Definition": {},
"CreateIndex": 516,
"ModifyIndex": 517
}]
}, {
"Node": {
"ID": "551cbb63-f693-3d1b-35fc-e6b141595373",
"Node": "ubuntu",
"Address": "192.168.1.136",
"Datacenter": "dc1",
"TaggedAddresses": {
"lan": "192.168.1.136",
"wan": "192.168.1.136"
},
"Meta": {
"consul-network-segment": ""
},
"CreateIndex": 497,
"ModifyIndex": 497
},
"Service": {
"ID": "hello",
"Service": "hello",
"Tags": ["primary"],
"Address": "192.168.1.136",
"Meta": null,
"Port": 8000,
"Weights": {
"Passing": 1,
"Warning": 1
},
"EnableTagOverride": false,
"ProxyDestination": "",
"Proxy": {},
"Connect": {},
"CreateIndex": 497,
"ModifyIndex": 497
},
"Checks": [{
"Node": "ubuntu",
"CheckID": "serfHealth",
"Name": "Serf Health Status",
"Status": "passing",
"Notes": "",
"Output": "Agent alive and reachable",
"ServiceID": "",
"ServiceName": "",
"ServiceTags": [],
"Definition": {},
"CreateIndex": 498,
"ModifyIndex": 498
}, {
"Node": "ubuntu",
"CheckID": "service:hello",
"Name": "Service 'hello' check",
"Status": "passing",
"Notes": "",
"Output": "TCP connect 192.168.1.136:8000: Success",
"ServiceID": "hello",
"ServiceName": "hello",
"ServiceTags": ["primary"],
"Definition": {},
"CreateIndex": 497,
"ModifyIndex": 501
}]
}]

服务发现使用的例子

如下程序:

  • 通过consul发现上面的hello服务,获得所有hello服务的ip地址和端口号,并建立tcp的连接池。

  • 通过轮询的方式,向每个hello服务发送数据。

  • 每隔3秒查询可用的服务,并动态的更新连接池,当有新的服务可用时,添加到连接池,当服务不可用时,从连接池中删除。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163

package main

import (
"bufio"
"fmt"
"log"
"net"
"time"

consulapi "github.com/hashicorp/consul/api"
)

var clientPool = make(map[string]*tcpClient)

type tcpClient struct {
host string
conn *net.TCPConn
canBeUsed bool
}

func discoveryService() ([]string, error) {
config := consulapi.DefaultConfig()
client, err := consulapi.NewClient(config)
if err != nil {
return nil, err
}

var hosts []string

serviceEntry, _, err := client.Health().Service("hello", "", true, &consulapi.QueryOptions{})
if err != nil {
return nil, err
}

for _, entry := range serviceEntry {
for _, health := range entry.Checks {
if health.ServiceName != "hello" {
continue
}
ip := entry.Service.Address
port := entry.Service.Port
host := fmt.Sprintf("%s:%d", ip, port)
hosts = append(hosts, host)
}
}
log.Printf("discovery service: %v\n", hosts)
return hosts, nil
}

func addClientToPool(hosts []string) {
// create client pool
for _, host := range hosts {
conn, err := net.Dial("tcp", host)
if err != nil {
log.Printf("Error connecting to: %s", host)
} else {
log.Printf("connecting to: %s", host)
var client tcpClient
client.conn = conn.(*net.TCPConn)
client.canBeUsed = true
client.host = host
clientPool[host] = &client
}
}
}

func request(host string, conn *net.TCPConn) error {
if _, err := conn.Write([]byte("Hello, " + host + "\n")); err != nil {
return err
}

reader := bufio.NewReader(conn)
if msg, err := reader.ReadString('\n'); err != nil {
return err
} else {
fmt.Printf("%s", msg)
}
return nil
}

func removeClientFromPool(host string) {
// close client and delete the client from client pool
clientPool[host].conn.Close()
delete(clientPool, host)
}

func differentHosts(hosts []string) ([]string, map[string]struct{}) {
var newHosts []string
deleteHosts := make(map[string]struct{}, len(clientPool))

for host := range clientPool {
deleteHosts[host] = struct{}{}
}

for _, host := range hosts {
if _, ok := clientPool[host]; !ok {
newHosts = append(newHosts, host)
} else {
delete(deleteHosts, host)
}
}
return newHosts, deleteHosts
}

func updateClientPool(hosts []string) {
newHosts, deleteHosts := differentHosts(hosts)

// add new host
if len(newHosts) != 0 {
log.Printf("add host: %v\n", newHosts)
addClientToPool(newHosts)
}

// delete host
for host := range deleteHosts {
log.Printf("delete host: %v\n", host)
removeClientFromPool(host)
}
}

func main() {
t := time.NewTicker(time.Second * 3)

hosts, err := discoveryService()
if err != nil {
panic(err)
}

addClientToPool(hosts)
if len(clientPool) == 0 {
log.Fatal("no have any client !!!")
}

clientChan := make(chan tcpClient)

go func(clientChan chan tcpClient) {
for client := range clientChan {
if err := request(client.host, client.conn); err != nil {
log.Printf("client: %v\n", err)
clientPool[client.host].canBeUsed = false
}
}
}(clientChan)

for {
select {
case <-t.C:
if hosts, err := discoveryService(); err != nil {
log.Println(err)
} else {
updateClientPool(hosts)
}
default:
for _, client := range clientPool {
if client.canBeUsed {
clientChan <- *client
}
}
time.Sleep(time.Second * 1)
}
}
}

运行程序,输出如下信息:

[heql@ubuntu client]$ go run client.go 
2018/11/21 21:34:32 discovery service: [172.17.0.5:8000 192.168.1.136:8000]
2018/11/21 21:34:32 connecting to: 172.17.0.5:8000
2018/11/21 21:34:32 connecting to: 192.168.1.136:8000
Response: Hello, 192.168.1.136:8000
Response: Hello, 172.17.0.5:8000
Response: Hello, 172.17.0.5:8000
Response: Hello, 192.168.1.136:8000
Response: Hello, 172.17.0.5:8000
Response: Hello, 192.168.1.136:8000

停止192.168.1.136的hello服务,输出如下:

2018/11/21 21:35:26 discovery service: [172.17.0.5:8000]
2018/11/21 21:35:26 delete host: 192.168.1.136:8000
Response: Hello, 172.17.0.5:8000
Response: Hello, 172.17.0.5:8000
Response: Hello, 172.17.0.5:8000

重新启动192.168.1.136的hello服务,输出如下:

2018/11/21 21:36:41 discovery service: [172.17.0.5:8000 192.168.1.136:8000]
2018/11/21 21:36:41 add host: [192.168.1.136:8000]
2018/11/21 21:36:41 connecting to: 192.168.1.136:8000
Response: Hello, 192.168.1.136:8000
Response: Hello, 172.17.0.5:8000
Response: Hello, 172.17.0.5:8000
Response: Hello, 192.168.1.136:8000
Response: Hello, 172.17.0.5:8000
Response: Hello, 192.168.1.136:8000