1、main方法中(1.0以上)
import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; /** * Kafka消息消费者 * 〈功能详细描述〉 * * @author 17090889 * @see [相关类/方法](可选) * @since [产品/模块版本] (可选) */ public class ConsumerSample { public static void main(String[] args) { String topic = "test-topic"; Properties props = new Properties(); // Kafka集群,多台服务器地址之间用逗号隔开 props.put("bootstrap.servers", "localhost:9092"); // 消费组ID props.put("group.id", "test_group1"); // Consumer的offset是否自动提交 props.put("enable.auto.commit", "true"); // 自动提交offset到zk的时间间隔,时间单位是毫秒 props.put("auto.commit.interval.ms", "1000"); // 消息的反序列化类型 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); Consumer<String, String> consumer = new KafkaConsumer<String, String>(props); // 订阅的话题 consumer.subscribe(Arrays.asList(topic)); // Consumer调用poll方法来轮询Kafka集群的消息,一直等到Kafka集群中没有消息或者达到超时时间100ms为止 while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord record : records) { System.out.println(record.partition() + record.offset()); System.out.println(record.key()); System.out.println(record.value()); } } } }
2、Spring下kafka1.0以上版本(不依赖Spring-Kafka)
3、Spring下kafka 0.8版本
1)kafka消费者抽象工厂类
/** * kafka消费者抽象工厂类 * 〈功能详细描述〉 * * @author * @see [相关类/方法](可选) * @since [产品/模块版本] (可选) */ import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Value; import java.io.UnsupportedEncodingException; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public abstract class BaseKafkaConsumerFactory implements InitializingBean, DisposableBean { private static final Logger logger = LoggerFactory.getLogger(BaseKafkaConsumerFactory.class); /** * 消费的Topic与消费线程数组成的Map */ private Map<String, Integer> topicThreadMap; /** * Consumer实例所需的配置 */ private Properties properties; /** * 线程池 */ private ThreadPoolExecutor taskExecutor; private ConsumerConnector consumerConnector; /** * zkConnect */ private String zkConnect; @Value("${kafka.groupId}") private String groupId; /** * sessionTimeOut */ @Value("${kafka.sessionTimeOut}") private String sessionTimeOut; /** * syncTime */ @Value("${kafka.syncTime}") private String syncTime; /** * commitInterval */ @Value("${kafka.commitInterval}") private String commitInterval; /** * offsetReset */ @Value("${kafka.offsetReset}") private String offsetReset; @Override public void afterPropertiesSet() { logger.info("afterPropertiesSet-start"); // 初始化properties if(properties==null){ properties = new Properties(); properties.put("zookeeper.connect", zkConnect); logger.info("zkConnect={}", zkConnect); // group 代表一个消费组 properties.put("group.id", groupId); logger.info("groupId={}", groupId); // zk连接超时 properties.put("zookeeper.session.timeout.ms", sessionTimeOut); properties.put("zookeeper.sync.time.ms", syncTime); properties.put("auto.commit.interval.ms", commitInterval); properties.put("auto.offset.reset", offsetReset); // 序列化类 properties.put("serializer.class", "kafka.serializer.StringEncoder"); properties.put("rebalance.max.retries", "10"); // 当rebalance发生时,两个相邻retry操作之间需要间隔的时间。 properties.put("rebalance.backoff.ms", "3100"); } ConsumerConfig consumerConfig = new ConsumerConfig(properties); consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams = consumerConnector.createMessageStreams(topicThreadMap); // 实际有多少个stream,就设置多少个线程处理 // int messageProcessThreadNum = 0; // for (List<KafkaStream<byte[], byte[]>> streamList : topicMessageStreams.values()) { // messageProcessThreadNum = messageProcessThreadNum + streamList.size(); // } // 创建实际处理消息的线程池 taskExecutor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10000)); for (List<KafkaStream<byte[], byte[]>> streams : topicMessageStreams.values()) { for (final KafkaStream<byte[], byte[]> stream : streams) { taskExecutor.submit(new Runnable() { @Override public void run() { ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (it.hasNext()) { MessageAndMetadata<byte[], byte[]> data = it.next(); try { String kafkaMsg = new String(data.message(),"UTF-8"); logger.info("来自topic:{}的消息:{}", topicThreadMap.keySet(), kafkaMsg); // 消息处理 onMessage(data); } catch (RuntimeException e) { logger.error("处理消息异常.", e); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } } }); } } } /** * 消息处理类 * @param data */ protected abstract void onMessage(MessageAndMetadata<byte[], byte[]> data); @Override public void destroy() throws Exception { try { if (consumerConnector != null) { consumerConnector.shutdown(); } } catch (Exception e) { logger.warn("shutdown consumer failed", e); } try { if (taskExecutor != null) { taskExecutor.shutdown(); } } catch (Exception e) { logger.warn("shutdown messageProcessExecutor failed", e); } logger.info("shutdown consumer successfully"); } public Properties getProperties() { return properties; } public void setProperties(Properties properties) { this.properties = properties; } public Map<String, Integer> getTopicThreadMap() { return topicThreadMap; } public void setTopicThreadMap(Map<String, Integer> topicThreadMap) { this.topicThreadMap = topicThreadMap; } public String getZkConnect() { return zkConnect; } public void setZkConnect(String zkConnect) { this.zkConnect = zkConnect; } }
2)具体的kafka消费者实现类
import com.xxx.sfmms.common.util.JsonConvertUtil; import com.xxx.sfmms.common.util.RedisUtil; import com.xxx.sfmms.common.util.StringUtil; import com.xxx.sfmms.service.intf.RecommendService; import kafka.message.MessageAndMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; import org.springframework.beans.factory.annotation.Autowired; import java.io.UnsupportedEncodingException; import java.util.HashMap; import java.util.Map; /** * 实名kafka消费者 * 〈功能详细描述〉 * * @author 17090889 * @see [相关类/方法](可选) * @since [产品/模块版本] (可选) */ public class RealNameKafkaConsumer extends BaseKafkaConsumerFactory { private final Logger LOGGER = LoggerFactory.getLogger(RealNameKafkaConsumer.class); private static final String STR_INVOKENO = "invokeNo"; @Autowired private RecommendService recommendService; /** * 消息处理 * @param data */ @Override protected void onMessage(MessageAndMetadata<byte[], byte[]> data) { MDC.put(STR_INVOKENO, StringUtil.getUuid()); String msg=""; try { msg=new String(data.message(),"UTF-8"); LOGGER.info("RealNameKafkaConsumer-data={},topic={}",msg,data.topic()); } catch (UnsupportedEncodingException e) { LOGGER.info("字节数组转字符串异常"); e.printStackTrace(); } // 实名的事后kafka数据 Map<String, String> map = JsonConvertUtil.json2pojo(msg, Map.class); LOGGER.info("RealNameKafkaConsumer-map={}", map); String userNo = map.get("eppAccountNO"); LOGGER.info("RealNameKafkaConsumer-userNo={}", userNo); String flag = RedisUtil.getString("PULLNEW:RACCOUNTNO_" + userNo, "MEIS"); // 不是渠道6被邀请用户 if(!"1".equals(flag)){ LOGGER.info("不是渠道6拉新用户"); return; } // 20-初级认证 30-高级实名认证 40- 实名申诉降级、50-高级到期降级 60-实名撤销(人工手动降级) 70-申诉找回身份降级 String authenStatus=map.get("authenStatus"); // 真实姓名 String realName=map.get("realName"); // 身份证号码 String idNo = map.get("idNO"); // apptoken String appToken=map.get("appToken"); // 校验任务 Map<String, String> paramMap = new HashMap<String, String>(4); paramMap.put("userNo", userNo); paramMap.put("authenStatus",authenStatus); paramMap.put("realName",realName); paramMap.put("idNo", idNo); paramMap.put("appToken",appToken); Map<String,String> resultMap=recommendService.checkRulesAndRiskSendMoney(paramMap); LOGGER.info("resultMap={}", resultMap); MDC.remove(STR_INVOKENO); } }
3)实现类的bean注入配置
<bean id="realNameKafkaConsumer" class="com.xxx.sfmms.service.RealNameKafkaConsumer"> <property name="topicThreadMap"> <map> <entry key="${realTopic}" value="5"/> </map> </property> <property name="zkConnect"> <value>${realZkConnect}</value> </property> </bean> <bean id="preCreditKafkaConsumer" class="com.xxx.sfmms.service.PreCreditKafkaConsumer"> <property name="topicThreadMap"> <map> <entry key="${rxdTopic}" value="5"/> </map> </property> <property name="zkConnect"> <value>${rxdZkConnect}</value> </property> </bean>
4)kafka consumer参数配置
#kafka监听配置 #实zk realZkConnect=xxx #topic realTopic=xxx #任zk rxdZkConnect=xxx #任性贷topic rxdTopic=xxx kafka.sessionTimeOut=6000 kafka.syncTime=2000 kafka.commitInterval=30000 kafka.offsetReset=smallest kafka.groupId=xxx
5)依赖包配置
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.9.2</artifactId> <version>0.8.1.1</version> <exclusions> <exclusion> <artifactId>jmxtools</artifactId> <groupId>com.sun.jdmk</groupId> </exclusion> <exclusion> <artifactId>jmxri</artifactId> <groupId>com.sun.jmx</groupId> </exclusion> </exclusions> </dependency>
END