ZooKeeper 本身是用 Java 编写的,但它提供了非常健壮的 C 语言客户端库,名为 libzookeeper,这个库允许 C 程序员创建、管理 ZooKeeper 中的数据节点(znodes),并监听数据变化。

这篇指南将涵盖以下几个部分:
- 准备工作:如何安装和配置 libzookeeper。
- 核心概念:理解 C API 的关键数据结构和回调函数。
- 基本操作示例:连接、创建、获取、更新和删除 znodes。
- 高级特性示例:实现 Watcher(监听器)来响应数据变化。
- 完整代码示例:一个可运行的 C 程序,演示所有基本操作。
- 注意事项与最佳实践。
准备工作:安装 libzookeeper
在使用 C 语言之前,你需要在你的系统上安装 libzookeeper 开发包。
在 Ubuntu/Debian 系统上:
sudo apt-get update sudo apt-get install zookeeperd zookeeper-clients libzookeeper-dev libzookeeper-mt-dev
zookeeperd: 启动 ZooKeeper 服务器。zookeeper-clients: 提供zkCli.sh等命令行工具。libzookeeper-dev: 这是最重要的,它包含了头文件(.h)和链接库(.a或.so)。libzookeeper-mt-dev: 多线程版本的库,如果你需要在多线程程序中使用,请安装这个。
在 CentOS/RHEL 系统上:
sudo yum install zookeeper-server zookeeper-c-client-devel
编译时链接:
在编译你的 C 程序时,你需要链接 zookeeper_mt (多线程) 或 zookeeper (单线程) 库。
gcc -o my_zk_app my_zk_app.c -lzookeeper_mt
-lzookeeper_mt: 告诉链接器链接多线程版本的 ZooKeeper 库。-o my_zk_app: 指定输出的可执行文件名。
核心概念
libzookeeper 的 API 是基于句柄和回调的异步模型,这意味着很多操作不会立即返回结果,而是会稍后通过一个你提供的回调函数来通知你。

a. zhandle_t - 句柄
zhandle_t 是与 ZooKeeper 服务器通信的核心,它包含了连接信息、会话状态、Watcher 回调函数等,几乎所有 libzookeeper 的函数都需要这个句柄作为第一个参数,你需要使用 zh_init 或 zk_init 来创建它,并在程序结束时用 zh_close 关闭它。
b. watcherCtx - 上下文
这是一个 void* 类型的指针,当你注册一个 Watcher 时,你可以传入一个自定义的上下文指针,当 Watcher 被触发时,这个指针会一同传给你的回调函数,这非常有用,因为它可以让你在回调函数中访问到你的程序中的特定数据结构(比如一个结构体,里面包含句柄、路径等)。
c. watcher_fn - Watcher 回调函数
这是一个函数指针,定义如下:
void watcher_fn(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx);
zh: ZooKeeper 句柄。type: 事件类型,ZOO_CREATED_EVENT,ZOO_DELETED_EVENT,ZOO_CHANGED_EVENT,ZOO_CHILD_EVENT。state: 连接状态,ZOO_CONNECTED_STATE,ZOO_EXPIRED_SESSION_STATE,ZOO_CONNECTING_STATE。path: 发生事件的 znode 路径。watcherCtx: 你当初传入的上下文指针。
d. 异步 vs. 同步
libzookeeper 同时提供了同步和异步 API。

- 同步 API: 函数名以
zk_开头(如zk_get,zk_create),它们会阻塞,直到服务器返回结果或超时,对于简单的脚本或一次性操作,同步 API 更容易使用。 - 异步 API: 函数名以
zk_a_开头(如zk_a_get,zk_a_create),它们立即返回,你需要提供回调函数来处理最终结果,对于高性能、高并发的服务器程序,异步 API 是首选,因为它不会阻塞线程。
对于初学者,我们强烈建议从同步 API 开始。
基本操作示例 (同步 API)
假设你已经启动了一个 ZooKeeper 服务器,默认运行在 0.0.1:2181。
a. 连接到 ZooKeeper
#include <zookeeper/zookeeper.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
int main() {
// 连接字符串,可以包含多个服务器地址,用逗号分隔
const char *host = "127.0.0.1:2181";
// 会话超时时间(毫秒)
int timeout = 30000;
// 创建 ZooKeeper 句柄
// 参数: host, timeout, 默认的 watcher 回调, 默认的 watcherCtx, 是否启用日志
zhandle_t *zh = zookeeper_init(host, NULL, timeout, NULL, NULL, 0);
if (!zh) {
fprintf(stderr, "Error connecting to ZooKeeper server!\n");
exit(EXIT_FAILURE);
}
printf("Successfully connected to ZooKeeper.\n");
// ... 在这里执行其他操作 ...
// 关闭连接
zookeeper_close(zh);
return 0;
}
b. 创建 Znode
// ... 在 main 函数中 ...
const char *path = "/my_c_app/node1";
const char *data = "hello from c";
int data_len = strlen(data);
// ZOO_EPHEMERAL: 会话结束时,znode 会被自动删除
// ZOO_SEQUENCE: 如果路径名以结尾,会自动在后面追加一个单调递增的序号
int flags = ZOO_EPHEMERAL | ZOO_SEQUENCE;
char realpath_buffer[1024];
int rc = zk_create(zh, path, (const_byte*)data, data_len, &ZOO_OPEN_ACL_UNSAFE, flags, realpath_buffer, sizeof(realpath_buffer));
if (rc != ZOK) {
fprintf(stderr, "Error %d creating znode %s\n", rc, path);
} else {
printf("Created znode at %s with data: %s\n", realpath_buffer, data);
}
c. 获取 Znode 数据和设置 Watcher
同步 zk_get 的一个特点是,你可以同时设置一个 Watcher,这个 Watcher 只对下一次特定的操作有效。
// ... 在 main 函数中 ...
// 定义一个简单的 Watcher 回调函数
void my_watcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx) {
printf("\n--- Watcher Triggered! ---\n");
printf("Path: %s\n", path);
printf("Event Type: %d\n", type);
printf("Connection State: %d\n", state);
printf("-------------------------\n\n");
}
const char *get_path = "/my_c_app/node1";
char buffer[1024];
int buffer_len = sizeof(buffer);
struct Stat stat; // 用于存储 znode 的元数据信息(如版本号、创建时间等)
// 注意:第四个参数是 watcher 回调函数
int rc = zk_get(zh, get_path, &my_watcher, (void*)&zh, (byte*)buffer, &buffer_len, &stat);
if (rc != ZOK) {
fprintf(stderr, "Error %d getting data from %s\n", rc, get_path);
} else {
printf("Got data from %s: %s (version: %d)\n", get_path, buffer, stat.version);
}
printf("Waiting for 30 seconds to see if the watcher fires...\n");
sleep(30); // 等待 30 秒,以便手动在 zkCli.sh 中修改这个节点的数据
d. 更新 Znode 数据
更新数据需要提供该节点的 version 号,以确保你是在更新一个未被别人修改过的版本,我们可以从 Stat 结构体中获取它。
// ... 在获取数据之后 ...
const char *new_data = "updated data from c";
int new_data_len = strlen(new_data);
// 使用获取数据时得到的版本号
int version = stat.version;
// zk_set 会返回新的版本号
rc = zk_set(zh, get_path, (const_byte*)new_data, new_data_len, version, &stat);
if (rc != ZOK) {
fprintf(stderr, "Error %d setting data for %s\n", rc, get_path);
} else {
printf("Successfully updated data for %s. New version: %d\n", get_path, stat.version);
}
e. 删除 Znode
删除同样需要提供正确的 version 号。
// ... 在更新数据之后 ...
// 使用更新后的版本号
version = stat.version;
rc = zk_delete(zh, get_path, version);
if (rc != ZOK) {
fprintf(stderr, "Error %d deleting znode %s\n", rc, get_path);
} else {
printf("Successfully deleted znode %s\n", get_path);
}
完整代码示例
下面是一个完整的 C 程序,它按顺序执行了连接、创建、获取、更新和删除操作,并演示了 Watcher。
// zk_example.c
#include <zookeeper/zookeeper.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
// 全局变量,用于在回调中访问句柄
zhandle_t *g_zh = NULL;
// Watcher 回调函数
void connection_watcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx) {
// 当连接状态发生变化时被调用
if (type == ZOO_SESSION_EVENT) {
if (state == ZOO_CONNECTED_STATE) {
printf("Connection established.\n");
} else if (state == ZOO_EXPIRED_SESSION_STATE) {
printf("Session expired.\n");
} else if (state == ZOO_CONNECTING_STATE) {
printf("Connecting to ZooKeeper...\n");
}
}
}
// 数据变更的 Watcher 回调
void data_watcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx) {
printf("\n--- Data Watcher Triggered on %s ---\n", path);
printf("Event Type: %s\n", (type == ZOO_CHANGED_EVENT) ? "ZOO_CHANGED_EVENT" : "Other");
printf("Connection State: %d\n", state);
printf("-----------------------------------\n\n");
}
int main() {
const char *host = "127.0.0.1:2181";
int timeout = 30000;
const char *node_path = "/my_c_app/example_node";
const char *initial_data = "initial data";
const char *updated_data = "this is the updated data";
// 1. 初始化连接
printf("Connecting to ZooKeeper at %s...\n", host);
g_zh = zookeeper_init(host, connection_watcher, timeout, NULL, NULL, 0);
if (!g_zh) {
fprintf(stderr, "Error %d: %s\n", errno, strerror(errno));
exit(EXIT_FAILURE);
}
// 等待连接建立
zookeeper_exists(g_zh, node_path, 0, NULL); // 一个简单的阻塞方法
sleep(1);
// 2. 创建 Znode (如果不存在)
printf("Attempting to create znode %s...\n", node_path);
char real_path[1024];
int rc = zk_create(g_zh, node_path, (const_byte*)initial_data, strlen(initial_data), &ZOO_OPEN_ACL_UNSAFE, 0, real_path, sizeof(real_path));
if (rc == ZNODEEXISTS) {
printf("Znode %s already exists. Skipping creation.\n", node_path);
} else if (rc != ZOK) {
fprintf(stderr, "Error %d creating znode %s\n", rc, node_path);
zookeeper_close(g_zh);
exit(EXIT_FAILURE);
} else {
printf("Created znode at %s with data: %s\n", real_path, initial_data);
}
// 3. 获取数据和设置 Watcher
printf("\nGetting data and setting a watcher...\n");
char buffer[1024];
int buffer_len = sizeof(buffer);
struct Stat stat;
rc = zk_get(g_zh, node_path, &data_watcher, NULL, (byte*)buffer, &buffer_len, &stat);
if (rc != ZOK) {
fprintf(stderr, "Error %d getting data from %s\n", rc, node_path);
} else {
printf("Got data: %s (version: %d)\n", buffer, stat.version);
}
// 4. 更新数据
printf("\nUpdating data...\n");
rc = zk_set(g_zh, node_path, (const_byte*)updated_data, strlen(updated_data), stat.version, &stat);
if (rc != ZOK) {
fprintf(stderr, "Error %d updating data for %s (version mismatch?)\n", rc, node_path);
} else {
printf("Successfully updated data to: %s (new version: %d)\n", updated_data, stat.version);
}
// 5. 再次获取数据,验证 Watcher 是否触发
printf("\nGetting data again to verify watcher...\n");
// 注意:这个 get 也会设置一个 watcher,但我们已经在上一步设置了
// ZooKeeper 的 watcher 是一次性的,所以这个新的 watcher 只对下一次变更有效
rc = zk_get(g_zh, node_path, &data_watcher, NULL, (byte*)buffer, &buffer_len, &stat);
if (rc == ZOK) {
printf("Got data: %s (version: %d)\n", buffer, stat.version);
}
printf("\nNow, go to your zkCli.sh and run:\n");
printf(" set %s \"manual change\"\n", node_path);
printf("Then wait here for 10 seconds...\n");
sleep(10);
// 6. 删除 Znode
printf("\nDeleting znode %s...\n", node_path);
rc = zk_delete(g_zh, node_path, stat.version);
if (rc != ZOK) {
fprintf(stderr, "Error %d deleting znode %s\n", rc, node_path);
} else {
printf("Successfully deleted znode %s\n", node_path);
}
// 7. 关闭连接
printf("\nClosing ZooKeeper connection.\n");
zookeeper_close(g_zh);
return 0;
}
如何编译和运行:
- 确保你的 ZooKeeper 服务器正在运行。
- 将上述代码保存为
zk_example.c。 - 编译:
gcc -o zk_example zk_example.c -lzookeeper_mt
- 运行:
./zk_example
- 在程序等待时,打开另一个终端,连接到 ZooKeeper 并手动修改数据:
zkCli.sh # 在 zkCli.sh 中 set /my_c_app/example_node "manual change" quit
你会看到第一个终端的 Watcher 被触发。
注意事项与最佳实践
- 错误处理:始终检查 libzookeeper 函数的返回值。
ZOK(值为 0) 表示成功,任何其他值都是一个错误码,可以使用zerror(rc)函数将错误码转换为可读的字符串。 - 会话管理:
connection_watcher至关重要,你必须监听ZOO_EXPIRED_SESSION_STATE事件,如果会话过期,你的zhandle_t句柄就失效了,你需要重新调用zookeeper_init来建立新会话。 - Watcher 的一次性:ZooKeeper 的 Watcher 是一次性的,一旦被触发,它就会被自动移除,如果你需要持续监听,必须在 Watcher 回调函数中重新注册 Watcher。
- 多线程安全:
zhandle_t不是线程安全的,一个句柄不能被多个线程同时使用。- 你可以为每个线程创建一个独立的
zhandle_t,或者使用一个专门的 I/O 线程来处理所有的 ZooKeeper 通信,然后通过队列将任务和结果传递给其他线程。 libzookeeper-mt库内部使用线程来处理网络 I/O,这使得你可以在一个线程中安全地使用zookeeper_init,然后从其他线程调用 API(只要你确保不会同时调用即可)。
- ACL (访问控制列表):示例中使用了
ZOO_OPEN_ACL_UNSAFE,这意味着任何连接到 ZooKeeper 的客户端都可以对这个 znode 进行任何操作,在生产环境中,你应该使用更严格的 ACL 来保护敏感数据。 - 异步 API:对于需要高性能的应用,学习使用异步 API (
zk_a_get,zk_a_set等) 是必要的,这需要你理解如何组织回调函数,逻辑会更复杂,但能避免阻塞,提高并发性能。
