zoukankan      html  css  js  c++  java
  • 大数据入门第三天——基础补充与ActiveMQ

    一、多线程基础回顾

      先导知识在基础随笔篇:http://www.cnblogs.com/jiangbei/p/6664555.html

      以下此部分以补充为主

      1.概念

        进程:进行中的程序,内存中有独立的内存空间

        线程:进程中的多个顺序控制流

      2.Java中实现线程的两种方式

        参考上文(继承thread类与实现runnable接口)

      3.同步synchronized的用法

        参考上文(同一时间只能有一个线程执行)

      4.lock

        外部的锁类,参考:https://www.cnblogs.com/dolphin0520/p/3923167.html

        更多的介绍与细节,将在JUC的基础随笔中进行补充...

      5.更多JUC特性

        线程池,队列blockqueue等其他特性,将在基础JUC中进行补充...

    二、JMS

      1.什么是JMS

      JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。

    JMS是一种与厂商无关的 API,用来访问消息收发系统消息,它类似于JDBC(Java Database Connectivity)。这里,JDBC 是可以用来访问许多不同关系数据库的 API,而 JMS 则提供同样与厂商无关的访问方法,以访问消息收发服务。许多厂商都支持 JMS,包括 IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ。 JMS 使您能够通过消息收发服务(有时称为消息中介程序或路由器)从一个 JMS 客户机向另一个 JMS客户机发送消息。消息是 JMS 中的一种类型对象,由两部分组成:报头和消息主体。报头由路由信息以及有关该消息的元数据组成。消息主体则携带着应用程序的数据或有效负载。根据有效负载的类型来划分,可以将消息分为几种类型,它们分别携带:简单文本(TextMessage)、可序列化的对象 (ObjectMessage)、属性集合 (MapMessage)、字节流 (BytesMessage)、原始值流 (StreamMessage),还有无有效负载的消息 (Message)。
    查看更多JMS介绍

      // 介绍来自百度百科

        所以,实质上,JMS是类似JDBC的一套规范(一组接口)

      2.体系架构

    JMS由以下元素组成。
    JMS提供者provider:连接面向消息中间件的,JMS接口的一个实现。提供者可以是Java平台的JMS实现,也可以是非Java平台的面向消息中间件的适配器。
    JMS客户:生产或消费基于消息的Java的应用程序或对象。
    JMS生产者:创建并发送消息的JMS客户。
    JMS消费者:接收消息的JMS客户。
    JMS消息:包括可以在JMS客户之间传递的数据的对象
    JMS队列:一个容纳那些被发送的等待阅读的消息的区域。与队列名字所暗示的意思不同,消息的接受顺序并不一定要与消息的发送顺序相同。一旦一个消息被阅读,该消息将被从队列中移走。
    JMS主题:一种支持发送消息给多个订阅者的机制。

      3.JMS两种模型 

        1、 点对点或队列模型

          

        2、发布者/订阅者模型

          

      更多JMS基本概念介绍与编程接口讲解,参考http://blog.csdn.net/jiuqiyuliang/article/details/46701559

     三、ActiveMQ入门

      1.概述

        消息中间件的概念

    消息中间件
    
    我们简单的介绍一下消息中间件,对它有一个基本认识就好,消息中间件(MOM:Message Orient middleware)。
    
    消息中间件有很多的用途和优点: 
    1. 将数据从一个应用程序传送到另一个应用程序,或者从软件的一个模块传送到另外一个模块; 
    2. 负责建立网络通信的通道,进行数据的可靠传送。 
    3. 保证数据不重发,不丢失 
    4. 能够实现跨平台操作,能够为不同操作系统上的软件集成技工数据传送服务

        ActiveMQ就是JMS的一种具体实现,是一种受欢迎的消息中间件

    Apache ActiveMQ ™ is the most popular and powerful open source messaging and Integration Patterns server.

      2.下载

        这里示例使用windows版本,实际生产应该是Linux版了;这里就不下载最新版了,使用5.12.1演示(注意查看版本位置)

        下载地址:http://activemq.apache.org/download.html

      3.配置与安装

        和tomcat一样,解压

        

        打开conf/activemq.xml配置文件,修改相关的IP0.0.0.0为locaohost

        

        4.启动

    binactivemq start

      对于报错无法加载主类一闪而过的,是因为安装路径有空格,解决方法参考:https://www.cnblogs.com/anan1688/p/4681965.html

      对于双击bat文件一闪而过,应该是由于版本问题(老版本可以使用此方式启动),查看官方get started即可完美解决:点击这里

       5.测试

    Open the administrative interface
        URL: http://127.0.0.1:8161/admin/
        Login: admin
        Passwort: admin
    Navigate to "Queues"
    Add a queue name and click create
    Send test message by klicking on "Send to"

    四、Java连接Helloworld

      1.准备

        将解压出来的包中的activemq-all-5.11.1.jar加入lib(IDEA如何引入外部jar请参考IDEA相关随笔),maven依赖方式此处暂略

      2.开始

    package cn.itcast_03_mq.topic;
    import javax.jms.Connection;      
    import javax.jms.Destination;      
    import javax.jms.ExceptionListener;
    import javax.jms.JMSException;      
    import javax.jms.MessageConsumer;      
    import javax.jms.Session;      
    import javax.jms.MessageListener;      
    import javax.jms.Message;      
    import javax.jms.TextMessage;      
         
    import org.apache.activemq.ActiveMQConnection;      
    import org.apache.activemq.ActiveMQConnectionFactory;      
         
    public class ConsumerTool implements MessageListener,ExceptionListener {      
        private String user = ActiveMQConnection.DEFAULT_USER;      
        private String password = ActiveMQConnection.DEFAULT_PASSWORD;      
        private String url =ActiveMQConnection.DEFAULT_BROKER_URL;      
        private String subject = "mytopic";      
        private Destination destination = null;      
        private Connection connection = null;      
        private Session session = null;      
        private MessageConsumer consumer = null;  
        public static Boolean isconnection=false;
        // 初始化      
        private void initialize() throws JMSException, Exception {      
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(      
                    user, password, url);      
            connection = connectionFactory.createConnection();      
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);      
            destination = session.createTopic(subject);      
            consumer = session.createConsumer(destination);     
        }      
         
        // 消费消息      
        public void consumeMessage() throws JMSException, Exception {      
            initialize();      
            connection.start();
            consumer.setMessageListener(this);    
            connection.setExceptionListener(this);
            isconnection=true;
            System.out.println("Consumer:->Begin listening...");      
            // 开始监听  
            // Message message = consumer.receive();      
        }
        // 关闭连接      
        public void close() throws JMSException {      
            System.out.println("Consumer:->Closing connection");      
            if (consumer != null)      
                consumer.close();      
            if (session != null)      
                session.close();      
            if (connection != null)      
                connection.close();      
        }
        // 消息处理函数      
        public void onMessage(Message message) {      
            try {      
                if (message instanceof TextMessage) {      
                    TextMessage txtMsg = (TextMessage) message;      
                    String msg = txtMsg.getText();      
                    System.out.println("Consumer:->Received: " + msg);      
                } else {      
                    System.out.println("Consumer:->Received: " + message);      
                }      
            } catch (JMSException e) {      
                // TODO Auto-generated catch block      
                e.printStackTrace();      
            }      
        }
    
        public void onException(JMSException arg0) {
            isconnection=false;
        }      
    }      
         
    View Code

      更多helloworld示例,参考博文或者官网示例

      J2EE中使用activeMQ没问题,大数据方向后续将会有kafka的介绍

    五、反射与动态代理

      反射参考基础随笔篇:http://www.cnblogs.com/jiangbei/p/6829755.html

       动态代理参考基础增强篇:http://www.cnblogs.com/jiangbei/p/6828086.html

      其他基础(例如socket等)请在Java基础篇补充查看

  • 相关阅读:
    关于git 拉取的时候一直弹输入密码的问题
    开始日期结束日期check问题
    关于boostrap 排版问题
    【DP_树形DP专题】题单总结
    【DP_背包专题】 背包九讲
    Ubuntu不卸载ibus前提下安装搜狗输入法
    Ubuntu下Java环境配置
    Ubuntu下gcc及g++环境配置
    Ubuntu下VIM(GVIM)环境配置
    PAT 1065 A+B and C (64bit) (20)
  • 原文地址:https://www.cnblogs.com/jiangbei/p/8311148.html
Copyright © 2011-2022 走看看