zoukankan      html  css  js  c++  java
  • luigi学习2-在hadoop上运行Top Artists

    一、AggregateArtistsHadoop

    class AggregateArtistsHadoop(luigi.contrib.hadoop.JobTask):
        date_interval = luigi.DateIntervalParameter()
    
        def output(self):
            return luigi.contrib.hdfs.HdfsTarget("data/artist_streams_%s.tsv" % self.date_interval)
    
        def requires(self):
            return [StreamsHdfs(date) for date in self.date_interval]
    
        def mapper(self, line):
            timestamp, artist, track = line.strip().split()
            yield artist, 1
    
        def reducer(self, key, values):
            yield key, sum(values)

    实现的功能和AggregateArtists类似,需要注意的是:

    luigi.contrib.hadoop.JobTask不需要你实现run方法,需要你实现mapper和reducer方法。mapper和combiner需要yield包含两个元素的tuple,这两个元素也可以是tuple类型的。

    这个task是依赖StreamsHdfs类型task的。现在看看StreamsHdfs的内容吧:

    二、StreamHdfs

    class StreamsHdfs(Streams):
        def output(self):
            return luigi.contrib.hdfs.HdfsTarget(self.date.strftime('data/streams_%Y_%m_%d_faked.tsv'))

    这个类和Stream类的工作是一样的,所以它继承了Stream类,并且重写了output方法,也就是说这个类最终的结果输出是在hdfs上。

    三、执行AggregateArtistsHadoop

    执行下面的命令,出现了报错,从错误信息中,我们可以看到NoSectionError,这是关于配置文件的错误,详情请参考luigi的配置文件,我的博客也给出了部分常用的配置

     PYTHONPATH='' luigi --module top_artists AggregateArtistsHadoop --local-scheduler --date-interval 2012-06
      File "/root/miniconda2/envs/my_python_env/lib/python2.7/site-packages/luigi/configuration.py", line 103, in get
        return self._get_with_default(ConfigParser.get, section, option, default, **kwargs)
      File "/root/miniconda2/envs/my_python_env/lib/python2.7/site-packages/luigi/configuration.py", line 93, in _get_with_default
        return method(self, section, option, **kwargs)
      File "/root/miniconda2/envs/my_python_env/lib/python2.7/ConfigParser.py", line 607, in get
        raise NoSectionError(section)
    NoSectionError: No section: 'hadoop'
    Exception AttributeError: "'DefaultHadoopJobRunner' object has no attribute 'tmp_dir'" in <bound method DefaultHadoopJobRunner.__del__ of <luigi.contrib.hadoop.DefaultHadoopJobRunner object at 0x7fda9c9b38d0>> ignored

    在当前工作目录创建luigi.cfg,并且写入下面的配置,然后重新执行该命令

    (my_python_env)[root@hadoop26 pythonCode]# cat luigi.cfg 
    [hadoop]
    command=hadoop#提交hadoop作业的命令
    python-executable=python#本机执行python脚本的命令
    scheduler=fair#hadoop的调度器
    streaming-jar=/usr/local/hadoop-2.6.0/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar#streaming包的位置
    version=apache1#hadoop的版本

    四、find top Artists

    如果你看到了这里,说明已经计算出了每个artist的出现次数,并且保存在本地或者HDFS上的文件中。现在我们将要找出前10个artist。这里我们选用普通的python程序来计算。

    class Top10Artists(luigi.Task):
        date_interval = luigi.DateIntervalParameter()
        use_hadoop = luigi.BoolParameter()
    
        def requires(self):
            if self.use_hadoop:
                return AggregateArtistsHadoop(self.date_interval)
            else:
                return AggregateArtists(self.date_interval)
    
        def output(self):
            return luigi.LocalTarget("data/top_artists_%s.tsv" % self.date_interval)
    
        def run(self):
            top_10 = nlargest(10, self._input_iterator())
            with self.output().open('w') as out_file:
                for streams, artist in top_10:
                    print >> out_file, self.date_interval.date_a, self.date_interval.date_b, artist, streams
    
        def _input_iterator(self):
            with self.input().open('r') as in_file:
                for line in in_file:
                    artist, streams = line.strip().split()
                    yield int(streams), int(artist)

    运行下面命令,来完成top10的计算

    PYTHONPATH='' luigi --module top_artists Top10Artists  --local-scheduler --date-interval 2012-06

    最终会在data目录下产生新的文件:

    (my_python_env)[root@hadoop26 pythonCode]# ls data/
    artist_streams_2012-06.tsv    streams_2012_06_06_faked.tsv  streams_2012_06_12_faked.tsv  streams_2012_06_18_faked.tsv  streams_2012_06_24_faked.tsv  streams_2012_06_30_faked.tsv
    streams_2012_06_01_faked.tsv  streams_2012_06_07_faked.tsv  streams_2012_06_13_faked.tsv  streams_2012_06_19_faked.tsv  streams_2012_06_25_faked.tsv  top_artists_2012-06.tsv
    streams_2012_06_02_faked.tsv  streams_2012_06_08_faked.tsv  streams_2012_06_14_faked.tsv  streams_2012_06_20_faked.tsv  streams_2012_06_26_faked.tsv  
    streams_2012_06_03_faked.tsv  streams_2012_06_09_faked.tsv  streams_2012_06_15_faked.tsv  streams_2012_06_21_faked.tsv  streams_2012_06_27_faked.tsv  
    streams_2012_06_04_faked.tsv  streams_2012_06_10_faked.tsv  streams_2012_06_16_faked.tsv  streams_2012_06_22_faked.tsv  streams_2012_06_28_faked.tsv  
    streams_2012_06_05_faked.tsv  streams_2012_06_11_faked.tsv  streams_2012_06_17_faked.tsv  streams_2012_06_23_faked.tsv  streams_2012_06_29_faked.tsv  
    (my_python_env)[root@hadoop26 pythonCode]# cat data/top_artists_2012-06.tsv 
    2012-06-01    2012-07-01    858    47
    2012-06-01    2012-07-01    594    47
    2012-06-01    2012-07-01    248    47
    2012-06-01    2012-07-01    164    46
    2012-06-01    2012-07-01    846    45
    2012-06-01    2012-07-01    776    44
    2012-06-01    2012-07-01    75    44
    2012-06-01    2012-07-01    345    44
    2012-06-01    2012-07-01    195    44
    2012-06-01    2012-07-01    750    43

    五、将top10插入到msyql中

    class ArtistToplistToMysql(luigi.Task):
        date_interval = luigi.DateIntervalParameter()
        use_hadoop = luigi.BoolParameter()
    
        def requires(self):
            return Top10Artists(self.date_interval,self.use_hadoop)
    
        def run(self):
            conn = MySQLdb.connect(host='localhost', port=3306, user='root', passwd='123456', db='test',charset='utf8', use_unicode=False)
            cursor = conn.cursor()
            with self.input().open('r') as in_file:
                for line in in_file:
                    start_date,end_date,artist,count = line.strip().split()
                    insert_sql = "insert into artist (startdate,enddate,artist,score) values('%s','%s','%s','%d')" % (start_date , end_date, artist, int(count))
                    cursor.execute(insert_sql)
            conn.commit()
            conn.close()

    执行下面的命令来完成插入:

    PYTHONPATH='' luigi --module top_artists ArtistToplistToMysql  --local-scheduler --date-interval 2012-06

    完成之后查看数据库内容:

    所有的任务到此已经完成了调试

  • 相关阅读:
    python 包管理工具 pip 的配置
    Python 变量作用域 LEGB (下)—— Enclosing function locals
    Python 变量作用域 LEGB (上)—— Local,Global,Builtin
    2020 Java 面试题 小结 (答案慢慢补上,有错误请指出)
    mysql 根据日期(date)做年,月,日分组统计查询
    jvm指令
    正则表达式 分割地址 获取省市区详细地址
    .Net 异常记录
    WCF设计服务协议(一)
    plsql ORA-01789:查询块具有不正确的结果列数
  • 原文地址:https://www.cnblogs.com/dongdone/p/5703613.html
Copyright © 2011-2022 走看看