0

0

分享Java如何远程连接调用Rabbitmq?

黄舟

黄舟

发布时间:2018-05-24 11:51:25

|

3475人浏览过

|

来源于php中文网

原创

rabbitmq远程调用测试,使用外部机器192.168.174.132上的rabbitmq,使用之前需要对远程调用进行配置,操作过程见博文“解决rabbitmq远程不能访问的问题”。

SendTest:

package com.mq.rabbitmq.rabbitmqtest;
  
import java.util.Date;
  
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
  
public class ReceiveTest {
    private final static String QUEUE_NAME = "ftpAgent";
    private final static String userName = "admin";
    private final static String password = "admin";
    private final static String virtualHost = "/";
    private final static int portNumber = 5672;
    private final static String hostName = "master";
    private final static String host = "192.168.174.132";
  
    public static void main(String[] argv) throws java.io.IOException,
            java.lang.InterruptedException {
  
        ConnectionFactory factory = new ConnectionFactory();
//      factory.setHost("192.168.174.160");
        factory.setUsername(userName);
        factory.setPassword(password);
//      factory.setVirtualHost(virtualHost);
        factory.setHost(host);
        factory.setPort(portNumber);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
//      channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
          
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(QUEUE_NAME, true, consumer);
  
        Date nowTime = new Date();
          
        while (true) {
          QueueingConsumer.Delivery delivery = consumer.nextDelivery();
          String message = new String(delivery.getBody());
          System.out.println("RecieveTime: " + nowTime);
          System.out.println(" [x] Received '" + message + "'");
        }
  
    }
}

打开IDEA创建一个maven工程(Java就可以了)。 

这里写图片描述 

pom.xml文件如下


 4.0.0

 com.zhenqi
 rabbitmq-study
 1.0-SNAPSHOT
 jar

 rabbitmq-study
 http://maven.apache.org

 
  UTF-8
 

 
  
   junit
   junit
   4.12
   test
  

  
  
   com.rabbitmq
   amqp-client
   4.1.0
   
    
     org.slf4j
     slf4j-api
    
   
  

  
   org.slf4j
   slf4j-log4j12
   1.7.21
  

  
   commons-lang
   commons-lang
   2.6
  

 

为了能远程访问rabbitmq,则需要编辑 /etc/rabbitmq/rabbitmq.conf,添加以下内容。

立即学习Java免费学习笔记(深入)”;

[
  {rabbit, [{tcp_listeners, [5672]}, {loopback_users, ["asdf"]}]}
 ]

添加administrator角色

rabbitmqctl set_user_tags openstack administrator

创建抽象队列 EndPoint.java

FreeTTS
FreeTTS

FreeTTS是一个免费开源的在线文本到语音生成解决方案,可以将文本转换成MP3,

下载
package com.zhenqi;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
 * Created by wuming on 2017/7/16.
 */
public abstract class EndPoint {

  protected Channel channel;
  protected Connection connection;
  protected String endPointName;

  public EndPoint(String endpointName) throws Exception {

    this.endPointName = endpointName;

    //创建一个连接工厂 connection factory
    ConnectionFactory factory = new ConnectionFactory();

    //设置rabbitmq-server服务IP地址
    factory.setHost("192.168.146.128");
    factory.setUsername("openstack");
    factory.setPassword("rabbitmq");
    factory.setPort(5672);
    factory.setVirtualHost("/");

    //得到 连接
    connection = factory.newConnection();

    //创建 channel实例
    channel = connection.createChannel();

    channel.queueDeclare(endpointName, false, false, false, null);
  }

  /**
   * 关闭channel和connection。并非必须,因为隐含是自动调用的。
   * @throws IOException
   */
  public void close() throws Exception{
    this.channel.close();
    this.connection.close();
  }
}

生产者Producer.java

生产者类的任务是向队列里写一条消息

package com.zhenqi;
import org.apache.commons.lang.SerializationUtils;
import java.io.Serializable;
/**
 * Created by wuming on 2017/7/16.
 */
public class Producer extends EndPoint {

  public Producer(String endpointName) throws Exception {
    super(endpointName);
  }

  public void sendMessage(Serializable object) throws Exception {
    channel.basicPublish("",endPointName, null, SerializationUtils.serialize(object));
  }
}

消费者QueueConsumer.java

消费者可以以线程方式运行,对于不同的事件有不同的回调函数,其中最主要的是处理新消息到来的事件。

package com.zhenqi;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import org.apache.commons.lang.SerializationUtils;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
 * Created by wuming on 2017/7/16.
 */
public class QueueConsumer extends EndPoint implements Runnable, Consumer {

  private Logger LOG=Logger.getLogger(QueueConsumer.class);

  public QueueConsumer(String endpointName) throws Exception {
    super(endpointName);
  }

  public void handleConsumeOk(String s) {

  }

  public void handleCancelOk(String s) {

  }

  public void handleCancel(String s) throws IOException {

  }

  public void handleShutdownSignal(String s, ShutdownSignalException e) {

  }

  public void handleRecoverOk(String s) {
    LOG.info("Consumer "+s +" registered");
  }

  public void handleDelivery(String s, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
    Map map = (HashMap) SerializationUtils.deserialize(bytes);
    LOG.info("Message Number "+ map.get("message number") + " received.");
  }

  public void run() {
    try{
      channel.basicConsume(endPointName, true,this);
    }catch(IOException e){
      e.printStackTrace();
    }
  }
}

 测试

运行一个消费者线程,然后开始产生大量的消息,这些消息会被消费者取走

package com.zhenqi;

import java.util.HashMap;

/**
 * Created by wuming on 2017/7/16.
 */
public class TestRabbitmq {

  public static void main(String[] args){
    try{
      QueueConsumer consumer = new QueueConsumer("queue");
      Thread consumerThread = new Thread(consumer);
      consumerThread.start();

      Producer producer = new Producer("queue");

      for (int i = 0; i < 100000; i++){
        HashMap message = new HashMap();
        message.put("message number", i);
        producer.sendMessage(message);
        System.out.println("Message Number "+ i +" sent.");
      }
    }catch(Exception e){
      e.printStackTrace();
    }

  }
}

相关文章

java速学教程(入门到精通)
java速学教程(入门到精通)

java怎么学习?java怎么入门?java在哪学?java怎么学才快?不用担心,这里为大家提供了java速学教程(入门到精通),有需要的小伙伴保存下载就能学习啦!

下载

相关标签:

本站声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

相关专题

更多
c++主流开发框架汇总
c++主流开发框架汇总

本专题整合了c++开发框架推荐,阅读专题下面的文章了解更多详细内容。

26

2026.01.09

c++框架学习教程汇总
c++框架学习教程汇总

本专题整合了c++框架学习教程汇总,阅读专题下面的文章了解更多详细内容。

24

2026.01.09

学python好用的网站推荐
学python好用的网站推荐

本专题整合了python学习教程汇总,阅读专题下面的文章了解更多详细内容。

72

2026.01.09

学python网站汇总
学python网站汇总

本专题整合了学python网站汇总,阅读专题下面的文章了解更多详细内容。

9

2026.01.09

python学习网站
python学习网站

本专题整合了python学习相关推荐汇总,阅读专题下面的文章了解更多详细内容。

10

2026.01.09

俄罗斯手机浏览器地址汇总
俄罗斯手机浏览器地址汇总

汇总俄罗斯Yandex手机浏览器官方网址入口,涵盖国际版与俄语版,适配移动端访问,一键直达搜索、地图、新闻等核心服务。

52

2026.01.09

漫蛙稳定版地址大全
漫蛙稳定版地址大全

漫蛙稳定版地址大全汇总最新可用入口,包含漫蛙manwa漫画防走失官网链接,确保用户随时畅读海量正版漫画资源,建议收藏备用,避免因域名变动无法访问。

183

2026.01.09

php学习网站大全
php学习网站大全

精选多个优质PHP入门学习网站,涵盖教程、实战与文档,适合零基础到进阶开发者,助你高效掌握PHP编程。

12

2026.01.09

php网站搭建教程大全
php网站搭建教程大全

本合集专为零基础用户打造,涵盖PHP网站搭建全流程,从环境配置到实战开发,免费、易懂、系统化,助你快速入门建站!

8

2026.01.09

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Kotlin 教程
Kotlin 教程

共23课时 | 2.4万人学习

C# 教程
C# 教程

共94课时 | 6.5万人学习

Java 教程
Java 教程

共578课时 | 44.7万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号