zoukankan      html  css  js  c++  java
  • Clojure:ZeroMQ的入门DEMO

    假设你已经知道什么是ZeroMQ(不知道的话可以看这个:http://zh.wikipedia.org/wiki/%C3%98MQ),以下就给出在Clojure中如何使用ZeroMQ(感谢此文作者:http://patternhatch.com/2013/06/12/messaging-using-clojure-and-zeromq/)。
    1.    创建一个Clojure项目,这里我们用leiin。lein new app zmq-test
    2.    在project.clj文件中添加
    [com.rmoquin.bundle/jeromq "0.2.0"]
    cheshire "5.3.1"]]
    其中jeromq就是我们需要使用的ZeroMQ类库(纯Java实现),cheshire用于双向处理json。
    3.    打开core.clj文件,输入如下代码:

     1 (ns zmq-test.core
     2   (:import [org.jeromq ZMQ])
     3   (:require (cheshire [core :as c])))
     4 
     5 (def ctx (ZMQ/context 1))
     6 
     7 ;; REQ/REP [Request-Reply] Pattern
     8 ;; In REPL, input
     9 ;;   (future-call echo-server)
    10 ;;   (echo "hi")
    11 ;; to run the demo function
    12 (defn echo-server
    13   []
    14   (let [s (.socket ctx ZMQ/REP)]
    15     (.bind s "tcp:// 127.0.0.1:5555")
    16     (loop [msg (.recv s)]
    17       (.send s msg)
    18       (recur (.recv s)))))
    19 
    20 (defn echo
    21   [msg]
    22   (let [s (.socket ctx ZMQ/REQ)]
    23     (.connect s "tcp:// 127.0.0.1:5555")
    24     (.send s msg)
    25     (println "Server replied:" (String. (.recv s)))
    26     (.close s)))
    27 
    28 ;; PUB/SUB [Publish-Subscribe] Pattern
    29 ;; In REPL, input
    30 ;;   (future-call market-data-publisher)
    31 ;;   (get-market-data 100)
    32 ;; to run the demo function
    33 (defn market-data-publisher
    34   []
    35   (let [s (.socket ctx ZMQ/PUB)
    36         market-data-event (fn []
    37                             {:symbol (rand-nth ["CAT" "UTX"])
    38                              :size (rand-int 1000)
    39                              :price (format "%.2f" (rand 50.0))})]
    40     (.bind s "tcp:// 127.0.0.1:6666")
    41     (while :true
    42       (.send s (c/generate-string (market-data-event))))))
    43 
    44 (defn get-market-data
    45   [num-events]
    46   (let [s (.socket ctx ZMQ/SUB)]
    47     (.subscribe s "")
    48     (.connect s "tcp://127.0.0.1:6666")
    49     (dotimes [_ num-events]
    50       (println (c/parse-string (String. (.recv s)))))
    51     (.close s)))
    52 
    53 ;; PUSH/PULL [Pipeline] Pattern
    54 ;; In REPL, input
    55 ;;   (future-call collector)
    56 ;;   (future-call worker)
    57 ;;   (future-call worker)
    58 ;;   (future-call worker)
    59 ;;   (dispatcher 100)
    60 ;; to run the demo function
    61 (defn dispatcher
    62   [jobs]
    63   (let [s (.socket ctx ZMQ/PUSH)]
    64     (.bind s "tcp://127.0.0.1:7777")
    65     (Thread/sleep 1000)
    66     (dotimes [n jobs]
    67       (.send s (str n)))
    68     (.close s)))
    69 
    70 (defn worker
    71   []
    72   (let [rcv (.socket ctx ZMQ/PULL)
    73         snd (.socket ctx ZMQ/PUSH)
    74         id (str (gensym "w"))]
    75     (.connect rcv "tcp://127.0.0.1:7777")
    76     (.connect snd "tcp://127.0.0.1:8888")
    77     (while :true
    78       (let [job-id (String. (.recv rcv))
    79             proc-time (rand-int 100)]
    80         (Thread/sleep proc-time)
    81         (.send snd (c/generate-string {:worker-id id
    82                                        :job-id job-id
    83                                        :processing-time proc-time}))))))
    84 
    85 (defn collector
    86   []
    87   (let [s (.socket ctx ZMQ/PULL)]
    88     (.bind s "tcp://127.0.0.1:8888")
    89     (while :true
    90       (->> (.recv s)
    91            (String.)
    92            (c/parse-string)
    93            (println "Job completed:")))))

    代码中包括了ZeroMQ的三种模式,可以直接在REPL中进行测试。但是这只是很简单的Hello World程序,如果要将ZeroMQ用于实际生产环境中的话,还有很多环节需要考虑和完善。

  • 相关阅读:
    shell脚本快速配置yum源
    RAID 5+1
    RAID 10
    TFTP
    CentOS7 初始化脚本 2.0
    CentOS7 初始化脚本 1.0
    Tomcat CGI 轻松打造 Web 服务
    Python 变量类型 —— type() 函数和 isinstance() 函数
    Python源码换行
    RFC 文档搜索与阅读
  • 原文地址:https://www.cnblogs.com/ilovewindy/p/3984269.html
Copyright © 2011-2022 走看看