MyException - 我的异常网
当前位置:我的异常网» 软件架构设计 » ZeroMQ指南-第1章-基础-放出讯息

ZeroMQ指南-第1章-基础-放出讯息

www.MyException.Cn  网友分享于:2015-08-26  浏览:27次
ZeroMQ指南-第1章-基础-放出消息

放出消息

第二个经典模式是单向数据分发,服务器推送更新到一组客户端。让我们看一个推送天气情况变化的例子,包含地区编码、温度、和相对湿度。我们会生成随机值来模拟真实气象站。

这是服务器代码,这个程序我们使用5556端口。

wuserver: Weather update server in C

//
// Weather update server
// Binds PUB socket to tcp://*:5556
// Publishes random weather updates
//
#include "zhelpers.h"

int main (void)
{
    // Prepare our context and publisher
    void *context = zmq_ctx_new ();
    void *publisher = zmq_socket (context, ZMQ_PUB);
    int rc = zmq_bind (publisher, "tcp://*:5556");
    assert (rc == 0);
    rc = zmq_bind (publisher, "ipc://weather.ipc");
    assert (rc == 0);

    // Initialize random number generator
    srandom ((unsigned) time (NULL));
    while (1) {
        // Get values that will fool the boss
        int zipcode, temperature, relhumidity;
        zipcode = randof (100000);
        temperature = randof (215) - 80;
        relhumidity = randof (50) + 10;

        // Send message to all subscribers
        char update [20];
        sprintf (update, "%05d %d %d", zipcode, temperature, relhumidity);
        s_send (publisher, update);
    }
    zmq_close (publisher);
    zmq_ctx_destroy (context);
    return 0;
}

更新流既无开始也无结束,像一个永不结束的天气预报。

图 4 – 发布-订阅

这是客户端程序,监听更新流并捕获符合特定地区编码的所有消息,默认为纽约市因为那是个冒险的好地方:

wuclient: Weather update client in C

//
// Weather update client
// Connects SUB socket to tcp://localhost:5556
// Collects weather updates and finds avg temp in zipcode
//
#include "zhelpers.h"

int main (int argc, char *argv [])
{
    void *context = zmq_ctx_new ();

    // Socket to talk to server
    printf ("Collecting updates from weather server…\n");
    void *subscriber = zmq_socket (context, ZMQ_SUB);
    int rc = zmq_connect (subscriber, "tcp://localhost:5556");
    assert (rc == 0);

    // Subscribe to zipcode, default is NYC, 10001
    char *filter = (argc > 1) ? argv [1] : "10001 ";
    rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, filter, strlen (filter));
    assert (rc == 0);

    // Process 100 updates
    int update_nbr;
    long total_temp = 0;
    for (update_nbr = 0; update_nbr < 100; update_nbr++) {
        char *string = s_recv (subscriber);

        int zipcode, temperature, relhumidity;
        sscanf (string, "%d %d %d",
                &zipcode, &temperature, &relhumidity);
        total_temp += temperature;
        free (string);
    }
    printf ("Average temperature for zipcode '%s' was %dF\n",
            filter, (int) (total_temp / update_nbr));

    zmq_close (subscriber);
    zmq_ctx_destroy (context);
    return 0;
}

注意当你使用一个订阅套接字时你必须使用zmq_setsockopt()和SUBSCRIBE设置一个订阅,就像这段代码中那样。如果你不设置任何订阅,就得不到任何消息。这是初学者的常见错误。订阅者可以设置很多订阅,会合并到一起。就是说,如果一个更新匹配任意一个订阅,订阅者都会接收。订阅者也可以取消特定的订阅。一个订阅通常是一个可打印字符串,但也不是必须的。参考zmq_setsockopt()看这是怎么工作的。

发布订阅套接字对是异步的。客户端在循环中(或单次如果有必要)做zmq_msg_recv()。尝试发送消息到订阅套接字将导致错误。同样的,服务按所需频率做zmq_msg_send(),但绝不能对发布套接字做zmq_msg_recv()。

理论上,ØMQ套接字不在乎哪一端来连接哪一端来绑定。但是在实践中会有未公开的差异,待会我会提及。现在,绑定发布并连接订阅,除非你的网络设计导致这无法实现。

还有一个关于发布订阅套接字的重要事项:你无法精确知道订阅者什么时候开始获取消息。即使你先启动一个订阅者,过一会再启动发布者,订阅者总是会错过发布者发送的第一条消息。这是因为订阅者连接到发布者时(占用了短暂但非零的时间),发布者可能已经将消息发送出去了。

这种“迟钝加入者”症状击中了很多人、很多次,我们需要详细解释一下。记住ØMQ是异步I/O的,也就是在后台。比如说你有两个节点按这个顺序这么做:

  • 订阅者连接到一个端点并接收和计数消息
  • 发布者绑定到一个端点并立刻发送1000条消息

那么订阅者很可能不会接收到任何东西。你可以眨眨眼,检查一下是否设置了正确的过滤器,再试试,然而订阅者还是不会接收任何东西。

建立TCP连接牵涉到大概几毫秒的来回握手,这取决于网络状况和节点之间跳跃的次数。这个时间里,ØMQ已经可以发送好多消息了。为了证明,假设花费5毫秒来建立连接,而相同的链路可以搞定1M每秒的消息。在订阅者连接到发布者的5毫秒里,发布者仅耗费1毫秒就将这1K的消息发送出去了。

在第2章 - 套接字与模式中我们会解释如何让发布者和订阅者同步,以便无需等待订阅者(们)都已连接并就绪时才开始发布数据。有一个简单的笨办法来延迟发布者,通过睡眠sleep。但是在真实程序中可不要这么做,因为这极为脆弱、不雅而缓慢。先用sleep向自己证明到底发生了什么,然后等到第2章再看正确做法。

同步的另一种替代方案是简单的假设发布的数据流是无限的,没有开始也没有结束。还假设订阅者不在乎它启动之前发生过什么。这就是我们建造的例子天气客户端的方式。

客户端订阅到选定的地区编码并收集1000个更新。也就是大概1000万服务器更新,如果地区编码是随机分发的。你可以先启动客户端,再启动服务器,而客户端会持续工作。你可以随时停止并重启服务器,而客户端会持续工作。当客户端收集到了1000个更新,它计算出平均值,打印输出,然后退出。

关于发布订阅模式的一些要点:

  • 订阅者可以每次调用“connect”来连接到一个以上的发布者。数据会交错到达(“公平队列”)以避免发布者相互淹没。
  • 如果一个发布者没有已连接的订阅者,那么它直接丢弃所有消息。
  • 如果你使用TCP时一个订阅者很慢,消息会在发布者那里排上队。我们待会看看这种情况下如何用“高水位线”来保护发布者。
  • 从ØMQ 3.x开始,使用已连接协议(tcp: 或 ipc:)将在发布者方面进行过滤,使用epgm://协议时,会在订阅者一方进行过滤。在ØMQ2.x中所有过滤都在订阅者方面进行。

如下是在我的笔记本电脑中接收过滤10M消息花费的时间,电脑配置是2011-era Intel i5,体面但也没啥特殊的。

$ time wuclient
Collecting updates from weather server...
Average temperature for zipcode '10001 ' was 28F

real    0m4.470s
user    0m0.000s
sys     0m0.008s

文章评论

Google伦敦新总部 犹如星级庄园
Google伦敦新总部 犹如星级庄园
2013年美国开发者薪资调查报告
2013年美国开发者薪资调查报告
“肮脏的”IT工作排行榜
“肮脏的”IT工作排行榜
程序猿的崛起——Growth Hacker
程序猿的崛起——Growth Hacker
团队中“技术大拿”并非越多越好
团队中“技术大拿”并非越多越好
 程序员的样子
程序员的样子
程序员应该关注的一些事儿
程序员应该关注的一些事儿
初级 vs 高级开发者 哪个性价比更高?
初级 vs 高级开发者 哪个性价比更高?
程序员的一天:一寸光阴一寸金
程序员的一天:一寸光阴一寸金
为啥Android手机总会越用越慢?
为啥Android手机总会越用越慢?
老美怎么看待阿里赴美上市
老美怎么看待阿里赴美上市
我的丈夫是个程序员
我的丈夫是个程序员
程序员最害怕的5件事 你中招了吗?
程序员最害怕的5件事 你中招了吗?
代码女神横空出世
代码女神横空出世
程序员的鄙视链
程序员的鄙视链
那些性感的让人尖叫的程序员
那些性感的让人尖叫的程序员
要嫁就嫁程序猿—钱多话少死的早
要嫁就嫁程序猿—钱多话少死的早
写给自己也写给你 自己到底该何去何从
写给自己也写给你 自己到底该何去何从
5款最佳正则表达式编辑调试器
5款最佳正则表达式编辑调试器
Java 与 .NET 的平台发展之争
Java 与 .NET 的平台发展之争
中美印日四国程序员比较
中美印日四国程序员比较
亲爱的项目经理,我恨你
亲爱的项目经理,我恨你
总结2014中国互联网十大段子
总结2014中国互联网十大段子
那些争议最大的编程观点
那些争议最大的编程观点
当下全球最炙手可热的八位少年创业者
当下全球最炙手可热的八位少年创业者
我是如何打败拖延症的
我是如何打败拖延症的
Java程序员必看电影
Java程序员必看电影
旅行,写作,编程
旅行,写作,编程
程序员和编码员之间的区别
程序员和编码员之间的区别
做程序猿的老婆应该注意的一些事情
做程序猿的老婆应该注意的一些事情
鲜为人知的编程真相
鲜为人知的编程真相
聊聊HTTPS和SSL/TLS协议
聊聊HTTPS和SSL/TLS协议
我跳槽是因为他们的显示器更大
我跳槽是因为他们的显示器更大
如何成为一名黑客
如何成为一名黑客
Web开发者需具备的8个好习惯
Web开发者需具备的8个好习惯
10个调试和排错的小建议
10个调试和排错的小建议
科技史上最臭名昭著的13大罪犯
科技史上最臭名昭著的13大罪犯
“懒”出效率是程序员的美德
“懒”出效率是程序员的美德
为什么程序员都是夜猫子
为什么程序员都是夜猫子
看13位CEO、创始人和高管如何提高工作效率
看13位CEO、创始人和高管如何提高工作效率
编程语言是女人
编程语言是女人
每天工作4小时的程序员
每天工作4小时的程序员
2013年中国软件开发者薪资调查报告
2013年中国软件开发者薪资调查报告
程序员都该阅读的书
程序员都该阅读的书
60个开发者不容错过的免费资源库
60个开发者不容错过的免费资源库
十大编程算法助程序员走上高手之路
十大编程算法助程序员走上高手之路
Web开发人员为什么越来越懒了?
Web开发人员为什么越来越懒了?
一个程序员的时间管理
一个程序员的时间管理
程序员周末都喜欢做什么?
程序员周末都喜欢做什么?
软件开发程序错误异常ExceptionCopyright © 2009-2015 MyException 版权所有