这是一个比较专业的话题,因为 C 语言本身没有像 Java 那样内置的、与 ActiveMQ 完美集成的 JMS 客户端,我们需要借助第三方库,目前最主流、最推荐的方式是使用 Apache Qpid Proton 库,它是一个 AMQP 1.0 的 C 语言实现。

核心概念:AMQP vs. JMS
- JMS (Java Message Service): 这是一个Java API标准,定义了一套通用的接口用于在Java应用中访问消息队列,ActiveMQ 对 JMS 提供了原生的、高质量的支持。
- AMQP (Advanced Message Queuing Protocol): 这是一个开放标准的网络协议,用于在应用程序之间进行消息传递,它不依赖于任何特定语言,ActiveMQ 同时支持多种协议,包括 OpenWire (JMS 常用)、STOMP 和 AMQP。
为什么选择 AMQP + C? 因为 C 语言没有 JMS 标准,直接与 ActiveMQ 的 OpenWire 协议通信非常复杂,而 AMQP 是一个标准化的协议,有成熟的 C 库(如 Qpid Proton)支持,这使得 C 应用可以作为一个标准的 AMQP 客户端与 ActiveMQ(或其他任何支持 AMQP 的消息中间件,如 RabbitMQ)进行通信。
环境准备与安装
我们将使用 qpid-proton-c 库,并通过 pkg-config 来管理编译选项。
安装 ActiveMQ
如果你还没有 ActiveMQ,可以按照以下步骤安装:
# 下载 ActiveMQ (以 5.18.3 为例,请替换为最新版本) wget https://archive.apache.org/dist/activemq/5.18.3/activemq-bin-5.18.3.tar.gz tar -xzf activemq-bin-5.18.3.tar.gz cd activemq-5.18.3 # 启动 ActiveMQ (前台运行,方便查看日志) # 在生产环境中,通常会使用 `./bin/activemq start` 作为后台服务启动 ./bin/activemq console
启动后,你可以访问管理界面 http://localhost:8161 (默认用户名/密码是 admin/admin) 来确认队列和主题是否创建成功。

安装 Qpid Proton C 库
你需要安装 qpid-proton-c 的开发包。
在基于 Debian/Ubuntu 的系统上:
sudo apt-get update sudo apt-get install libqpid-proton-dev
在基于 RedHat/CentOS/Fedora 的系统上:
sudo yum install qpid-proton-c-devel # 或者对于较新的系统 sudo dnf install qpid-proton-c-devel
编译配置
qpid-proton-c 库通常与 pkg-config 集成,我们可以使用 pkg-config 来获取正确的编译和链接标志。

# 检查 pkg-config 是否能找到库 pkg-config --exists qpid-proton && echo "Found qpid-proton" || echo "Not found" # 获取编译标志 (通常用于 CFLAGS) pkg-config --cflags qpid-proton # 输出示例: -I/usr/include/qpid-proton # 获取链接标志 (通常用于 LDLIBS) pkg-config --libs qpid-proton # 输出示例: -lqpid-proton
C 语言代码示例
下面是一个完整的 C 语言示例,它包含一个生产者(发送消息)和一个消费者(接收消息),我们将使用一个名为 "test.queue" 的队列。
生产者 (producer.c)
这个程序会连接到 ActiveMQ,并向 "test.queue" 发送一条文本消息。
#include <proton/message.h>
#include <proton/messenger.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define AMQP_URL "amqp://localhost:5672" // ActiveMQ 默认 AMQP 端口是 5672
#define ADDRESS "queue://test.queue"
#define MESSAGE_BODY "Hello from C Client!"
int main(void) {
pn_messenger_t *messenger;
pn_message_t *message;
pn_data_t *body;
const char *text;
// 1. 创建 Messenger 对象
messenger = pn_messenger(NULL);
if (!messenger) {
fprintf(stderr, "Error: failed to create messenger.\n");
return 1;
}
pn_messenger_start(messenger); // 启动 messenger
// 2. 创建 Message 对象
message = pn_message();
if (!message) {
fprintf(stderr, "Error: failed to create message.\n");
pn_messenger_stop(messenger);
pn_messenger_free(messenger);
return 1;
}
// 3. 设置消息地址
pn_message_set_address(message, ADDRESS);
// 4. 设置消息内容
body = pn_message_body(message);
pn_data_put_string(body, pn_bytes(strlen(MESSAGE_BODY), MESSAGE_BODY));
// 5. 发送消息
printf("Sending message: %s\n", MESSAGE_BODY);
pn_messenger_put(messenger, message);
pn_messenger_send(messenger, -1); // -1 表示发送所有缓存的消息
// 6. 清理资源
pn_message_free(message);
pn_messenger_stop(messenger);
pn_messenger_free(messenger);
printf("Message sent successfully.\n");
return 0;
}
消费者 (consumer.c)
这个程序会连接到 ActiveMQ,并从 "test.queue" 接收消息。
#include <proton/message.h>
#include <proton/messenger.h>
#include <stdio.h>
#include <stdlib.h>
#define AMQP_URL "amqp://localhost:5672"
#define ADDRESS "queue://test.queue"
int main(void) {
pn_messenger_t *messenger;
pn_message_t *message;
pn_data_t *body;
pn_bytes_t bytes;
// 1. 创建 Messenger 对象
messenger = pn_messenger(NULL);
if (!messenger) {
fprintf(stderr, "Error: failed to create messenger.\n");
return 1;
}
pn_messenger_start(messenger);
// 2. 订阅地址
pn_messenger_subscribe(messenger, ADDRESS);
printf("Listening for messages on %s...\n", ADDRESS);
// 3. 接收消息
message = pn_message();
int status = pn_messenger_recv(messenger, -1); // -1 表示接收直到没有更多消息
if (status < 0) {
fprintf(stderr, "Error: failed to receive message. %d\n", status);
} else if (status == 0) {
printf("No messages received.\n");
} else {
// 4. 解析并打印消息
pn_messenger_get(messenger, message);
printf("Received message:\n");
pn_message_dump(message, stdout); // 打印消息所有信息
// 只打印消息体
body = pn_message_body(message);
if (pn_data_type(body) == PN_STRING) {
bytes = pn_data_get_string(body);
printf(" Body: %.*s\n", (int)bytes.size, bytes.start);
}
}
// 5. 清理资源
pn_message_free(message);
pn_messenger_stop(messenger);
pn_messenger_free(messenger);
return 0;
}
编译与运行
编译代码
使用 pkg-config 提供的标志来编译两个文件。
# 编译生产者 gcc -o producer $(pkg-config --cflags qpid-proton) producer.c $(pkg-config --libs qpid-proton) # 编译消费者 gcc -o consumer $(pkg-config --cflags qpid-proton) consumer.c $(pkg-config --libs qpid-proton)
运行测试
重要提示:请确保你的 ActiveMQ 已经启动并且启用了 AMQP 协议,默认情况下,ActiveMQ 可能只启用了 OpenWire,你需要检查 conf/activemq.xml 文件,确保有类似下面的配置(通常默认是开启的):
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 1MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=1048576"/>
<transportConnector name="amqps" uri="amqps://0.0.0.0:5671?transport.enabledProtocols=SSL" sslContext="sslContext"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672"/> <!-- 确保这一行存在 -->
</transportConnectors>
打开三个终端窗口:
-
终端 1: 启动 ActiveMQ (如果还没启动)
./activemq-5.18.3/bin/activemq console
-
终端 2: 运行消费者,它会等待消息
./consumer # 输出: # Listening for messages on queue://test.queue... # (会一直等待)
-
终端 3: 运行生产者,发送一条消息
./producer # 输出: # Sending message: Hello from C Client! # Message sent successfully.
观察结果:
- 在 终端 2 (消费者) 中,你会看到消费者接收到了消息并打印出来:
Listening for messages on queue://test.queue... Received message: (消息的详细 dump 信息) Body: Hello from C Client! - 在 终端 3 (生产者) 中,程序执行完毕后退出。
- 你可以多次运行生产者,消费者会按顺序接收到每一条消息。
总结与注意事项
- 核心库: 使用
qpid-proton-c是在 C 语言中与 ActiveMQ 交互最推荐的方式,因为它基于标准化的 AMQP 协议。 - 协议: 确保你的 ActiveMQ 实例配置了 AMQP 传输连接器 (
amqp://0.0.0.0:5672)。 - 编译: 善用
pkg-config可以简化库的编译和链接过程,避免手动指定路径。 - 生命周期管理: C 语言中需要手动管理内存,务必遵循“谁创建,谁释放”的原则,正确调用
pn_messenger_free(),pn_message_free()等函数,避免内存泄漏。 - 错误处理: 示例中的错误处理比较简单,在实际应用中,你应该检查所有
pn_*函数的返回值,并根据错误码进行相应的处理。 - 其他选择: 除了 Qpid Proton,还有一些其他库,如 rabbitmq-c,它也支持 AMQP 0-9-1(一个略有不同的 AMQP 版本,但也很流行),如果你只关心 RabbitMQ,这个库是很好的选择,但对于 ActiveMQ,Qpid Proton 是更直接的官方风格支持。
通过以上步骤,你就可以成功地在 Linux 环境下使用 C 语言与 ActiveMQ 进行通信了。
