zoukankan      html  css  js  c++  java
  • spring-integration-kafka

    1、pom.xml配置

    <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-kafka</artifactId>
            <version>1.1.1.RELEASE</version>
        </dependency>
    </dependencies>
    

    2、producer.xml配置

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
        xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
        xmlns:task="http://www.springframework.org/schema/task"
        xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
            http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
        <int:channel id="inputToKafka">
            <int:queue />
        </int:channel>
        <int-kafka:outbound-channel-adapter
            id="kafkaOutboundChannelAdapter" kafka-producer-context-ref="kafkaProducerContext"
            auto-startup="false" channel="inputToKafka" order="3">
            <int:poller fixed-delay="1000" time-unit="MILLISECONDS"
                receive-timeout="0" task-executor="taskExecutor" />
        </int-kafka:outbound-channel-adapter>
        <task:executor id="taskExecutor" pool-size="5"
            keep-alive="120" queue-capacity="500" />
        <bean id="producerProperties"
            class="org.springframework.beans.factory.config.PropertiesFactoryBean">
            <property name="properties">
                <props>
                    <prop key="topic.metadata.refresh.interval.ms">3600000</prop>
                    <prop key="message.send.max.retries">5</prop>
                    <prop key="serializer.class">kafka.serializer.StringEncoder</prop>
                    <prop key="request.required.acks">1</prop>
                </props>
            </property>
        </bean>
        <int-kafka:producer-context id="kafkaProducerContext"
            producer-properties="producerProperties">
            <int-kafka:producer-configurations>
                <int-kafka:producer-configuration
                    broker-list="10.20.0.248:9092" topic="test" compression-codec="default" />
            </int-kafka:producer-configurations>
        </int-kafka:producer-context>
    </beans>
    

    3、consumer.xml配置

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
        xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
        xmlns:task="http://www.springframework.org/schema/task"
        xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka 
                            http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
                            http://www.springframework.org/schema/integration 
                            http://www.springframework.org/schema/integration/spring-integration.xsd
                            http://www.springframework.org/schema/beans 
                            http://www.springframework.org/schema/beans/spring-beans.xsd
                            http://www.springframework.org/schema/task 
                            http://www.springframework.org/schema/task/spring-task.xsd">
     
        <int:channel id="inputFromKafka">
            <int:queue/>
        </int:channel>
         
        <int-kafka:inbound-channel-adapter
            id="kafkaInboundChannelAdapter" kafka-consumer-context-ref="consumerContext"
            auto-startup="false" channel="inputFromKafka">
            <int:poller fixed-delay="10" time-unit="MILLISECONDS"
                max-messages-per-poll="5" />
        </int-kafka:inbound-channel-adapter>
     
        <bean id="consumerProperties"
            class="org.springframework.beans.factory.config.PropertiesFactoryBean">
            <property name="properties">
                <props>
                    <prop key="auto.offset.reset">smallest</prop>
                    <prop key="socket.receive.buffer.bytes">10485760</prop> <!-- 10M -->
                    <prop key="fetch.message.max.bytes">5242880</prop>
                    <prop key="auto.commit.interval.ms">1000</prop>
                </props>
            </property>
        </bean>
     
        <int-kafka:consumer-context id="consumerContext"
            consumer-timeout="4000" zookeeper-connect="zookeeperConnect" consumer-properties="consumerProperties">
            <int-kafka:consumer-configurations>
                <int-kafka:consumer-configuration
                    group-id="mygroup" max-messages="5000">
                    <int-kafka:topic id="test" streams="4" />
                </int-kafka:consumer-configuration>
                <!-- <int-kafka:consumer-configuration group-id="default3" value-decoder="kafkaSpecificDecoder"
                    key-decoder="kafkaReflectionDecoder" max-messages="10"> <int-kafka:topic-filter 
                    pattern="regextopic.*" streams="4" exclude="false" /> </int-kafka:consumer-configuration> -->
            </int-kafka:consumer-configurations>
        </int-kafka:consumer-context>
     
        <int-kafka:zookeeper-connect id="zookeeperConnect"
            zk-connect="10.20.0.248:2181" zk-connection-timeout="40000"
            zk-session-timeout="40000" zk-sync-time="200" />
    </beans>
    

    4、producer.java示例

    package kafka.spring_kafka;
     
    import java.util.Random;
     
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    import org.springframework.integration.support.MessageBuilder;
    import org.springframework.messaging.MessageChannel;
     
    public class Producer {
        private static final String CONFIG = "/context.xml";
        private static Random rand = new Random();
     
        public static void main(String[] args) {
             
            final ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext(CONFIG, Producer.class);
            ctx.start();
     
            final MessageChannel channel = ctx.getBean("inputToKafka", MessageChannel.class);
     
            for (int i = 0; i < 10; i++) {
                //默认
                //boolean flag = channel.send(MessageBuilder.withPayload("消息" + rand.nextInt()).build());
                //System.out.println(flag);
                //指定partition 和 topic
                boolean flag = channel.send(MessageBuilder.withPayload("消息" + rand.nextInt()).setHeader("messageKey", String.valueOf(i)).setHeader("topic", "test").build());
                System.out.println(flag);
            }
            ctx.close();
        }
    }
    

    5、consumer.java示例

    package kafka.spring_kafka;
     
    import java.util.Collection;
    import java.util.HashMap;
    import java.util.Iterator;
    import java.util.List;
    import java.util.Map;
    import java.util.Set;
    import java.util.concurrent.ConcurrentHashMap;
     
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    import org.springframework.integration.channel.QueueChannel;
    import org.springframework.messaging.Message;
     
     
    public class Consumer {
        private static final String CONFIG = "/consumer_context.xml";
        @SuppressWarnings({ "unchecked", "rawtypes" })
        public static void main(String[] args) {
            final ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext(CONFIG, Consumer.class);
            ctx.start();
     
            final QueueChannel channel = ctx.getBean("inputFromKafka", QueueChannel.class);
            Message msg;        
            while((msg = channel.receive()) != null) {
                HashMap map = (HashMap)msg.getPayload();
                Set<Map.Entry> set = map.entrySet();
                for (Map.Entry entry : set) {
                    String topic = (String)entry.getKey();
                    System.out.println("Topic:" + topic);
                    ConcurrentHashMap<Integer,List<byte[]>> messages = (ConcurrentHashMap<Integer,List<byte[]>>)entry.getValue();
                    Collection<List<byte[]>> values = messages.values();
                    for (Iterator<List<byte[]>> iterator = values.iterator(); iterator.hasNext();) {
                        List<byte[]> list = iterator.next();
                        for (byte[] object : list) {
                            String message = new String(object);
                            System.out.println("	Message: " + message);
                        }
                    }
                }
            }
            ctx.close();
        }
    }
    

    链接请参见:https://www.oschina.net/code/snippet_1866237_52084http://colobu.com/2014/11/19/kafka-spring-integration-in-practice/

  • 相关阅读:
    redis安装
    VMware安装Centos
    Nacos简单配置
    RAS非对称加密
    uLua Unity工作机制
    一个长期主义者的内与外
    MacOSX 运行Unity卡顿 [gethostname]
    着眼于长远,走的更稳
    物质趋于无穷, 人群趋于发散.符合熵增加的规律
    论PM与团队与敏捷开发
  • 原文地址:https://www.cnblogs.com/moonandstar08/p/6403640.html
Copyright © 2011-2022 走看看