MyException - 我的异常网
当前位置:我的异常网» 开源软件 » RocketMQ鏈€浣冲疄璺碉紙涓€锛?零鐗堟湰/姒傚康浠

RocketMQ鏈€浣冲疄璺碉紙涓€锛?零鐗堟湰/姒傚康浠嬬粛/瀹夎璋冭瘯/瀹㈡埛绔痙emo

www.MyException.Cn  网友分享于:2013-10-11  浏览:0次
RocketMQ鏈€浣冲疄璺碉紙涓€锛?.0鐗堟湰/姒傚康浠嬬粛/瀹夎璋冭瘯/瀹㈡埛绔痙emo

涓轰粈涔堥€夋嫨RocketMQ

鎴戜滑鏉ョ湅鐪嬪畼鏂瑰洖绛旓細

鈥滄垜浠爺绌跺彂鐜帮紝瀵逛簬ActiveMQ鑰岃█锛岄殢鐫€瓒婃潵瓒婂鐨勪娇鐢╭ueues鍜宼opics锛屽叾IO鎴愪负浜嗙摱棰堛€傛煇浜涙儏鍐典笅锛屾秷璐硅€呯紦鎱紙娑堣垂鑳藉姏涓嶈冻锛夎繕浼氭嫋鎱㈢敓浜ц€咃紙閫犳垚娑堟伅闃诲锛夈€傝櫧鐒舵垜浠仛浜嗘渶澶у姫鍔涜繘琛屼紭鍖栵細鑺傛祦銆佹柇璺櫒鎴栬€呭洖閫€锛屼絾鏄苟涓嶈兘杩涜浼橀泤鐨勬墿灞曘€傚洜姝ゆ垜浠紑濮嬩笓娉ㄤ簬浣跨敤鏃朵笅闈炲父娴佽鐨刱afka锛屼絾鏄粛鐒朵笉鑳芥弧瓒虫垜浠殑瑕佹眰锛屽浣庡欢杩熷拰楂樺彲闈犳€э紝璇︽儏瑙?a style="color: #336699;" target="_blank" href="http://rocketmq.incubator.apache.org/rocketmq/how-to-support-more-queues-in-rocketmq/">杩欓噷銆傚湪杩欐牱鐨勮儗鏅笅锛屾垜浠喅瀹氬紑鍙戜竴涓柊鐨勬秷鎭腑闂翠欢鏉ュ鐞嗕竴绯诲垪骞挎硾鐨勪娇鐢ㄥ満鏅紝鍖呮嫭浠庝紶缁熺殑鍙戝竷/璁㈤槄鍦烘櫙鍒伴珮瀹归噺鐨勫疄鏃朵氦鏄撶郴缁熶腑涓嶅厑璁告秷鎭涪澶辩殑鍦烘櫙銆傗€?/p>

鍚勪綅鐪嬪畼涔熷彲浠ユ挳杩欓噷鍘荤湅鐪?a style="color: #336699;" target="_blank" href="http://rocketmq.incubator.apache.org/docs/motivation/">RocketMQ涓嶢ctiveMQ浠ュ強Kafka鐨勬瘮杈?/a>銆?/p>

鏍稿績姒傚康

  • 鐢熶骇鑰咃紙Producer锛夛細娑堟伅鍙戦€佹柟锛屽皢涓氬姟绯荤粺涓骇鐢熺殑娑堟伅鍙戦€佸埌brokers锛坆rokers鍙互鐞嗚В涓烘秷鎭唬鐞嗭紝鐢熶骇鑰呭拰娑堣垂鑰呬箣闂存槸閫氳繃brokers杩涜娑堟伅鐨勯€氫俊锛夛紝rocketmq鎻愪緵浜嗕互涓嬫秷鎭彂閫佹柟寮?span style="white-space: pre;">锛氬悓姝ャ€佸紓姝ャ€佸崟鍚?/span>銆?/li>
  • 鐢熶骇鑰呯粍锛圥roducer Group锛夛細鐩稿悓瑙掕壊鐨勭敓浜ц€呰褰掍负鍚屼竴缁勶紝姣斿閫氬父鎯呭喌涓嬩竴涓湇鍔′細閮ㄧ讲澶氫釜瀹炰緥锛岃繖澶氫釜瀹炰緥灏辨槸涓€涓粍锛岀敓浜ц€呭垎缁勭殑浣滅敤鍙綋鐜板湪娑堟伅鍥炴煡鐨勬椂鍊欙紝鍗冲鏋滀竴涓敓浜ц€呯粍涓殑涓€涓敓浜ц€呭疄渚嬪彂閫佷竴涓簨鍔℃秷鎭埌broker鍚庢寕鎺変簡锛岄偅涔坆roker浼氬洖鏌ユ瀹炰緥鎵€鍦ㄧ粍鐨勫叾浠栧疄渚嬶紝浠庤€岃繘琛屾秷鎭殑鎻愪氦鎴栧洖婊氭搷浣溿€?/li>
  • 娑堣垂鑰咃紙Consumer锛夛細娑堟伅娑堣垂鏂癸紝浠巄rokers鎷夊彇娑堟伅銆傜珯鍦ㄧ敤鎴风殑瑙掑害锛屾湁浠ヤ笅涓ょ娑堣垂鑰呫€?/li>
  • 涓诲姩娑堣垂鑰咃紙PullConsumer锛夛細浠巄rokers鎷夊彇娑堟伅骞舵秷璐广€?/li>
  • 琚姩娑堣垂鑰咃紙PushConsumer锛夛細鍐呴儴涔熸槸閫氳繃pull鏂瑰紡鑾峰彇娑堟伅锛屽彧鏄繘琛屼簡鎵╁睍鍜屽皝瑁咃紝骞剁粰鐢ㄦ埛棰勭暀浜嗕竴涓洖璋冩帴鍙e幓瀹炵幇锛屽綋娑堟伅鍒板簳鐨勬椂鍊欎細鎵ц鐢ㄦ埛鑷畾涔夌殑鍥炶皟鎺ュ彛銆?/li>
  • 娑堣垂鑰呯粍锛圕onsumer Group锛夛細鍜岀敓浜ц€呯粍绫讳技銆傚叾浣滅敤浣撶幇鍦ㄥ疄鐜版秷璐硅€呯殑璐熻浇鍧囪 鍜屽閿欙紝鏈変簡娑堣垂鑰呯粍鍙樺緱寮傚父瀹规槗銆傞渶瑕佹敞鎰忕殑鏄細鍚屼竴涓秷璐硅€呯粍鐨勬瘡涓秷璐硅€呭疄渚嬭闃呯殑涓婚蹇呴』鐩稿悓銆?/li>
  • 涓婚锛圱opic锛夛細涓婚灏辨槸娑堟伅浼犻€掔殑绫诲瀷銆備竴涓敓浜ц€呭疄渚嬪彲浠ュ彂閫佹秷鎭埌澶氫釜涓婚锛屽涓敓浜ц€呭疄渚嬩篃鍙互鍙戦€佹秷鎭埌鍚屼竴涓富棰樸€傚悓鏍风殑锛屽浜庢秷璐硅€呯鏉ヨ锛屼竴涓秷璐硅€呯粍鍙互璁㈤槄澶氫釜涓婚鐨勬秷鎭紝涓€涓富棰樼殑娑堟伅涔熷彲浠ヨ澶氫釜娑堣垂鑰呯粍璁㈤槄銆?/li>
  • 娑堟伅锛圡essage锛夛細娑堟伅灏卞儚鏄綘浼犻€掍俊鎭殑淇″皝銆傛瘡涓秷鎭繀椤绘寚瀹氫竴涓富棰橈紝灏卞ソ姣旀瘡涓俊灏佷笂閮藉繀椤诲啓鏄庢敹浠朵汉銆?/li>
  • 娑堟伅闃熷垪锛圡essage Queues锛夛細鍦ㄤ富棰樺唴閮紝閫昏緫鍒掑垎浜嗗涓瓙涓婚锛屾瘡涓瓙涓婚琚О涓烘秷鎭槦鍒椼€傝繖涓蹇靛湪瀹炵幇鏈€澶у苟鍙戞暟銆佹晠闅滃垏鎹㈢瓑鍔熻兘涓婃湁宸ㄥぇ鐨勪綔鐢ㄣ€?/li>
  • 鏍囩锛圱ag锛夛細鏍囩锛屽彲浠ヨ璁や负鏄瓙涓婚銆傞€氬父鐢ㄤ簬鍖哄垎鍚屼竴涓富棰樹笅鐨勪笉鍚屼綔鐢ㄦ垨鑰呰涓嶅悓涓氬姟鐨勬秷鎭€傚悓鏃朵篃鏄伩鍏嶄富棰樺畾涔夎繃澶氬紩璧锋€ц兘闂锛岄€氬父鎯呭喌涓嬩竴涓敓浜ц€呯粍鍙悜涓€涓富棰樺彂閫佹秷鎭紝鍏朵腑涓嶅悓涓氬姟鐨勬秷鎭€氳繃鏍囩鎴栬€呰瀛愪富棰樻潵鍖哄垎銆?/li>
  • 娑堟伅浠g悊锛圔roker锛夛細娑堟伅浠g悊鏄疪ockerMQ涓緢閲嶈鐨勮鑹层€傚畠鎺ユ敹鐢熶骇鑰呭彂閫佺殑娑堟伅锛岃繘琛屾秷鎭瓨鍌紝涓烘秷璐硅€呮媺鍙栨秷鎭湇鍔°€傚畠杩樺瓨鍌ㄦ秷鎭秷鑰楃浉鍏崇殑鍏冩暟鎹紝鍖呮嫭娑堣垂缇や綋锛屾秷璐硅繘搴﹀亸绉诲拰涓婚/闃熷垪淇℃伅銆?/li>
  • 鍛藉悕鏈嶅姟锛圢ame Server锛夛細鍛藉悕鏈嶅姟浣滀负璺敱淇℃伅鎻愪緵绋嬪簭銆傜敓浜ц€?娑堣垂鑰呰繘琛屼富棰樻煡鎵俱€佹秷鎭唬鐞嗘煡鎵俱€佽鍙?鍐欏叆娑堟伅閮介渶瑕侀€氳繃鍛藉悕鏈嶅姟鑾峰彇璺敱淇℃伅銆?/li>
  • 娑堟伅椤哄簭锛圡essage Order锛夛細褰撴垜浠娇鐢―efaultMQPushConsumer鏃讹紝鎴戜滑鍙互閫夋嫨浣跨敤鈥渙rderly鈥濊繕鏄€渃oncurrently鈥濄€?
    • orderly锛?/strong>娑堣垂娑堟伅鐨勬湁搴忓寲鎰忓懗鐫€娑堟伅琚敓浜ц€呮寜鐓ф瘡涓秷鎭槦鍒楀彂閫佺殑椤哄簭娑堣垂銆傚鏋滄偍姝e湪澶勭悊鍏ㄥ眬椤哄簭涓哄己鍒剁殑鍦烘櫙锛岃纭繚鎮ㄤ娇鐢ㄧ殑涓婚鍙湁涓€涓秷鎭槦鍒椼€傛敞鎰忥細濡傛灉鎸囧畾浜嗘秷璐归『搴忥紝鍒欐秷鎭秷璐圭殑鏈€澶у苟鍙戞€ф槸娑堣垂缁勮闃呯殑娑堟伅闃熷垪鏁般€?/li>
    • concurrently锛?/strong>褰撳悓鏃舵秷璐规椂锛屾秷鎭秷璐圭殑鏈€澶у苟鍙戜粎闄愪簬涓烘瘡涓秷璐瑰鎴风鎸囧畾鐨勭嚎绋嬫睜銆傛敞鎰忥細姝ゆā寮忎笉鍐嶄繚璇佹秷鎭『搴忋€?/li>

瀹夎涓庤皟璇?/h1>

瀹樻柟瑕佹眰鐨勭幆澧冿細

  • 64bit OS, Linux/Unix/Mac is recommended;
  • 64bit JDK 1.7+;
  • Maven 3.2.x
  • Git

鎴戠殑鐜锛氾紙鎴戝枩娆娇鐢ㄨ緝鏂扮殑鐗堟湰寰楁剰锛?/h2>

  • CentOS Linux release 7.3.1611;
  • 64bit JDK 1.8.0_91;
  • apache-maven-3.5.0;
  • Git聽1.8.3.1

瀹夎jdk

楹荤儲鍚勪綅鐪嬪畼鑷鎼滅储锛岃祫鏂欏鐨勫悡浜恒€傘€傘€?img alt="寰瑧" style="border-style: none; max-width: 100%;" src="http://static.blog.csdn.net/xheditor/xheditor_emot/default/smile.gif">

瀹夎maven

鍏堝幓瀹樼綉涓嬭浇maven
鐒跺悗涓婁紶鍒板畨瑁呯洰褰曪紝瑙e帇锛?/blockquote>
[plain]聽view plain聽copy
  1. sudo聽tar聽zxvf聽apache-maven-3.5.0-bin.tar.gz聽聽
瑙e帇瀹屾垚璁剧疆鐜鍙橀噺锛?/blockquote>
[plain]聽view plain聽copy
  1. sudo聽vi聽/etc/profile聽聽
鐒跺悗浣跨幆澧冨彉閲忕敓鏁堬細
[plain]聽view plain聽copy
  1. source聽/etc/profile聽聽
鏈€鍚庨獙璇佹槸鍚﹀畨瑁呮垚鍔燂細
[plain]聽view plain聽copy
  1. mvn聽-v聽聽

瀹夎Git锛坰o easy澶х瑧锛?/h3>

鍏堟鏌ョ湅鐪嬫槸鍚﹀凡缁忓畨瑁呰繃浜嗭細
[plain]聽view plain聽copy
  1. git聽--version聽聽
濡傛灉娌℃湁灏卞紑濮嬪畨瑁咃細
[plain]聽view plain聽copy
  1. sudo聽yum聽install聽git聽聽
瀹夎瀹屾瘯鍐嶇湅鐪嬶細
[plain]聽view plain聽copy
  1. git聽--version聽聽

涓嬮潰杩涜RocketMq瀹夎

缂栬瘧锛?/div>
[plain]聽view plain聽copy
  1. >聽git聽clone聽https://github.com/apache/incubator-rocketmq.git聽聽
[plain]聽view plain聽copy
  1. >聽cd聽incubator-rocketmq聽聽
[plain]聽view plain聽copy
  1. >聽mvn聽clean聽package聽install聽-Prelease-all聽assembly:assembly聽-U聽聽
[plain]聽view plain聽copy
  1. >聽cd聽target/apache-rocketmq-all聽聽
鍦ㄦ墽琛宮vn缂栬瘧鐨勬椂鍊欙紝浣犲彲鑳戒細閬囧埌濡備笅鐨勯棶棰橈細
杩欐槸鐢变簬娌℃湁鏉冮檺鍒涘缓鐩綍閫犳垚鐨勩€傛墍浠ワ紝瑕佷箞浣犲垏鎹㈠埌root鐢ㄦ埛锛岃涔堜娇鐢╯udo锛?/div>
[plain]聽view plain聽copy
  1. sudo聽mvn聽clean聽package聽install聽-Prelease-all聽assembly:assembly聽-U聽聽
鎻愮ず锛歴udo: mvn: command not found銆傚ソ鍚э紝涔熸槸閱変簡銆傛垜浠繕闇€瑕佸湪浣犲綋鍓嶇敤鎴风殑Home鐩綍涓嬬殑涓€涓殣钘忔枃浠讹紙.bashrc锛変腑娣诲姞鐐逛笢瑗匡細
[plain]聽view plain聽copy
  1. >聽cd聽~聽聽
[plain]聽view plain聽copy
  1. >聽sudo聽vi聽.bashrc聽聽
娣诲姞瀹屾垚鍚庯紝鎵ц锛歴ource聽.bashrc 聽浣夸慨鏀圭敓鏁堛€傜劧鍚庡啀閲嶆柊鎵ц鐪嬬湅锛?/span>
[plain]聽view plain聽copy
  1. sudo聽mvn聽clean聽package聽install聽-Prelease-all聽assembly:assembly聽-U聽聽
鏃堕棿绋嶅井鏈夌偣闀匡紝鎴戠殑鐜鐢ㄤ簡16鍒嗛挓锛岃鐪嬪畼鑰愬績绛夊緟锛屽畬鎴愬悗濡備笅鍥撅細

鍚姩RocketMQ

淇敼榛樿閰嶇疆

鐢变簬RocketMQ榛樿閰嶇疆瑕佹眰寰堥珮锛屾瘮濡傚唴瀛樿嚦灏戝氨瑕?涓狦锛屽紑鍙戣皟璇曠幆澧冩牴鏈悆涓嶆秷锛屾墍浠ユ垜浠紑濮嬪惎鍔ㄥ墠闇€瑕佸厛淇敼杩欎簺鍙傛暟銆傚惁鍒欑殑璇濓紝鎴戜滑寰堟湁浼氶亣鍒板唴瀛樺垎閰嶆垨鑰呬笉澶熺殑闂銆?/div>
淇敼target/apache-rocketmq-all/bin/runserver.sh
[plain]聽view plain聽copy
  1. JAVA_OPT="${JAVA_OPT}聽-server聽-Xms256m聽-Xmx256m聽-Xmn128m聽-XX:PermSize=128m聽-XX:MaxPermSize=320m"聽聽
淇敼target/apache-rocketmq-all/bin/runbroker.sh
[plain]聽view plain聽copy
  1. JAVA_OPT="${JAVA_OPT}聽-server聽-Xms256m聽-Xmx256m聽-Xmn128m聽聽
淇敼target/apache-rocketmq-all/bin/tools.sh
[plain]聽view plain聽copy
  1. JAVA_OPT="${JAVA_OPT}聽-server聽-Xms256m聽-Xmx256m聽-Xmn128m聽-XX:PermSize=128m聽-XX:MaxPermSize=128m"聽聽

鍚姩NameServer

杩涘叆target/apache-rocketmq-all鐩綍涓?/div>
[plain]聽view plain聽copy
  1. >聽nohup聽sh聽bin/mqnamesrv聽&聽聽
[plain]聽view plain聽copy
  1. >聽tail聽-f聽~/logs/rocketmqlogs/namesrv.log聽聽
[plain]聽view plain聽copy
  1. The聽Name聽Server聽boot聽success...聽聽

鍚姩Broker

[plain]聽view plain聽copy
  1. >聽nohup聽sh聽bin/mqbroker聽-n聽localhost:9876聽&聽聽
[plain]聽view plain聽copy
  1. >聽tail聽-f聽~/logs/rocketmqlogs/broker.log聽聽聽
[plain]聽view plain聽copy
  1. The聽broker[%s,聽172.17.0.1:10911]聽boot聽success...聽聽

寮€鏀剧鍙?/h3>

[plain]聽view plain聽copy
  1. sudo聽vi聽/etc/sysconfig/iptables聽聽
鐒跺悗閲嶅惎鐢熸晥锛?/div>
[plain]聽view plain聽copy
  1. sudo聽systemctl聽restart聽iptables聽聽

娣诲姞ROCKETMQ_HOME鐜鍙橀噺

[plain]聽view plain聽copy
  1. sudo聽vi聽/etc/profile聽聽


[plain]聽view plain聽copy
  1. source聽/etc/profile聽聽


java瀹㈡埛绔?/h1>

pom.xml

[html]聽view plain聽copy
  1. <rocketmq.version>4.0.0-incubating</rocketmq.version>聽聽
  2. 聽聽
  3. <dependency>聽聽
  4. 聽聽聽聽<groupId>org.apache.rocketmq</groupId>聽聽
  5. 聽聽聽聽<artifactId>rocketmq-client</artifactId>聽聽
  6. 聽聽聽聽<version>${rocketmq.version}</version>聽聽
  7. </dependency>聽聽
  8. <dependency>聽聽
  9. 聽聽聽聽<groupId>org.apache.rocketmq</groupId>聽聽
  10. 聽聽聽聽<artifactId>rocketmq-common</artifactId>聽聽
  11. 聽聽聽聽<version>${rocketmq.version}</version>聽聽
  12. </dependency>聽聽

鐢熶骇鑰?/h2>

[java]聽view plain聽copy
  1. import聽org.apache.rocketmq.client.exception.MQClientException;聽聽
  2. import聽org.apache.rocketmq.client.producer.DefaultMQProducer;聽聽
  3. import聽org.apache.rocketmq.client.producer.SendResult;聽聽
  4. import聽org.apache.rocketmq.common.message.Message;聽聽
  5. 聽聽
  6. import聽java.util.concurrent.TimeUnit;聽聽
  7. 聽聽
  8. publicclass聽Producer聽{聽聽
  9. 聽聽聽聽publicstaticvoid聽main(String[]聽args)聽throws聽MQClientException,聽聽
  10. 聽聽聽聽聽聽聽聽聽聽聽聽InterruptedException聽{聽聽
  11. 聽聽聽聽聽聽聽聽/**
  12. 聽聽聽聽聽聽聽聽聽*聽涓€涓簲鐢ㄥ垱寤轰竴涓狿roducer锛岀敱搴旂敤鏉ョ淮鎶ゆ瀵硅薄锛屽彲浠ヨ缃负鍏ㄥ眬瀵硅薄鎴栬€呭崟渚?lt;br>
  13. 聽聽聽聽聽聽聽聽聽*聽娉ㄦ剰锛歅roducerGroupName闇€瑕佺敱搴旂敤鏉ヤ繚璇佸敮涓€<br>
  14. 聽聽聽聽聽聽聽聽聽*聽ProducerGroup杩欎釜姒傚康鍙戦€佹櫘閫氱殑娑堟伅鏃讹紝浣滅敤涓嶅ぇ锛屼絾鏄彂閫佸垎甯冨紡浜嬪姟娑堟伅鏃讹紝姣旇緝鍏抽敭锛?/span>聽
  15. 聽聽聽聽聽聽聽聽聽*聽鍥犱负鏈嶅姟鍣ㄤ細鍥炴煡杩欎釜Group涓嬬殑浠绘剰涓€涓狿roducer
  16. 聽聽聽聽聽聽聽聽聽*/聽聽
  17. 聽聽聽聽聽聽聽聽DefaultMQProducer聽producer聽=聽new聽DefaultMQProducer("ProducerGroupName");聽聽
  18. 聽聽聽聽聽聽聽聽producer.setNamesrvAddr("192.168.56.101:9876");聽聽
  19. 聽聽聽聽聽聽聽聽producer.setInstanceName("Producer");聽聽
  20. 聽聽聽聽聽聽聽聽producer.setVipChannelEnabled(false);聽聽
  21. 聽聽
  22. 聽聽聽聽聽聽聽聽/**
  23. 聽聽聽聽聽聽聽聽聽*聽Producer瀵硅薄鍦ㄤ娇鐢ㄤ箣鍓嶅繀椤昏璋冪敤start鍒濆鍖栵紝鍒濆鍖栦竴娆″嵆鍙?lt;br>
  24. 聽聽聽聽聽聽聽聽聽*聽娉ㄦ剰锛氬垏璁颁笉鍙互鍦ㄦ瘡娆″彂閫佹秷鎭椂锛岄兘璋冪敤start鏂规硶
  25. 聽聽聽聽聽聽聽聽聽*/聽聽
  26. 聽聽聽聽聽聽聽聽producer.start();聽聽
  27. 聽聽
  28. 聽聽聽聽聽聽聽聽/**
  29. 聽聽聽聽聽聽聽聽聽*聽涓嬮潰杩欐浠g爜琛ㄦ槑涓€涓狿roducer瀵硅薄鍙互鍙戦€佸涓猼opic锛屽涓猼ag鐨勬秷鎭€?/span>聽
  30. 聽聽聽聽聽聽聽聽聽*聽娉ㄦ剰锛歴end鏂规硶鏄悓姝ヨ皟鐢紝鍙涓嶆姏寮傚父灏辨爣璇嗘垚鍔熴€備絾鏄彂閫佹垚鍔熶篃鍙細鏈夊绉嶇姸鎬侊紝<br>
  31. 聽聽聽聽聽聽聽聽聽*聽渚嬪娑堟伅鍐欏叆Master鎴愬姛锛屼絾鏄疭lave涓嶆垚鍔燂紝杩欑鎯呭喌娑堟伅灞炰簬鎴愬姛锛屼絾鏄浜庝釜鍒簲鐢ㄥ鏋滃娑堟伅鍙潬鎬ц姹傛瀬楂橈紝<br>
  32. 聽聽聽聽聽聽聽聽聽*聽闇€瑕佸杩欑鎯呭喌鍋氬鐞嗐€傚彟澶栵紝娑堟伅鍙兘浼氬瓨鍦ㄥ彂閫佸け璐ョ殑鎯呭喌锛屽け璐ラ噸璇曠敱搴旂敤鏉ュ鐞嗐€?/span>聽
  33. 聽聽聽聽聽聽聽聽聽*/聽聽
  34. 聽聽聽聽聽聽聽聽for聽(int聽i聽=聽0;聽i聽<聽1;聽i++)聽{聽聽
  35. 聽聽聽聽聽聽聽聽聽聽聽聽try聽{聽聽
  36. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽{聽聽
  37. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽Message聽msg聽=聽new聽Message("TopicTest1",//聽topic聽聽
  38. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽"TagA",//聽tag聽聽
  39. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽"OrderID001",//聽key聽聽
  40. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽("Hello聽MetaQ").getBytes());//聽body聽聽
  41. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽SendResult聽sendResult聽=聽producer.send(msg);聽聽
  42. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽System.out.println(sendResult);聽聽
  43. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽}聽聽
  44. 聽聽
  45. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽{聽聽
  46. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽Message聽msg聽=聽new聽Message("TopicTest2",//聽topic聽聽
  47. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽"TagB",//聽tag聽聽
  48. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽"OrderID0034",//聽key聽聽
  49. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽("Hello聽MetaQ").getBytes());//聽body聽聽
  50. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽SendResult聽sendResult聽=聽producer.send(msg);聽聽
  51. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽System.out.println(sendResult);聽聽
  52. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽}聽聽
  53. 聽聽
  54. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽{聽聽
  55. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽Message聽msg聽=聽new聽Message("TopicTest3",//聽topic聽聽
  56. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽"TagC",//聽tag聽聽
  57. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽"OrderID061",//聽key聽聽
  58. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽("Hello聽MetaQ").getBytes());//聽body聽聽
  59. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽SendResult聽sendResult聽=聽producer.send(msg);聽聽
  60. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽System.out.println(sendResult);聽聽
  61. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽}聽聽
  62. 聽聽聽聽聽聽聽聽聽聽聽聽}聽catch聽(Exception聽e)聽{聽聽
  63. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽e.printStackTrace();聽聽
  64. 聽聽聽聽聽聽聽聽聽聽聽聽}聽聽
  65. 聽聽聽聽聽聽聽聽聽聽聽聽TimeUnit.MILLISECONDS.sleep(1000);聽聽
  66. 聽聽聽聽聽聽聽聽}聽聽
  67. 聽聽
  68. 聽聽聽聽聽聽聽聽/**
  69. 聽聽聽聽聽聽聽聽聽*聽搴旂敤閫€鍑烘椂锛岃璋冪敤shutdown鏉ユ竻鐞嗚祫婧愶紝鍏抽棴缃戠粶杩炴帴锛屼粠MetaQ鏈嶅姟鍣ㄤ笂娉ㄩ攢鑷繁
  70. 聽聽聽聽聽聽聽聽聽*聽娉ㄦ剰锛氭垜浠缓璁簲鐢ㄥ湪JBOSS銆乀omcat绛夊鍣ㄧ殑閫€鍑洪挬瀛愰噷璋冪敤shutdown鏂规硶
  71. 聽聽聽聽聽聽聽聽聽*/聽聽
  72. 聽聽聽聽聽聽聽聽producer.shutdown();聽聽
  73. 聽聽聽聽}聽聽
  74. }聽聽

娑堣垂鑰?/h2>

[java]聽view plain聽copy
  1. import聽org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;聽聽
  2. import聽org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;聽聽
  3. import聽org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;聽聽
  4. import聽org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;聽聽
  5. import聽org.apache.rocketmq.client.exception.MQClientException;聽聽
  6. import聽org.apache.rocketmq.common.message.MessageExt;聽聽
  7. 聽聽
  8. import聽java.util.List;聽聽
  9. 聽聽
  10. publicclass聽PushConsumer聽{聽聽
  11. 聽聽
  12. 聽聽聽聽/**
  13. 聽聽聽聽聽*聽褰撳墠渚嬪瓙鏄疨ushConsumer鐢ㄦ硶锛屼娇鐢ㄦ柟寮忕粰鐢ㄦ埛鎰熻鏄秷鎭粠RocketMQ鏈嶅姟鍣ㄦ帹鍒颁簡搴旂敤瀹㈡埛绔€?lt;br>
  14. 聽聽聽聽聽*聽浣嗘槸瀹為檯PushConsumer鍐呴儴鏄娇鐢ㄩ暱杞Pull鏂瑰紡浠嶮etaQ鏈嶅姟鍣ㄦ媺娑堟伅锛岀劧鍚庡啀鍥炶皟鐢ㄦ埛Listener鏂规硶<br>
  15. 聽聽聽聽聽*/聽聽
  16. 聽聽聽聽publicstaticvoid聽main(String[]聽args)聽throws聽InterruptedException,聽聽
  17. 聽聽聽聽聽聽聽聽聽聽聽聽MQClientException聽{聽聽
  18. 聽聽聽聽聽聽聽聽/**
  19. 聽聽聽聽聽聽聽聽聽*聽涓€涓簲鐢ㄥ垱寤轰竴涓狢onsumer锛岀敱搴旂敤鏉ョ淮鎶ゆ瀵硅薄锛屽彲浠ヨ缃负鍏ㄥ眬瀵硅薄鎴栬€呭崟渚?lt;br>
  20. 聽聽聽聽聽聽聽聽聽*聽娉ㄦ剰锛欳onsumerGroupName闇€瑕佺敱搴旂敤鏉ヤ繚璇佸敮涓€
  21. 聽聽聽聽聽聽聽聽聽*/聽聽
  22. 聽聽聽聽聽聽聽聽DefaultMQPushConsumer聽consumer聽=聽new聽DefaultMQPushConsumer(聽聽
  23. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽"ConsumerGroupName");聽聽
  24. 聽聽聽聽聽聽聽聽consumer.setNamesrvAddr("192.168.56.101:9876");聽聽
  25. 聽聽聽聽聽聽聽聽consumer.setInstanceName("Consumber");聽聽
  26. 聽聽
  27. 聽聽聽聽聽聽聽聽/**
  28. 聽聽聽聽聽聽聽聽聽*聽璁㈤槄鎸囧畾topic涓媡ags鍒嗗埆绛変簬TagA鎴朤agC鎴朤agD
  29. 聽聽聽聽聽聽聽聽聽*/聽聽
  30. 聽聽聽聽聽聽聽聽consumer.subscribe("TopicTest1",聽"TagA聽||聽TagC聽||聽TagD");聽聽
  31. 聽聽聽聽聽聽聽聽/**
  32. 聽聽聽聽聽聽聽聽聽*聽璁㈤槄鎸囧畾topic涓嬫墍鏈夋秷鎭?lt;br>
  33. 聽聽聽聽聽聽聽聽聽*聽娉ㄦ剰锛氫竴涓猚onsumer瀵硅薄鍙互璁㈤槄澶氫釜topic
  34. 聽聽聽聽聽聽聽聽聽*/聽聽
  35. 聽聽聽聽聽聽聽聽consumer.subscribe("TopicTest2",聽"*");聽聽
  36. 聽聽
  37. 聽聽聽聽聽聽聽聽consumer.registerMessageListener(new聽MessageListenerConcurrently()聽{聽聽
  38. 聽聽
  39. 聽聽聽聽聽聽聽聽聽聽聽聽/**
  40. 聽聽聽聽聽聽聽聽聽聽聽聽聽*聽榛樿msgs閲屽彧鏈変竴鏉℃秷鎭紝鍙互閫氳繃璁剧疆consumeMessageBatchMaxSize鍙傛暟鏉ユ壒閲忔帴鏀舵秷鎭?/span>聽
  41. 聽聽聽聽聽聽聽聽聽聽聽聽聽*/聽聽
  42. 聽聽聽聽聽聽聽聽聽聽聽聽@Override聽聽
  43. 聽聽聽聽聽聽聽聽聽聽聽聽public聽ConsumeConcurrentlyStatus聽consumeMessage(聽聽
  44. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽List<MessageExt>聽msgs,聽ConsumeConcurrentlyContext聽context)聽{聽聽
  45. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽System.out.println(Thread.currentThread().getName()聽聽
  46. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽+聽"聽Receive聽New聽Messages:聽"聽+聽msgs.size());聽聽
  47. 聽聽
  48. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽MessageExt聽msg聽=聽msgs.get(0);聽聽
  49. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽if聽(msg.getTopic().equals("TopicTest1"))聽{聽聽
  50. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽//聽鎵цTopicTest1鐨勬秷璐归€昏緫聽聽
  51. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽if聽(msg.getTags()聽!=聽null聽&&聽msg.getTags().equals("TagA"))聽{聽聽
  52. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽//聽鎵цTagA鐨勬秷璐?/span>聽聽
  53. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽System.out.println(new聽String(msg.getBody()));聽聽
  54. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽}聽elseif聽(msg.getTags()聽!=聽null聽聽
  55. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽&&聽msg.getTags().equals("TagC"))聽{聽聽
  56. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽//聽鎵цTagC鐨勬秷璐?/span>聽聽
  57. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽}聽elseif聽(msg.getTags()聽!=聽null聽聽
  58. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽&&聽msg.getTags().equals("TagD"))聽{聽聽
  59. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽//聽鎵цTagD鐨勬秷璐?/span>聽聽
  60. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽}聽聽
  61. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽}聽elseif聽(msg.getTopic().equals("TopicTest2"))聽{聽聽
  62. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽System.out.println(new聽String(msg.getBody()));聽聽
  63. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽}聽聽
  64. 聽聽
  65. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽return聽ConsumeConcurrentlyStatus.CONSUME_SUCCESS;聽聽
  66. 聽聽聽聽聽聽聽聽聽聽聽聽}聽聽
  67. 聽聽聽聽聽聽聽聽});聽聽
  68. 聽聽
  69. 聽聽聽聽聽聽聽聽/**
  70. 聽聽聽聽聽聽聽聽聽*聽Consumer瀵硅薄鍦ㄤ娇鐢ㄤ箣鍓嶅繀椤昏璋冪敤start鍒濆鍖栵紝鍒濆鍖栦竴娆″嵆鍙?lt;br>
  71. 聽聽聽聽聽聽聽聽聽*/聽聽
  72. 聽聽聽聽聽聽聽聽consumer.start();聽聽
  73. 聽聽
  74. 聽聽聽聽聽聽聽聽System.out.println("Consumer聽Started.");聽聽
  75. 聽聽聽聽}聽聽
  76. }聽聽

http://blog.csdn.net/jayjjb/article/details/69948357

文章评论

“懒”出效率是程序员的美德
“懒”出效率是程序员的美德
程序员眼里IE浏览器是什么样的
程序员眼里IE浏览器是什么样的
编程语言是女人
编程语言是女人
程序员必看的十大电影
程序员必看的十大电影
那些争议最大的编程观点
那些争议最大的编程观点
漫画:程序员的工作
漫画:程序员的工作
60个开发者不容错过的免费资源库
60个开发者不容错过的免费资源库
程序员应该关注的一些事儿
程序员应该关注的一些事儿
旅行,写作,编程
旅行,写作,编程
10个帮程序员减压放松的网站
10个帮程序员减压放松的网站
为啥Android手机总会越用越慢?
为啥Android手机总会越用越慢?
中美印日四国程序员比较
中美印日四国程序员比较
Web开发者需具备的8个好习惯
Web开发者需具备的8个好习惯
程序员都该阅读的书
程序员都该阅读的书
Java程序员必看电影
Java程序员必看电影
为什么程序员都是夜猫子
为什么程序员都是夜猫子
如何成为一名黑客
如何成为一名黑客
什么才是优秀的用户界面设计
什么才是优秀的用户界面设计
不懂技术不要对懂技术的人说这很容易实现
不懂技术不要对懂技术的人说这很容易实现
做程序猿的老婆应该注意的一些事情
做程序猿的老婆应该注意的一些事情
我跳槽是因为他们的显示器更大
我跳槽是因为他们的显示器更大
鲜为人知的编程真相
鲜为人知的编程真相
科技史上最臭名昭著的13大罪犯
科技史上最臭名昭著的13大罪犯
5款最佳正则表达式编辑调试器
5款最佳正则表达式编辑调试器
程序员最害怕的5件事 你中招了吗?
程序员最害怕的5件事 你中招了吗?
程序员和编码员之间的区别
程序员和编码员之间的区别
总结2014中国互联网十大段子
总结2014中国互联网十大段子
要嫁就嫁程序猿—钱多话少死的早
要嫁就嫁程序猿—钱多话少死的早
初级 vs 高级开发者 哪个性价比更高?
初级 vs 高级开发者 哪个性价比更高?
十大编程算法助程序员走上高手之路
十大编程算法助程序员走上高手之路
如何区分一个程序员是“老手“还是“新手“?
如何区分一个程序员是“老手“还是“新手“?
老程序员的下场
老程序员的下场
2013年中国软件开发者薪资调查报告
2013年中国软件开发者薪资调查报告
 程序员的样子
程序员的样子
2013年美国开发者薪资调查报告
2013年美国开发者薪资调查报告
Google伦敦新总部 犹如星级庄园
Google伦敦新总部 犹如星级庄园
Java 与 .NET 的平台发展之争
Java 与 .NET 的平台发展之争
代码女神横空出世
代码女神横空出世
10个调试和排错的小建议
10个调试和排错的小建议
团队中“技术大拿”并非越多越好
团队中“技术大拿”并非越多越好
老美怎么看待阿里赴美上市
老美怎么看待阿里赴美上市
我是如何打败拖延症的
我是如何打败拖延症的
聊聊HTTPS和SSL/TLS协议
聊聊HTTPS和SSL/TLS协议
那些性感的让人尖叫的程序员
那些性感的让人尖叫的程序员
程序员周末都喜欢做什么?
程序员周末都喜欢做什么?
我的丈夫是个程序员
我的丈夫是个程序员
亲爱的项目经理,我恨你
亲爱的项目经理,我恨你
看13位CEO、创始人和高管如何提高工作效率
看13位CEO、创始人和高管如何提高工作效率
“肮脏的”IT工作排行榜
“肮脏的”IT工作排行榜
软件开发程序错误异常ExceptionCopyright © 2009-2015 MyException 版权所有