Linux下ActiveMQ如何用C语言实现消息交互?

99ANYc3cd6
预计阅读时长 26 分钟
位置: 首页 C语言 正文

这是一个比较专业的话题,因为 C 语言本身没有像 Java 那样内置的、与 ActiveMQ 完美集成的 JMS 客户端,我们需要借助第三方库,目前最主流、最推荐的方式是使用 Apache Qpid Proton 库,它是一个 AMQP 1.0 的 C 语言实现。

linux activemq c语言
(图片来源网络,侵删)

核心概念:AMQP vs. JMS

  • JMS (Java Message Service): 这是一个Java API标准,定义了一套通用的接口用于在Java应用中访问消息队列,ActiveMQ 对 JMS 提供了原生的、高质量的支持。
  • AMQP (Advanced Message Queuing Protocol): 这是一个开放标准的网络协议,用于在应用程序之间进行消息传递,它不依赖于任何特定语言,ActiveMQ 同时支持多种协议,包括 OpenWire (JMS 常用)、STOMP 和 AMQP。

为什么选择 AMQP + C? 因为 C 语言没有 JMS 标准,直接与 ActiveMQ 的 OpenWire 协议通信非常复杂,而 AMQP 是一个标准化的协议,有成熟的 C 库(如 Qpid Proton)支持,这使得 C 应用可以作为一个标准的 AMQP 客户端与 ActiveMQ(或其他任何支持 AMQP 的消息中间件,如 RabbitMQ)进行通信。


环境准备与安装

我们将使用 qpid-proton-c 库,并通过 pkg-config 来管理编译选项。

安装 ActiveMQ

如果你还没有 ActiveMQ,可以按照以下步骤安装:

# 下载 ActiveMQ (以 5.18.3 为例,请替换为最新版本)
wget https://archive.apache.org/dist/activemq/5.18.3/activemq-bin-5.18.3.tar.gz
tar -xzf activemq-bin-5.18.3.tar.gz
cd activemq-5.18.3
# 启动 ActiveMQ (前台运行,方便查看日志)
# 在生产环境中,通常会使用 `./bin/activemq start` 作为后台服务启动
./bin/activemq console

启动后,你可以访问管理界面 http://localhost:8161 (默认用户名/密码是 admin/admin) 来确认队列和主题是否创建成功。

linux activemq c语言
(图片来源网络,侵删)

安装 Qpid Proton C 库

你需要安装 qpid-proton-c 的开发包。

在基于 Debian/Ubuntu 的系统上:

sudo apt-get update
sudo apt-get install libqpid-proton-dev

在基于 RedHat/CentOS/Fedora 的系统上:

sudo yum install qpid-proton-c-devel
# 或者对于较新的系统
sudo dnf install qpid-proton-c-devel

编译配置

qpid-proton-c 库通常与 pkg-config 集成,我们可以使用 pkg-config 来获取正确的编译和链接标志。

linux activemq c语言
(图片来源网络,侵删)
# 检查 pkg-config 是否能找到库
pkg-config --exists qpid-proton && echo "Found qpid-proton" || echo "Not found"
# 获取编译标志 (通常用于 CFLAGS)
pkg-config --cflags qpid-proton
# 输出示例: -I/usr/include/qpid-proton
# 获取链接标志 (通常用于 LDLIBS)
pkg-config --libs qpid-proton
# 输出示例: -lqpid-proton

C 语言代码示例

下面是一个完整的 C 语言示例,它包含一个生产者(发送消息)和一个消费者(接收消息),我们将使用一个名为 "test.queue" 的队列。

生产者 (producer.c)

这个程序会连接到 ActiveMQ,并向 "test.queue" 发送一条文本消息。

#include <proton/message.h>
#include <proton/messenger.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define AMQP_URL "amqp://localhost:5672" // ActiveMQ 默认 AMQP 端口是 5672
#define ADDRESS "queue://test.queue"
#define MESSAGE_BODY "Hello from C Client!"
int main(void) {
    pn_messenger_t *messenger;
    pn_message_t *message;
    pn_data_t *body;
    const char *text;
    // 1. 创建 Messenger 对象
    messenger = pn_messenger(NULL);
    if (!messenger) {
        fprintf(stderr, "Error: failed to create messenger.\n");
        return 1;
    }
    pn_messenger_start(messenger); // 启动 messenger
    // 2. 创建 Message 对象
    message = pn_message();
    if (!message) {
        fprintf(stderr, "Error: failed to create message.\n");
        pn_messenger_stop(messenger);
        pn_messenger_free(messenger);
        return 1;
    }
    // 3. 设置消息地址
    pn_message_set_address(message, ADDRESS);
    // 4. 设置消息内容
    body = pn_message_body(message);
    pn_data_put_string(body, pn_bytes(strlen(MESSAGE_BODY), MESSAGE_BODY));
    // 5. 发送消息
    printf("Sending message: %s\n", MESSAGE_BODY);
    pn_messenger_put(messenger, message);
    pn_messenger_send(messenger, -1); // -1 表示发送所有缓存的消息
    // 6. 清理资源
    pn_message_free(message);
    pn_messenger_stop(messenger);
    pn_messenger_free(messenger);
    printf("Message sent successfully.\n");
    return 0;
}

消费者 (consumer.c)

这个程序会连接到 ActiveMQ,并从 "test.queue" 接收消息。

#include <proton/message.h>
#include <proton/messenger.h>
#include <stdio.h>
#include <stdlib.h>
#define AMQP_URL "amqp://localhost:5672"
#define ADDRESS "queue://test.queue"
int main(void) {
    pn_messenger_t *messenger;
    pn_message_t *message;
    pn_data_t *body;
    pn_bytes_t bytes;
    // 1. 创建 Messenger 对象
    messenger = pn_messenger(NULL);
    if (!messenger) {
        fprintf(stderr, "Error: failed to create messenger.\n");
        return 1;
    }
    pn_messenger_start(messenger);
    // 2. 订阅地址
    pn_messenger_subscribe(messenger, ADDRESS);
    printf("Listening for messages on %s...\n", ADDRESS);
    // 3. 接收消息
    message = pn_message();
    int status = pn_messenger_recv(messenger, -1); // -1 表示接收直到没有更多消息
    if (status < 0) {
        fprintf(stderr, "Error: failed to receive message. %d\n", status);
    } else if (status == 0) {
        printf("No messages received.\n");
    } else {
        // 4. 解析并打印消息
        pn_messenger_get(messenger, message);
        printf("Received message:\n");
        pn_message_dump(message, stdout); // 打印消息所有信息
        // 只打印消息体
        body = pn_message_body(message);
        if (pn_data_type(body) == PN_STRING) {
            bytes = pn_data_get_string(body);
            printf("  Body: %.*s\n", (int)bytes.size, bytes.start);
        }
    }
    // 5. 清理资源
    pn_message_free(message);
    pn_messenger_stop(messenger);
    pn_messenger_free(messenger);
    return 0;
}

编译与运行

编译代码

使用 pkg-config 提供的标志来编译两个文件。

# 编译生产者
gcc -o producer $(pkg-config --cflags qpid-proton) producer.c $(pkg-config --libs qpid-proton)
# 编译消费者
gcc -o consumer $(pkg-config --cflags qpid-proton) consumer.c $(pkg-config --libs qpid-proton)

运行测试

重要提示:请确保你的 ActiveMQ 已经启动并且启用了 AMQP 协议,默认情况下,ActiveMQ 可能只启用了 OpenWire,你需要检查 conf/activemq.xml 文件,确保有类似下面的配置(通常默认是开启的):

<transportConnectors>
    <!-- DOS protection, limit concurrent connections to 1000 and frame size to 1MB -->
    <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=1048576"/>
    <transportConnector name="amqps" uri="amqps://0.0.0.0:5671?transport.enabledProtocols=SSL" sslContext="sslContext"/>
    <transportConnector name="stomp" uri="stomp://0.0.0.0:61613"/>
    <transportConnector name="amqp" uri="amqp://0.0.0.0:5672"/> <!-- 确保这一行存在 -->
</transportConnectors>

打开三个终端窗口:

  • 终端 1: 启动 ActiveMQ (如果还没启动)

    ./activemq-5.18.3/bin/activemq console
  • 终端 2: 运行消费者,它会等待消息

    ./consumer
    # 输出:
    # Listening for messages on queue://test.queue...
    # (会一直等待)
  • 终端 3: 运行生产者,发送一条消息

    ./producer
    # 输出:
    # Sending message: Hello from C Client!
    # Message sent successfully.

观察结果

  • 终端 2 (消费者) 中,你会看到消费者接收到了消息并打印出来:
    Listening for messages on queue://test.queue...
    Received message:
      (消息的详细 dump 信息)
      Body: Hello from C Client!
  • 终端 3 (生产者) 中,程序执行完毕后退出。
  • 你可以多次运行生产者,消费者会按顺序接收到每一条消息。

总结与注意事项

  1. 核心库: 使用 qpid-proton-c 是在 C 语言中与 ActiveMQ 交互最推荐的方式,因为它基于标准化的 AMQP 协议。
  2. 协议: 确保你的 ActiveMQ 实例配置了 AMQP 传输连接器 (amqp://0.0.0.0:5672)。
  3. 编译: 善用 pkg-config 可以简化库的编译和链接过程,避免手动指定路径。
  4. 生命周期管理: C 语言中需要手动管理内存,务必遵循“谁创建,谁释放”的原则,正确调用 pn_messenger_free(), pn_message_free() 等函数,避免内存泄漏。
  5. 错误处理: 示例中的错误处理比较简单,在实际应用中,你应该检查所有 pn_* 函数的返回值,并根据错误码进行相应的处理。
  6. 其他选择: 除了 Qpid Proton,还有一些其他库,如 rabbitmq-c,它也支持 AMQP 0-9-1(一个略有不同的 AMQP 版本,但也很流行),如果你只关心 RabbitMQ,这个库是很好的选择,但对于 ActiveMQ,Qpid Proton 是更直接的官方风格支持。

通过以上步骤,你就可以成功地在 Linux 环境下使用 C 语言与 ActiveMQ 进行通信了。

-- 展开阅读全文 --
头像
织梦网站如何去掉index.html?
« 上一篇 2025-12-02
dede channel orderby参数如何正确使用?
下一篇 » 2025-12-02

相关文章

取消
微信二维码
支付宝二维码

目录[+]