ZooKeeper C语言开发如何高效实现分布式协调?

99ANYc3cd6
预计阅读时长 41 分钟
位置: 首页 C语言 正文

ZooKeeper 本身是用 Java 编写的,但它提供了非常健壮的 C 语言客户端库,名为 libzookeeper,这个库允许 C 程序员创建、管理 ZooKeeper 中的数据节点(znodes),并监听数据变化。

zookeeper c语言
(图片来源网络,侵删)

这篇指南将涵盖以下几个部分:

  1. 准备工作:如何安装和配置 libzookeeper。
  2. 核心概念:理解 C API 的关键数据结构和回调函数。
  3. 基本操作示例:连接、创建、获取、更新和删除 znodes。
  4. 高级特性示例:实现 Watcher(监听器)来响应数据变化。
  5. 完整代码示例:一个可运行的 C 程序,演示所有基本操作。
  6. 注意事项与最佳实践

准备工作:安装 libzookeeper

在使用 C 语言之前,你需要在你的系统上安装 libzookeeper 开发包。

在 Ubuntu/Debian 系统上:

sudo apt-get update
sudo apt-get install zookeeperd zookeeper-clients libzookeeper-dev libzookeeper-mt-dev
  • zookeeperd: 启动 ZooKeeper 服务器。
  • zookeeper-clients: 提供 zkCli.sh 等命令行工具。
  • libzookeeper-dev: 这是最重要的,它包含了头文件(.h)和链接库(.a.so)。
  • libzookeeper-mt-dev: 多线程版本的库,如果你需要在多线程程序中使用,请安装这个。

在 CentOS/RHEL 系统上:

sudo yum install zookeeper-server zookeeper-c-client-devel

编译时链接:

在编译你的 C 程序时,你需要链接 zookeeper_mt (多线程) 或 zookeeper (单线程) 库。

gcc -o my_zk_app my_zk_app.c -lzookeeper_mt
  • -lzookeeper_mt: 告诉链接器链接多线程版本的 ZooKeeper 库。
  • -o my_zk_app: 指定输出的可执行文件名。

核心概念

libzookeeper 的 API 是基于句柄和回调的异步模型,这意味着很多操作不会立即返回结果,而是会稍后通过一个你提供的回调函数来通知你。

zookeeper c语言
(图片来源网络,侵删)

a. zhandle_t - 句柄

zhandle_t 是与 ZooKeeper 服务器通信的核心,它包含了连接信息、会话状态、Watcher 回调函数等,几乎所有 libzookeeper 的函数都需要这个句柄作为第一个参数,你需要使用 zh_initzk_init 来创建它,并在程序结束时用 zh_close 关闭它。

b. watcherCtx - 上下文

这是一个 void* 类型的指针,当你注册一个 Watcher 时,你可以传入一个自定义的上下文指针,当 Watcher 被触发时,这个指针会一同传给你的回调函数,这非常有用,因为它可以让你在回调函数中访问到你的程序中的特定数据结构(比如一个结构体,里面包含句柄、路径等)。

c. watcher_fn - Watcher 回调函数

这是一个函数指针,定义如下:

void watcher_fn(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx);
  • zh: ZooKeeper 句柄。
  • type: 事件类型,ZOO_CREATED_EVENT, ZOO_DELETED_EVENT, ZOO_CHANGED_EVENT, ZOO_CHILD_EVENT
  • state: 连接状态,ZOO_CONNECTED_STATE, ZOO_EXPIRED_SESSION_STATE, ZOO_CONNECTING_STATE
  • path: 发生事件的 znode 路径。
  • watcherCtx: 你当初传入的上下文指针。

d. 异步 vs. 同步

libzookeeper 同时提供了同步和异步 API。

zookeeper c语言
(图片来源网络,侵删)
  • 同步 API: 函数名以 zk_ 开头(如 zk_get, zk_create),它们会阻塞,直到服务器返回结果或超时,对于简单的脚本或一次性操作,同步 API 更容易使用。
  • 异步 API: 函数名以 zk_a_ 开头(如 zk_a_get, zk_a_create),它们立即返回,你需要提供回调函数来处理最终结果,对于高性能、高并发的服务器程序,异步 API 是首选,因为它不会阻塞线程。

对于初学者,我们强烈建议从同步 API 开始


基本操作示例 (同步 API)

假设你已经启动了一个 ZooKeeper 服务器,默认运行在 0.0.1:2181

a. 连接到 ZooKeeper

#include <zookeeper/zookeeper.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
int main() {
    // 连接字符串,可以包含多个服务器地址,用逗号分隔
    const char *host = "127.0.0.1:2181";
    // 会话超时时间(毫秒)
    int timeout = 30000;
    // 创建 ZooKeeper 句柄
    // 参数: host, timeout, 默认的 watcher 回调, 默认的 watcherCtx, 是否启用日志
    zhandle_t *zh = zookeeper_init(host, NULL, timeout, NULL, NULL, 0);
    if (!zh) {
        fprintf(stderr, "Error connecting to ZooKeeper server!\n");
        exit(EXIT_FAILURE);
    }
    printf("Successfully connected to ZooKeeper.\n");
    // ... 在这里执行其他操作 ...
    // 关闭连接
    zookeeper_close(zh);
    return 0;
}

b. 创建 Znode

// ... 在 main 函数中 ...
const char *path = "/my_c_app/node1";
const char *data = "hello from c";
int data_len = strlen(data);
// ZOO_EPHEMERAL: 会话结束时,znode 会被自动删除
// ZOO_SEQUENCE: 如果路径名以结尾,会自动在后面追加一个单调递增的序号
int flags = ZOO_EPHEMERAL | ZOO_SEQUENCE;
char realpath_buffer[1024];
int rc = zk_create(zh, path, (const_byte*)data, data_len, &ZOO_OPEN_ACL_UNSAFE, flags, realpath_buffer, sizeof(realpath_buffer));
if (rc != ZOK) {
    fprintf(stderr, "Error %d creating znode %s\n", rc, path);
} else {
    printf("Created znode at %s with data: %s\n", realpath_buffer, data);
}

c. 获取 Znode 数据和设置 Watcher

同步 zk_get 的一个特点是,你可以同时设置一个 Watcher,这个 Watcher 只对下一次特定的操作有效。

// ... 在 main 函数中 ...
// 定义一个简单的 Watcher 回调函数
void my_watcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx) {
    printf("\n--- Watcher Triggered! ---\n");
    printf("Path: %s\n", path);
    printf("Event Type: %d\n", type);
    printf("Connection State: %d\n", state);
    printf("-------------------------\n\n");
}
const char *get_path = "/my_c_app/node1";
char buffer[1024];
int buffer_len = sizeof(buffer);
struct Stat stat; // 用于存储 znode 的元数据信息(如版本号、创建时间等)
// 注意:第四个参数是 watcher 回调函数
int rc = zk_get(zh, get_path, &my_watcher, (void*)&zh, (byte*)buffer, &buffer_len, &stat);
if (rc != ZOK) {
    fprintf(stderr, "Error %d getting data from %s\n", rc, get_path);
} else {
    printf("Got data from %s: %s (version: %d)\n", get_path, buffer, stat.version);
}
printf("Waiting for 30 seconds to see if the watcher fires...\n");
sleep(30); // 等待 30 秒,以便手动在 zkCli.sh 中修改这个节点的数据

d. 更新 Znode 数据

更新数据需要提供该节点的 version 号,以确保你是在更新一个未被别人修改过的版本,我们可以从 Stat 结构体中获取它。

// ... 在获取数据之后 ...
const char *new_data = "updated data from c";
int new_data_len = strlen(new_data);
// 使用获取数据时得到的版本号
int version = stat.version;
// zk_set 会返回新的版本号
rc = zk_set(zh, get_path, (const_byte*)new_data, new_data_len, version, &stat);
if (rc != ZOK) {
    fprintf(stderr, "Error %d setting data for %s\n", rc, get_path);
} else {
    printf("Successfully updated data for %s. New version: %d\n", get_path, stat.version);
}

e. 删除 Znode

删除同样需要提供正确的 version 号。

// ... 在更新数据之后 ...
// 使用更新后的版本号
version = stat.version;
rc = zk_delete(zh, get_path, version);
if (rc != ZOK) {
    fprintf(stderr, "Error %d deleting znode %s\n", rc, get_path);
} else {
    printf("Successfully deleted znode %s\n", get_path);
}

完整代码示例

下面是一个完整的 C 程序,它按顺序执行了连接、创建、获取、更新和删除操作,并演示了 Watcher。

// zk_example.c
#include <zookeeper/zookeeper.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
// 全局变量,用于在回调中访问句柄
zhandle_t *g_zh = NULL;
// Watcher 回调函数
void connection_watcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx) {
    // 当连接状态发生变化时被调用
    if (type == ZOO_SESSION_EVENT) {
        if (state == ZOO_CONNECTED_STATE) {
            printf("Connection established.\n");
        } else if (state == ZOO_EXPIRED_SESSION_STATE) {
            printf("Session expired.\n");
        } else if (state == ZOO_CONNECTING_STATE) {
            printf("Connecting to ZooKeeper...\n");
        }
    }
}
// 数据变更的 Watcher 回调
void data_watcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx) {
    printf("\n--- Data Watcher Triggered on %s ---\n", path);
    printf("Event Type: %s\n", (type == ZOO_CHANGED_EVENT) ? "ZOO_CHANGED_EVENT" : "Other");
    printf("Connection State: %d\n", state);
    printf("-----------------------------------\n\n");
}
int main() {
    const char *host = "127.0.0.1:2181";
    int timeout = 30000;
    const char *node_path = "/my_c_app/example_node";
    const char *initial_data = "initial data";
    const char *updated_data = "this is the updated data";
    // 1. 初始化连接
    printf("Connecting to ZooKeeper at %s...\n", host);
    g_zh = zookeeper_init(host, connection_watcher, timeout, NULL, NULL, 0);
    if (!g_zh) {
        fprintf(stderr, "Error %d: %s\n", errno, strerror(errno));
        exit(EXIT_FAILURE);
    }
    // 等待连接建立
    zookeeper_exists(g_zh, node_path, 0, NULL); // 一个简单的阻塞方法
    sleep(1);
    // 2. 创建 Znode (如果不存在)
    printf("Attempting to create znode %s...\n", node_path);
    char real_path[1024];
    int rc = zk_create(g_zh, node_path, (const_byte*)initial_data, strlen(initial_data), &ZOO_OPEN_ACL_UNSAFE, 0, real_path, sizeof(real_path));
    if (rc == ZNODEEXISTS) {
        printf("Znode %s already exists. Skipping creation.\n", node_path);
    } else if (rc != ZOK) {
        fprintf(stderr, "Error %d creating znode %s\n", rc, node_path);
        zookeeper_close(g_zh);
        exit(EXIT_FAILURE);
    } else {
        printf("Created znode at %s with data: %s\n", real_path, initial_data);
    }
    // 3. 获取数据和设置 Watcher
    printf("\nGetting data and setting a watcher...\n");
    char buffer[1024];
    int buffer_len = sizeof(buffer);
    struct Stat stat;
    rc = zk_get(g_zh, node_path, &data_watcher, NULL, (byte*)buffer, &buffer_len, &stat);
    if (rc != ZOK) {
        fprintf(stderr, "Error %d getting data from %s\n", rc, node_path);
    } else {
        printf("Got data: %s (version: %d)\n", buffer, stat.version);
    }
    // 4. 更新数据
    printf("\nUpdating data...\n");
    rc = zk_set(g_zh, node_path, (const_byte*)updated_data, strlen(updated_data), stat.version, &stat);
    if (rc != ZOK) {
        fprintf(stderr, "Error %d updating data for %s (version mismatch?)\n", rc, node_path);
    } else {
        printf("Successfully updated data to: %s (new version: %d)\n", updated_data, stat.version);
    }
    // 5. 再次获取数据,验证 Watcher 是否触发
    printf("\nGetting data again to verify watcher...\n");
    // 注意:这个 get 也会设置一个 watcher,但我们已经在上一步设置了
    // ZooKeeper 的 watcher 是一次性的,所以这个新的 watcher 只对下一次变更有效
    rc = zk_get(g_zh, node_path, &data_watcher, NULL, (byte*)buffer, &buffer_len, &stat);
    if (rc == ZOK) {
        printf("Got data: %s (version: %d)\n", buffer, stat.version);
    }
    printf("\nNow, go to your zkCli.sh and run:\n");
    printf("  set %s \"manual change\"\n", node_path);
    printf("Then wait here for 10 seconds...\n");
    sleep(10);
    // 6. 删除 Znode
    printf("\nDeleting znode %s...\n", node_path);
    rc = zk_delete(g_zh, node_path, stat.version);
    if (rc != ZOK) {
        fprintf(stderr, "Error %d deleting znode %s\n", rc, node_path);
    } else {
        printf("Successfully deleted znode %s\n", node_path);
    }
    // 7. 关闭连接
    printf("\nClosing ZooKeeper connection.\n");
    zookeeper_close(g_zh);
    return 0;
}

如何编译和运行:

  1. 确保你的 ZooKeeper 服务器正在运行。
  2. 将上述代码保存为 zk_example.c
  3. 编译:
    gcc -o zk_example zk_example.c -lzookeeper_mt
  4. 运行:
    ./zk_example
  5. 在程序等待时,打开另一个终端,连接到 ZooKeeper 并手动修改数据:
    zkCli.sh
    # 在 zkCli.sh 中
    set /my_c_app/example_node "manual change"
    quit

    你会看到第一个终端的 Watcher 被触发。


注意事项与最佳实践

  1. 错误处理:始终检查 libzookeeper 函数的返回值。ZOK (值为 0) 表示成功,任何其他值都是一个错误码,可以使用 zerror(rc) 函数将错误码转换为可读的字符串。
  2. 会话管理connection_watcher 至关重要,你必须监听 ZOO_EXPIRED_SESSION_STATE 事件,如果会话过期,你的 zhandle_t 句柄就失效了,你需要重新调用 zookeeper_init 来建立新会话。
  3. Watcher 的一次性:ZooKeeper 的 Watcher 是一次性的,一旦被触发,它就会被自动移除,如果你需要持续监听,必须在 Watcher 回调函数中重新注册 Watcher。
  4. 多线程安全
    • zhandle_t 不是线程安全的,一个句柄不能被多个线程同时使用。
    • 你可以为每个线程创建一个独立的 zhandle_t,或者使用一个专门的 I/O 线程来处理所有的 ZooKeeper 通信,然后通过队列将任务和结果传递给其他线程。
    • libzookeeper-mt 库内部使用线程来处理网络 I/O,这使得你可以在一个线程中安全地使用 zookeeper_init,然后从其他线程调用 API(只要你确保不会同时调用即可)。
  5. ACL (访问控制列表):示例中使用了 ZOO_OPEN_ACL_UNSAFE,这意味着任何连接到 ZooKeeper 的客户端都可以对这个 znode 进行任何操作,在生产环境中,你应该使用更严格的 ACL 来保护敏感数据。
  6. 异步 API:对于需要高性能的应用,学习使用异步 API (zk_a_get, zk_a_set 等) 是必要的,这需要你理解如何组织回调函数,逻辑会更复杂,但能避免阻塞,提高并发性能。
-- 展开阅读全文 --
头像
Binsearch C语言如何高效实现?
« 上一篇 今天
C语言数据损坏,原因何在?
下一篇 » 今天

相关文章

取消
微信二维码
支付宝二维码

目录[+]