rabbitmq 消息队列安装和使用(文中附带rpm安装包)

rabbitmq+erlang.zip

需要注意的是,mq和erlang的版本是有匹配要求的,使用时请查明这两个软件版本是否匹配,该文中的版本就是相互匹配的。

版本情况:erlang-23.3.4.8-1.el7.x86_64   rabbitmq-server-3.9.8-1.el7.noarch

0、下载上面的zip包,解压到服务器,会得到2个rpm包。

1、安装erlang
yum install -y socat  # 安装依赖(rabbitmq需要它)
rpm -ivh erlang-23.3.4.8-1.el7.x86_64.rpm
安装完成后,运行:erl -v
Erlang/OTP 19 [erts-8.1] [source-77fb4f8] [64-bit] [async-threads:10] [hipe] [kernel-poll:false]
Eshell V8.1  (abort with ^G)
能进去说明安装成功。

2.安装RabbitMQ
rpm -ivh rabbitmq-server-3.9.8-1.el7.noarch.rpm

3.以服务的方式启动
service rabbitmq-server start

4.检查端口5672和相应端口是否打开,建议直接关闭防火墙
systemctl stop firewalld

5.启用维护插件(使网页可以访问rabbitmq控制台)
rabbitmq-plugins enable rabbitmq_management

6.重启rabbitmq
service rabbitmq-server restart

7.添加用户信息
rabbitmqctl add_user username password // 添加用户
rabbitmqctl set_user_tags rollen administrator // 设置用户角色
rabbitmqctl set_permissions -p / rollen ".*" ".*" ".*" // 设置用户权限

8.登录
http://192.168.110.60:15672

9.示例代码(想知道spring boot如何集成mq,自行百度)

 

1.生产者
import com.rabbitmq.client.Channel;;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

import java.util.concurrent.TimeoutException;

public class Producer {
    private static final String TASK_QUEUE_NAME = "task_queue";


    public static void main(String[] argv) throws java.io.IOException, TimeoutException, InterruptedException {
        System.out.println("----------main method is begin----------");
        //创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        System.out.println("----------factory successful!----------");
        //设置IP,端口,账户和密码
        factory.setHost("47.254.16.3");
        factory.setPort(5672);
        factory.setUsername("sj");
        factory.setPassword("7503388");
        //创建连接对象
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        System.out.println("----------connected successful!----------");

        // 初始化队列信息
        /*
        a)   taskname:队列名。
        b)   Durable:持久化。(在重启服务之后,该队列的文件是否还在全靠这个)
        c)   Exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。
        d)   AutoDelete:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。(当所有消费客户端连接断开后,是否自动删除队列)
        e)   这个属性不知道干嘛的
        小提示:  在消费者接受的时候queueDeclare方法里面的参数要和这边配合哟
        */
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        String message = getMessage(argv);
        //共发送10万条数据,每隔1S发送1        int index = 0;
        while (index < 100000) {
            Thread.sleep(1 * 1000);
            channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
            index++;
        }
        channel.close();
        connection.close();
    }


    private static String getMessage(String[] strings) {
        if (strings.length < 1)
            return "Hello World!";
        return joinStrings(strings, " ");
    }


    private static String joinStrings(String[] strings, String delimiter) {
        int length = strings.length;
        if (length == 0)
            return "";
        StringBuilder words = new StringBuilder(strings[0]);
        for (int i = 1; i < length; i++) {
            words.append(delimiter).append(strings[i]);
        }
        return words.toString();
    }
}
2.消费者
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {
    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void recv() throws java.io.IOException, java.lang.InterruptedException, TimeoutException {
        System.out.print("----------main method is begin----------");
        ConnectionFactory factory = new ConnectionFactory();
        System.out.println("----------factory successful!----------");
        factory.setHost("47.254.16.3");
        factory.setPort(5672);
        factory.setUsername("sj");
        factory.setPassword("7503388");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        System.out.println("----------connected successful!----------");

        // 指定队列持久化
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        // 指定该线程同时只接收一条消息
        channel.basicQos(1);

        QueueingConsumer consumer = new QueueingConsumer(channel);

        // 打开消息应答机制
        channel.basicConsume(TASK_QUEUE_NAME, false, consumer);

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());

            System.out.println(" [x] Received '" + message + "'");
            doWork(message);
            System.out.println(" [x] Done");

            // 返回接收到消息的确认信息
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }

    private static void doWork(String task) throws InterruptedException {
        for (char ch : task.toCharArray()) {
            if (ch == '.')
                Thread.sleep(1000);
        }
    }
    public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
        recv();
    }
}
3.消费者二号,上面的方法 QueueingConsumer 过期,下面是新的实现,两者都可以用,但是建议用下面的,上面的好像在极限的时候,会炸内存,下面的是在4.0以后才出现的代替的方法
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {
    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void recv() throws java.io.IOException, java.lang.InterruptedException, TimeoutException {
        System.out.print("----------main method is begin----------");
        ConnectionFactory factory = new ConnectionFactory();
        System.out.println("----------factory successful!----------");
        factory.setHost("47.254.16.3");
        factory.setPort(5672);
        factory.setUsername("sj");
        factory.setPassword("7503388");
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        System.out.println("----------connected successful!----------");

        // 指定队列持久化
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        // 指定该线程同时只接收一条消息
        channel.basicQos(1);

        DefaultConsumer consumer = new DefaultConsumer(channel){
            //重写DefaultConsumerhandleDelivery方法,在方法中获取消息
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException{
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
                doWork(message);
                System.out.println(" [x] Done");
                //告知channel本任务已完成,如果关闭应答模式,就不需要本句;如果开启应答模式,不执行本句,则到达channel.basicQos(1)后,本消费者将不会收到任务。
                getChannel().basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // false-开启应答模式(你不做完channel.basicQos(1)的任务数,就不会给新任务给你) true-关闭问答模式(你只管给我任务,别管我做没做完channel.basicQos(1)的任务数)
        // 相较于原来的方法,这样做之后,生产者发任务时,可以检查消费者是否异常,如果异常,就会停止任务的分发。原来只要发了就不管(不知道消费者的消费情况),消费没消费要看消费者是否回答。
        channel.basicConsume(TASK_QUEUE_NAME, false,consumer);
    }

    private static void doWork(String task){
        for (char ch : task.toCharArray()) {
            if (ch == '.')
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
        }
    }
    public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
        recv();
    }
}
发布者:songJian   点击数:398   发布时间:2017-12-12 23:08:01   更新时间:2022-03-25 18:59:35
正在加载评论...