Java curator操作zookeeper获取kafka

Curator是Netflix公司开源的一个Zookeeper客户端,与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量。

Curator的Maven依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.3.0</version>
</dependency>

本地启动kafka

启动zookeeper

1
./bin/zookeeper-server-start.sh config/zookeeper.properties

启动kafka

1
./bin/kafka-server-start.sh config/server.properties

启动kafkaManager

1
sudo ./bin/kafka-manager

代码

项目结构

先看测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.xinxiucan.test;
import java.util.List;
import com.xinxiucan.api.BrokerApi;
import com.xinxiucan.api.TopicApi;
import com.xinxiucan.zookeeper.ZKBean;
public class AppTest {
public static void main(String[] args) {
ZKBean zk = new ZKBean();
zk.setZkURL("127.0.0.1:2181");
List<String> ids = BrokerApi.findBrokerIds(zk);
for (String id : ids) {
String idInfo = BrokerApi.findBrokerById(zk, id);
System.out.println(idInfo);
}
List<String> topicNames = TopicApi.findAllTopicName(zk);
for (String topicName : topicNames) {
System.out.println("topicName:" + topicName);
}
List<String> topics = TopicApi.findAllTopics(zk);
for (String topic : topics) {
System.out.println("topic:" + topic);
}
}
}

运行结果

1
2
3
4
5
{"jmx_port":-1,"timestamp":"1501636761404","endpoints":["PLAINTEXT://can:9092"],"host":"can","version":3,"port":9092}
topicName:test_xin_1
topicName:xin
topic:{"version":1,"partitions":{"0":[0]}}
topic:{"version":1,"partitions":{"0":[0]}}

剩余代码

ZKBean

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package com.xinxiucan.zookeeper;
import java.io.Serializable;
public class ZKBean implements Serializable {
private static final long serialVersionUID = -6057956208558425192L;
private int id = -1;
private String name;
private String zkURL;
private String version = "0.8.2.2";
private boolean jmxEnable;
private String jmxAuthUsername;
private String jmxAuthPassword;
private boolean jmxWithSsl;
private int zkConnectionTimeout = 30;
private int maxRetryCount = 3;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getZkURL() {
return zkURL;
}
public void setZkURL(String zkURL) {
this.zkURL = zkURL;
}
public boolean isJmxEnable() {
return jmxEnable;
}
public void setJmxEnable(boolean jmxEnable) {
this.jmxEnable = jmxEnable;
}
public String getJmxAuthUsername() {
return jmxAuthUsername;
}
public void setJmxAuthUsername(String jmxAuthUsername) {
this.jmxAuthUsername = jmxAuthUsername;
}
public String getJmxAuthPassword() {
return jmxAuthPassword;
}
public void setJmxAuthPassword(String jmxAuthPassword) {
this.jmxAuthPassword = jmxAuthPassword;
}
public boolean isJmxWithSsl() {
return jmxWithSsl;
}
public void setJmxWithSsl(boolean jmxWithSsl) {
this.jmxWithSsl = jmxWithSsl;
}
public int getZkConnectionTimeout() {
return zkConnectionTimeout;
}
public void setZkConnectionTimeout(int zkConnectionTimeout) {
this.zkConnectionTimeout = zkConnectionTimeout;
}
public int getMaxRetryCount() {
return maxRetryCount;
}
public void setMaxRetryCount(int maxRetryCount) {
this.maxRetryCount = maxRetryCount;
}
public String getVersion() {
return version;
}
public void setVersion(String version) {
this.version = version;
}
@Override
public int hashCode() {
return this.id;
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (this == obj) {
return true;
}
if (obj instanceof ZKBean) {
ZKBean anotherZKCluster = (ZKBean) obj;
return this.id == anotherZKCluster.getId();
} else {
return false;
}
}
}

ZKConnectionFactory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.xinxiucan.zookeeper;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class ZKConnectionFactory {
public static CuratorFramework buildCuratorFramework(ZKBean zk) {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(zk.getZkConnectionTimeout() * 1000, zk.getMaxRetryCount());
CuratorFramework client = CuratorFrameworkFactory.newClient(zk.getZkURL(), retryPolicy);
client.start();
return client;
}
}

BrokerApi

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package com.xinxiucan.api;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.curator.framework.CuratorFramework;
import com.xinxiucan.KafkaZKPath;
import com.xinxiucan.zookeeper.ZKBean;
import com.xinxiucan.zookeeper.ZKConnectionFactory;
/**
* 获取broker信息
* @author xinxiucan
*/