一、AggregateArtistsHadoop
class AggregateArtistsHadoop(luigi.contrib.hadoop.JobTask): date_interval = luigi.DateIntervalParameter() def output(self): return luigi.contrib.hdfs.HdfsTarget("data/artist_streams_%s.tsv" % self.date_interval) def requires(self): return [StreamsHdfs(date) for date in self.date_interval] def mapper(self, line): timestamp, artist, track = line.strip().split() yield artist, 1 def reducer(self, key, values): yield key, sum(values)
实现的功能和AggregateArtists类似,需要注意的是:
luigi.contrib.hadoop.JobTask不需要你实现run方法,需要你实现mapper和reducer方法。mapper和combiner需要yield包含两个元素的tuple,这两个元素也可以是tuple类型的。
这个task是依赖StreamsHdfs类型task的。现在看看StreamsHdfs的内容吧:
二、StreamHdfs
class StreamsHdfs(Streams): def output(self): return luigi.contrib.hdfs.HdfsTarget(self.date.strftime('data/streams_%Y_%m_%d_faked.tsv'))
这个类和Stream类的工作是一样的,所以它继承了Stream类,并且重写了output方法,也就是说这个类最终的结果输出是在hdfs上。
三、执行AggregateArtistsHadoop
执行下面的命令,出现了报错,从错误信息中,我们可以看到NoSectionError,这是关于配置文件的错误,详情请参考luigi的配置文件,我的博客也给出了部分常用的配置
PYTHONPATH='' luigi --module top_artists AggregateArtistsHadoop --local-scheduler --date-interval 2012-06
File "/root/miniconda2/envs/my_python_env/lib/python2.7/site-packages/luigi/configuration.py", line 103, in get return self._get_with_default(ConfigParser.get, section, option, default, **kwargs) File "/root/miniconda2/envs/my_python_env/lib/python2.7/site-packages/luigi/configuration.py", line 93, in _get_with_default return method(self, section, option, **kwargs) File "/root/miniconda2/envs/my_python_env/lib/python2.7/ConfigParser.py", line 607, in get raise NoSectionError(section) NoSectionError: No section: 'hadoop' Exception AttributeError: "'DefaultHadoopJobRunner' object has no attribute 'tmp_dir'" in <bound method DefaultHadoopJobRunner.__del__ of <luigi.contrib.hadoop.DefaultHadoopJobRunner object at 0x7fda9c9b38d0>> ignored
在当前工作目录创建luigi.cfg,并且写入下面的配置,然后重新执行该命令
(my_python_env)[root@hadoop26 pythonCode]# cat luigi.cfg [hadoop] command=hadoop#提交hadoop作业的命令 python-executable=python#本机执行python脚本的命令 scheduler=fair#hadoop的调度器 streaming-jar=/usr/local/hadoop-2.6.0/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar#streaming包的位置 version=apache1#hadoop的版本
四、find top Artists
如果你看到了这里,说明已经计算出了每个artist的出现次数,并且保存在本地或者HDFS上的文件中。现在我们将要找出前10个artist。这里我们选用普通的python程序来计算。
class Top10Artists(luigi.Task): date_interval = luigi.DateIntervalParameter() use_hadoop = luigi.BoolParameter() def requires(self): if self.use_hadoop: return AggregateArtistsHadoop(self.date_interval) else: return AggregateArtists(self.date_interval) def output(self): return luigi.LocalTarget("data/top_artists_%s.tsv" % self.date_interval) def run(self): top_10 = nlargest(10, self._input_iterator()) with self.output().open('w') as out_file: for streams, artist in top_10: print >> out_file, self.date_interval.date_a, self.date_interval.date_b, artist, streams def _input_iterator(self): with self.input().open('r') as in_file: for line in in_file: artist, streams = line.strip().split() yield int(streams), int(artist)
运行下面命令,来完成top10的计算
PYTHONPATH='' luigi --module top_artists Top10Artists --local-scheduler --date-interval 2012-06
最终会在data目录下产生新的文件:
(my_python_env)[root@hadoop26 pythonCode]# ls data/ artist_streams_2012-06.tsv streams_2012_06_06_faked.tsv streams_2012_06_12_faked.tsv streams_2012_06_18_faked.tsv streams_2012_06_24_faked.tsv streams_2012_06_30_faked.tsv streams_2012_06_01_faked.tsv streams_2012_06_07_faked.tsv streams_2012_06_13_faked.tsv streams_2012_06_19_faked.tsv streams_2012_06_25_faked.tsv top_artists_2012-06.tsv streams_2012_06_02_faked.tsv streams_2012_06_08_faked.tsv streams_2012_06_14_faked.tsv streams_2012_06_20_faked.tsv streams_2012_06_26_faked.tsv streams_2012_06_03_faked.tsv streams_2012_06_09_faked.tsv streams_2012_06_15_faked.tsv streams_2012_06_21_faked.tsv streams_2012_06_27_faked.tsv streams_2012_06_04_faked.tsv streams_2012_06_10_faked.tsv streams_2012_06_16_faked.tsv streams_2012_06_22_faked.tsv streams_2012_06_28_faked.tsv streams_2012_06_05_faked.tsv streams_2012_06_11_faked.tsv streams_2012_06_17_faked.tsv streams_2012_06_23_faked.tsv streams_2012_06_29_faked.tsv (my_python_env)[root@hadoop26 pythonCode]# cat data/top_artists_2012-06.tsv 2012-06-01 2012-07-01 858 47 2012-06-01 2012-07-01 594 47 2012-06-01 2012-07-01 248 47 2012-06-01 2012-07-01 164 46 2012-06-01 2012-07-01 846 45 2012-06-01 2012-07-01 776 44 2012-06-01 2012-07-01 75 44 2012-06-01 2012-07-01 345 44 2012-06-01 2012-07-01 195 44 2012-06-01 2012-07-01 750 43
五、将top10插入到msyql中
class ArtistToplistToMysql(luigi.Task): date_interval = luigi.DateIntervalParameter() use_hadoop = luigi.BoolParameter() def requires(self): return Top10Artists(self.date_interval,self.use_hadoop) def run(self): conn = MySQLdb.connect(host='localhost', port=3306, user='root', passwd='123456', db='test',charset='utf8', use_unicode=False) cursor = conn.cursor() with self.input().open('r') as in_file: for line in in_file: start_date,end_date,artist,count = line.strip().split() insert_sql = "insert into artist (startdate,enddate,artist,score) values('%s','%s','%s','%d')" % (start_date , end_date, artist, int(count)) cursor.execute(insert_sql) conn.commit() conn.close()
执行下面的命令来完成插入:
PYTHONPATH='' luigi --module top_artists ArtistToplistToMysql --local-scheduler --date-interval 2012-06
完成之后查看数据库内容:
所有的任务到此已经完成了调试