zoukankan      html  css  js  c++  java
  • kafka+windows+java+springboot中的配置

    1.百度kafka+zookeeper+windows配置

    1.1  zookeeper配置

    dataDir=/tmp/zookeeper
    # the port at which the clients will connect
    clientPort=2181
    # disable the per-ip limit on the number of connections since this is a non-production config
    maxClientCnxns=0

    1.2 kafka server.properties配置

    advertised.host.name=IP
    
    log.dirs=D:/kafka_2.11-1.0.0/log
    
    zookeeper.connect=IP:2181

    1.3 windows hosts配置

    IP localhost

    2.maven构建springboot项目

    2.1 intellij idea 新建kafka项目

    2.2 kafka配置pom.xml

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.tangxin.kafka</groupId>
        <artifactId>kafka</artifactId>
        <version>1.0</version>
        <name>kafka</name>
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        </properties>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>1.4.1.RELEASE</version>
        </parent>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.29</version>
            </dependency>
    
    
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.11</artifactId>
                <version>0.10.0.1</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
    
    
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
                <version>1.2.0.RELEASE</version>
            </dependency>
    
        </dependencies>
    
        <build>
            <resources>
                <resource>
                    <directory>src/main/java</directory>
                    <includes>
                        <include>**/*.xml</include>
                    </includes>
                    <filtering>false</filtering>
                </resource>
                <resource>
                    <directory>src/main/resources</directory>
                    <!--<excludes>-->
                    <!--<exclude>*</exclude>-->
                    <!--</excludes>-->
                </resource>
            </resources>
            <plugins>
    
                <plugin>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                        <encoding>utf-8</encoding>
    
                        <compilerArguments>
                            <extdirs>lib</extdirs>
                        </compilerArguments>
                    </configuration>
                    <version>2.3.2</version>
                </plugin>
                <plugin>
                    <artifactId>maven-resources-plugin</artifactId>
                    <configuration>
                        <encoding>utf-8</encoding>
                    </configuration>
                    <version>2.4.3</version>
                </plugin>
            </plugins>
        </build>
    </project>

    2.3 新建springboot启动类Application

    package com.tangxin.kafka;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    
    @SpringBootApplication
    public class Application {
    
        public static void main(String[] args) {
            SpringApplication.run(Application.class, args);
        }
    
    }

    2.4 新建springboot项目中resources目录的配置文件

    application.yml

    server:
      display-name: kafka
      port: 8888
      contextPath: /kafka
      
    spring:
        profiles:
            active: dev

    application-dev.properties

    kafka.bootstrap-servers=x.x.x.x:9092

    log4j2.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <Configuration status="trace" dest="/data/logs/work/log.log">
        <appenders>
            <Console name="Console" target="SYSTEM_OUT">
                <PatternLayout>
                    <charset>UTF-8</charset>
                    <Pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n</Pattern>
                </PatternLayout>
                <ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
            </Console>
            <RollingFile name="RollingFile" fileName="/data/logs/work/work.log"
                         filePattern="/data/logs/work/work-%d{yyyy-MM-dd}-%i.log">
                <PatternLayout>
                    <charset>UTF-8</charset>
                    <Pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n</Pattern>
                </PatternLayout>
                <Policies>
                    <TimeBasedTriggeringPolicy/>
                    <SizeBasedTriggeringPolicy size="1000 MB"/>
                </Policies>
                <DefaultRolloverStrategy max="20"/>
                <ThresholdFilter level="info" onMatch="ACCEPT" onMismatch="DENY"/>
            </RollingFile>
    
            <RollingFile name="ErrorFile" fileName="/data/logs/work/error.log"  filePattern="/data/logs/work/error.%d{yyyy-MM-dd}.%i.log">
                <PatternLayout>
                    <charset>UTF-8</charset>
                    <Pattern>%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%M%n</Pattern>
                </PatternLayout>
                <Filters>
                    <ThresholdFilter level="error" onMatch="ACCEPT" onMismatch="DENY"/>
                </Filters>
                <Policies>
                    <TimeBasedTriggeringPolicy />
                    <SizeBasedTriggeringPolicy size="50 MB"/>
                </Policies>
                <DefaultRolloverStrategy fileIndex="min" max="100"/>
            </RollingFile>
        </appenders>
        <loggers>
            <Root level="info">
                <appender-ref ref="Console"/>
                <appender-ref ref="RollingFile"  level="info"/>
            </Root>
            <Logger name="com.tangxin.kafka">
                <appender-ref ref="ErrorFile" />
            </Logger>
        </loggers>
    </Configuration>

    2.5 kafka配置类

    package com.tangxin.kafka.service;
    
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.annotation.EnableKafka;
    import org.springframework.kafka.core.DefaultKafkaProducerFactory;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.core.ProducerFactory;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @Configuration
    @EnableKafka
    public class KafkaProducerConfig {
    
        @Value("${kafka.bootstrap-servers}")
        private String bootstrapServers;
    
        public Map<String, Object> producerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            return props;
        }
    
        public ProducerFactory<String, String> producerFactory() {
            return new DefaultKafkaProducerFactory<>(producerConfigs());
        }
    
        @Bean
        public KafkaTemplate<String, String> kafkaTemplate() {
            return new KafkaTemplate(producerFactory());
        }
    }

    2.6 controller层调用kafka发送

    package com.tangxin.kafka.web;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestMethod;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    
    @RestController
    public class KafkaController {
    
        @Autowired
        private KafkaTemplate kafkaTemplate;
    
        @RequestMapping(value = "/send", method = { RequestMethod.GET, RequestMethod.POST })
        public String callFeedInfo() throws Exception {
            ExecutorService executorService = Executors.newSingleThreadExecutor();
            executorService.submit(() -> {
                try {
                    kafkaTemplate.send("feed-info","1000");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
            return "send done!";
        }
    
    }

    3.windows启动zookeeper和kafka

    4.遇到的问题

    2017-11-27 17:55:38.484 [kafka-producer-network-thread | producer-1] ERROR org.springframework.kafka.support.LoggingProducerListener - Exception thrown when sending a message with key='null' and payload='1' to topic mytopic:
    org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for mytopic-1

     之所以写这个随笔就是因为这个问题,本地访问没有问题因为本机localhost和ip映射估计没问题,如果你两台电脑,一台想做server,一台想做开发就可能会遇到这样的问题,开始可能你各种官方各种百度可能都无法解决这个问题,这个问题涉及hostname主机名,说实话网络这块确实不熟悉,之前弄hadoop和spark入门时也遇到类似问题纠结很久。总结下可能存在的。

    1. 防火墙是否关闭

    2.windows下是否安装了vmware软件,控制面板网络和 Internet网络连接 禁用vmware network adapter

    3.kafka配置

    advertised.host.name=IP
    
    log.dirs=D:/kafka_2.11-1.0.0/log
    
    zookeeper.connect=IP:2181

    windows hosts配置
    IP localhost
  • 相关阅读:
    中序遍历【递归算法】和【非递归算法】
    等价无穷小替换
    轮转访问MAC协议
    曲率
    Java I/O流 01
    Java 集合框架 04
    Java 集合框架 03
    Java 集合框架 02
    Java 集合框架 01
    Java 常见对象 05
  • 原文地址:https://www.cnblogs.com/fofawubian/p/7910913.html
Copyright © 2011-2022 走看看