zoukankan      html  css  js  c++  java
  • python scala kafka 集成一个流程项目 spark

    想在windows 下 ,搭建一个spark kafka 的 最简单的实时流计算:
    python 随机生成0-100 的随机数据,发送给spark 进行统计
    scala  2.11
    python 3.5
    java 1.8
    kafka_2.11-0.11.0.0.tgz
    zookeeper-3.4.9.tar.gz
    spark 2.2

    step 1
    zk  配置  ,启动zkserver

    step 2
    kafka 配置 启动 kafka  ,创建topic



    数据生成端:
    # -*- coding: utf-8 -*-
    from kafka import KafkaProducer
    import random
    import time

    '''
    c:p_not_imprtkafka_2.11-0.11.0.0inwindows>kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafkaid
    '''

    '''
    kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic kafkaid --from-beginning
    kafka-console-consumer.bat --bootstrap-server 127.0.0.1:9092 --topic kafkaid --from-beginning
    '''

    class Kafka_Proceduer():
        def __init__(self,kafkahost,kafkaport,kafkatopic):
            self.kafkahost=kafkahost
            self.kafkaport=kafkaport
            self.kafkatopic=kafkatopic
            self.producer=KafkaProducer(bootstrap_servers = '{kafka_host}:{kafka_port}'.format(kafka_host=self.kafkahost,kafka_port=self.kafkaport))


        def sendvalues(self):
            values = random.randint(0,100)
            proceduer = self.producer
            proceduer.send(self.kafkatopic,bytes(values))
            print(values)
            proceduer.flush()

    def main():
        proceduer = Kafka_Proceduer('127.0.0.1',9092,"kafkaid")
        while 1==1:
            proceduer.sendvalues()
            time.sleep(5)

    if __name__ == '__main__':
        main()


        
    数据消费端 ,用于测试:
    # encoding:utf-8
    from kafka import KafkaConsumer
    """
    测试数据可以消费
    """
    class Kafka_Consumer():
        def __init__(self, kafkahost, kafkaport, kafkatopic):
            self.kafkaHost = kafkahost
            self.kafkaPort = kafkaport
            self.kafkatopic = kafkatopic

            self.consumer = KafkaConsumer(self.kafkatopic,
                                          bootstrap_servers = '{kafka_host}:{kafka_port}'.format(
                kafka_host=self.kafkaHost,
                kafka_port=self.kafkaPort ))

        def dataToConsumer(self):
            for data in self.consumer:
                print (data)

    def main():
        con = Kafka_Consumer('127.0.0.1',9092,'kafkaid')
        con.dataToConsumer()

    if __name__=='__main__':
       main()


       """
    ConsumerRecord(topic='kafkaid', partition=0, offset=305, timestamp=1503113220641, timestamp_type=0, key=None, value=b'x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00', checksum=-1933145318, serialized_key_size=-1, serialized_value_size=48)
    ConsumerRecord(topic='kafkaid', partition=0, offset=306, timestamp=1503113225648, timestamp_type=0, key=None, value=b'x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00', checksum=-341178958, serialized_key_size=-1, serialized_value_size=77)
    ConsumerRecord(topic='kafkaid', partition=0, offset=307, timestamp=1503113230657, timestamp_type=0, key=None, value=b'x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00', checksum=825828613, serialized_key_size=-1, serialized_value_size=22)
    ConsumerRecord(topic='kafkaid', partition=0, offset=308, timestamp=1503113235667, timestamp_type=0, key=None, value=b'x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00', checksum=-1954167954, serialized_key_size=-1, serialized_value_size=21)
    ConsumerRecord(topic='kafkaid', partition=0, offset=309, timestamp=1503113240673, timestamp_type=0, key=None, value=b'x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00', checksum=341811256, serialized_key_size=-1, serialized_value_size=16)
    ConsumerRecord(topic='kafkaid', partition=0, offset=310, timestamp=1503113245678, timestamp_type=0, key=None, value=b'x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00', checksum=979573176, serialized_key_size=-1, serialized_value_size=88)
    ConsumerRecord(topic='kafkaid', partition=0, offset=311, timestamp=1503113250685, timestamp_type=0, key=None, value=b'x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00', checksum=1345984197, serialized_key_size=-1, serialized_value_size=90)
    ConsumerRecord(topic='kafkaid', partition=0, offset=312, timestamp=1503113255695, timestamp_type=0, key=None, value=b'x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00', checksum=-1404551612, serialized_key_size=-1, serialized_value_size=89)
    ConsumerRecord(topic='kafkaid', partition=0, offset=313, timestamp=1503113260700, timestamp_type=0, key=None, value=b'x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00', checksum=-96665387, serialized_key_size=-1, serialized_value_size=50)
    ConsumerRecord(topic='kafkaid', partition=0, offset=314, timestamp=1503113265708, timestamp_type=0, key=None, value=b'x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00', checksum=479929368, serialized_key_size=-1, serialized_value_size=25)
    ConsumerRecord(topic='kafkaid', partition=0, offset=315, timestamp=1503113270715, timestamp_type=0, key=None, value=b'x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00', checksum=-1410866262, serialized_key_size=-1, serialized_value_size=92)
    ConsumerRecord(topic='kafkaid', partition=0, offset=316, timestamp=1503113275722, timestamp_type=0, key=None, value=b'x00x00x00x00x00x00x00x00x00x00', checksum=149549586, serialized_key_size=-1, serialized_value_size=10)
       """



       
       

    Spark 数据消费端:


    貌似 现在还不能从 0.11 kafka读取数据 ,

    不过现实的话需要两个重要的函数

    updateStateByKey

    reduceByKey





  • 相关阅读:
    .vsdx 在线查看 省的安装 visio 2013了
    桌面整理工具 rolan
    第一节、ES6的开发环境搭建
    $("[lay-id='"+this.id+"']")
    mathAge.call(btn) 函数call 改变函数内 this #js
    viewer && ImageFlow 图片滚动组件 图片点击放大 可以滚轮放大缩小 viewer
    ie11 突然不能加载外部css 很神奇 头部改为 <!DOCTYPE> <html>
    CODE[VS] 1219 骑士游历
    CODE[VS] 1169 传纸条
    CODE[VS] 1010 过河卒
  • 原文地址:https://www.cnblogs.com/TendToBigData/p/10501212.html
Copyright © 2011-2022 走看看