import pandas as pd
import csv
for i in range(1,178): # 爬取全部页
tb = pd.read_html('http://s.askci.com/stock/a/?reportTime=2017-12-31&pageNum=%s' % (str(i)))[3]
tb.to_csv(r'1.csv', mode='a', encoding='utf_8_sig', header=1, index=0)
import requests
import pandas as pd
from bs4 import BeautifulSoup
from lxml import etree
import time
import pymysql
from sqlalchemy import create_engine
from urllib.parse import urlencode # 编码 URL 字符串
start_time = time.time() #计算程序运行时间
def get_one_page(i):
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.181 Safari/537.36'
}
paras = {
'reportTime': '2017-12-31',
#可以改报告日期,比如2018-6-30获得的就是该季度的信息
'pageNum': i #页码
}
url = 'http://s.askci.com/stock/a/?' + urlencode(paras)
response = requests.get(url,headers = headers)
if response.status_code == 200:
return response.text
return None
except RequestException:
print('爬取失败')
def parse_one_page(html):
soup = BeautifulSoup(html,'lxml')
content = soup.select('#myTable04')[0] #[0]将返回的list改为bs4类型
tbl = pd.read_html(content.prettify(),header = 0)[0]
# prettify()优化代码,[0]从pd.read_html返回的list中提取出DataFrame
tbl.rename(columns = {'序号':'serial_number', '股票代码':'stock_code', '股票简称':'stock_abbre', '公司名称':'company_name', '省份':'province', '城市':'city', '主营业务收入(201712)':'main_bussiness_income', '净利润(201712)':'net_profit', '员工人数':'employees', '上市日期':'listing_date', '招股书':'zhaogushu', '公司财报':'financial_report', '行业分类':'industry_classification', '产品类型':'industry_type', '主营业务':'main_business'},inplace = True)
return tbl
#创建数据库表
def generate_mysql():
conn = pymysql.connect(
host='localhost',
user='root',
password='',
port=3306,
charset = 'utf8',
db = 'wade')
cursor = conn.cursor()
sql = 'CREATE TABLE IF NOT EXISTS listed_company (serial_number INT(20) NOT NULL,stock_code INT(20) ,stock_abbre VARCHAR(20) ,company_name VARCHAR(20) ,province VARCHAR(20) ,city VARCHAR(20) ,main_bussiness_income VARCHAR(20) ,net_profit VARCHAR(20) ,employees INT(20) ,listing_date DATETIME(0) ,zhaogushu VARCHAR(20) ,financial_report VARCHAR(20) , industry_classification VARCHAR(20) ,industry_type VARCHAR(100) ,main_business VARCHAR(200) ,PRIMARY KEY (serial_number))'
cursor.execute(sql)
conn.close()
#写入数据库
def write_to_sql(tbl, db = 'wade'):
engine = create_engine('mysql+pymysql://root:@localhost:3306/{0}?charset=utf8'.format(db))
try:
tbl.to_sql('listed_company2',con = engine,if_exists='append',index=False)
# append表示在原有表基础上增加,但该表要有表头
except Exception as e:
print(e)
def main(page):
generate_mysql()
for i in range(1,page):
html = get_one_page(i)
tbl = parse_one_page(html)
write_to_sql(tbl)
# # 单进程
if __name__ == '__main__':
main(178)
endtime = time.time()-start_time
print('程序运行了%.2f秒' %endtime)
# 多进程
from multiprocessing import Pool
if __name__ == '__main__':
pool = Pool(4)
pool.map(main, [i for i in range(1,178)]) #共有178页
endtime = time.time()-start_time
print('程序运行了%.2f秒' %(time.time()-start_time))
为爬取网页表格数据,较容想到的常规方式是通过requests请求以及lxml解析定位获取元素,
此外还可以通过pandas库的read_html直接获取表格数据,得到的将是目标网页所有table表格的list集合
- requests请求+lxml解析+xpath定位
from lxml import etree import pandas as pd from pandas import DataFrame import time import requests import random def pagedataget(html): items=html.xpath('//*[@class="timeborder"]') compname=[];publish_date=[];ssdbk=[];pubtype=[];pubinfo=[] for item in items : name=str(item.xpath('./td/text()')[0]).replace(' ','').replace(' ','').replace(' ','') name=''.join(name.split()) if name: compname.append(name) date=str(item.xpath('./td/text()')[1]).replace(' ','').replace(' ','').replace(' ','') publish_date.append(date) sb=str(item.xpath('./td/text()')[2]).replace(' ','').replace(' ','').replace(' ','') ssdbk.append(sb) pt=str(item.xpath('./td/text()')[3]).replace(' ','').replace(' ','').replace(' ','') pubtype.append(pt) pinf=item.xpath('./td/a/text()') if len(pinf)==2: pinf=str(pinf[0]).replace(' ','').replace(' ','').replace(' ','')+'、'+str(pinf[1]).replace(' ','').replace(' ','').replace(' ','') else: pinf=str(pinf[0]).replace(' ','').replace(' ','').replace(' ','') pubinfo.append(pinf) ipopub=pd.concat([DataFrame(compname),DataFrame(publish_date),DataFrame(ssdbk),DataFrame(pubtype),DataFrame(pubinfo)],axis=1) ipopub.columns=['compname','publish_date','ssdbk','pubtype','pubinfo'] return ipopub if __name__ == '__main__': start = time.time() user_agent_list = ["Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; WOW64; rv:60.0) Gecko/20100101 Firefox/61.0", "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/64.0.3282.186 Safari/537.36", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.62 Safari/537.36"] header={'User-Agent':'Mozilla/5.0'} ipopub=DataFrame(data=None,columns=['compname','publish_date','ssdbk','pubtype','pubinfo']) for i in range(1,199): header['User-Agent'] = random.choice(user_agent_list) pageurl='http://eid.csrc.gov.cn/ipo/infoDisplay.action?pageNo='+str(i)+'&temp=&temp1=&blockType=byTime' header['User-Agent'] = random.choice(user_agent_list) tx=requests.get(pageurl,headers=header) tx.encoding=tx.apparent_encoding html = etree.HTML(tx.text) ipopub=pd.concat([ipopub,pagedataget(html)]) ipopub.index=range(0len(ipopub)+1) end = time.time() print ('一共有',len(ipopub),'家公司,'+'运行时间为',(end-start)/60,'分钟')
pd.read_html 版本
import time
start = time.time()
import pandas as pd
from pandas import DataFrame
pubtable=DataFrame(data=None,columns=['compname','publish_date','ssdbk','pubtype','pubinfo'])
for i in range(1,199):
url ='http://eid.csrc.gov.cn/ipo/infoDisplay.action?pageNo='+str(i)+'&temp=&temp1=&blockType=byTime'
data = pd.read_html(url,header=0,encoding='utf-8')[2]
pub = data.iloc[0:len(data)-1,0:-1]
pub.columns=['compname','publish_date','ssdbk','pubtype','pubinfo']
pubtable=pd.concat([pubtable,pub])
end = time.time()
print ('一共有',len(pubtable),'家公司,'+'运行时间为',(end-start)/60,'分钟')
pandas+create_engine:连接数据库,返回DataFrame类型的结果
导包
from sqlalchemy import create_engine
import pandas as pd
连接
获取连接引擎engine
engine = create_engine('mysql+pymysql://root:dong@localhost:3306/stu?charset=utf8')
sql语句
sql = 'select * from student'
sql = 'select name,sum(year(age)) as 年龄和 from student'
查询
pd.read_sql_query(sql,engine)
返回值类型:pandas.core.frame.DataFrame
engine = create_engine('dialect+driver://username:password@host:port/database')
dialect -- 数据库类型
driver -- 数据库驱动选择
username -- 数据库用户名
password -- 用户密码
host 服务器地址
port 端口
database 数据库
mysql-python
engine = create_engine('mysql+mysqldb://scott:tiger@localhost/foo')
MySQL-connector-python
engine = create_engine('mysql+mysqlconnector://scott:tiger@localhost/foo')
Oracle
create_engine('oracle://scott:tiger@127.0.0.1:1521/sidname')
from sqlalchemy import create_engine, Table, Column, Integer, /
String, MetaData, ForeignKey
import MySQLdb
#创建数据库连接
engine = create_engine("mysql+mysqldb://liuyao:liuyao@121.42.195.15:3306/liuyao", max_overflow=5)
# 获取元数据
metadata = MetaData()
# 定义表
user = Table('user', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(20)),
)
color = Table('color', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(20)),
)
#将dataframe 添加到 tmp_formidinfo 如果表存在就添加,不存在创建并添加
pd.io.sql.to_sql(DataResultDF,'tmp_formidinfo',engine, schema='liuyao', if_exists='append')
# 执行sql语句
engine.execute(
"INSERT INTO liuyao.color(id, name) VALUES ('1', 'liuyao');"
)
result = engine.execute('select * from color')
print(result.fetchall())
python连接数据库——create_engine和conn.cursor
python操作数据库的方法:
一种是导入sqlalchemy包,另一种是导入psycopg2包。
具体用法如下(此处以postgre数据库举例)
第一种:
# 导入包
from sqlalchemy import create_engine
import pandas as pd
from string import Template
# 初始化引擎
engine = create_engine('postgresql+psycopg2://' + pg_username + ':' + pg_password + '@' + pg_host + ':' + str(
pg_port) + '/' + pg_database)
query_sql = """
select * from $arg1
"""
query_sql = Template(query_sql) # template方法
df = pd.read_sql_query(query_sql .substitute(arg1=tablename),engine) # 配合pandas的方法读取数据库值
# 配合pandas的to_sql方法使用十分方便(dataframe对象直接入库)
df.to_sql(table, engine, if_exists='replace', index=False) #覆盖入库
df.to_sql(table, engine, if_exists='append', index=False) #增量入库
第二种方法,与jdbc使用极为相似
# 导入包
import psycopg2
# 初始化
conn = psycopg2.connect(database = database, user = username, password = passwd, host = host, port = port)
cursor = conn.cursor()
# 执行sql
cursor.execute(sql, values) # 与jdbc的prepareStatement极为类似,执行的是一个具体的sql语句。
cursor也能调存储过程,并且获取返回值。
1、csv的写
python自带了csv模块提供用户对csv文件进行读写操作,要对csv文件进行写操作,首先要创建一个writer对象,参考help(csv.writer),情况如下
调用writer对象的前提是:需要传入一个文件对象,然后才能在这个文件对象的基础上调用csv的写入方法writerow(写入一行)writerrow(写入多行)。写入数据的代码如下:
import csv
headers = ['class','name','sex','height','year']
rows = [
[1,'xiaoming','male',168,23],
[1,'xiaohong','female',162,22],
[2,'xiaozhang','female',163,21],
[2,'xiaoli','male',158,21]
]
with open('test.csv','w')as f:
f_csv = csv.writer(f)
f_csv.writerow(headers)
f_csv.writerows(rows)
首先定义了写入csv文件的表头、每一列的内容,然后打开一个csv文件,将文件对象作为参数传给csv.writer(),最后将表头和每一行的内容写入到csv文件中。
注意:如果打开csv文件出现空行的情况,那么需要添加一个参数 newline=”(我使用windows出现了这种情况,使用linux mint没有出现)
with open('test.csv','w',newline='')as f:
2、写入字典序列的数据
在写入字典序列类型数据的时候,需要传入两个参数,一个是文件对象——f,一个是字段名称——fieldnames.
到时候要写入表头的时候,只需要调用writerheader方法,写入一行字典系列数据调用writerrow方法,并传入相应字典参数,写入多行调用writerows
具体代码如下:
import csv
headers = ['class','name','sex','height','year']
rows = [
{'class':1,'name':'xiaoming','sex':'male','height':168,'year':23},
{'class':1,'name':'xiaohong','sex':'female','height':162,'year':22},
{'class':2,'name':'xiaozhang','sex':'female','height':163,'year':21},
{'class':2,'name':'xiaoli','sex':'male','height':158,'year':21},
]
with open('test2.csv','w',newline='')as f:
f_csv = csv.DictWriter(f,headers)
f_csv.writeheader()
f_csv.writerows(rows)
3、csv文件的读
读取csv时需要使用reader,并传如一个文件对象,而且reader返回的是一个可迭代的对象,需要使用for循环遍历,代码如下:
import csv
with open('test.csv')as f:
f_csv = csv.reader(f)
for row in f_csv:
print(row)
在上面,row是一个列表,如果想要查看固定的某列,则需要加上下标,例如我想要查看name,那么只需要改为row[1]
import csv
with open('test.csv')as f:
f_csv = csv.reader(f)
for row in f_csv:
print(row[1])
import os
import csv
path = "5255_1"
# "newline="就是说因为我们的csv文件的类型,如果不加这个东西,当我们写入东西的时候,就会出现空行
out = open("test.csv","w",newline='')
# 定义一个变量进行写入,两个参数
csv_write = csv.writer(out,dialect='excel')
for root,dirs,files in os.walk(path): #root是根文件夹,dirs是该根文件夹下的文件夹,files是该根文件夹下的文件
content=[]
content.append(root)
content.append(len(files))
print(content)
# 数据写入
csv_write.writerow(content)
1、引入模块
1 import os #获取当前工作路径 2 import pandas as pd #将数据保存至相应文件中
2、将数据保存至制定文件下
1 file = os.getcwd() + '\1.csv' #保存文件位置,即当前工作路径下的csv文件
2 data = pd.DataFrame({'a':[1, 2, 3], 'b': [4, 5, 6]}) #要保存的数据
3 data.to_csv(file, index=False) #数据写入,index=False表示不加索引
3、产生新的数据,添加至上述csv文件中已有数据的后面
1 #新数据,与data具有相同的和列与列名
2 data2 = pd.DataFrame({'a':[7, 8, 9], 'b': [1, 2, 3]})
3 #保存至file文件中,index=False表示文件中不添加索引,header=False表示不添加列名,mode='a+'表示在已有数据基础上添加新数据,并不覆盖已有数据
4 data2.to_csv(file, index=False, mode='a+', header=False)
4、将数据保存至同一excel不同sheet中
直接用pd.to_excel()指定sheet_name时,即使指定sheet_name也没用,后一次的写入数据会覆盖前一次写入的数据,解决方法如下,借助pandas中的ExcelWriter方法;
# 方法1,推荐方法
with pd.ExcelWriter('test.xlsx') as writer:
data.to_excel(writer, sheet_name='data')
data2.to_excel(writer, sheet_name='data2')
# 写法2
writer = pd.ExcelWriter('test.xlsx')
data.to_excel(writer, sheet_name='data')
data.to_excel(writer, sheet_name='data2')
writer.save()
writer.close()
语句分析:
q_table6 = pd.read_csv('dl_data.csv',encoding = "utf-8",header = 0,names = range(0,50))
功能:将原来的列索引[-25......25]替换成[0....49]。
介绍:header = 0是默认情况(即不标明,默认就是header = 0),表示以数据的第一行为列索引。
encoding = "utf-8"表明以utf-8为编码规则。
names = range(0,50))表示以[0....49]为列索引的名字
q_table6 = pd.read_csv('dl_data.csv',encoding = "utf-8",header = None,names = range(0,50))
功能:给数据添加一行列索引[0....49],header = None表示原来的数据是没有列索引的,就算你的数据里面有列索引,这时就把 原来的列索引当成了数据。
上面两个语句都会默认为数据添加行索引,即会把原来的行索引当成数据,自己再添加新的行索引[0,1,2.....]
如果不想添加新的行索引,代码如下:
q_table6 = pd.read_csv('dl_data.csv',encoding = "utf-8",index_col=0)
index_col=0表示以原有数据的第一列(索引为0)当作行索引。
df.read(file,sep,shkiprows,usecols,nrows,chunksize...)
sep=',' # 以 “,” 作为数据的分隔符
shkiprows= 10 # 跳过前十行
usecols=['column1', 'column2', 'column3'] # 读取指定列
nrows = 10 # 只取前10行
chunksize=1000 # 分块大小来读取文件(每次读取多少行),不一次性把文件数据读入内存中,而是分多次
parse_dates = ['col_name'] # 指定某行读取为日期格式
index_col = ['col_1','col_2'] # 读取指定的几列
error_bad_lines = False # 当某行数据有问题时,不报错,直接跳过,处理脏数据时使用
na_values = 'NULL' # 将NULL识别为空值
dt.to_csv('file.csv',sep='?')#使用?分隔需要保存的数据,如果不写,默认是,
dt.to_csv('file.csv',na_rep='NA') #确实值保存为NA,如果不写,默认是空
dt.to_csv('file.csv',float_format='%.2f') #保留两位小数
dt.to_csv('file.csv',columns=['name']) #保存索引列和name列
dt.to_csv('file.csv',header=0) #不保存列名
dt.to_csv('file.csv',index=0) #不保存行索引
解决python中 .to_csv() 的乱码问题
解决方法:添加参数 encoding='utf_8_sig'
df.to_csv('users.csv', encoding='utf_8_sig')