Java curator操作zookeeper获取kafka

Curator是Netflix公司开源的一个Zookeeper客户端,与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量。

Curator的Maven依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.3.0</version>
</dependency>

本地启动kafka

启动zookeeper

1
./bin/zookeeper-server-start.sh config/zookeeper.properties

启动kafka

1
./bin/kafka-server-start.sh config/server.properties

启动kafkaManager

1
sudo ./bin/kafka-manager

代码

项目结构

先看测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.xinxiucan.test;
import java.util.List;
import com.xinxiucan.api.BrokerApi;
import com.xinxiucan.api.TopicApi;
import com.xinxiucan.zookeeper.ZKBean;
public class AppTest {
public static void main(String[] args) {
ZKBean zk = new ZKBean();
zk.setZkURL("127.0.0.1:2181");
List<String> ids = BrokerApi.findBrokerIds(zk);
for (String id : ids) {
String idInfo = BrokerApi.findBrokerById(zk, id);
System.out.println(idInfo);
}
List<String> topicNames = TopicApi.findAllTopicName(zk);
for (String topicName : topicNames) {
System.out.println("topicName:" + topicName);
}
List<String> topics = TopicApi.findAllTopics(zk);
for (String topic : topics) {
System.out.println("topic:" + topic);
}
}
}

运行结果

1
2
3
4
5
{"jmx_port":-1,"timestamp":"1501636761404","endpoints":["PLAINTEXT://can:9092"],"host":"can","version":3,"port":9092}
topicName:test_xin_1
topicName:xin
topic:{"version":1,"partitions":{"0":[0]}}
topic:{"version":1,"partitions":{"0":[0]}}

剩余代码

ZKBean

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package com.xinxiucan.zookeeper;
import java.io.Serializable;
public class ZKBean implements Serializable {
private static final long serialVersionUID = -6057956208558425192L;
private int id = -1;
private String name;
private String zkURL;
private String version = "0.8.2.2";
private boolean jmxEnable;
private String jmxAuthUsername;
private String jmxAuthPassword;
private boolean jmxWithSsl;
private int zkConnectionTimeout = 30;
private int maxRetryCount = 3;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getZkURL() {
return zkURL;
}
public void setZkURL(String zkURL) {
this.zkURL = zkURL;
}
public boolean isJmxEnable() {
return jmxEnable;
}
public void setJmxEnable(boolean jmxEnable) {
this.jmxEnable = jmxEnable;
}
public String getJmxAuthUsername() {
return jmxAuthUsername;
}
public void setJmxAuthUsername(String jmxAuthUsername) {
this.jmxAuthUsername = jmxAuthUsername;
}
public String getJmxAuthPassword() {
return jmxAuthPassword;
}
public void setJmxAuthPassword(String jmxAuthPassword) {
this.jmxAuthPassword = jmxAuthPassword;
}
public boolean isJmxWithSsl() {
return jmxWithSsl;
}
public void setJmxWithSsl(boolean jmxWithSsl) {
this.jmxWithSsl = jmxWithSsl;
}
public int getZkConnectionTimeout() {
return zkConnectionTimeout;
}
public void setZkConnectionTimeout(int zkConnectionTimeout) {
this.zkConnectionTimeout = zkConnectionTimeout;
}
public int getMaxRetryCount() {
return maxRetryCount;
}
public void setMaxRetryCount(int maxRetryCount) {
this.maxRetryCount = maxRetryCount;
}
public String getVersion() {
return version;
}
public void setVersion(String version) {
this.version = version;
}
@Override
public int hashCode() {
return this.id;
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (this == obj) {
return true;
}
if (obj instanceof ZKBean) {
ZKBean anotherZKCluster = (ZKBean) obj;
return this.id == anotherZKCluster.getId();
} else {
return false;
}
}
}

ZKConnectionFactory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.xinxiucan.zookeeper;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class ZKConnectionFactory {
public static CuratorFramework buildCuratorFramework(ZKBean zk) {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(zk.getZkConnectionTimeout() * 1000, zk.getMaxRetryCount());
CuratorFramework client = CuratorFrameworkFactory.newClient(zk.getZkURL(), retryPolicy);
client.start();
return client;
}
}

BrokerApi

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package com.xinxiucan.api;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.curator.framework.CuratorFramework;
import com.xinxiucan.KafkaZKPath;
import com.xinxiucan.zookeeper.ZKBean;
import com.xinxiucan.zookeeper.ZKConnectionFactory;
/**
* 获取broker信息
* @author xinxiucan
*/
public class BrokerApi {
/**
* 根据zk获取Broker ID列表
* @param zkBean
* @return
*/
public static List<String> findBrokerIds(ZKBean zkBean) {
CuratorFramework client = null;
try {
client = ZKConnectionFactory.buildCuratorFramework(zkBean);
List<String> brokerIdStrs = client.getChildren().forPath(KafkaZKPath.BROKER_IDS_PATH);
return brokerIdStrs;
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
if (client != null) {
client.close();
}
}
}
/**
* 根据zk&brokerId获取Broker信息
* @param zkBean
* @return
*/
public static String findBrokerById(ZKBean zkBean, String id) {
CuratorFramework client = null;
try {
client = ZKConnectionFactory.buildCuratorFramework(zkBean);
String brokerIdPath = KafkaZKPath.getBrokerSummeryPath(id);
String brokerInfo = new String(client.getData().forPath(brokerIdPath), "UTF-8");
return brokerInfo;
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
if (client != null) {
client.close();
}
}
}
public static Map<String, String> findAllBrokerSummarys(ZKBean zkBean) {
CuratorFramework client = null;
try {
client = ZKConnectionFactory.buildCuratorFramework(zkBean);
List<String> brokerIds = client.getChildren().forPath(KafkaZKPath.BROKER_IDS_PATH);
if (brokerIds == null || brokerIds.size() == 0) {
return null;
}
Map<String, String> brokerMap = new HashMap<String, String>();
for (String brokerId : brokerIds) {
String brokerIdPath = KafkaZKPath.getBrokerSummeryPath(brokerId);
String brokerInfo = new String(client.getData().forPath(brokerIdPath), "UTF-8");
brokerMap.put(brokerId, brokerInfo);
}
return brokerMap;
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
if (client != null) {
client.close();
}
}
}
}

TopicApi

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package com.xinxiucan.api;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.data.Stat;
import com.xinxiucan.KafkaZKPath;
import com.xinxiucan.ReplicasAssignmentBuilder;
import com.xinxiucan.util.Object2Array;
import com.xinxiucan.zookeeper.ZKBean;
import com.xinxiucan.zookeeper.ZKConnectionFactory;
/**
* 获取Topic信息
* @author can
*/
public class TopicApi {
/**
* 创建Topic
* @param zkBean
* @param topic
* @param partitionsCount
* @param replicationFactorCount
* @param topicConfig
*/
public static void createTopic(ZKBean zkBean, String topic, int partitionsCount, int replicationFactorCount,
Map<String, String> topicConfig) {
CuratorFramework client = null;
try {
client = ZKConnectionFactory.buildCuratorFramework(zkBean);
List<String> brokerIds = client.getChildren().forPath(KafkaZKPath.BROKER_IDS_PATH);
List<Integer> ids = new ArrayList<>();
for (String str : brokerIds) {
int i = Integer.parseInt(str);
ids.add(i);
}
Map<String, List<Integer>> replicasAssignment = ReplicasAssignmentBuilder.assignReplicasToBrokers(ids, partitionsCount,
replicationFactorCount);
Map<String, Object> topicSummaryMap = new HashMap<String, Object>();
topicSummaryMap.put("version", 1);
topicSummaryMap.put("partitions", replicasAssignment);
byte[] topicSummaryData = Object2Array.objectToByteArray(topicSummaryMap);
String topicSummaryPath = KafkaZKPath.getTopicSummaryPath(topic);
Stat stat = client.checkExists().forPath(topicSummaryPath);
if (stat == null) {
// create
client.create().creatingParentsIfNeeded().forPath(topicSummaryPath, topicSummaryData);
} else {
// update
client.setData().forPath(topicSummaryPath, topicSummaryData);
}
Map<String, Object> topicConfigMap = new HashMap<String, Object>();
topicConfigMap.put("version", 1);
topicConfigMap.put("config", topicConfig);
byte[] topicConfigData = Object2Array.objectToByteArray(topicConfigMap);
String topicConfigPath = KafkaZKPath.getTopicConfigPath(topic);
Stat stat2 = client.checkExists().forPath(topicConfigPath);
if (stat2 == null) {
// create
client.create().creatingParentsIfNeeded().forPath(topicConfigPath, topicConfigData);
} else {
// update
client.setData().forPath(topicConfigPath, topicConfigData);
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
if (client != null) {
client.close();
}
}
}
/**
* 删除Topic
* @param zkBean
* @param topicName
*/
public static void deleteTopic(ZKBean zkBean, String topicName) {
CuratorFramework client = null;
try {
client = ZKConnectionFactory.buildCuratorFramework(zkBean);
String adminDeleteTopicPath = KafkaZKPath.getDeleteTopicPath(topicName);
client.create().creatingParentsIfNeeded().forPath(adminDeleteTopicPath, null);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
if (client != null) {
client.close();
}
}
}
/**
* 查询topic
* @param zkBean
* @return
*/
public static List<String> findAllTopics(ZKBean zkBean) {
CuratorFramework client = null;
try {
client = ZKConnectionFactory.buildCuratorFramework(zkBean);
List<String> topicNames = client.getChildren().forPath(KafkaZKPath.BROKER_TOPICS_PATH);
List<String> topicSummarys = new ArrayList<String>();
for (String topicName : topicNames) {
byte[] topicData = client.getData().forPath(KafkaZKPath.getTopicSummaryPath(topicName));
topicSummarys.add(new String(topicData, "UTF-8"));
}
// add delete flag
List<String> deleteTopics = client.getChildren().forPath(KafkaZKPath.ADMIN_DELETE_TOPICS_PATH);
return topicSummarys;
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
if (client != null) {
client.close();
}
}
}
public static List<String> findAllTopicName(ZKBean zkBean) {
CuratorFramework client = null;
try {
client = ZKConnectionFactory.buildCuratorFramework(zkBean);
return client.getChildren().forPath(KafkaZKPath.BROKER_TOPICS_PATH);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
if (client != null) {
client.close();
}
}
}
}

KafkaZKPath

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package com.xinxiucan;
public class KafkaZKPath {
public static final String COMSUMERS_PATH = "/consumers";
public static final String BROKER_IDS_PATH = "/brokers/ids";
public static final String BROKER_TOPICS_PATH = "/brokers/topics";
public static final String CONFIG_TOPICS_PATH = "/config/topics";
public static final String CONFIG_CHANGES_PATH = "/config/changes";
public static final String CONTROLLER_PATH = "/controller";
public static final String CONTROLLER_EPOCH_PATH = "/controller_epoch";
public static final String ADMIN_PATH = "/admin";
public static final String ADMIN_REASSGIN_PARTITIONS_PATH = "/admin/reassign_partitions";
public static final String ADMIN_DELETE_TOPICS_PATH = "/admin/delete_topics";
public static final String ADMIN_PREFERED_REPLICA_ELECTION_PATH = "/admin/preferred_replica_election";
public static String getTopicSummaryPath(String topic) {
return BROKER_TOPICS_PATH + "/" + topic;
}
public static String getTopicPartitionsStatePath(String topic,String partitionId) {
return getTopicSummaryPath(topic) + "/partitions/" + partitionId+ "/state";
}
public static String getTopicConfigPath(String topic) {
return CONFIG_TOPICS_PATH + "/" + topic;
}
public static String getDeleteTopicPath(String topic) {
return ADMIN_DELETE_TOPICS_PATH + "/" + topic;
}
public static String getBrokerSummeryPath(String id) {
return BROKER_IDS_PATH + "/" + id;
}
public static String getConsumerOffsetPath(String groupId) {
return COMSUMERS_PATH + "/" + groupId +"/offsets";
}
public static String getTopicConsumerOffsetPath(String groupId,String topic){
return COMSUMERS_PATH + "/" + groupId +"/offsets/"+topic;
}
public static String getTopicPartitionConsumerOffsetPath(String groupId, String topic,String partitionId){
return COMSUMERS_PATH + "/" + groupId +"/offsets/"+topic + "/" + partitionId;
}
public static String getTopicPartitionConsumerOwnerPath(String groupId, String topic, String partitionId) {
return COMSUMERS_PATH + "/" + groupId +"/owners/"+topic + "/" + partitionId;
}
}

ReplicasAssignmentBuilder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package com.xinxiucan;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
public class ReplicasAssignmentBuilder {
public static Map<String, List<Integer>> assignReplicasToBrokers(List<Integer> brokerListSet, int nPartitions, int nReplicationFactor) {
return assignReplicasToBrokers(brokerListSet, nPartitions, nReplicationFactor, -1, -1);
}
/**
* There are 2 goals of replica assignment:
* 1. Spread the replicas evenly among brokers.
* 2. For partitions assigned to a particular broker, their other replicas are spread over the other brokers.
*
* To achieve this goal, we:
* 1. Assign the first replica of each partition by round-robin, starting from a random position in the broker list.
* 2. Assign the remaining replicas of each partition with an increasing shift.
*
* Here is an example of assigning
* broker-0 broker-1 broker-2 broker-3 broker-4
* p0 p1 p2 p3 p4 (1st replica)
* p5 p6 p7 p8 p9 (1st replica)
* p4 p0 p1 p2 p3 (2nd replica)
* p8 p9 p5 p6 p7 (2nd replica)
* p3 p4 p0 p1 p2 (3nd replica)
* p7 p8 p9 p5 p6 (3nd replica)
*/
public static Map<String, List<Integer>> assignReplicasToBrokers(List<Integer> brokerListSet, int nPartitions, int nReplicationFactor,
int fixedStartIndex, int startPartitionId) {
// sort
brokerListSet.sort(new Comparator<Integer>() {
@Override
public int compare(Integer o1, Integer o2) {
return o1 - o2;
}
});
if (nPartitions <= 0) {
throw new RuntimeException("num of partition cannot less than 1");
}
if (nReplicationFactor <= 0) {
throw new RuntimeException("num of replication factor cannot less than 1");
}
if (nReplicationFactor > brokerListSet.size()) {
throw new RuntimeException("broker num is less than replication factor num");
}
Random random = new Random();
Map<String, List<Integer>> result = new HashMap<String, List<Integer>>();
int startIndex = fixedStartIndex >= 0 ? fixedStartIndex : random.nextInt(brokerListSet.size());
int currentPartitionId = startPartitionId >= 0 ? startPartitionId : 0;
int nextReplicaShift = fixedStartIndex >= 0 ? fixedStartIndex : random.nextInt(brokerListSet.size());
for (int i = 0; i < nPartitions; i++) {
if (currentPartitionId > 0 && ((currentPartitionId % brokerListSet.size()) == 0)) {
nextReplicaShift++;
}
int firstReplicaIndex = (currentPartitionId + startIndex) % brokerListSet.size();
List<Integer> replicaList = new ArrayList<Integer>();
replicaList.add(brokerListSet.get(firstReplicaIndex));
for (int j = 0; j < (nReplicationFactor - 1); j++) {
replicaList.add(brokerListSet.get(getReplicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerListSet.size())));
}
result.put("" + currentPartitionId, getReverseList(replicaList));
currentPartitionId++;
}
return result;
}
private static List<Integer> getReverseList(List<Integer> list) {
List<Integer> result = new ArrayList<Integer>();
for (int i = list.size() - 1; i >= 0; i--) {
result.add(list.get(i));
}
return result;
}
private static int getReplicaIndex(int firstReplicaIndex, int nextReplicaShift, int replicaIndex, int nBrokers) {
int shift = 1 + (nextReplicaShift + replicaIndex) % (nBrokers - 1);
int result = (firstReplicaIndex + shift) % nBrokers;
// System.out.println(firstReplicaIndex+"|"+nextReplicaShift+"|"+replicaIndex+"="+result);
return result;
}
private static void printAssignReplicas(Map<String, List<Integer>> map, List<Integer> brokerListSet) {
Map<Integer, List<String>> brokerMap = new HashMap<Integer, List<String>>();
for (int i = 0; i < brokerListSet.size(); i++) {
List<String> partitions = new ArrayList<String>();
for (int j = 0; j < map.size(); j++) {
List<Integer> brokerIds = map.get(j + "");
if (brokerIds.contains(i)) {
partitions.add("" + j);
}
}
brokerMap.put(i, partitions);
}
}
}