zoukankan      html  css  js  c++  java
  • Python常用功能函数系列总结(二)

     本节目录

    • 常用函数一:sel文件转换

    • 常用函数二:refwork文件转换

    • 常用函数三:xml文档解析

    • 常用函数四:文本分词

    常用函数一:sel文件转换

        sel是种特殊的文件格式,具体应用场景的话可以在搜狗细胞词库中看到,经常在做文本处理,分词的时候需要一些词典,那么搜狗细胞词库中的一些相关词库就会被使用,而这种sel文件格式不能直接使用,需要进行转换,转换成txt文件之后就可以去做进一步使用了,转换的代码是从网上找到,我自己也是用过多次,使用的时候可以直接拿来用。

    # -*- coding:utf-8 -*-
    """
    @author:Zhang Yafei
    @time: 2019/12/26
    Description: scel 文件格式转换
    """
    import struct
    import os
    
    # 搜狗的scel词库就是保存的文本的unicode编码,每两个字节一个字符(中文汉字或者英文字母)
    # 找出其每部分的偏移位置即可
    # 主要两部分
    # 1.全局拼音表,貌似是所有的拼音组合,字典序
    #       格式为(index,len,pinyin)的列表
    #       index: 两个字节的整数 代表这个拼音的索引
    #       len: 两个字节的整数 拼音的字节长度
    #       pinyin: 当前的拼音,每个字符两个字节,总长len
    #
    # 2.汉语词组表
    #       格式为(same,py_table_len,py_table,{word_len,word,ext_len,ext})的一个列表
    #       same: 两个字节 整数 同音词数量
    #       py_table_len:  两个字节 整数
    #       py_table: 整数列表,每个整数两个字节,每个整数代表一个拼音的索引
    #
    #       word_len:两个字节 整数 代表中文词组字节数长度
    #       word: 中文词组,每个中文汉字两个字节,总长度word_len
    #       ext_len: 两个字节 整数 代表扩展信息的长度,好像都是10
    #       ext: 扩展信息 前两个字节是一个整数(不知道是不是词频) 后八个字节全是0
    #
    #      {word_len,word,ext_len,ext} 一共重复same次 同音词 相同拼音表
    
    
    # 拼音表偏移,
    startPy = 0x1540;
    
    # 汉语词组表偏移
    startChinese = 0x2628;
    
    # 全局拼音表
    GPy_Table = {}
    
    # 解析结果
    # 元组(词频,拼音,中文词组)的列表
    GTable = []
    
    
    # 原始字节码转为字符串
    def byte2str(data):
        pos = 0
        str = ''
        while pos < len(data):
            c = chr(struct.unpack('H', bytes([data[pos], data[pos + 1]]))[0])
            if c != chr(0):
                str += c
            pos += 2
        return str
    
    
    # 获取拼音表
    def getPyTable(data):
        data = data[4:]
        pos = 0
        while pos < len(data):
            index = struct.unpack('H', bytes([data[pos], data[pos + 1]]))[0]
            pos += 2
            lenPy = struct.unpack('H', bytes([data[pos], data[pos + 1]]))[0]
            pos += 2
            py = byte2str(data[pos:pos + lenPy])
    
            GPy_Table[index] = py
            pos += lenPy
    
    
    # 获取一个词组的拼音
    def getWordPy(data):
        pos = 0
        ret = ''
        while pos < len(data):
            index = struct.unpack('H', bytes([data[pos], data[pos + 1]]))[0]
            ret += GPy_Table[index]
            pos += 2
        return ret
    
    
    # 读取中文表
    def getChinese(data):
        pos = 0
        while pos < len(data):
            # 同音词数量
            same = struct.unpack('H', bytes([data[pos], data[pos + 1]]))[0]
    
            # 拼音索引表长度
            pos += 2
            py_table_len = struct.unpack('H', bytes([data[pos], data[pos + 1]]))[0]
    
            # 拼音索引表
            pos += 2
            py = getWordPy(data[pos: pos + py_table_len])
    
            # 中文词组
            pos += py_table_len
            for i in range(same):
                # 中文词组长度
                c_len = struct.unpack('H', bytes([data[pos], data[pos + 1]]))[0]
                # 中文词组
                pos += 2
                word = byte2str(data[pos: pos + c_len])
                # 扩展数据长度
                pos += c_len
                ext_len = struct.unpack('H', bytes([data[pos], data[pos + 1]]))[0]
                # 词频
                pos += 2
                count = struct.unpack('H', bytes([data[pos], data[pos + 1]]))[0]
    
                # 保存
                GTable.append((count, py, word))
    
                # 到下个词的偏移位置
                pos += ext_len
    
    
    def scel2txt(file_name):
        print('-' * 60)
        with open(file_name, 'rb') as f:
            data = f.read()
    
        print("词库名:", byte2str(data[0x130:0x338]))  # .encode('GB18030')
        print("词库类型:", byte2str(data[0x338:0x540]))
        print("描述信息:", byte2str(data[0x540:0xd40]))
        print("词库示例:", byte2str(data[0xd40:startPy]))
    
        getPyTable(data[startPy:startChinese])
        getChinese(data[startChinese:])
    
    
    def run(to_file, file=None, dir_path=None):
        """
        sel 多个文件转换
        :param file: sel文件路径 转换单个sel文件
        :param dir_path: sel文件夹路径 若设置 则转换该文件加内所有sel文件
        :param to_file: 转换完成文件路径
        :return:
        """
        if dir_path:
            fin = [fname for fname in os.listdir(in_path) if fname[-5:] == ".scel"]
            for f in fin:
                f = os.path.join(in_path, f)
                scel2txt(f)
        elif file:
            scel2txt(file)
        else:
            raise Exception('参数必须包含file或者dir_path')
        # 保存结果
        with open(to_file, 'w', encoding='utf8') as f:
            f.writelines([word + '
    ' for count, py, word in GTable])
    
    
    def dict_merge():
        """
        词典合并
        :return:
        """
        with open('data/medical_dict.txt', encoding='utf8') as f:
            word_set1 = {word.strip() for word in f}
        with open('data/medical_dict2.txt', encoding='utf8') as f:
            word_set2 = {word.strip() for word in f}
        with open('data/medical_dict3.txt', encoding='utf8') as f:
            word_set3 = {word.strip() for word in f}
        word_set = word_set1 | word_set2 | word_set3
        with open('data/words_dict.txt', encoding='utf-8', mode='w') as f:
            for word in word_set:
                f.write(word + '
    ')
    
    
    if __name__ == '__main__':
        # run(file='data/细胞词库/医学词汇大全【官方推荐】.scel', to_file='医学词库.txt',)
        run(dir_path="data/细胞词库", to_file="data/cell_dict.txt")  

    经验分享:直接拿来用。

    常用函数二:refwork文件转换

        refowrk是一种文献格式,可以用一些科研软件做分析使用,有些场景下我们需要将excel格式的文件转成refwork文件,一下代码可以实现这个功能。

    # -*- coding: utf-8 -*-
    """
    Datetime: 2020/03/04
    author: Zhang Yafei
    description: refwork格式转换
    数据格式
    列  RT,A1,T1,JF,YR,K1,AB,AD
        ...
    """
    import pandas as pd
    
    
    def main(ref_file, to_file):
        """
        :param ref_file: 转换的csv或者excel文件路径
        :param to_file: 转换之后保存的refwork文件路径
        """
        if ref_file.endswith('csv'):
            rawdata = pd.read_csv(ref_file)
        elif ref_file.endswith('xls') or ref_file.endswith('xlsx'):
            rawdata = pd.read_excel(ref_file)
        with open(to_file, 'a') as f:
            for index, item in rawdata.iterrows():
                f.write('RT ' + item.RT)
                A1 = item.A1
                f.write('
    ' + 'A1 ' + A1)
                T1 = item.T1
                f.write('
    ' + 'T1 ' + T1)
                YR = item.YR
                f.write('
    ' + 'YR ' + YR)
                JF = item.JF
                f.write('
    ' + 'JF ' + JF)
                K1 = item.K1
                f.write('
    ' + 'K1 ' + K1)
                AB = item.AB
                if pd.notna(AB):
                    f.write('
    ' + 'AB ' + AB)
                AD = item.AD
                if pd.notna(AD):
                    f.write('
    ' + 'AD ' + AD)
                f.write('
    DS CNKI')
                if index < rawdata.shape[0] - 1:
                    f.write('
    
    
    ')
    
    
    if __name__ == '__main__':
        main(ref_file='data.xlsx', to_file='result.txt')

    经验分享:直接拿来用

    常用函数三:xml文档解析

        xml文档经常作为数据传输格式在web领域使用,它有很多优势,但我们平时梳理的数据大多是csv或者exel这种,那么解析xml文档就是一个必备的技能吗,下面以pubmed下载的xml文档解析为例,展示了xml文档解析的整个流程。

    # -*- coding: utf-8 -*-
    
    """
    @Datetime: 2019/4/26
    @Author: Zhang Yafei
    @Description: 07_xml文档解析
    """
    import os
    import re
    import threading
    from concurrent.futures import ThreadPoolExecutor
    
    from lxml import etree
    import pandas as pd
    
    
    def pubmed_xpath_parse(path):
        tree = etree.parse(path)
        # 如果xml数据中出现了关于dtd的声明(如下面的例子),那样的话,必须在使用lxml解析xml的时候,进行相应的声明。
        # parser = etree.XMLParser(load_dtd=True)  # 首先根据dtd得到一个parser(注意dtd文件要放在和xml文件相同的目录)
        # tree = etree.parse('1.xml', parser=parser)  # 用上面得到的parser将xml解析为树结构
        data_list = []
        pmid_set = []
        for articles in tree.xpath('//PubmedArticle'):
            pmid = articles.xpath('MedlineCitation/PMID/text()')[0]
            if pmid in pmid_set:
                continue
            pmid_set.append(pmid)
            Article = articles.xpath('MedlineCitation/Article')[0]
            journal = Article.xpath('Journal/ISOAbbreviation/text()')[0]
            try:
                authors = Article.xpath('AuthorList/Author')
                affiliations_info = set()
                for author in authors:
                    # author_name = author.find('LastName').text + ' ' + author.find('ForeName').text
                    affiliations = [x.xpath('Affiliation/text()')[0] for x in author.xpath('AffiliationInfo')]
                    # author = author_name + ':' + ';'.join(affiliations)
                    for affiliation in affiliations:
                        affiliations_info.add(affiliation)
                affiliations_info = ';'.join(affiliations_info)
            except AttributeError:
                affiliations_info = ''
            try:
                date = Article.xpath('Journal/JournalIssue/PubDate/Year/text()')[0]
            except IndexError:
                date = Article.xpath('Journal/JournalIssue/PubDate/MedlineDate/text()')[0]
                date = re.search('d+', date).group(0)
            try:
                mesh_words = []
                for mesh_heading in articles.xpath('MedlineCitation/MeshHeadingList/MeshHeading'):
                    if len(mesh_heading.xpath('child::*')) == 1:
                        mesh_words.append((mesh_heading.xpath('child::*'))[0].text)
                        continue
                    mesh_name = ''
                    for mesh in mesh_heading.xpath('child::*'):
                        if mesh.tag == 'DescriptorName':
                            mesh_name = mesh.xpath('string()')
                            continue
                        if mesh_name and mesh.tag == 'QualifierName':
                            mesh_word = mesh_name + '/' + mesh.xpath('string()')
                            mesh_words.append(mesh_word)
                mesh_words = ';'.join(mesh_words)
            except AttributeError:
                mesh_words = ''
            article_type = '/'.join([x.xpath('./text()')[0] for x in Article.xpath('PublicationTypeList/PublicationType')])
            country = articles.xpath('MedlineCitation/MedlineJournalInfo/Country/text()')[0]
            data_list.append(
                {'PMID': pmid, 'journal': journal, 'affiliations_info': affiliations_info, 'pub_year': date,
                 'mesh_words': mesh_words,
                 'country': country, 'article_type': article_type, 'file_path': path})
            print(pmid + '	解析完成')
            df = pd.DataFrame(data_list)
            with threading.Lock():
                df.to_csv('pubmed.csv', encoding='utf_8_sig', mode='a', index=False, header=False)
    
    
    def to_excel(data, path):
        writer = pd.ExcelWriter(path)
        data.to_excel(writer, sheet_name='table', index=False)
        writer.save()
    
    
    def get_files_path(dir_name):
        xml_files = []
        for base_path, folders, files in os.walk(dir_name):
            xml_files = xml_files + [os.path.join(base_path, file) for file in files if file.endswith('.xml')]
        return xml_files
    
    
    if __name__ == '__main__':
        files = get_files_path(dir_name='data')
        if not files:
            print('全部解析完成')
        else:
            with ThreadPoolExecutor() as pool:
                pool.map(pubmed_xpath_parse, files)

    常用函数四:文本分词

    方式一:jieba分词+停用词+自定义词典

    # -*- coding: utf-8 -*-
    
    """
    Datetime: 2020/06/25
    Author: Zhang Yafei
    Description: 文本分词
    输入 停用词文件路径 词典文件路径 分词文件路径 表名(可选) 列名 分词结果列名 保存文件名
    输出 分词结果-文件
    """
    import os
    import re
    
    import jieba
    import pandas as pd
    
    if not os.path.exists('res'):
        os.mkdir('res')
    
    
    class TextCut(object):
        def __init__(self, dictionary=None, stopwords=None, ):
            self.dictionary = dictionary
            self.word_list = None
            if self.dictionary:
                jieba.load_userdict(self.dictionary)
            if stopwords:
                with open(stopwords, 'r', encoding='utf-8') as swf:
                    self.stopwords = [line.strip() for line in swf]
            else:
                self.stopwords = None
    
        @staticmethod
        def clean_txt(raw):
            file = re.compile(r"[^0-9a-zA-Zu4e00-u9fa5]+")
            return file.sub(' ', raw)
    
        def cut(self, text):
            sentence = self.clean_txt(text.strip().replace('
    ', ''))
            return ' '.join([i for i in jieba.cut(sentence) if i.strip() and i not in self.stopwords and len(i) > 1])
    
        def cut2(self, text):
            sentence = self.clean_txt(text.strip().replace('
    ', ''))
            return ' '.join([i for i in jieba.cut(sentence) if
                             i.strip() and i not in self.stopwords and len(i) > 1 and i in self.word_list])
    
        def run(self, file_path, col_name, new_col_name, to_file, sheet_name=None, word_in_dict=False):
            if sheet_name:
                df = pd.read_excel(file_path, sheet_name=sheet_name)
            else:
                df = pd.read_excel(file_path)
            if word_in_dict:
                with open(self.dictionary, encoding='utf-8') as f:
                    self.word_list = [word.strip() for word in f]
                df[new_col_name] = df[col_name].apply(self.cut2)
            else:
                df[new_col_name] = df[col_name].apply(self.cut)
            df.to_excel(to_file, index=False)
            print('######### 处理完成 ############')
    
    
    if __name__ == "__main__":
        # 1. 分词
        text_cut = TextCut(stopwords='data/stopwords.txt', dictionary='data/word_dict.txt')
        text_cut.run(file_path='data/山西政策.xlsx', sheet_name='1.21-2.20', col_name='全文', new_col_name='全文分词',
                     to_file='res/山西政策_分词.xlsx') 

    方式二:jieba分词+信息熵合并

    # -*- coding: utf-8 -*-
    
    """
    Datetime: 2020/03/01
    Author: Zhang Yafei
    Description: 基于信息熵对分词结果进行合并
    """
    from collections import Counter
    from functools import reduce
    from pandas import read_excel, DataFrame
    
    
    class InfoEntropyMerge(object):
        def __init__(self, data, stopwords='data/stopwords.txt'):
            self.data = data
            self.words_freq_one = {}
            self.words_freq_two = {}
            self.entropy_words_dict = {}
            if stopwords:
                with open(stopwords, 'r', encoding='utf-8') as f:
                    self.stopwords = {line.strip() for line in f}
            else:
                self.stopwords = None
    
        def count_word_freq_one(self, save_to_file=False, word_freq_file=None):
            keywords = (word for word_list in self.data for word in word_list if word)
            self.words_freq_one = Counter(keywords)
            if save_to_file:
                words = [word for word in self.words_freq_one]
                freqs = [self.words_freq_one[word] for word in words]
                words_df = DataFrame(data={'word': words, 'freq': freqs})
                words_df.sort_values('freq', ascending=False, inplace=True)
                words_df.to_excel(word_freq_file, index=False)
    
        def count_freq(self, word1, word2):
            """
            统计相邻两个词出现的频率
            :param word1:
            :param word2:
            :return:
            """
            if (word1, word2) not in self.words_freq_two:
                self.words_freq_two[(word1, word2)] = 1
            else:
                self.words_freq_two[(word1, word2)] += 1
            return word2
    
        def count_word_freq_two(self, save_to_file=False, word_freq_file=None):
            """
            计算相邻两个词出现的频率
            :param save_to_file:
            :param word_freq_file:
            :return:
            """
            for word_list in self.data:
                reduce(self.count_freq, word_list)
            if save_to_file and word_freq_file:
                words_list = [(word1, word2) for word1, word2 in self.words_freq_two]
                freqs = [self.words_freq_two[w1_w2] for w1_w2 in words_list]
                words_df = DataFrame(data={'word': words_list, 'freq': freqs})
                words_df.sort_values('freq', ascending=False, inplace=True)
                words_df.to_excel(word_freq_file, index=False)
    
        @staticmethod
        def is_chinese(word):
            for ch in word:
                if 'u4e00' <= ch <= 'u9fff':
                    return True
            return False
    
        def clac_entropy(self, save_to_file=False, dict_path='data/entropy_dict.txt'):
            """
            计算信息熵: E(w1, w2) = P(w1,w2)/min(P(w1),P(w2))
            :param save_to_file: 是否将熵值大于0.5的新词保存到文件中
            :param dict_path: 保存字典路径
            :return:
            """
            for word1, word2 in self.words_freq_two:
                freq_two = self.words_freq_two[(word1, word2)]
                freq_one_min = min(self.words_freq_one[word1], self.words_freq_one[word2])
                freq_one_max = max(self.words_freq_one[word1], self.words_freq_one[word2])
                w1_w2_entropy = freq_two / freq_one_max
                if self.stopwords:
                    if w1_w2_entropy > 0.5 and word1 not in self.stopwords and word2 not in self.stopwords and self.is_chinese(word1) and self.is_chinese(word2):
                        # print(word1, word2, freq_two, freq_one_min, freq_one_max)
                        self.entropy_words_dict[word1+word2] = w1_w2_entropy
                else:
                    if w1_w2_entropy > 0.5:
                        self.entropy_words_dict[word1+word2] = w1_w2_entropy
    
            print('信息熵大于0.5的词语组合:
    ', self.entropy_words_dict)
            if save_to_file and dict_path:
                with open(dict_path, mode='r+', encoding='utf-8') as f:
                    content = f.read()
                    f.seek(0, 0)
                    for word in self.entropy_words_dict:
                        f.write(word+'
    ')
                    f.write(content)
                print(f'成功将信息熵大于0.5的词语保存到了{dict_path}中')
    
    
    def data_read(path, col_name):
        df = read_excel(path)
        texts = df.loc[df[col_name].notna(), col_name].str.split()
        return texts
    
    
    if __name__ == '__main__':
        text_list = data_read(path='res/国家政策_分词.xlsx', col_name='全文分词')
        info_entro = InfoEntropyMerge(data=text_list)
        info_entro.count_word_freq_one()
        info_entro.count_word_freq_two()
        info_entro.clac_entropy(save_to_file=False, dict_path='data/entropy_dict.txt')

    经验分享:若有好的词典和停用词,优先选用方式一,否则选择方式二。

  • 相关阅读:
    从苏宁电器到卡巴斯基(后传)第06篇:我的离职信
    Dalvik模式下基于Android运行时类加载的函数dexFindClass脱壳
    基于dalvik模式下的Xposed Hook开发的某加固脱壳工具
    Android平台dalvik模式下java Hook框架ddi的分析(2)--dex文件的注入和调用
    黑板客 -- 爬虫闯关 -- 关卡05
    黑板客 -- 爬虫闯关 -- 关卡04
    黑板客 -- 爬虫闯关 -- 关卡03
    黑板客 -- 爬虫闯关 -- 关卡02
    黑板客 -- 爬虫闯关 -- 关卡01
    Python小游戏 -- 猜单词
  • 原文地址:https://www.cnblogs.com/zhangyafei/p/13251458.html
Copyright © 2011-2022 走看看