ZooKeeper实现配置管理、分布式锁


使用ZooKeeper可以实现命名服务、集群管理、配置管理、分布式锁功能。

配置管理

对于配置的修改在项目中是非常常见的,如果是一两台机器,可以直接手动进行修改。但是对于集群环境,如果要修改这些相同的配置项,那么就必须同时修改每台运行机器,像这样的配置信息完全可以交给Zookeeper来管理。

ZooKeeper实现配置管理

对于需要修改的配置信息,可以保存在Zookeeper的某个目录节点中,然后将所有需要修改的应用机器监控配置信息的状态,一旦配置信息发生变化,每台应用机器就会收到Zookeeper的通知,然后从Zookeeper获取新的配置信息应用到系统中。如图:

conf_manager.png

代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.zookeeper.config.manager;

public class Config {
// Zookeeper节点
public final static String CONFIG_NODE = "/config";
public final static String HOST_NODE = "/config/host";
public final static String USERNAME_NODE = "/config/username";
public final static String PASSWD_NODE = "/config/passwd";

// Zookeeper节点对应的数据
private String host;
private String username;
private String passwd;

public Config(String host, String username, String passwd) {
this.host = host;
this.username = username;
this.passwd = passwd;
}

public String getHost() {
return host;
}

public void setHost(String host) {
this.host = host;
}

public String getUsername() {
return username;
}

public void setUsername(String username) {
this.username = username;
}

public String getPasswd() {
return passwd;
}

public void setPasswd(String passwd) {
this.passwd = passwd;
}

public String toString() {
return username + ":" + passwd + "@" + host;
}
}

如上代码,定义一个Config配置类,Config类中hostusernamepasswd分别对应于Zookeeper中/config/host/config/username/config/passwd节点中数据,当这几个节点中数据发生变化时,如果客户端对这些节点设置了watch,则会收到相应的通知。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package com.zookeeper.config.manager;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

public class ZookeeperConfig implements Watcher {
private CountDownLatch countDownLatch = new CountDownLatch(1);
private ZooKeeper zooKeeper = null;

public void connect(String connectString, int sessionTimeout)
throws InterruptedException, IOException {
zooKeeper = new ZooKeeper(connectString, sessionTimeout, this);

// 等待Zookeeper服务器连接成功
countDownLatch.await();
}

public void createNode(String path, String data) throws KeeperException,
InterruptedException {
if (zooKeeper != null && zooKeeper.exists(path, false) == null) {
zooKeeper.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
}

public void close() throws InterruptedException {
if (zooKeeper != null) {
zooKeeper.close();
}
}

public static void main(String[] args) {
String connectString = "192.168.1.128:2181";
int sessionTimeout = 5000;
ZookeeperConfig zookeeperConfig = new ZookeeperConfig();

try {
Config config = new Config("192.168.1.128:3306", "test", "123456");

// 连接Zookeeper服务器
zookeeperConfig.connect(connectString, sessionTimeout);

// 创建节点
zookeeperConfig.createNode(Config.CONFIG_NODE, "");
zookeeperConfig.createNode(Config.HOST_NODE, config.getHost());
zookeeperConfig.createNode(Config.USERNAME_NODE,
config.getUsername());
zookeeperConfig.createNode(Config.PASSWD_NODE, config.getPasswd());
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
zookeeperConfig.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

@Override
public void process(WatchedEvent event) {
if (event.getState() == KeeperState.SyncConnected) {
// 连接上服务器
countDownLatch.countDown();
}
}
}

ZookeeperConfig负责创建Config配置类中的节点,并为节点设置相应的默认值。

运行ZookeeperConfig,可以看到Zookeeper服务器创建了上述节点,并设置了相应的默认值:

[zk: 127.0.0.1:2181(CONNECTED) 7] ls /config
[host, passwd, username]
[zk: 127.0.0.1:2181(CONNECTED) 8] get /config/host
192.168.1.128:3306
cZxid = 0x697
ctime = Wed Jun 27 21:00:32 CST 2018
mZxid = 0x697
mtime = Wed Jun 27 21:00:32 CST 2018
pZxid = 0x697
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 18
numChildren = 0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package com.zookeeper.config.manager;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.KeeperState;

public class Client implements Watcher {
private CountDownLatch countDownLatch = new CountDownLatch(1);
private ZooKeeper zooKeeper = null;
private static Config config;

public void connect(String connectString, int sessionTimeout)
throws InterruptedException, IOException {
zooKeeper = new ZooKeeper(connectString, sessionTimeout, this);

// 等待Zookeeper服务器连接成功
countDownLatch.await();
}

public String getData(String path) throws KeeperException,
InterruptedException {
if (zooKeeper != null && zooKeeper.exists(path, true) != null) {
return new String(zooKeeper.getData(path, true, null));
}
return "";
}

public void close() throws InterruptedException {
if (zooKeeper != null) {
zooKeeper.close();
}
}

private void updateConfig() throws KeeperException, InterruptedException {
// 更新配置项
config.setHost(getData(Config.HOST_NODE));
config.setUsername(getData(Config.USERNAME_NODE));
config.setPasswd(getData(Config.PASSWD_NODE));
}

public static void main(String[] args) {
String connectString = "192.168.1.129:2181";
int sessionTimeout = 5000;
Client client = new Client();

try {
client.connect(connectString, sessionTimeout);

// 获得节点的数据
String host = client.getData(Config.HOST_NODE);
String username = client.getData(Config.USERNAME_NODE);
String passwd = client.getData(Config.PASSWD_NODE);

config = new Config(host, username, passwd);

while (true) {
System.out.println(config.toString());
Thread.sleep(5000);
}

// client.close();
} catch (Exception e) {
e.printStackTrace();
}
}

@Override
public void process(WatchedEvent event) {
if (event.getState() == KeeperState.SyncConnected) {
// 连接上服务器
countDownLatch.countDown();
}

if (event.getType() == EventType.NodeDataChanged) {
try {
updateConfig();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

客户端Client设置了对节点的Watcher,当节点的数据发生变化时,则会更新配置项。

运行Client程序,输出如下:

test:123456@192.168.1.128:3306

修改host节点的值:

[zk: 127.0.0.1:2181(CONNECTED) 12] set /config/host 127.0.0.1:2379
cZxid = 0x697
ctime = Wed Jun 27 21:00:32 CST 2018
mZxid = 0x69e
mtime = Wed Jun 27 21:09:36 CST 2018
pZxid = 0x697
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 14
numChildren = 0

程序输出如下:

test:123456@127.0.0.1:2379

修改username节点的值:

[zk: 127.0.0.1:2181(CONNECTED) 13] set /config/username heql

程序输出如下:

heql:123456@127.0.0.1:2379

分布式锁

在分布式系统中的多个进程竞争同一资源,为了保证分布式系统中多个进程能够有序的访问该临界资源,这就要使用分布式锁,互斥锁在同一个进程中很容易实现,但是在跨进程或者在不同Server之间就不好实现。但是使用Zookeeper却很容易实现这个功能。

ZooKeeper实现分布式锁

ZooKeeper分布式锁的流程:

  1. 在zookeeper指定节点(lock)下创建临时顺序节点(EPHEMERAL_SEQUENTIAL)
  2. 获取locks下所有子节点children
  3. 对子节点按节点自增序号从小到大排序
  4. 判断本节点是不是第一个子节点,若是,则获取锁;若不是,则监听比该节点小的那个节点的删除事件
  5. 若监听事件生效,则回到第二步重新进行判断,直到获取到锁

如图:

zookeeper_lock.png

代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.zookeeper.distribute.lock;

import java.util.concurrent.TimeUnit;

public interface Lock {

/**
* 阻塞式获得锁
*
* @throws Exception
*/
public void lock() throws Exception;

/**
* 阻塞式获得锁,在规定时间内未获得锁,则直接返回
*
* @param time
* @param unit
* @throws Exception
*/
public boolean lock(long time, TimeUnit unit) throws Exception;

/**
* 释放锁
*
* @throws Exception
*/
public void unLock() throws Exception;
}

Lock是一个接口,lock()表示使用阻塞式获得锁、lock(long time, TimeUnit unit)表示在指定时间内未获得锁,则直接返回、unLock用于释放锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
package com.zookeeper.distribute.lock;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZkDistributeLock implements Lock {
private static Logger logger = LoggerFactory
.getLogger(ZkDistributeLock.class);
private ZooKeeper zooKeeper;
private String lockPath; // 节点的路径
private String lockName; // 锁的前缀
private String currentLockPath; // 用于保存当前客户端创建成功的顺序节点
private final static int MAX_RETRY_COUNT = 3;

public ZkDistributeLock(ZooKeeper zooKeeper, String lockPath,
String lockName) {
this.zooKeeper = zooKeeper;
this.lockPath = lockPath;
this.lockName = lockName;
init();
}

private void init() {
try {
if (zooKeeper.exists(lockPath, false) == null) {
zooKeeper.create(lockPath, null, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
} catch (Exception e) {
e.printStackTrace();
}
}

private String createLockNode(String path) throws KeeperException,
InterruptedException {
// 创建临时顺序节点
return zooKeeper.create(path, null, Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
}

private List<String> getSortedChildren() throws KeeperException,
InterruptedException {

// 获得lockPath节点下的所有子节点,并排序
List<String> children = zooKeeper.getChildren(lockPath, false);
if (children != null && !children.isEmpty()) {
Collections.sort(children);
}

logger.info("children:{}", children);
return children;
}

private boolean waitForLock(long startMillis, long millisToWait)
throws Exception {
boolean hasLock = false;
boolean deleteNode = false;

try {
while (!hasLock) {

// 获取locker节点下的所有节点,并按从小到大排序
List<String> children = getSortedChildren();
String lockNodeName = currentLockPath.substring(lockPath
.length() + 1);

// 查看当前客户端创建的节点在所有节点排序后的位置,如果是位置为0,则表示获取到了锁
int currentIndex = children.indexOf(lockNodeName);

if (currentIndex < 0) {
// 如果网络断开,创建的临时顺序节点将会被删除
throw new Exception("not find the node: " + lockNodeName);
} else if (currentIndex == 0) {
logger.info("get the lock, currentLockPath:{}",
currentLockPath);
hasLock = true;
} else {
// 其他客户端已经获取了锁 ,需要等待其他客户端释放锁
// 从节点列表中获取到比自己次小的那个节点,并对其建立监听
String beforePath = lockPath.concat("/").concat(
children.get(currentIndex - 1));

// 如果次小的节点被删除了,则表示当前客户端的节点应该是最小
// 使用CountDownLatch来实现等待
final CountDownLatch latch = new CountDownLatch(1);
Watcher beforePathListener = new Watcher() {

@Override
public void process(WatchedEvent event) {
// 节点被删除,唤醒应用程序
if (event.getType() == EventType.NodeDeleted) {
latch.countDown();
}
}
};

if (zooKeeper.exists(beforePath, beforePathListener) == null) {
// 节点不存在
continue;
}

if (millisToWait != -1) {
millisToWait -= (System.currentTimeMillis() - startMillis);
if (millisToWait <= 0) {
deleteNode = true;
break;
}
if (!latch.await(millisToWait, TimeUnit.MILLISECONDS)) {
// 超时, 删除节点
deleteNode = true;
break;
}
} else {
// 未设置超时时间,则一直阻塞
latch.await();
}

}
}

} catch (Exception e) {
// 发生异常需要删除节点
logger.error("waitForLock exception", e);
deleteNode = true;
throw e;
} finally {
if (deleteNode) {
unLock();
}
}

logger.info("end for waitForLock , hasLock = " + hasLock);
return hasLock;
}

private boolean attemptLock(long time, TimeUnit unit) throws Exception {
long startMillis = System.currentTimeMillis();
long millisToWait = (unit != null) ? unit.toMillis(time) : -1;

boolean hasLock = false;

// 网络出现问题,进行重试
for (int i = 0; i < MAX_RETRY_COUNT; i++) {
try {
currentLockPath = createLockNode(lockPath.concat("/").concat(
lockName));
hasLock = waitForLock(startMillis, millisToWait);
break;
} catch (Exception e) {
if (i == MAX_RETRY_COUNT - 1) {
throw e;
}
}
}

return hasLock;
}

@Override
public void lock() throws Exception {
attemptLock(-1, null);
}

@Override
public boolean lock(long time, TimeUnit unit) throws Exception {
if (time <= 0) {
throw new Exception("Lock wait for time must be greater than 0");
}

if (unit == null) {
throw new Exception("TimeUnit can not be null");
}

return attemptLock(time, unit);
}

@Override
public void unLock() throws Exception {
zooKeeper.delete(currentLockPath, -1);
}

}

ZkDistributeLock实现了Lock接口,在调用attemptLock获得锁时,实现了上述流程。假设有3个客户端分别创建了/lock/lock-0000000000/lock/lock-0000000001/lock/lock-0000000002三个节点。那么当前能获得锁是/lock/lock-0000000000对应的客户端、当/lock/lock-0000000000被删除时,/lock/lock-0000000001节点将会得到通知。

测试程序如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package com.zookeeper.distribute.lock;

import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZkDistributeLockTest implements Watcher {
private static final int THREAD_NUM = 10;
private static CountDownLatch countDownLatch = new CountDownLatch(1);
private static Logger logger = LoggerFactory
.getLogger(ZkDistributeLock.class);

public static void main(String[] args) {
for(int i = 0; i < THREAD_NUM; i++) {
new Thread() {
public void run() {
try {
ZooKeeper zooKeeper = new ZooKeeper("192.168.1.130:2181", 5000, new ZkDistributeLockTest());
countDownLatch.await();

ZkDistributeLock zkDistributeLock = new ZkDistributeLock(zooKeeper, "/lock", "lock-");
zkDistributeLock.lock();
logger.info("{}: 正在处理逻辑......", Thread.currentThread().getName());
zkDistributeLock.unLock();
} catch (Exception e) {
e.printStackTrace();
}
};
}.start();
}

while(true) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

@Override
public void process(WatchedEvent event) {
if (event.getState() == KeeperState.SyncConnected) {
// 连接上服务器
countDownLatch.countDown();
}
}
}

上面的测试程序,开启了10个线程,进行测试。

运行程序,输出如下:

 INFO - children:[lock-0000000030, lock-0000000031, lock-0000000032, lock-0000000033, lock-0000000034, lock-0000000035, lock-0000000036, lock-0000000037, lock-0000000038, lock-0000000039]
INFO - children:[lock-0000000030, lock-0000000031, lock-0000000032, lock-0000000033, lock-0000000034, lock-0000000035, lock-0000000036, lock-0000000037, lock-0000000038, lock-0000000039]
INFO - get the lock, currentLockPath:/lock/lock-0000000030
INFO - end for waitForLock , hasLock = true
INFO - Thread-6: 正在处理逻辑......
INFO - children:[lock-0000000030, lock-0000000031, lock-0000000032, lock-0000000033, lock-0000000034, lock-0000000035, lock-0000000036, lock-0000000037, lock-0000000038, lock-0000000039]
INFO - children:[lock-0000000030, lock-0000000031, lock-0000000032, lock-0000000033, lock-0000000034, lock-0000000035, lock-0000000036, lock-0000000037, lock-0000000038, lock-0000000039]
INFO - children:[lock-0000000030, lock-0000000031, lock-0000000032, lock-0000000033, lock-0000000034, lock-0000000035, lock-0000000036, lock-0000000037, lock-0000000038, lock-0000000039]
INFO - children:[lock-0000000030, lock-0000000031, lock-0000000032, lock-0000000033, lock-0000000034, lock-0000000035, lock-0000000036, lock-0000000037, lock-0000000038, lock-0000000039]
INFO - children:[lock-0000000030, lock-0000000031, lock-0000000032, lock-0000000033, lock-0000000034, lock-0000000035, lock-0000000036, lock-0000000037, lock-0000000038, lock-0000000039]
INFO - children:[lock-0000000030, lock-0000000031, lock-0000000032, lock-0000000033, lock-0000000034, lock-0000000035, lock-0000000036, lock-0000000037, lock-0000000038, lock-0000000039]
INFO - children:[lock-0000000030, lock-0000000031, lock-0000000032, lock-0000000033, lock-0000000034, lock-0000000035, lock-0000000036, lock-0000000037, lock-0000000038, lock-0000000039]
INFO - children:[lock-0000000030, lock-0000000031, lock-0000000032, lock-0000000033, lock-0000000034, lock-0000000035, lock-0000000036, lock-0000000037, lock-0000000038, lock-0000000039]
INFO - children:[lock-0000000031, lock-0000000032, lock-0000000033, lock-0000000034, lock-0000000035, lock-0000000036, lock-0000000037, lock-0000000038, lock-0000000039]
INFO - get the lock, currentLockPath:/lock/lock-0000000031
INFO - end for waitForLock , hasLock = true
INFO - Thread-0: 正在处理逻辑......
INFO - children:[lock-0000000032, lock-0000000033, lock-0000000034, lock-0000000035, lock-0000000036, lock-0000000037, lock-0000000038, lock-0000000039]
INFO - get the lock, currentLockPath:/lock/lock-0000000032
INFO - end for waitForLock , hasLock = true
INFO - Thread-8: 正在处理逻辑......
INFO - children:[lock-0000000033, lock-0000000034, lock-0000000035, lock-0000000036, lock-0000000037, lock-0000000038, lock-0000000039]
INFO - get the lock, currentLockPath:/lock/lock-0000000033
INFO - end for waitForLock , hasLock = true
INFO - Thread-4: 正在处理逻辑......
INFO - children:[lock-0000000034, lock-0000000035, lock-0000000036, lock-0000000037, lock-0000000038, lock-0000000039]
INFO - get the lock, currentLockPath:/lock/lock-0000000034
INFO - end for waitForLock , hasLock = true
INFO - Thread-7: 正在处理逻辑......
INFO - children:[lock-0000000035, lock-0000000036, lock-0000000037, lock-0000000038, lock-0000000039]
INFO - get the lock, currentLockPath:/lock/lock-0000000035
INFO - end for waitForLock , hasLock = true
INFO - Thread-3: 正在处理逻辑......
INFO - children:[lock-0000000036, lock-0000000037, lock-0000000038, lock-0000000039]
INFO - get the lock, currentLockPath:/lock/lock-0000000036
INFO - end for waitForLock , hasLock = true
INFO - Thread-1: 正在处理逻辑......
INFO - children:[lock-0000000037, lock-0000000038, lock-0000000039]
INFO - get the lock, currentLockPath:/lock/lock-0000000037
INFO - end for waitForLock , hasLock = true
INFO - Thread-2: 正在处理逻辑......
INFO - children:[lock-0000000038, lock-0000000039]
INFO - get the lock, currentLockPath:/lock/lock-0000000038
INFO - end for waitForLock , hasLock = true
INFO - Thread-5: 正在处理逻辑......
INFO - children:[lock-0000000039]
INFO - get the lock, currentLockPath:/lock/lock-0000000039
INFO - end for waitForLock , hasLock = true
INFO - Thread-9: 正在处理逻辑......