def output_mapper(line): """ 输入文件是gbk编码, 使用spark的GBKFileInputFormat读取后自动转为utf-8编码. Keys are the position in the file, and values are the line of text, and will be converted to UTF-8 Text. Args: line (position, bidword sp tag_info) Returns: list [bidword, sp, tag_info, theDate] """ try: global theDate value = line[1] bidword, sp, tag_info = value.strip().split(' ') return [bidword, sp, tag_info, theDate] except Exception as e: logging.error("add_date_mapper error: {}".format(e)) return None test_df = sc.hadoopFile(test_file, "org.apache.spark.input.GBKFileInputFormat", "org.apache.hadoop.io.LongWritable", "org.apache.hadoop.io.Text") .map(output_mapper) .filter(lambda x: x is not None) .toDF()
参考链接:
/**
* FileInputFormat for gbk encoded files. Files are broken into lines.Either linefeed
* or carriage-return are used to signal end of line. Keys are the position in the file,
* and values are the line of text and will be converted to UTF-8 Text.
*/