zoukankan      html  css  js  c++  java
  • 一种异步消费kafka消息的实现机制

    本文将从消息流转过程以及各步骤实现方式来进行阐述,代码基于springboot项目,配置文件yml格式:

    1. 项目启动时启动kafka消息消费线程
    2. 接收kafka消息
    3. 将kafka消息添加进对应的阻塞队列,消费消息
    4. 程序出错处理办法
    5. 总结

    1.项目启动时启动kafka消息消费线程

    ​ 消费kafka消息的类实现一个生命周期管理接口,这个接口自己定义,我这设为LifeCycle。

    public interface LifeCycle {
    	/**
    	 * start
    	 */
    	void startup();
    
    	/**
    	 * 生命周期结束时调用
    	 */
    	void shutdown();
    }
    

    ​ 该LIfeCycle类在组件生命周期管理类ComponentContainer(自定义)中进行管理,该管理类实现org.springframework.context中的ApplicationListener接口。

    /**
     *  组件生命周期管理
     */
    @Slf4j
    @Component
    public class ComponentContainer implements ApplicationListener<ContextRefreshedEvent> {
    
    	@Override
    	public void onApplicationEvent(ContextRefreshedEvent event) {
    		//  get beans of LifeCycle
    		Map<String, LifeCycle> components = event.getApplicationContext().getBeansOfType(LifeCycle.class);
    		Collection<LifeCycle> instances = retrievalInstance(components);
    		//  startup
    		instances.forEach(LifeCycle::startup);
    		//  shutdown
    		Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    			instances.forEach(LifeCycle::shutdown);
    		}));
    	}
    	/**
    	 * retrieval instance of LifeCycle
    	 */
    	private Collection<LifeCycle> retrievalInstance(Map<String, LifeCycle> components) {
    		Collection<LifeCycle> allInstances = components == null ? new ArrayList<>() : new ArrayList<>(components.values());
    		return allInstances;
    	}
    }
    
    

    这样程序启动时,就会执行LifeCycle接口的实现类的startup方法了。


    2.接收kafka消息

    ​ 注解org.springframework.kafka.annotation.KafkaListener监听kafka消息,在yml配置文件中配置好topics和containerFactory的值

    @KafkaListener(topics = {
    		"${kafka.XXX.topics}"
    	}, containerFactory = "${kafka.XXX.properties.listener-names}")
    	public void onMessage1(ConsumerRecord<String, String> record) {
    		try {
    			LOGGER.info("收到消息 record = {}",record.value());
    			doDealMessage(record.value());
    		} catch (Exception e) {
    			LOGGER.info("处理消息出错 record = {}",record.value());
    		}
    	}
    
    

    3.将kafka消息添加进对应的阻塞队列,消费消息

    ​ kafka消息消费类MessageConsumer:

    ​ 两个具体的消息消费类:Message1Consumer ,Message2Consumer

    
    @Slf4j
    @Service
    public class MessageConsumer implements LifeCycle {
    
    	/**
    	 *  数据中转队列
    	 */
    	private CommonQueue<String> queue1;
    	private CommonQueue<String> queue2;
    	/**
    	 *  收到kafka消息
    	 * @param record
    	 */
    	@KafkaListener(topics = {
    		"${kafka.XXX.topics}"
    	}, containerFactory = "${kafka.XXX.properties.listener-name}")
    	public void onMessage1(ConsumerRecord<String, String> record) {
    		try {
    			LOGGER.info("收到消息 record = {}",record.value());
    			doDealMessage1(record.value());
    		} catch (Exception e) {
    			LOGGER.info("处理消息出错 record = {}",record.value());
    		}
    	}
    
    	/**
    	 *  收到kafka消息
    	 * @param record
    	 */
    	@KafkaListener(topics = {
    		"${kafka.XXX.topics}"
    	}, containerFactory = "${kafka.XXX1.properties.listener-name}")
    	public void onMessage(ConsumerRecord<String, String> record) {
    		try {
    			LOGGER.info("收到消息 record ={}",record.value());
    			doDealMessage2(record.value());
    		} catch (Exception e) {
    			LOGGER.info("处理消息出错 record = {}",record.value());
    		}
    	}
    
    	public void doDealMessage1(String data) {
    		queue1.add(data);
    	}
    	public void doDealMessage2(String data) {
    		queue2.add(data);
    	}
    
    	@Override
    	public void startup() {
    		queue1 = new CommonQueue<>(new Message1Consumer());
    		queue2 = new CommonQueue<>(new Message2Consumer());
    	}
    
    	@Override
    	public void shutdown() {
    		if (queue1 != null) {
    			queue1.shutdown();
    		}
    		if (queue2 != null) {
    			queue2.shutdown();
    		}
    	}
    
    	/**
    	 *  数据1消费队列
    	 */
    	private class Message1Consumer implements QueueConsumer<String> {
    
    		@Override
    		public void accept(String messageVo) {
    			// 处理
    			try {
    				//处理消息1
    				}
    			} catch (Exception e) {
    				LOGGER.error("处理图谱数据出现异常,data={}", messageVo, e);
    			}
    		}
    	}
    	/**
    	 *  数据2消费队列
    	 */
    	private class Message2Consumer implements QueueConsumer<String> {
    
    		@Override
    		public void accept(String messageVo) {
    			try {
    				//处理消息2
    			} catch (Exception e) {
    				LOGGER.error("处理消息出现异常,data={}", messageVo, e);
    			}
    		}
    	}
    }
    
    

    CommonQueue类:队列类,初始化阻塞队列,并开启线程

    public class CommonQueue<T> {
    	private final Queue<T> queue;
    	private final Thread consumerThread;
    	private volatile boolean actived = true;
    
    	public CommonQueue(QueueConsumer<T> consumer) {
    		this.queue = new ArrayBlockingQueue<>(2000);
    		this.consumerThread = new Thread(new Consumer(queue, consumer), "common-queue-consumer-thread");
    		this.consumerThread.start();
    	}
    
    	public boolean add(T e) {
    		return queue.add(e);
    	}
    
    	public void shutdown() {
    		this.actived = false;
    	}
    
    	private class Consumer implements Runnable {
    		private Queue<T> queue;
    		private QueueConsumer<T> consumer;
    
    		public Consumer(Queue<T> queue, QueueConsumer<T> consumer) {
    			this.queue = queue;
    			this.consumer = consumer;
    		}
    
    		@Override
    		public void run() {
    			while (actived) {
    				T e = queue.poll();
    				if (e != null) {
    					this.consumer.accept(e);
    				} else {
    					try {
    						Thread.sleep(100);
    					} catch (InterruptedException e1) {
    						e1.printStackTrace();
    					}
    				}
    			}
    		}
    	}
    }
    
    

    QueueConsumer接口:具体的消息消费类实现该接口

    public interface QueueConsumer<T> {
    
       void accept(T e);
    }
    

    4.程序异常处理机制

    ​ 当程序出错时,停止线程处理阻塞队列中的消息

    public void shutdown() {
    		this.actived = false;
    	}
    

    ​ JDK提供了Java.Runtime.addShutdownHook(Thread hook)方法,可以在一下几种场景中被调用:

    1. 程序正常退出
    2. 使用System.exit()
    3. 终端使用Ctrl+C触发的中断
    4. 系统关闭
    5. OutOfMemory宕机
    6. 使用Kill pid命令干掉进程(注:在使用kill -9 pid时,是不会被调用的)

    5.总结

    ​ 该实现机制在获取到kafka消息后,将消息存到本地阻塞队列ArrayBlockingQueue中,一类消息拥有自己的队列,让对应的线程去取并处理该阻塞队列中的消息;一方面可以尽快的消费kafka的消息,防止消费者无法跟上数据生成的速度;另一方面容易扩展,具体的消息消费类实现通用accept()方法,实现方法的具体逻辑即可在新线程中异步执行消费,不需要在具体的消费类中关注是否开启新线程执行。

  • 相关阅读:
    JS 获取网页内容高度 和 网页可视高度 支持IE6789 Firefox Chrome
    JS 回车快捷键登陆页面 兼容火狐和IE
    如何设置span宽度
    实现三行布局页面自适应不同分辨率下的屏幕高度
    电脑个别网站打不开 POSTMAN可以请求通
    克隆DataTable
    Microsoft Visual SourceSafe 6.0 关联VS
    asp.net 网站设置访问超时时长
    sql server management studio(ssms)连接多个数据库注意事项
    ASP.NET Core 开发中间件(StaticFiles)使用
  • 原文地址:https://www.cnblogs.com/bb-ben99/p/14011534.html
Copyright © 2011-2022 走看看