import os
import email
import email.policy
1. 读取邮件数据
SPAM_PATH = os.path.join(
"E:\3.Study\机器学习\Hand on Machine Learning\第三章:分类\spam_data")
spam_path = os.path.join(SPAM_PATH, "spam")
ham_path = os.path.join(SPAM_PATH, "easy_ham")
spam_list = [name for name in os.listdir(spam_path) if len(name) > 20]
ham_list = [name for name in os.listdir(ham_path) if len(name) > 20]
def load_email(is_spam, filename, spam_path=SPAM_PATH):
directory = "spam" if is_spam else "easy_ham"
with open(os.path.join(spam_path, directory, filename), "rb") as f:
return email.parser.BytesParser(policy=email.policy.default).parse(f)
# email.message.EmailMessage'类型,没法用list.append接收
# return email.parser.BytesParser(policy=email.policy.default).parse(f)
# 这里有类型问题,应该记住这种加载email文件的形式。尝试list.append添加数据,加入的数据是generator类型,非email类型
ham_emails = [load_email(is_spam=False, filename=name) for name in ham_list]
spam_emails = [load_email(is_spam=True, filename=name) for name in spam_list]
def get_email_structure(email):
# isinstance 函数:判断一个对象是否是已知类型。第一个参数为对象,第二个参数为类型名或者是类型名的列表。返回True/False
if isinstance(email, str):
return email
# get_pyload()函数:返回当前邮件的正文。
# 如果正文含有多个部分的话(is_multipart=True),返回一个message对象的list列表;
# 如果is_multipart=False,即正文没有多部份的话,返回一个string类型。
payload = email.get_payload()
if isinstance(payload, list):
return "multipart({})".format(", ".join([
for sub_email in payload
return email.get_content_type()
from collections import Counter
def structures_counter(emails):
# Counter类的目的是用来跟踪值出现的次数
structures = Counter()
for email in emails:
structure = get_email_structure(email)
structures[structure] += 1
return structures
array = structures_counter(ham_emails).most_common()
array2 = structures_counter(spam_emails).most_common()
for head, value in spam_emails[0].items():
print(head, ":", value)
4. 划分训练集,测试集
import numpy as np
X = np.array(ham_emails+spam_emails) # ham_emails和span_emails是list类型
Y = np.array([0]*len(ham_emails)+[1]*len(spam_emails))
from sklearn.model_selection import train_test_split
x_train, x_test, y_train, y_test = train_test_split(
X, Y, test_size=0.33, random_state=42)
5. 邮件文本预处理(转换HTML)
import re # regular expressions(正则)
from html import unescape
def html_to_plain_text(html):
# sub->substitute(替换)
# 参数1:pattern 正则;
# 参数2:repl:replacement,被替换的字符串/函数
# 参数3: string:需要被处理的内容
# 参数4: count: 匹配的数目 如果正则表达式在string中有多个匹配结果,count控制匹配的数目
# 参数5: flag : 匹配模式
# re.I 匹配对大小写不敏感
# re.M 多行匹配(以行为单位匹配)
# re.S 使 . 匹配包括换行在内的所有字符
# ->用pattern模式将string里面count个的字符换成repl
text = re.sub('<head.*?>.*?</head>', '', html, flags=re.M | re.S | re.I)
text = re.sub('<as.*?>', 'HYPERLINK', text, flags=re.M | re.S | re.I)
text = re.sub('<.*?>', '', text, flags=re.M | re.S)
text = re.sub(r'(s*
)+', '
', text, flags=re.M | re.S)
return unescape(text)
html_spam_emails = [email for email in x_train[y_train == 1]
if get_email_structure(email) == "text/html"]
sample_html_spam = html_spam_emails[2]
# 输出html辣鸡邮件的前1000个字符,strip()->去除首尾空格
print(sample_html_spam.get_content().strip()[:1000], "...")
print(html_to_plain_text(sample_html_spam.get_content())[:1000], "...")
def email_to_text(email):
html = None
# email->part->part.part 以树的结构存储,walk()用来循环遍历各个树及其子树
for part in email.walk():
ctype = part.get_content_type()
if not ctype in ("text/plain", "text/html"):
continue # 跳过不是以上两种类型的部分
content = part.get_content()
content = str(part.get_payload())
if ctype == "text/plain":
return content
html = content
if html:
return html_to_plain_text(html)
print(email_to_text(sample_html_spam)[:100], "...")
7. 自然语言处理
import nltk
stemmer = nltk.PorterStemmer() # 建立一个波特词干算法(分析单词的词干)
for word in ("Conputations", "Computation", "Computing", "Computed", "Compulsive"):
print(word, "=>", stemmer.stem(word))
except ImportError:
print("Error: stemming requires the NLTK module.")
stemmer = None
import urlextract
url_extracror = urlextract.URLExtract()
print(url_extracror.find_urls("will it detect and"))
except ImportError:
print("Error:url_extracror requires the urlextract module.")
urlextract = None
9. 对邮件内的所有单词进行计数
from sklearn.base import BaseEstimator, TransformerMixin
class EmailToWordCounterTransformer(BaseEstimator, TransformerMixin):
def __init__(self, strip_headers=True, low_case=True, remove_punctuation=True,
repalce_urls=True, replace_numbers=True, stemming=True):
self.strip_headers = strip_headers
self.low_case = low_case
self.remove_punctuation = remove_punctuation
self.replace_urls = repalce_urls
self.replace_numbers = replace_numbers
self.stemming = stemming
def fit(self, X, Y=None):
return self
def transform(self, X, Y=None):
X_transform = []
for email in X:
text = email_to_text(email) or ""
if self.low_case:
text = text.lower()
if self.replace_urls and url_extracror is not None:
# list(set()) 创建一个不重复的元素集
urls = list(set(url_extracror.find_urls(text)))
urls.sort(key=lambda url: len(url),
reverse=True) # 根据url的长度对url进行排序
for url in urls:
text = text.replace(url, "URL") # 用“URL”换所有真实的url
if self.replace_numbers: # 将所有数字转换为NUMBER字符
text = re.sub(r'd+(?:.d*(?:[eE]d+))?', 'NUMBER', text)
if self.remove_punctuation: # 删除所有标点符号
text = re.sub(r'W+', ' ', text, flags=re.M) # W 匹配任何非单词字符
# Counter()返回一个特殊的字典,包含单词种类和单词数量。eg:{"a":3,"b""2}
word_count = Counter(text.split())
if self.stemming and stemmer is not None:
stemmed_word_counts = Counter()
for word, count in word_count.items(): # 分析单词的词干,统计词干的数量
stemmed_word = stemmer.stem(word)
stemmed_word_counts[stemmed_word] += count
word_count = stemmed_word_counts
X_transform.append(word_count) # 将每个邮件的字符字典存到list中
return np.array(X_transform)
X_few = x_train[:3]
X_few_wordcounts = EmailToWordCounterTransformer().fit_transform(X_few)
from scipy.sparse import csr_matrix # 压缩稀疏行矩阵
class WordCounterToVectorTransformer(BaseEstimator, TransformerMixin):
def __init__(self, vocabulary_size=100):
self.vocabulary_size = vocabulary_size
def fit(self, X, Y=None):
total_count = Counter()
for word_count in X:
for word, count in word_count.items(): # X是上个函数内的字典,不是X数据集
total_count[word] += min(count, 10) # 次数超过10的存10
# most_common 字典里面出现次数最多的.当most_common没有参数时,返回字典所有的item,从大到小排列
# 查看前vocabulaty_size个出现次数最多的
most_common = total_count.most_common()[:self.vocabulary_size]
self.most_common_ = most_common
# most_commoon [('number', 15), ('i', 7), ('the', 7), ('url', 7), ('to', 4), ('chri', 3), ('wa', 3), ('from', 3), ('list', 3), ('of', 3)]
# 将most_common里面的出现频率最多的词从多到少依次排序,返回{(单词,序号)}
self.vocabulary_ = {word: index + 1 for index,
(word, count) in enumerate(most_common)}
# vocabulary {'number': 1, 'i': 2, 'the': 3, 'url': 4, 'to': 5, 'chri': 6, 'wa': 7, 'from': 8, 'list': 9, 'of': 10}
return self
def transform(self, X, Y=None):
rows = []
cols = []
data = []
for row, word_count in enumerate(X):
for word, count in word_count.items():
cols.append(self.vocabulary_.get(word, 0))
return csr_matrix((data, (rows, cols)), shape=(len(X), self.vocabulary_size+1))
vocab_transformer = WordCounterToVectorTransformer(vocabulary_size=10)
X_few_vectors = vocab_transformer.fit_transform(X_few_wordcounts)
from sklearn.pipeline import Pipeline # 创建流水线处理
preprocess_pipeline = Pipeline([
("email_to_wordcount", EmailToWordCounterTransformer()),
("wordcount_to_vector", WordCounterToVectorTransformer()),
X_train_transformed = preprocess_pipeline.fit_transform(x_train)
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score
log_clf = LogisticRegression()
score = cross_val_score(log_clf, X_train_transformed,
y_train, cv=3, verbose=3, n_jobs=-1)
from sklearn.metrics import precision_score, recall_score
X_test_transformed = preprocess_pipeline.transform(x_test)
# solver 优化算法的参数,包括newton-cg,lbfgs,liblinear,sag,saga,对损失的优化的方法
log_clf2 = LogisticRegression(solver="liblinear", random_state=42), y_train)
y_pred = log_clf2.predict(X_test_transformed)
print(precision_score(y_test, y_pred))
print(recall_score(y_test, y_pred))
from sklearn.naive_bayes import MultinomialNB
mnb = MultinomialNB(), y_train)
mnb_y_pred = mnb.predict(X_test_transformed)
print(precision_score(y_test, mnb_y_pred))
print(recall_score(y_test, mnb_y_pred))