rabbitmq远程调用测试,使用外部机器192.168.174.132上的rabbitmq,使用之前需要对远程调用进行配置,操作过程见博文“解决rabbitmq远程不能访问的问题”。
SendTest:
package com.mq.rabbitmq.rabbitmqtest;
import java.util.Date;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
public class ReceiveTest {
private final static String QUEUE_NAME = "ftpAgent";
private final static String userName = "admin";
private final static String password = "admin";
private final static String virtualHost = "/";
private final static int portNumber = 5672;
private final static String hostName = "master";
private final static String host = "192.168.174.132";
public static void main(String[] argv) throws java.io.IOException,
java.lang.InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
// factory.setHost("192.168.174.160");
factory.setUsername(userName);
factory.setPassword(password);
// factory.setVirtualHost(virtualHost);
factory.setHost(host);
factory.setPort(portNumber);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true, consumer);
Date nowTime = new Date();
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("RecieveTime: " + nowTime);
System.out.println(" [x] Received '" + message + "'");
}
}
}打开IDEA创建一个maven工程(Java就可以了)。
pom.xml文件如下
4.0.0 com.zhenqi rabbitmq-study 1.0-SNAPSHOT jar rabbitmq-study http://maven.apache.org UTF-8 junit junit 4.12 test com.rabbitmq amqp-client 4.1.0 org.slf4j slf4j-api org.slf4j slf4j-log4j12 1.7.21 commons-lang commons-lang 2.6
为了能远程访问rabbitmq,则需要编辑 /etc/rabbitmq/rabbitmq.conf,添加以下内容。
立即学习“Java免费学习笔记(深入)”;
[
{rabbit, [{tcp_listeners, [5672]}, {loopback_users, ["asdf"]}]}
]添加administrator角色
rabbitmqctl set_user_tags openstack administrator
创建抽象队列 EndPoint.java
package com.zhenqi;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* Created by wuming on 2017/7/16.
*/
public abstract class EndPoint {
protected Channel channel;
protected Connection connection;
protected String endPointName;
public EndPoint(String endpointName) throws Exception {
this.endPointName = endpointName;
//创建一个连接工厂 connection factory
ConnectionFactory factory = new ConnectionFactory();
//设置rabbitmq-server服务IP地址
factory.setHost("192.168.146.128");
factory.setUsername("openstack");
factory.setPassword("rabbitmq");
factory.setPort(5672);
factory.setVirtualHost("/");
//得到 连接
connection = factory.newConnection();
//创建 channel实例
channel = connection.createChannel();
channel.queueDeclare(endpointName, false, false, false, null);
}
/**
* 关闭channel和connection。并非必须,因为隐含是自动调用的。
* @throws IOException
*/
public void close() throws Exception{
this.channel.close();
this.connection.close();
}
}生产者Producer.java
生产者类的任务是向队列里写一条消息
package com.zhenqi;
import org.apache.commons.lang.SerializationUtils;
import java.io.Serializable;
/**
* Created by wuming on 2017/7/16.
*/
public class Producer extends EndPoint {
public Producer(String endpointName) throws Exception {
super(endpointName);
}
public void sendMessage(Serializable object) throws Exception {
channel.basicPublish("",endPointName, null, SerializationUtils.serialize(object));
}
}消费者QueueConsumer.java
消费者可以以线程方式运行,对于不同的事件有不同的回调函数,其中最主要的是处理新消息到来的事件。
package com.zhenqi;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import org.apache.commons.lang.SerializationUtils;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* Created by wuming on 2017/7/16.
*/
public class QueueConsumer extends EndPoint implements Runnable, Consumer {
private Logger LOG=Logger.getLogger(QueueConsumer.class);
public QueueConsumer(String endpointName) throws Exception {
super(endpointName);
}
public void handleConsumeOk(String s) {
}
public void handleCancelOk(String s) {
}
public void handleCancel(String s) throws IOException {
}
public void handleShutdownSignal(String s, ShutdownSignalException e) {
}
public void handleRecoverOk(String s) {
LOG.info("Consumer "+s +" registered");
}
public void handleDelivery(String s, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
Map map = (HashMap) SerializationUtils.deserialize(bytes);
LOG.info("Message Number "+ map.get("message number") + " received.");
}
public void run() {
try{
channel.basicConsume(endPointName, true,this);
}catch(IOException e){
e.printStackTrace();
}
}
} 测试
运行一个消费者线程,然后开始产生大量的消息,这些消息会被消费者取走
package com.zhenqi;
import java.util.HashMap;
/**
* Created by wuming on 2017/7/16.
*/
public class TestRabbitmq {
public static void main(String[] args){
try{
QueueConsumer consumer = new QueueConsumer("queue");
Thread consumerThread = new Thread(consumer);
consumerThread.start();
Producer producer = new Producer("queue");
for (int i = 0; i < 100000; i++){
HashMap message = new HashMap();
message.put("message number", i);
producer.sendMessage(message);
System.out.println("Message Number "+ i +" sent.");
}
}catch(Exception e){
e.printStackTrace();
}
}
}











