gen_sample_by_captcha.py 生成验证码图片
# -*- coding: UTF-8 -*-
"""
使用captcha lib生成验证码(前提:pip install captcha)
"""
from captcha.image import ImageCaptcha
import os
import random
import time
def gen_special_img(text, file_path):
# 生成img文件
generator = ImageCaptcha(width=width, height=height) # 指定大小
img = generator.generate_image(text) # 生成图片
img.save(file_path) # 保存图片
if __name__ == '__main__':
# 配置参数
root_dir = "../sample/origin/" # 图片储存路径
image_suffix = "png" # 图片储存后缀
characters = "0123456789" # 图片上显示的字符集
# characters = "0123456789abcdefghijklmnopqrstuvwxyz"
count = 10000 # 生成多少张样本
char_count = 4 # 图片上的字符数量
# 设置图片高度和宽度
width = 100
height = 60
# 判断文件夹是否存在
if not os.path.exists(root_dir):
os.mkdir(root_dir)
for i in range(count):
text = ""
for j in range(char_count):
text += random.choice(characters)
timec = str(time.time()).replace(".", "")
p = os.path.join(root_dir, "{}_{}.{}".format(text, timec, image_suffix))
gen_special_img(text, p)
sample.py 配置文件
from easydict import EasyDict
import os
import json
# 可以使得以属性的方式去访问字典的值
sample_conf = EasyDict()
# 图片文件夹
sample_conf.origin_image_dir = "./sample/origin/"
sample_conf.train_image_dir = "./sample/train/"
sample_conf.test_image_dir = "./sample/test/"
sample_conf.api_image_dir = "./sample/api/"
sample_conf.online_image_dir = "./sample/online/"
sample_conf.local_image_dir = "./sample/local/"
# 模型文件夹
sample_conf.model_save_dir = "./model/"
# 图片相关参数
sample_conf.image_width = 100
sample_conf.image_height = 60
sample_conf.max_captcha = 4
sample_conf.image_suffix = "png"
# 验证码字符相关参数
sample_conf.char_set = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i',
'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z']
# char_set = ['a','b','c','d','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','t','u','v','w','x','y','z']
# char_set = ['A','B','C','D','E','F','G','H','I','J','K','L','M','N','O','P','Q','R','S','T','U','V','W','X','Y','Z']
use_labels_json_file = False
if use_labels_json_file:
if os.path.exists("gen_image/labels.json"):
with open("gen_image/labels.json", "r") as f:
content = f.read()
if content:
sample_conf.char_set = json.loads(content)
else:
pass
else:
pass
sample_conf.remote_url = "https://www.xxxxx.com/getImg"
verify_and_split_data.py
"""
验证图片尺寸和划分测试集(5%)和训练集(95%)
"""
from PIL import Image
import random
import os
import shutil
from sample import sample_conf
def verify(origin_dir, real_width, real_height, image_suffix):
"""
校验图片大小
:return:
"""
print("开始校验原始图片集")
# 图片真实尺寸
real_size = (real_width, real_height)
# 图片名称列表和数量
img_list = os.listdir(origin_dir)
total_count = len(img_list)
print("原始集共有图片: {}张".format(total_count))
# 无效图片列表
bad_img = []
# 遍历所有图片进行验证
for index, img_name in enumerate(img_list):
file_path = os.path.join(origin_dir, img_name)
# 过滤图片不正确的后缀
if not img_name.endswith(image_suffix):
bad_img.append((index, img_name, "文件后缀不正确"))
continue
# 过滤图片标签不标准的情况
prefix, posfix = img_name.split("_")
if prefix == "" or posfix == "":
bad_img.append((index, img_name, "图片标签异常"))
continue
# 图片无法正常打开
try:
img = Image.open(file_path)
except OSError:
bad_img.append((index, img_name, "图片无法正常打开"))
continue
# 图片尺寸有异常
if real_size == img.size:
print("{} pass".format(index), end='
')
else:
bad_img.append((index, img_name, "图片尺寸异常为:{}".format(img.size)))
print("====以下{}张图片有异常====".format(len(bad_img)))
if bad_img:
for b in bad_img:
print("[第{}张图片] [{}] [{}]".format(b[0], b[1], b[2]))
else:
print("未发现异常(共 {} 张图片)".format(len(img_list)))
print("========end
")
return bad_img
def split(origin_dir, train_dir, test_dir, bad_imgs):
"""
分离训练集和测试集
:return:
"""
print("开始分离原始图片集为:测试集(5%)和训练集(95%)")
# 图片名称列表和数量
img_list = os.listdir(origin_dir)
for img in bad_imgs:
img_list.remove(img)
total_count = len(img_list)
print("共分配{}张图片到训练集和测试集,其中{}张为异常留在原始目录".format(total_count, len(bad_imgs)))
# 创建文件夹
if not os.path.exists(train_dir):
os.mkdir(train_dir)
if not os.path.exists(test_dir):
os.mkdir(test_dir)
# 测试集
test_count = int(total_count * 0.05)
test_set = set()
for i in range(test_count):
while True:
file_name = random.choice(img_list)
if file_name in test_set:
pass
else:
test_set.add(file_name)
img_list.remove(file_name)
break
test_list = list(test_set)
print("测试集数量为:{}".format(len(test_list)))
for file_name in test_list:
src = os.path.join(origin_dir, file_name)
dst = os.path.join(test_dir, file_name)
shutil.move(src, dst)
# 训练集
train_list = img_list
print("训练集数量为:{}".format(len(train_list)))
for file_name in train_list:
src = os.path.join(origin_dir, file_name)
dst = os.path.join(train_dir, file_name)
shutil.move(src, dst)
if os.listdir(origin_dir) == 0:
print("migration done")
def main():
# 图片路径
origin_dir = sample_conf["origin_image_dir"]
train_dir = sample_conf["train_image_dir"]
test_dir = sample_conf["test_image_dir"]
# 图片尺寸
real_width = sample_conf["image_width"]
real_height = sample_conf["image_height"]
# 图片后缀
image_suffix = sample_conf["image_suffix"]
bad_images_info = verify(origin_dir, real_width, real_height, image_suffix)
bad_imgs = []
for info in bad_images_info:
bad_imgs.append(info[1])
split(origin_dir, train_dir, test_dir, bad_imgs)
if __name__ == '__main__':
main()
train_model_v2.py 训练模型,训练过程中同时输出训练集和验证集的准确率
# -*- coding: utf-8 -*-
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import time
from PIL import Image
import random
import os
from sample import sample_conf
from tensorflow.python.framework.errors_impl import NotFoundError
# 设置以下环境变量可开启CPU识别
# os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
# os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
class TrainError(Exception):
pass
class TrainModel(object):
def __init__(self, train_img_path, verify_img_path, char_set, model_save_dir, verify=False):
# 模型路径
self.model_save_dir = model_save_dir
# 打乱文件顺序+校验图片格式
self.train_img_path = train_img_path
self.train_images_list = os.listdir(train_img_path)
# 校验格式
if verify:
self.confirm_image_suffix()
# 打乱文件顺序
random.seed(time.time())
random.shuffle(self.train_images_list)
# 验证集文件
self.verify_img_path = verify_img_path
self.verify_images_list = os.listdir(verify_img_path)
# 获得图片宽高和字符长度基本信息
label, captcha_array = self.gen_captcha_text_image(train_img_path, self.train_images_list[0])
captcha_shape = captcha_array.shape
captcha_shape_len = len(captcha_shape)
if captcha_shape_len == 3:
image_height, image_width, channel = captcha_shape
self.channel = channel
elif captcha_shape_len == 2:
image_height, image_width = captcha_shape
else:
raise TrainError("图片转换为矩阵时出错,请检查图片格式")
# 初始化变量
# 图片尺寸
self.image_height = image_height
self.image_width = image_width
# 验证码长度(位数)
self.max_captcha = len(label)
# 验证码字符类别
self.char_set = char_set
self.char_set_len = len(char_set)
# 相关信息打印
print("-->图片尺寸: {} X {}".format(image_height, image_width))
print("-->验证码长度: {}".format(self.max_captcha))
print("-->验证码共{}类 {}".format(self.char_set_len, char_set))
print("-->使用测试集为 {}".format(train_img_path))
print("-->使验证集为 {}".format(verify_img_path))
# tf初始化占位符
self.X = tf.placeholder(tf.float32, [None, image_height * image_width]) # 特征向量
self.Y = tf.placeholder(tf.float32, [None, self.max_captcha * self.char_set_len]) # 标签
self.keep_prob = tf.placeholder(tf.float32) # dropout值
self.w_alpha = 0.01
self.b_alpha = 0.1
# test model input and output
print(">>> Start model test")
batch_x, batch_y = self.get_batch(0, size=100)
print(">>> input batch images shape: {}".format(batch_x.shape))
print(">>> input batch labels shape: {}".format(batch_y.shape))
@staticmethod
def gen_captcha_text_image(img_path, img_name):
"""
返回一个验证码的array形式和对应的字符串标签
:return:tuple (str, numpy.array)
"""
# 标签
label = img_name.split("_")[0]
# 文件
img_file = os.path.join(img_path, img_name)
captcha_image = Image.open(img_file)
captcha_array = np.array(captcha_image) # 向量化
return label, captcha_array
@staticmethod
def convert2gray(img):
"""
图片转为灰度图,如果是3通道图则计算,单通道图则直接返回
:param img:
:return:
"""
if len(img.shape) > 2:
r, g, b = img[:, :, 0], img[:, :, 1], img[:, :, 2]
gray = 0.2989 * r + 0.5870 * g + 0.1140 * b
return gray
else:
return img
def text2vec(self, text):
"""
转标签为oneHot编码
:param text: str
:return: numpy.array
"""
text_len = len(text)
if text_len > self.max_captcha:
raise ValueError('验证码最长{}个字符'.format(self.max_captcha))
vector = np.zeros(self.max_captcha * self.char_set_len)
for i, ch in enumerate(text):
idx = i * self.char_set_len + self.char_set.index(ch)
vector[idx] = 1
return vector
def get_batch(self, n, size=128):
batch_x = np.zeros([size, self.image_height * self.image_width]) # 初始化
batch_y = np.zeros([size, self.max_captcha * self.char_set_len]) # 初始化
max_batch = int(len(self.train_images_list) / size)
# print(max_batch)
if max_batch - 1 < 0:
raise TrainError("训练集图片数量需要大于每批次训练的图片数量")
if n > max_batch - 1:
n = n % max_batch
s = n * size
e = (n + 1) * size
this_batch = self.train_images_list[s:e]
# print("{}:{}".format(s, e))
for i, img_name in enumerate(this_batch):
label, image_array = self.gen_captcha_text_image(self.train_img_path, img_name)
image_array = self.convert2gray(image_array) # 灰度化图片
batch_x[i, :] = image_array.flatten() / 255 # flatten 转为一维
batch_y[i, :] = self.text2vec(label) # 生成 oneHot
return batch_x, batch_y
def get_verify_batch(self, size=100):
batch_x = np.zeros([size, self.image_height * self.image_width]) # 初始化
batch_y = np.zeros([size, self.max_captcha * self.char_set_len]) # 初始化
verify_images = []
for i in range(size):
verify_images.append(random.choice(self.verify_images_list))
for i, img_name in enumerate(verify_images):
label, image_array = self.gen_captcha_text_image(self.verify_img_path, img_name)
image_array = self.convert2gray(image_array) # 灰度化图片
batch_x[i, :] = image_array.flatten() / 255 # flatten 转为一维
batch_y[i, :] = self.text2vec(label) # 生成 oneHot
return batch_x, batch_y
def confirm_image_suffix(self):
# 在训练前校验所有文件格式
print("开始校验所有图片后缀")
for index, img_name in enumerate(self.train_images_list):
print("{} image pass".format(index), end='
')
if not img_name.endswith(sample_conf['image_suffix']):
raise TrainError('confirm images suffix:you request [.{}] file but get file [{}]'
.format(sample_conf['image_suffix'], img_name))
print("所有图片格式校验通过")
def model(self):
x = tf.reshape(self.X, shape=[-1, self.image_height, self.image_width, 1])
print(">>> input x: {}".format(x))
# 卷积层1
wc1 = tf.get_variable(name='wc1', shape=[3, 3, 1, 32], dtype=tf.float32,
initializer=tf.contrib.layers.xavier_initializer())
bc1 = tf.Variable(self.b_alpha * tf.random_normal([32]))
conv1 = tf.nn.relu(tf.nn.bias_add(tf.nn.conv2d(x, wc1, strides=[1, 1, 1, 1], padding='SAME'), bc1))
conv1 = tf.nn.max_pool(conv1, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')
conv1 = tf.nn.dropout(conv1, self.keep_prob)
# 卷积层2
wc2 = tf.get_variable(name='wc2', shape=[3, 3, 32, 64], dtype=tf.float32,
initializer=tf.contrib.layers.xavier_initializer())
bc2 = tf.Variable(self.b_alpha * tf.random_normal([64]))
conv2 = tf.nn.relu(tf.nn.bias_add(tf.nn.conv2d(conv1, wc2, strides=[1, 1, 1, 1], padding='SAME'), bc2))
conv2 = tf.nn.max_pool(conv2, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')
conv2 = tf.nn.dropout(conv2, self.keep_prob)
# 卷积层3
wc3 = tf.get_variable(name='wc3', shape=[3, 3, 64, 128], dtype=tf.float32,
initializer=tf.contrib.layers.xavier_initializer())
bc3 = tf.Variable(self.b_alpha * tf.random_normal([128]))
conv3 = tf.nn.relu(tf.nn.bias_add(tf.nn.conv2d(conv2, wc3, strides=[1, 1, 1, 1], padding='SAME'), bc3))
conv3 = tf.nn.max_pool(conv3, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')
conv3 = tf.nn.dropout(conv3, self.keep_prob)
print(">>> convolution 3: ", conv3.shape)
next_shape = conv3.shape[1] * conv3.shape[2] * conv3.shape[3]
# 全连接层1
wd1 = tf.get_variable(name='wd1', shape=[next_shape, 1024], dtype=tf.float32,
initializer=tf.contrib.layers.xavier_initializer())
bd1 = tf.Variable(self.b_alpha * tf.random_normal([1024]))
dense = tf.reshape(conv3, [-1, wd1.get_shape().as_list()[0]])
dense = tf.nn.relu(tf.add(tf.matmul(dense, wd1), bd1))
dense = tf.nn.dropout(dense, self.keep_prob)
# 全连接层2
wout = tf.get_variable('name', shape=[1024, self.max_captcha * self.char_set_len], dtype=tf.float32,
initializer=tf.contrib.layers.xavier_initializer())
bout = tf.Variable(self.b_alpha * tf.random_normal([self.max_captcha * self.char_set_len]))
y_predict = tf.add(tf.matmul(dense, wout), bout)
return y_predict
def train_cnn(self):
y_predict = self.model()
print(">>> input batch predict shape: {}".format(y_predict.shape))
print(">>> End model test")
# 计算概率 损失
cost = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=y_predict, labels=self.Y))
# 梯度下降
optimizer = tf.train.AdamOptimizer(learning_rate=0.0001).minimize(cost)
# 计算准确率
predict = tf.reshape(y_predict, [-1, self.max_captcha, self.char_set_len]) # 预测结果
max_idx_p = tf.argmax(predict, 2) # 预测结果
max_idx_l = tf.argmax(tf.reshape(self.Y, [-1, self.max_captcha, self.char_set_len]), 2) # 标签
correct_pred = tf.equal(max_idx_p, max_idx_l)
accuracy_char_count = tf.reduce_mean(tf.cast(correct_pred, tf.float32))
accuracy_image_count = tf.reduce_mean(tf.reduce_min(tf.cast(correct_pred, tf.float32), axis=1))
# 模型保存对象
saver = tf.train.Saver()
with tf.Session() as sess:
init = tf.global_variables_initializer()
sess.run(init)
# 恢复模型
if os.path.exists(self.model_save_dir):
try:
saver.restore(sess, self.model_save_dir)
# 判断捕获model文件夹中没有模型文件的错误
except ValueError:
print("model文件夹为空,将创建新模型")
else:
pass
step = 1
for i in range(3000):
batch_x, batch_y = self.get_batch(i, size=128)
# 梯度下降训练
_, cost_ = sess.run([optimizer, cost], feed_dict={self.X: batch_x, self.Y: batch_y, self.keep_prob: 0.75})
if step % 10 == 0:
# 基于训练集的测试
batch_x_test, batch_y_test = self.get_batch(i, size=100)
acc_char = sess.run(accuracy_char_count, feed_dict={self.X: batch_x_test, self.Y: batch_y_test, self.keep_prob: 1.})
acc_image = sess.run(accuracy_image_count, feed_dict={self.X: batch_x_test, self.Y: batch_y_test, self.keep_prob: 1.})
print("第{}次训练 >>> ".format(step))
print("[训练集] 字符准确率为 {:.5f} 图片准确率为 {:.5f} >>> loss {:.10f}".format(acc_char, acc_image, cost_))
# 基于验证集的测试
batch_x_verify, batch_y_verify = self.get_verify_batch(size=100)
acc_char = sess.run(accuracy_char_count, feed_dict={self.X: batch_x_verify, self.Y: batch_y_verify, self.keep_prob: 1.})
acc_image = sess.run(accuracy_image_count, feed_dict={self.X: batch_x_verify, self.Y: batch_y_verify, self.keep_prob: 1.})
print("[验证集] 字符准确率为 {:.5f} 图片准确率为 {:.5f} >>> loss {:.10f}".format(acc_char, acc_image, cost_))
# 准确率达到99%后保存并停止
if acc_image > 0.99:
saver.save(sess, self.model_save_dir)
print("验证集准确率达到99%,保存模型成功")
break
# 每训练500轮就保存一次
if i % 500 == 0:
saver.save(sess, self.model_save_dir)
print("定时保存模型成功")
step += 1
saver.save(sess, self.model_save_dir)
def main():
train_image_dir = sample_conf["train_image_dir"]
verify_image_dir = sample_conf["test_image_dir"]
char_set = sample_conf["char_set"]
model_save_dir = sample_conf["model_save_dir"]
tm = TrainModel(train_image_dir, verify_image_dir, char_set, model_save_dir, verify=False)
tm.train_cnn() # 开始训练模型
if __name__ == '__main__':
main()
训练结果
第2960次训练 >>>
[训练集] 字符准确率为 0.87500 图片准确率为 0.61000 >>> loss 0.0337208398
[验证集] 字符准确率为 0.81500 图片准确率为 0.45000 >>> loss 0.0337208398
第2970次训练 >>>
[训练集] 字符准确率为 0.88500 图片准确率为 0.62000 >>> loss 0.0343154743
[验证集] 字符准确率为 0.80750 图片准确率为 0.39000 >>> loss 0.0343154743
第2980次训练 >>>
[训练集] 字符准确率为 0.89250 图片准确率为 0.65000 >>> loss 0.0298477933
[验证集] 字符准确率为 0.80000 图片准确率为 0.38000 >>> loss 0.0298477933
第2990次训练 >>>
[训练集] 字符准确率为 0.90250 图片准确率为 0.71000 >>> loss 0.0316790938
[验证集] 字符准确率为 0.83500 图片准确率为 0.48000 >>> loss 0.0316790938
第3000次训练 >>>
[训练集] 字符准确率为 0.89000 图片准确率为 0.69000 >>> loss 0.0330378339
[验证集] 字符准确率为 0.83750 图片准确率为 0.53000 >>> loss 0.0330378339
test_batch.py 批量验证
# -*- coding: utf-8 -*-
import tensorflow as tf
import numpy as np
import time
from PIL import Image
import random
import os
from sample import sample_conf
class TestError(Exception):
pass
class TestBatch(object):
def __init__(self, img_path, char_set, model_save_dir, total):
# 模型路径
self.model_save_dir = model_save_dir
# 打乱文件顺序
self.img_path = img_path
self.img_list = os.listdir(img_path)
random.seed(time.time())
random.shuffle(self.img_list)
# 获得图片宽高和字符长度基本信息
label, captcha_array = self.gen_captcha_text_image()
captcha_shape = captcha_array.shape
captcha_shape_len = len(captcha_shape)
if captcha_shape_len == 3:
image_height, image_width, channel = captcha_shape
self.channel = channel
elif captcha_shape_len == 2:
image_height, image_width = captcha_shape
else:
raise TestError("图片转换为矩阵时出错,请检查图片格式")
# 初始化变量
# 图片尺寸
self.image_height = image_height
self.image_width = image_width
# 验证码长度(位数)
self.max_captcha = len(label)
# 验证码字符类别
self.char_set = char_set
self.char_set_len = len(char_set)
# 测试个数
self.total = total
# 相关信息打印
print("-->图片尺寸: {} X {}".format(image_height, image_width))
print("-->验证码长度: {}".format(self.max_captcha))
print("-->验证码共{}类 {}".format(self.char_set_len, char_set))
print("-->使用测试集为 {}".format(img_path))
# tf初始化占位符
self.X = tf.placeholder(tf.float32, [None, image_height * image_width]) # 特征向量
self.Y = tf.placeholder(tf.float32, [None, self.max_captcha * self.char_set_len]) # 标签
self.keep_prob = tf.placeholder(tf.float32) # dropout值
self.w_alpha = 0.01
self.b_alpha = 0.1
def gen_captcha_text_image(self):
"""
返回一个验证码的array形式和对应的字符串标签
:return:tuple (str, numpy.array)
"""
img_name = random.choice(self.img_list)
# 标签
label = img_name.split("_")[0]
# 文件
img_file = os.path.join(self.img_path, img_name)
captcha_image = Image.open(img_file)
captcha_array = np.array(captcha_image) # 向量化
return label, captcha_array
@staticmethod
def convert2gray(img):
"""
图片转为灰度图,如果是3通道图则计算,单通道图则直接返回
:param img:
:return:
"""
if len(img.shape) > 2:
r, g, b = img[:, :, 0], img[:, :, 1], img[:, :, 2]
gray = 0.2989 * r + 0.5870 * g + 0.1140 * b
return gray
else:
return img
def text2vec(self, text):
"""
转标签为oneHot编码
:param text: str
:return: numpy.array
"""
text_len = len(text)
if text_len > self.max_captcha:
raise ValueError('验证码最长{}个字符'.format(self.max_captcha))
vector = np.zeros(self.max_captcha * self.char_set_len)
for i, ch in enumerate(text):
idx = i * self.char_set_len + self.char_set.index(ch)
vector[idx] = 1
return vector
def model(self):
x = tf.reshape(self.X, shape=[-1, self.image_height, self.image_width, 1])
print(">>> input x: {}".format(x))
# 卷积层1
wc1 = tf.get_variable(name='wc1', shape=[3, 3, 1, 32], dtype=tf.float32,
initializer=tf.contrib.layers.xavier_initializer())
bc1 = tf.Variable(self.b_alpha * tf.random_normal([32]))
conv1 = tf.nn.relu(tf.nn.bias_add(tf.nn.conv2d(x, wc1, strides=[1, 1, 1, 1], padding='SAME'), bc1))
conv1 = tf.nn.max_pool(conv1, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')
conv1 = tf.nn.dropout(conv1, self.keep_prob)
# 卷积层2
wc2 = tf.get_variable(name='wc2', shape=[3, 3, 32, 64], dtype=tf.float32,
initializer=tf.contrib.layers.xavier_initializer())
bc2 = tf.Variable(self.b_alpha * tf.random_normal([64]))
conv2 = tf.nn.relu(tf.nn.bias_add(tf.nn.conv2d(conv1, wc2, strides=[1, 1, 1, 1], padding='SAME'), bc2))
conv2 = tf.nn.max_pool(conv2, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')
conv2 = tf.nn.dropout(conv2, self.keep_prob)
# 卷积层3
wc3 = tf.get_variable(name='wc3', shape=[3, 3, 64, 128], dtype=tf.float32,
initializer=tf.contrib.layers.xavier_initializer())
bc3 = tf.Variable(self.b_alpha * tf.random_normal([128]))
conv3 = tf.nn.relu(tf.nn.bias_add(tf.nn.conv2d(conv2, wc3, strides=[1, 1, 1, 1], padding='SAME'), bc3))
conv3 = tf.nn.max_pool(conv3, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')
conv3 = tf.nn.dropout(conv3, self.keep_prob)
print(">>> convolution 3: ", conv3.shape)
next_shape = conv3.shape[1]*conv3.shape[2]*conv3.shape[3]
# 全连接层1
wd1 = tf.get_variable(name='wd1', shape=[next_shape, 1024], dtype=tf.float32,
initializer=tf.contrib.layers.xavier_initializer())
bd1 = tf.Variable(self.b_alpha * tf.random_normal([1024]))
dense = tf.reshape(conv3, [-1, wd1.get_shape().as_list()[0]])
dense = tf.nn.relu(tf.add(tf.matmul(dense, wd1), bd1))
dense = tf.nn.dropout(dense, self.keep_prob)
# 全连接层2
wout = tf.get_variable('name', shape=[1024, self.max_captcha * self.char_set_len], dtype=tf.float32,
initializer=tf.contrib.layers.xavier_initializer())
bout = tf.Variable(self.b_alpha * tf.random_normal([self.max_captcha * self.char_set_len]))
y_predict = tf.add(tf.matmul(dense, wout), bout)
return y_predict
def test_batch(self):
y_predict = self.model()
total = self.total
right = 0
saver = tf.train.Saver()
with tf.Session() as sess:
saver.restore(sess, self.model_save_dir)
s = time.time()
for i in range(total):
# test_text, test_image = gen_special_num_image(i)
test_text, test_image = self.gen_captcha_text_image() # 随机
test_image = self.convert2gray(test_image)
test_image = test_image.flatten() / 255
predict = tf.argmax(tf.reshape(y_predict, [-1, self.max_captcha, self.char_set_len]), 2)
text_list = sess.run(predict, feed_dict={self.X: [test_image], self.keep_prob: 1.})
predict_text = text_list[0].tolist()
p_text = ""
for p in predict_text:
p_text += str(self.char_set[p])
print("origin: {} predict: {}".format(test_text, p_text))
if test_text == p_text:
right += 1
else:
pass
e = time.time()
rate = str(right/total) + "%"
print("测试结果: {}/{}".format(right, total))
print("{}个样本识别耗时{}秒,准确率{}".format(total, e-s, rate))
def main():
test_image_dir = sample_conf["test_image_dir"]
model_save_dir = sample_conf["model_save_dir"]
char_set = sample_conf["char_set"]
total = 100
tb = TestBatch(test_image_dir, char_set, model_save_dir, total)
tb.test_batch()
if __name__ == '__main__':
main()
程序结果
origin: 4958 predict: 4958
origin: 0409 predict: 0409
origin: 1328 predict: 1228
origin: 6181 predict: 6181
origin: 7017 predict: 7002
origin: 5355 predict: 5355
origin: 1780 predict: 7180
origin: 4122 predict: 4122
测试结果: 46/100
100个样本识别耗时3.113262891769409秒,准确率0.46%
封装识别类
# -*- coding: utf-8 -*-
"""
识别图像的类,为了快速进行多次识别可以调用此类下面的方法:
R = Recognizer(image_height, image_width, max_captcha)
for i in range(10):
r_img = Image.open(str(i) + ".jpg")
t = R.rec_image(r_img)
简单的图片每张基本上可以达到毫秒级的识别速度
"""
import tensorflow as tf
import numpy as np
from PIL import Image
from sample import sample_conf
class Recognizer(object):
def __init__(self, image_height, image_width, max_captcha, char_set, model_save_dir):
self.w_alpha = 0.01
self.b_alpha = 0.1
self.image_height = image_height
self.image_width = image_width
self.max_captcha = max_captcha
self.char_set = char_set
self.char_set_len = len(self.char_set)
self.model_save_dir = model_save_dir
# 新建图和会话
self.g = tf.Graph()
self.sess = tf.Session(graph=self.g)
# 使用指定的图和会话
with self.g.as_default():
# 迭代循环前,写出所有用到的张量的计算表达式,如果写在循环中,会发生内存泄漏,拖慢识别的速度
# tf初始化占位符
self.X = tf.placeholder(tf.float32, [None, self.image_height * self.image_width]) # 特征向量
self.Y = tf.placeholder(tf.float32, [None, self.max_captcha * self.char_set_len]) # 标签
self.keep_prob = tf.placeholder(tf.float32) # dropout值
# 加载网络和模型参数
self.y_predict = self.model()
self.predict = tf.argmax(tf.reshape(self.y_predict, [-1, self.max_captcha, self.char_set_len]), 2)
saver = tf.train.Saver()
with self.sess.as_default() as sess:
saver.restore(sess, self.model_save_dir)
# def __del__(self):
# self.sess.close()
# print("session close")
@staticmethod
def convert2gray(img):
"""
图片转为灰度图,如果是3通道图则计算,单通道图则直接返回
:param img:
:return:
"""
if len(img.shape) > 2:
r, g, b = img[:, :, 0], img[:, :, 1], img[:, :, 2]
gray = 0.2989 * r + 0.5870 * g + 0.1140 * b
return gray
else:
return img
def text2vec(self, text):
"""
转标签为oneHot编码
:param text: str
:return: numpy.array
"""
text_len = len(text)
if text_len > self.max_captcha:
raise ValueError('验证码最长{}个字符'.format(self.max_captcha))
vector = np.zeros(self.max_captcha * self.char_set_len)
for i, ch in enumerate(text):
idx = i * self.char_set_len + self.char_set.index(ch)
vector[idx] = 1
return vector
def model(self):
x = tf.reshape(self.X, shape=[-1, self.image_height, self.image_width, 1])
print(">>> input x: {}".format(x))
# 卷积层1
wc1 = tf.get_variable(name='wc1', shape=[3, 3, 1, 32], dtype=tf.float32,
initializer=tf.contrib.layers.xavier_initializer())
bc1 = tf.Variable(self.b_alpha * tf.random_normal([32]))
conv1 = tf.nn.relu(tf.nn.bias_add(tf.nn.conv2d(x, wc1, strides=[1, 1, 1, 1], padding='SAME'), bc1))
conv1 = tf.nn.max_pool(conv1, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')
conv1 = tf.nn.dropout(conv1, self.keep_prob)
# 卷积层2
wc2 = tf.get_variable(name='wc2', shape=[3, 3, 32, 64], dtype=tf.float32,
initializer=tf.contrib.layers.xavier_initializer())
bc2 = tf.Variable(self.b_alpha * tf.random_normal([64]))
conv2 = tf.nn.relu(tf.nn.bias_add(tf.nn.conv2d(conv1, wc2, strides=[1, 1, 1, 1], padding='SAME'), bc2))
conv2 = tf.nn.max_pool(conv2, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')
conv2 = tf.nn.dropout(conv2, self.keep_prob)
# 卷积层3
wc3 = tf.get_variable(name='wc3', shape=[3, 3, 64, 128], dtype=tf.float32,
initializer=tf.contrib.layers.xavier_initializer())
bc3 = tf.Variable(self.b_alpha * tf.random_normal([128]))
conv3 = tf.nn.relu(tf.nn.bias_add(tf.nn.conv2d(conv2, wc3, strides=[1, 1, 1, 1], padding='SAME'), bc3))
conv3 = tf.nn.max_pool(conv3, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')
conv3 = tf.nn.dropout(conv3, self.keep_prob)
print(">>> convolution 3: ", conv3.shape)
next_shape = conv3.shape[1] * conv3.shape[2] * conv3.shape[3]
# 全连接层1
wd1 = tf.get_variable(name='wd1', shape=[next_shape, 1024], dtype=tf.float32,
initializer=tf.contrib.layers.xavier_initializer())
bd1 = tf.Variable(self.b_alpha * tf.random_normal([1024]))
dense = tf.reshape(conv3, [-1, wd1.get_shape().as_list()[0]])
dense = tf.nn.relu(tf.add(tf.matmul(dense, wd1), bd1))
dense = tf.nn.dropout(dense, self.keep_prob)
# 全连接层2
wout = tf.get_variable('name', shape=[1024, self.max_captcha * self.char_set_len], dtype=tf.float32,
initializer=tf.contrib.layers.xavier_initializer())
bout = tf.Variable(self.b_alpha * tf.random_normal([self.max_captcha * self.char_set_len]))
y_predict = tf.add(tf.matmul(dense, wout), bout)
return y_predict
def rec_image(self, img):
# 读取图片
img_array = np.array(img)
test_image = self.convert2gray(img_array)
test_image = test_image.flatten() / 255
# 使用指定的图和会话
with self.g.as_default():
with self.sess.as_default() as sess:
text_list = sess.run(self.predict, feed_dict={self.X: [test_image], self.keep_prob: 1.})
# 获取结果
predict_text = text_list[0].tolist()
p_text = ""
for p in predict_text:
p_text += str(self.char_set[p])
# 返回识别结果
return p_text
def main():
image_height = sample_conf["image_height"]
image_width = sample_conf["image_width"]
max_captcha = sample_conf["max_captcha"]
char_set = sample_conf["char_set"]
model_save_dir = sample_conf["model_save_dir"]
R = Recognizer(image_height, image_width, max_captcha, char_set, model_save_dir)
r_img = Image.open("./sample/test/0059_15553933348531582.png")
t = R.rec_image(r_img)
print(t)
if __name__ == '__main__':
main()
使用flask写的提供在线识别功能的接口
# -*- coding: UTF-8 -*-
"""
构建flask接口服务
接收 files={'image_file': ('captcha.jpg', BytesIO(bytes), 'application')} 参数识别验证码
需要配置参数:
image_height = 40
image_width = 80
max_captcha = 4
"""
import json
from io import BytesIO
import os
from recognition_object import Recognizer
import time
from flask import Flask, request, jsonify, Response
from PIL import Image
from sample import sample_conf
# 默认使用CPU
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
# 配置参数
image_height = sample_conf["image_height"]
image_width = sample_conf["image_width"]
max_captcha = sample_conf["max_captcha"]
api_image_dir = sample_conf["api_image_dir"]
model_save_dir = sample_conf["model_save_dir"]
image_suffix = sample_conf["image_suffix"] # 文件后缀
char_set = sample_conf["char_set"]
# Flask对象
app = Flask(__name__)
basedir = os.path.abspath(os.path.dirname(__file__))
# 生成识别对象,需要配置参数
R = Recognizer(image_height, image_width, max_captcha, char_set, model_save_dir)
# 如果你需要使用多个模型,可以参照原有的例子配置路由和编写逻辑
# Q = Recognizer(image_height, image_width, max_captcha, char_set, model_save_dir)
def response_headers(content):
resp = Response(content)
resp.headers['Access-Control-Allow-Origin'] = '*'
return resp
@app.route('/b', methods=['POST'])
def up_image():
if request.method == 'POST' and request.files.get('image_file'):
timec = str(time.time()).replace(".", "")
file = request.files.get('image_file')
img = file.read()
img = BytesIO(img)
img = Image.open(img, mode="r")
# username = request.form.get("name")
print("接收图片尺寸: {}".format(img.size))
s = time.time()
value = R.rec_image(img)
e = time.time()
print("识别结果: {}".format(value))
# 保存图片
print("保存图片: {}{}_{}.{}".format(api_image_dir, value, timec, image_suffix))
file_name = "{}_{}.{}".format(value, timec, image_suffix)
file_path = os.path.join(api_image_dir + file_name)
img.save(file_path)
result = {
'time': timec, # 时间戳
'value': value, # 预测的结果
'speed_time(ms)': int((e - s) * 1000) # 识别耗费的时间
}
img.close()
return jsonify(result)
else:
content = json.dumps({"error_code": "1001"})
resp = response_headers(content)
return resp
if __name__ == '__main__':
app.run(debug=True, port=9999)
为了测试这个接口是否工作正常,还得写一个页面
<form id="form1" method="post" action=" http://127.0.0.1:9999/b" enctype="multipart/form-data">
<div>
<input id="image_file" type="file" name="image_file"/>
<input type="submit">提交</input>
</div>
</form>
测试结果
{
"speed_time(ms)": 13,
"time": "15553999504148507",
"value": "0069"
}