zoukankan      html  css  js  c++  java
  • Storm框架:Storm整合springboot

    我们知道Storm本身是一个独立运行的分布式流式数据处理框架,Springboot也是一个独立运行的web框架。那么如何在Strom框架中集成Springboot使得我们能够在Storm开发中运用Spring的Ioc容器及其他如Spring Jpa等功能呢?我们先来了解以下概念:

    • Storm主要的三个Component:Topology、Spout、Bolt。Topology作为主进程控制着spout、bolt线程的运行,他们相当于独立运行的容器分布于storm集群中的各个机器节点。
    • SpringApplication:是配置Spring应用上下文的起点。通过调用SpringApplication.run()方法它将创建ApplicationContext实例,这是我们能够使用Ioc容器的主要BeanFactory。之后Spring将会加载所有单例模式的beans,并启动后台运行的CommandLineRunner beans等。
    • ApplicationContextAware:这是我们能够在普通Java类中调用Spring容器里的beans的关键接口。

    实现原理

    Storm框架中的每个Spout和Bolt都相当于独立的应用,Strom在启动spout和bolt时提供了一个open方法(spout)和prepare方法(bolt)。我们可以把初始化Spring应用的操作放在这里,这样可以保证每个spout/bolt应用在后续执行过程中都能获取到Spring的ApplicationContext,有了ApplicationContext实例对象,Spring的所有功能就都能用上了。

    • Spout.open方法实现
    @Override
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        //启动Springboot应用
        SpringStormApplication.run();
    
        this.map = map;
        this.topologyContext = topologyContext;
        this.spoutOutputCollector = spoutOutputCollector;
    }
    
    • Bolt.prepare方法实现
    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        //启动Springboot应用
        SpringStormApplication.run();
    
        this.map = map;
        this.topologyContext = topologyContext;
        this.outputCollector = outputCollector;
    }
    
    • SpringStormApplication启动类
    @SpringBootApplication
    @ComponentScan(value = "com.xxx.storm")
    public class SpringStormApplication {
        /**
         * 非工程启动入口,所以不用main方法
         * 加上synchronized的作用是由于storm在启动多个bolt线程实例时,如果Springboot用到Apollo分布式配置,会报ConcurrentModificationException错误
         * 详见:https://github.com/ctripcorp/apollo/issues/1658
         * @param args
         */
        public synchronized static void run(String ...args) {
            SpringApplication app = new SpringApplication(SpringStormApplication.class);
            //我们并不需要web servlet功能,所以设置为WebApplicationType.NONE
            app.setWebApplicationType(WebApplicationType.NONE);
            //忽略掉banner输出
            app.setBannerMode(Banner.Mode.OFF);
            //忽略Spring启动信息日志
            app.setLogStartupInfo(false);
            app.run(args);
        }
    }
    

    与我们传统的Springboot应用启动入口稍微有点区别,主要禁用了web功能,看下正常的启动方式:

    @SpringBootApplication
    @ComponentScan(value = "com.xxx.web")
    public class PlatformApplication {
    	public static void main(String[] args) {
    		SpringApplication.run(PlatformApplication.class, args);
    	}
    }
    
    • 在spout/bolt中调用了SpringStormApplication.run方法后,我们还需要能够拿到ApplicationContext容器对象,这时候我们还需要实现ApplicationContextAware接口,写个工具类BeanUtils:
    @Component
    public class BeanUtils implements ApplicationContextAware {
        private static ApplicationContext applicationContext = null;
    
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            if (BeanUtils.applicationContext == null) {
                BeanUtils.applicationContext = applicationContext;
            }
        }
        
        public static ApplicationContext getApplicationContext() {
            return applicationContext;
        }
        
        public static Object getBean(String name) {
            return getApplicationContext().getBean(name);
        }
        
        public static <T> T getBean(Class<T> clazz) {
            return getApplicationContext().getBean(clazz);
        }
        
        public static <T> T getBean(String name, Class<T> clazz) {
            return getApplicationContext().getBean(name, clazz);
        }
    }
    

    通过@Component注解使得Spring在启动时能够扫描到该bean,因为BeanUtils实现了ApplicationContextAware接口,Spring会在启动成功时自动调用BeanUtils.setApplicationContext方法,将ApplicationContext对象保存到工具类的静态变量中,之后我们就可以使用BeanUtils.getBean()去获取Spring容器中的bean了。

    写个简单例子

    • 在FilterBolt的execute方法中获取Spring bean
    @Override
    public void execute(Tuple tuple) {
        FilterService filterService = (FilterService) BeanUtils.getBean("filterService");
        filterService.deleteAll();
    }
    
    • 定义FilterService类,这时候我们就可以使用Spring的相关注解,自动注入,Spring Jpa等功能了。
    @Service("filterService")
    public class FilterService {
        @Autowired
        UserRepository userRepository;
        
        public void deleteAll() {
            userRepository.deleteAll();
        }
    }
    

    将storm应用作为Springboot工程的一个子模块

    工程主目录的pom文件还是springboot相关的依赖,在storm子模块中引入storm依赖,这时候启动Strom的topology应用会有一个日志包依赖冲突。

    SLF4J: Class path contains multiple SLF4J bindings.
    SLF4J: Found binding in [jar:file:/Applications/IntelliJ%20IDEA.app/Contents/bin/~/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.11.1/log4j-slf4j-impl-2.11.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/Applications/IntelliJ%20IDEA.app/Contents/bin/~/.m2/repository/ch/qos/logback/logback-classic/1.2.3/logback-classic-1.2.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
    

    我们需要在storm子模块的pom文件中重写org.springframework.boot:spring-boot-starter包依赖,将Springboot的相关日志包排除掉,如下:

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
        <exclusions>
            <exclusion>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-to-slf4j2</artifactId>
            </exclusion>
            <exclusion>
                <groupId>ch.qos.logback</groupId>
                <artifactId>logback-classic2</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    

    使用maven-shade-plugin打包注意的问题

    因为springboot有自己的打包插件,如果使用maven-shade-plugin需要将spring-boot-maven-plugin作为依赖引入,另外spring-boot-starter-parent需要使用dependencyManagement引入。

    parent部分替换如下:

    <dependencyManagement>
            <dependencies>
                <dependency>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-dependencies</artifactId>
                    <version>2.1.0.RELEASE</version>
                    <type>pom</type>
                    <scope>import</scope>
                </dependency>
            </dependencies>
    </dependencyManagement>
    

    build如下:

    <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.8.0</version>
                    <configuration>
                        <encoding>UTF-8</encoding>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-jar-plugin</artifactId>
                    <version>2.6</version>
                    <configuration>
                        <archive>
                            <manifest>
                                <addClasspath>true</addClasspath>
                                <classpathPrefix>lib/</classpathPrefix>
                                <mainClass>com.xxx.storm.pointer.XdPointerTopology</mainClass>
                            </manifest>
                        </archive>
                    </configuration>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>3.1.1</version>
                    <dependencies>
                        <dependency>
                            <groupId>org.springframework.boot</groupId>
                            <artifactId>spring-boot-maven-plugin</artifactId>
                            <version>2.1.0.RELEASE</version>
                        </dependency>
                    </dependencies>
                    <configuration>
                        <keepDependenciesWithProvidedScope>true</keepDependenciesWithProvidedScope>
                        <createDependencyReducedPom>true</createDependencyReducedPom>
                        <filters>
                            <filter>
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                        <artifactSet>
                            <excludes>
                                <exclude>com.xxx.storm:xxx-storm</exclude>
                                <exclude>org.slf4j:slf4j-api</exclude>
                                <exclude>javax.mail:javax.mail-api</exclude>
                                <exclude>org.apache.storm:storm-core</exclude>
                                <exclude>org.apache.storm:storm-kafka</exclude>
                                <exclude>org.apache.logging.log4j:log4j-slf4j-impl</exclude>
                            </excludes>
                        </artifactSet>
                    </configuration>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <transformers>
                                    <transformer
                                            implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                        <resource>META-INF/spring.handlers</resource>
                                    </transformer>
                                    <transformer
                                            implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
                                        <resource>META-INF/spring.factories</resource>
                                    </transformer>
                                    <transformer
                                            implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                        <resource>META-INF/spring.schemas</resource>
                                    </transformer>
                                    <transformer
                                            implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                                    <transformer
                                            implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                        <mainClass>com.xxx.storm.pointer.XdPointerTopology</mainClass>
                                    </transformer>
                                </transformers>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    

    注意:META-INF/spring.* 文件不需要我们创建,这是springboot包内部的文件,这里只是需要显示的引入进来。

    OK,完美整合!

  • 相关阅读:
    【转】内网yum源搭建
    IO-同步,异步,阻塞,非阻塞,阅读摘要
    java如何获取当前时间,精确到毫秒
    java编写创建数据库和表的程序
    Java得到当前系统时间,精确到毫秒的几种方法
    linux学习 XShell上传、下载本地文件到linux服务器
    java的InputStream和OutputStream的理解
    SpringMVC使用session实现简单登录
    spring MVC 的MultipartFile转File读取
    SpringMvc文件上传和下载
  • 原文地址:https://www.cnblogs.com/gouyg/p/storm-springboot.html
Copyright © 2011-2022 走看看