zoukankan      html  css  js  c++  java
  • Kafka+OpenCV 实现实时流视频处理

     1. 启动Kafka Server

    bin/kafka-server-start.sh config/server.properties &

    2. 创建一个新topic

    bin/kafka-topics.sh --create --zookeeper xxxx --replication-factor 1 --partitions 1 --topic video

    3. 安装相关依赖

    sudo pip-3.6 install kafka-python opencv-contrib-python imutils

    4. 创建一个 Kafka Producer,并发送到kafka消息队列
    # kafkaProducer
    def publish_video(server, topic):
      
    # start producer
      
    producer = KafkaProducer(bootstrap_servers=server)

       vs = VideoStream(
    src=0).start()
       time.sleep(
    2.0)

      
    print("publishing video...")

      
    while True:
          frame = vs.read()
          frame = imutils.resize(frame,
    width=400)
          frame = detection(frame,
    'pretrained.prototxt.txt', 'pretrained.caffemodel')

         
    # send to kafka topic
         
    producer.send(topic, frame.tobytes())

       vs.stop()

    这里使用opencv 采集本地摄像头视频,读取每一帧数据,做图像识别处理,并转化为bytes数据发送到 kafka topic。

    这里选择了在Producer端直接对图像做处理,然后直接发送到流,Consumer端只做图像显示。

    其中detection方法为以一个已经训练好的深度学习模型,用于做图像识别以及描绘边框。网上类似模型很多,这里不多做赘述。这里对每帧做了一个resize是由于原视频采集的每帧数据较大,超过了kafka里默认的一个item大小,所以需要裁剪每帧,以减少传输数据量。

    5. 验证是否可以接收到流数据:

    bin/kafka-console-consumer.sh --bootstrap-server xxxx:port --topic video

    6. 创建一个Kafka Consumer,用于获取流数据

    from imutils.video import VideoStream
    from imutils.video import FPS
    import imutils
    import numpy as np
    import time
    import cv2
    from kafka import KafkaConsumer
    import sys

    def showCam(server, topic):
        consumer = KafkaConsumer(
            topic,
           
    bootstrap_servers=[server])

        fps = FPS().start()


       
    for msg in consumer:
           
    decoded = np.frombuffer(msg.value, np.uint8)
            decoded = decoded.reshape(
    225, 400, 3)

            cv2.imshow(
    "Cam", decoded)

            key = cv2.waitKey(
    1) & 0xFF
           
    if key == ord("q"):
               
    break

           
    fps.update()

        fps.stop()
        cv2.destroyAllWindows()

    这里使用np.frombuffer() 方法将每一帧的bytes数据转为一维numpy数组,由于采集的帧数据为3numpy数组,所以需要对此数组做reshape,以还原为原数据格式,最后显示在屏幕上。

    7. 执行代码:

    首先启动 Consumerpython3 kafkaCCam.py server:port topic

    然后启动 Producerpython3 KafkaPCam.py server:port topic

    即可在Consumer端获取到Producer送入到流里的实时视频图像:

  • 相关阅读:
    MVC OnActionExecuting,OnResultExecuted 的用法
    MindManager脑图之项目管理甘特图
    jQuery.Autocomplete实现自动完成功能(经典)
    常见26个jquery使用技巧详解(比如禁止右键点击、隐藏文本框文字等)
    用ATL创建COM组件详细解说
    STL中的常用的vector,map,set,Sort用法
    绘图效率完整解决方案——三种手段提高GDI/GDI+绘图效率
    C++面试
    iOS开发UI篇—iOS开发中三种简单的动画设置
    OS开发UI篇—IOS开发中Xcode的一些使用技巧
  • 原文地址:https://www.cnblogs.com/zackstang/p/10312176.html
Copyright © 2011-2022 走看看