zoukankan      html  css  js  c++  java
  • luigi学习2-在hadoop上运行Top Artists

    一、AggregateArtistsHadoop

    class AggregateArtistsHadoop(luigi.contrib.hadoop.JobTask):
        date_interval = luigi.DateIntervalParameter()
    
        def output(self):
            return luigi.contrib.hdfs.HdfsTarget("data/artist_streams_%s.tsv" % self.date_interval)
    
        def requires(self):
            return [StreamsHdfs(date) for date in self.date_interval]
    
        def mapper(self, line):
            timestamp, artist, track = line.strip().split()
            yield artist, 1
    
        def reducer(self, key, values):
            yield key, sum(values)

    实现的功能和AggregateArtists类似,需要注意的是:

    luigi.contrib.hadoop.JobTask不需要你实现run方法,需要你实现mapper和reducer方法。mapper和combiner需要yield包含两个元素的tuple,这两个元素也可以是tuple类型的。

    这个task是依赖StreamsHdfs类型task的。现在看看StreamsHdfs的内容吧:

    二、StreamHdfs

    class StreamsHdfs(Streams):
        def output(self):
            return luigi.contrib.hdfs.HdfsTarget(self.date.strftime('data/streams_%Y_%m_%d_faked.tsv'))

    这个类和Stream类的工作是一样的,所以它继承了Stream类,并且重写了output方法,也就是说这个类最终的结果输出是在hdfs上。

    三、执行AggregateArtistsHadoop

    执行下面的命令,出现了报错,从错误信息中,我们可以看到NoSectionError,这是关于配置文件的错误,详情请参考luigi的配置文件,我的博客也给出了部分常用的配置

     PYTHONPATH='' luigi --module top_artists AggregateArtistsHadoop --local-scheduler --date-interval 2012-06
      File "/root/miniconda2/envs/my_python_env/lib/python2.7/site-packages/luigi/configuration.py", line 103, in get
        return self._get_with_default(ConfigParser.get, section, option, default, **kwargs)
      File "/root/miniconda2/envs/my_python_env/lib/python2.7/site-packages/luigi/configuration.py", line 93, in _get_with_default
        return method(self, section, option, **kwargs)
      File "/root/miniconda2/envs/my_python_env/lib/python2.7/ConfigParser.py", line 607, in get
        raise NoSectionError(section)
    NoSectionError: No section: 'hadoop'
    Exception AttributeError: "'DefaultHadoopJobRunner' object has no attribute 'tmp_dir'" in <bound method DefaultHadoopJobRunner.__del__ of <luigi.contrib.hadoop.DefaultHadoopJobRunner object at 0x7fda9c9b38d0>> ignored

    在当前工作目录创建luigi.cfg,并且写入下面的配置,然后重新执行该命令

    (my_python_env)[root@hadoop26 pythonCode]# cat luigi.cfg 
    [hadoop]
    command=hadoop#提交hadoop作业的命令
    python-executable=python#本机执行python脚本的命令
    scheduler=fair#hadoop的调度器
    streaming-jar=/usr/local/hadoop-2.6.0/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar#streaming包的位置
    version=apache1#hadoop的版本

    四、find top Artists

    如果你看到了这里,说明已经计算出了每个artist的出现次数,并且保存在本地或者HDFS上的文件中。现在我们将要找出前10个artist。这里我们选用普通的python程序来计算。

    class Top10Artists(luigi.Task):
        date_interval = luigi.DateIntervalParameter()
        use_hadoop = luigi.BoolParameter()
    
        def requires(self):
            if self.use_hadoop:
                return AggregateArtistsHadoop(self.date_interval)
            else:
                return AggregateArtists(self.date_interval)
    
        def output(self):
            return luigi.LocalTarget("data/top_artists_%s.tsv" % self.date_interval)
    
        def run(self):
            top_10 = nlargest(10, self._input_iterator())
            with self.output().open('w') as out_file:
                for streams, artist in top_10:
                    print >> out_file, self.date_interval.date_a, self.date_interval.date_b, artist, streams
    
        def _input_iterator(self):
            with self.input().open('r') as in_file:
                for line in in_file:
                    artist, streams = line.strip().split()
                    yield int(streams), int(artist)

    运行下面命令,来完成top10的计算

    PYTHONPATH='' luigi --module top_artists Top10Artists  --local-scheduler --date-interval 2012-06

    最终会在data目录下产生新的文件:

    (my_python_env)[root@hadoop26 pythonCode]# ls data/
    artist_streams_2012-06.tsv    streams_2012_06_06_faked.tsv  streams_2012_06_12_faked.tsv  streams_2012_06_18_faked.tsv  streams_2012_06_24_faked.tsv  streams_2012_06_30_faked.tsv
    streams_2012_06_01_faked.tsv  streams_2012_06_07_faked.tsv  streams_2012_06_13_faked.tsv  streams_2012_06_19_faked.tsv  streams_2012_06_25_faked.tsv  top_artists_2012-06.tsv
    streams_2012_06_02_faked.tsv  streams_2012_06_08_faked.tsv  streams_2012_06_14_faked.tsv  streams_2012_06_20_faked.tsv  streams_2012_06_26_faked.tsv  
    streams_2012_06_03_faked.tsv  streams_2012_06_09_faked.tsv  streams_2012_06_15_faked.tsv  streams_2012_06_21_faked.tsv  streams_2012_06_27_faked.tsv  
    streams_2012_06_04_faked.tsv  streams_2012_06_10_faked.tsv  streams_2012_06_16_faked.tsv  streams_2012_06_22_faked.tsv  streams_2012_06_28_faked.tsv  
    streams_2012_06_05_faked.tsv  streams_2012_06_11_faked.tsv  streams_2012_06_17_faked.tsv  streams_2012_06_23_faked.tsv  streams_2012_06_29_faked.tsv  
    (my_python_env)[root@hadoop26 pythonCode]# cat data/top_artists_2012-06.tsv 
    2012-06-01    2012-07-01    858    47
    2012-06-01    2012-07-01    594    47
    2012-06-01    2012-07-01    248    47
    2012-06-01    2012-07-01    164    46
    2012-06-01    2012-07-01    846    45
    2012-06-01    2012-07-01    776    44
    2012-06-01    2012-07-01    75    44
    2012-06-01    2012-07-01    345    44
    2012-06-01    2012-07-01    195    44
    2012-06-01    2012-07-01    750    43

    五、将top10插入到msyql中

    class ArtistToplistToMysql(luigi.Task):
        date_interval = luigi.DateIntervalParameter()
        use_hadoop = luigi.BoolParameter()
    
        def requires(self):
            return Top10Artists(self.date_interval,self.use_hadoop)
    
        def run(self):
            conn = MySQLdb.connect(host='localhost', port=3306, user='root', passwd='123456', db='test',charset='utf8', use_unicode=False)
            cursor = conn.cursor()
            with self.input().open('r') as in_file:
                for line in in_file:
                    start_date,end_date,artist,count = line.strip().split()
                    insert_sql = "insert into artist (startdate,enddate,artist,score) values('%s','%s','%s','%d')" % (start_date , end_date, artist, int(count))
                    cursor.execute(insert_sql)
            conn.commit()
            conn.close()

    执行下面的命令来完成插入:

    PYTHONPATH='' luigi --module top_artists ArtistToplistToMysql  --local-scheduler --date-interval 2012-06

    完成之后查看数据库内容:

    所有的任务到此已经完成了调试

  • 相关阅读:
    November 07th, 2017 Week 45th Tuesday
    November 06th, 2017 Week 45th Monday
    November 05th, 2017 Week 45th Sunday
    November 04th, 2017 Week 44th Saturday
    November 03rd, 2017 Week 44th Friday
    Asp.net core 学习笔记 ( Area and Feature folder structure 文件结构 )
    图片方向 image orientation Exif
    Asp.net core 学习笔记 ( Router 路由 )
    Asp.net core 学习笔记 ( Configuration 配置 )
    qrcode render 二维码扫描读取
  • 原文地址:https://www.cnblogs.com/dongdone/p/5703613.html
Copyright © 2011-2022 走看看