import pymysql
import pandas as pd
import os
def connect():
# 连接数据库
host = 'localhost'
port = 3307
user = 'root'
password = 'root'
db_none = ''
charset = 'utf8'
# 创建连接
conn = pymysql.connect(host=host, port=port, user=user,
password=password, db=db_none, charset=charset)
return conn
def insert_data(db, table, df, index, dtypes):
# 插入数据
columns = df.columns.tolist()
if len(columns) != len(dtypes):
print('字段和数据类型数量不一致!')
return
conn = connect()
cursor = conn.cursor()
# 创建数据库
try:
creat_db = 'CREATE DATABASE ' + db + \
' CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci;'
cursor.execute(creat_db)
except:
pass
# print('数据库存在')
# 判断表是否存在
select_table = "select * from information_schema.TABLES where TABLE_SCHEMA='%s' and TABLE_NAME = '%s';" % (
db, table)
rows = cursor.execute(select_table)
# 不存在则创建
if rows == 0:
create_table = "CREATE TABLE " + db+"." + \
table + "(id INT AUTO_INCREMENT PRIMARY KEY "
for i in range(len(columns)):
create_table += ' , '+columns[i]+' '+dtypes[i]
create_table += ",INDEX index_name ("+index+"))"
cursor.execute(create_table)
# 写入数据
zd='' # 字段
for i in range(len(columns)):
if i==0:
zd += columns[i]
else:
zd += ' , '+columns[i]
for i in range(len(df)):
# ***插入数据时根据两个字段判断***
v1 = df['datetime'][i]
v2= df['contract'][i]
value=df.values[i]
insert_sql="INSERT INTO "+db+"."+table+" ("
insert_sql+=zd+") SELECT "
for j in range(len(value)): # 字段的值
if j==0:
insert_sql+="'"+str(value[j])+"'"
else:
insert_sql+=" , '"+str(value[j])+"'"
insert_sql+=" FROM DUAL WHERE NOT EXISTS(SELECT contract FROM "+db+"."+table+" WHERE datetime = '%s' and contract='%s')" % (v1,v2)
try:
# print(insert_sql)
# 执行sql语句
cursor.execute(insert_sql)
# print('写入成功'+str(datetime.datetime.now()))
except:
print('插入失败,语句为',insert_sql)
conn.commit()
# 关闭游标连接
cursor.close()
# 关闭数据库连接
conn.close()
def get_price_from_mysql(db,table,contract=None,start_date=None,end_date=None,freq=None):
conn = connect()
select_sql="select * from "+db+"."+table+" where 1=1 "
if contract!=None:
select_sql+=" and contract='%s' " % contract
if start_date!=None:
select_sql+=" and trading_date>='%s' " % start_date
if end_date!=None:
select_sql+=" and trading_date<='%s' " % end_date
# if freq!=None: # 该csv文件中无周期的字段
# pass
# 查询1 返回元祖格式
# cursor = conn.cursor()
# cursor.execute(select_sql)
# result = cursor.fetchall()
# cursor.close()
# 查询2 返回dataframe格式
result = pd.read_sql(select_sql,conn)
# 关闭数据库连接
conn.close()
return result
def main():
''' 查询数据 '''
# 当前地址
path = os.path.split(os.path.realpath(__file__))[0]
# 读取文件
df = pd.read_csv(path+"\A1101_20100104.csv")
#***设置索引***
index = 'contract' # 'contract,datetime'
# ***设置字段的类型***
dtypes = ['datetime', 'float', 'float', 'int', 'float',
'date', 'int', 'float', 'float', 'CHAR(200)']
insert_data('test_db', 'test_table', df, index, dtypes)
''' 查询数据 '''
result=get_price_from_mysql('test_db', 'test_table',contract='A1102',start_date='2010-01-04',end_date='2010-01-07',freq=None)
print('***查询结果***')
print(result)
if __name__ == "__main__":
main()
没有合适的资源?快使用搜索试试~ 我知道了~
温馨提示
python3.7.4版本,文件包含excel文件和py文件。 py文件中需要手动设置excel字段在mysql中的类型、index索引及写入时校验的字段。(搜索*查找对应的位置) 执行py文件,若不存在数据库及表会自动创建,并写入数据(对于指定字段重复的不会写入)
资源推荐
资源详情
资源评论
收起资源包目录
自动创建数据库,写入数据时校验.zip (2个子文件)
自动创建数据库,写入数据时校验
1.py 4KB
A1101_20100104.csv 15KB
共 2 条
- 1
月夜惹人醉
- 粉丝: 13
- 资源: 4
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功
- 1
- 2
- 3
- 4
- 5
前往页