当前访客身份:游客 [ 登录  | 注册加入尚学堂]
直播

我来了!

拥有积分:4024
尚学堂雄起!!威武。。。

博客分类

笔记中心

课题中心

提问中心

答题中心

解答题中心

activeMQ 点对点以及发布与订阅 - 以及spring的整合&集群方式

我来了! 发表于 2年前 (2014-11-10 16:32:20)  |  评论(0)  |  阅读次数(640)| 0 人收藏此文章,   我要收藏   

Overview

点对点

connect.createSession();
session.createQueue();

订阅与消费

connect.CreateTopicSession();
session.createTopic();

与spring的整合

Spring+JMS+ActiveMQ+Tomcat实现消息服务 

集群方式:master-slave; broker-cluster[static-discovery & dynamic-discovery]

ActiveMQ集群

======================================================

ActiveMQ 点对点消费者生产者示例

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://10.10.40.174:61616");
        Connection connection = null;
        Session session = null;
        MessageProducer producer = null;
        Destination destination = null;
        try {
            connection = connectionFactory.createConnection("zengjun", "zj");
            session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            destination = session.createQueue("TEST.QUEUE.ZJ_02");
            producer = session.createProducer(destination);
            //设置消息模式,有持久与非持久的
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            connection.start();
            double d = Math.random();
            InputStream in = new FileInputStream("D:\\2.09M.jar");
            BufferedInputStream objBufferedInputStream = new BufferedInputStream(in);
            int len = objBufferedInputStream.available();
            byte[] bBuffer = new byte[len];
            //创建StreamMessage
            StreamMessage message = session.createStreamMessage();
            objBufferedInputStream.read(bBuffer);
            //添加byte数组数据
            message.writeBytes(bBuffer);
            //添加整型属性
            message.setIntProperty("MessageLength1", len);
            //添加字符串属性
            message.setStringProperty("ID", String.valueOf(d));
            in.close();
            producer.send(message);
            System.out.println("发送消息成功 ID " + String.valueOf(d));
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            //关闭资源
            ConnectionUtil.closeSession(session);
            ConnectionUtil.closeConnection(connection);
            ConnectionUtil.closeMessageProducer(producer);
        }
        System.out.println("发送结束");

activeMQ 点对点消费者代码

// 创建连接工厂
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://10.10.40.174:61616");
        Session session = null;
        MessageConsumer consumer = null;
        Connection connection = null;
        Message message = null;
        try {
            // 访问的用户与密码
            connection = connectionFactory.createConnection("ll", "ll");
            connection.start();
            session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            Destination destination = session.createQueue("TEST.QUEUE.ZJ_02");
            consumer = session.createConsumer(destination);
            // 无时间参数表示一直等待,直到收到消息。
            // message = consumer.receive();
            // 有时间参数表示指定时间后没有消息则结束时,如果存在消息就在取完消息后结束
            message = consumer.receive(5 * 1000);
            // 立即往下执行
            // message = consumer.receiveNoWait();
            if (message != null) {
                System.out.println("收到消息");
                if (message instanceof TextMessage) {
                    TextMessage textMessage = (TextMessage) message;
                    String text = textMessage.getText();
                    System.out.println("TEXT:" + text);
                    textMessage.acknowledge();
                } else if (message instanceof StreamMessage) {
                    StreamMessage streamMessage = (StreamMessage) message;
                    String strId = streamMessage.getStringProperty("ID");
                    System.out.println("streammessage  ID:" + strId);
                    streamMessage.acknowledge();
                }
            } else {
                System.out.println("没有收到消息");
            }
        } catch (Exception e) {
            System.out.println("发生异常\n");
            e.printStackTrace();
        } finally {
            //关闭资源
            ConnectionUtil.closeAll(connection, session, consumer);
        }

activeMQ 发布/订阅,发布者代码示例  

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://10.10.40.174:61616");
        TopicConnection connection = null;
        ActiveMQTopicSession session = null;
        ActiveMQTopicPublisher publisher = null;
        ActiveMQTopic topic = null;
        try {
            connection = connectionFactory.createTopicConnection("zengjun", "zj");
            session = (ActiveMQTopicSession) connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
            topic = (ActiveMQTopic) session.createTopic("TEST.topic.zj");
            publisher = (ActiveMQTopicPublisher) session.createPublisher(topic);
            publisher.setDeliveryMode(DeliveryMode.PERSISTENT);
            connection.start();
            int flag = 2;
            if (flag == 1) {
                TextMessage messageText = session.createTextMessage();
                messageText.setText("tipic:" + System.currentTimeMillis());
                publisher.publish(messageText);
            } else {
                StreamMessage messageStream = session.createStreamMessage();
                FileInputStream fi = new FileInputStream("D:\\JavaXYQ.zip");
                byte[] btyes = new byte[fi.available()];
                fi.read(btyes);
                messageStream.writeBytes(btyes);
                publisher.publish(messageStream);
            }
            System.out.println("Topic消息发送成功");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            ConnectionUtil.closeSession(session);
            ConnectionUtil.closeConnection(connection);
            ConnectionUtil.closeTopicPublisher(publisher);
        }

activeMQ 发布/订阅,持久订阅代码示例

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://10.10.40.174:61616");
        TopicConnection connection = null;
        ActiveMQTopicSession session = null;
        ActiveMQTopic topic = null;
        ActiveMQTopicSubscriber subscriber = null;
        try {
            connection = connectionFactory.createTopicConnection("zengjun", "zj");
            connection.setClientID("client_ID_test");
            connection.start();
            session = (ActiveMQTopicSession) connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
            topic = (ActiveMQTopic) session.createTopic("TEST.topic.zj");
           //创建持久订阅
            subscriber = (ActiveMQTopicSubscriber) session.createDurableSubscriber(topic, "Subscriber_name_test");
            subscriber.setMessageListener(this);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // ConnectionUtil.closeSession(session);
            // ConnectionUtil.closeConnection(connection);
            // ConnectionUtil.closeTopicSubscriber(subscriber);
        }
    }

    public void onMessage(Message message) {
        System.out.println("收到消息");
        try {
            if (message != null) {
                if (message instanceof TextMessage) {
                    TextMessage textMessage = (TextMessage) message;
                    System.out.println(textMessage.getText());
                }else if(message instanceof StreamMessage){
                    System.out.println("收到steam消息");
                }
            } else {
                System.out.println("没有收到消息");
            }
            Thread.sleep(1 * 1000);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


分享到:0
关注微信,跟着我们扩展技术视野。每天推送IT新技术文章,每周聚焦一门新技术。微信二维码如下:
微信公众账号:尚学堂(微信号:bjsxt-java)
声明:博客文章版权属于原创作者,受法律保护。如果侵犯了您的权利,请联系管理员,我们将及时删除!
(邮箱:webmaster#sxt.cn(#换为@))
北京总部地址:北京市海淀区西三旗桥东建材城西路85号神州科技园B座三层尚学堂 咨询电话:400-009-1906 010-56233821
Copyright 2007-2015 北京尚学堂科技有限公司 京ICP备13018289号-1 京公网安备11010802015183