zoukankan      html  css  js  c++  java
  • Spring 4 + Reactor Integration Example--转

    原文地址:http://www.concretepage.com/spring-4/spring-4-reactor-integration-example

    Reactor is a framework to make event driven programming much easier. This is based on Reactor Design Pattern. Reactor is good for asynchronous applications on the JVM. Here we will create asynchronous and event driven application using Spring 4 and Reactor. Reactor uses Selectors, Consumers and Events as core module. Consumer is event consumer which needs to be notified for the event. Reactor is event gateway where event consumers are registered with a notification key. Selector is an abstraction to find consumer by invoking event. Find the example for detailed understanding.

    Software Required to Run Example

    To run the example we need the following software. 
    1. JDK 6 
    2. Gradle 
    3. Eclipse

    Project Structure in Eclipse

    Find our demo project structure in eclipse.

    Spring 4 + Reactor Integration Example
     

    Java Class for JSON

    For the event driven example, we have created sample page that will return the JSON as below.

    {"result":"success","employee":{"id":1,"name":"Ram"}}

    To get this output we are using local URL as http://localhost:8080/empdata.jsp
    To consume the JSON, we have two classes as below. 
    Employee.java

    package com.concretepage.reactor;
    import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
    @JsonIgnoreProperties(ignoreUnknown=true)
    public class Employee {
    	private Integer id;
    	private String name;
    	public Integer getId() { 
    		return id;
    	}
    	public void setId(Integer id) {
    		this.id = id; 
    	}
    	public String getName() {
    		return name;
    	}
    	public void setName(String name) {
    		this.name = name;
    	}
    }

    Find the EmployeeResource.java. 
    EmployeeResource.java

    package com.concretepage.reactor;
    public class EmployeeResource {
            private String result;
            private Employee employee;
    	public String getResult() {
    		return result;
    	}
    	public void setResult(String result) {
    		this.result = result;
    	}
    	public Employee getEmployee() {
    		return employee;
    	}
    	public void setEmployee(Employee employee) {
    		this.employee = employee;
    	}
    }

    Create Consumer using reactor.function.Consumer

    To create the consumer, our class must implement reactor.function.Consumer. We need to override accept method which has the argument as reactor.event.Event. To get the event data we can use the method as Event.getData. Here in this method, we are accessing a REST URL that will return the JSON data. 
    MessageConsumer.java

    package com.concretepage.reactor;
    import java.util.concurrent.CountDownLatch;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.client.RestTemplate;
    import reactor.event.Event;
    import reactor.function.Consumer;
    public class MessageConsumer implements Consumer<Event<Integer>> {
        @Autowired
        CountDownLatch latch;
        RestTemplate restTemplate = new RestTemplate();
        @Override
        public void accept(Event<Integer> event) {
    	EmployeeResource  empResource = restTemplate.getForObject("http://localhost:8080/empdata.jsp", EmployeeResource.class);
            System.out.println("Employee " + event.getData() + ":" + empResource.getEmployee().getName());
            latch.countDown();
        }
    }

    Create Producer

    In the producer class, we are notifying consumer with a key and an event that is ready to be processed. 
    MessageProducer.java

    package com.concretepage.reactor;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.atomic.AtomicInteger;
    import org.springframework.beans.factory.annotation.Autowired;
    import reactor.core.Reactor;
    import reactor.event.Event;
    public class MessageProducer {
        @Autowired
        Reactor reactor;
        @Autowired
        CountDownLatch latch;
        public void publishEmployee(int numberOfEmp) throws InterruptedException {
            AtomicInteger counter = new AtomicInteger(1);
            for (int i=0; i < numberOfEmp; i++) {
                reactor.notify("employees", Event.wrap(counter.getAndIncrement()));
            }
            latch.await();
            System.out.println("-------Done-------");
        }
    }

    Configuration Class for Reactor

    We need to create a reactor.core.Reactor bean that needs reactor.core.Environment as an argument. 
    ReactorConfig.java

    package com.concretepage.reactor;
    import java.util.concurrent.CountDownLatch;
    import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import reactor.core.Environment;
    import reactor.core.Reactor;
    import reactor.core.spec.Reactors;
    @Configuration
    @EnableAutoConfiguration
    public class ReactorConfig {
        private static final int NUMBER_OF_EMP = 5;
        @Bean
        Environment env() {
            return new Environment();
        }
        @Bean
        Reactor reactor(Environment env) {
            return Reactors.reactor()
                    .env(env)
                    .dispatcher(Environment.THREAD_POOL)
                    .get(); 
        }
        @Bean
        MessageConsumer consumer(){
        	return new MessageConsumer();
        }
        @Bean
        MessageProducer producer(){
        	return new MessageProducer();
        }
        @Bean
        public CountDownLatch latch() {
            return new CountDownLatch(NUMBER_OF_EMP);
        }
    }

    Main Class to Run Example

    Find the main class to run the example. Reactor provides Selectors to run our asynchronous and event-driven application. Here we using $ selector. Reactor.on method registers a consumer that is triggered when notification matches the given selector. 
    Main.java

    package com.concretepage.reactor;
    import static reactor.event.selector.Selectors.$;
    import org.springframework.context.annotation.AnnotationConfigApplicationContext;
    import reactor.core.Reactor;
    public class Main {
        public static void main(String[] args) throws InterruptedException {
           final int NUMBER_OF_EMP = 5;
           AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext();
           ctx.register(ReactorConfig.class);
           ctx.refresh();
           Reactor reactor = (Reactor)ctx.getBean("reactor");
           MessageConsumer consumer = (MessageConsumer)ctx.getBean("consumer");
           MessageProducer publisher = (MessageProducer)ctx.getBean("producer");
           reactor.on($("employees"), consumer);
           publisher.publishEmployee(NUMBER_OF_EMP);
       }
    }

    Find the output. 

    Employee 2:Ram
    Employee 4:Ram
    Employee 3:Ram
    Employee 5:Ram
    Employee 1:Ram
    -------Done-------

    Gradle for Spring and Reactor JAR Depedency

    Find the gradle file to resolve the JAR dependency. 
    build.gradle

    apply plugin: 'java'
    apply plugin: 'eclipse'
    archivesBaseName = 'Concretepage'
    version = '1.0-SNAPSHOT' 
    repositories {
        maven { url "https://repo.spring.io/libs-release" }
        mavenLocal()
        mavenCentral()
    }
    dependencies {
     	compile 'org.springframework.boot:spring-boot-starter:1.2.0.RELEASE'
     	compile 'org.projectreactor.spring:reactor-spring-context:1.1.3.RELEASE'
     	compile 'org.springframework:spring-web:4.1.3.RELEASE'
     	compile 'com.fasterxml.jackson.core:jackson-databind:2.5.0-rc1'
     	compile 'org.springframework.data:spring-data-commons:1.9.1.RELEASE'
     	compile 'org.springframework.boot:spring-boot-starter-security:1.2.0.RELEASE'
    }

    Now we are done. Enjoy Learning.

  • 相关阅读:
    谈谈服务限流算法的几种实现
    使用 MongoDB 存储日志数据
    MongoDB存储引擎选择
    下载一线视频
    spring-boot-starter-redis配置详解
    SpringBoot学习笔记(6) SpringBoot数据缓存Cache [Guava和Redis实现]
    Guava 源码分析(Cache 原理)
    分布式链路跟踪 Sleuth 与 Zipkin【Finchley 版】
    Dubbo x Cloud Native 服务架构长文总结(很全)
    区块链使用Java,以太坊 Ethereum, web3j, Spring Boot
  • 原文地址:https://www.cnblogs.com/davidwang456/p/5690525.html
Copyright © 2011-2022 走看看