from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
import numpy as np
import sys
from os.path import abspath, join, dirname
sys.path.append(abspath(join(dirname(__file__), "../../")))
from src.models import *
import cplex
import json
from time import time,strftime
#设置时间段的数量
TIME_NUM = 96
DATABASE_URL = "mysql+mysqlconnector://h_spot_manager:Hsp584785@139.198.17.229:33081/app_spot"
class MarketSimulation:
def __init__(self, TIME_NUM,DATABASE_URL):
'''
:param TIME_NUM: 时间段的数目
:param DATABASE_URL: 数据库的URL
'''
self.TIME_NUM = TIME_NUM #时间段的数目(96)
self.DATABASE_URL = DATABASE_URL #数据库URL
self.start_time = 0 #开始运行时的时间
self.end_time = 0 #运行完毕时的时间
self.type = 0 #用于识别是在进行日前出清还是实时出清
self.type_to_string = ['日前市场:','实时市场:'] #打印时通过self.type对应输出字符
self.case = None #如果不为None,则视为在进行场景仿真
'''下面三个成员用于保存从数据库中读取的发电机、线路、负荷数据'''
self.generator_data = []
self.line_data = []
self.load_data = []
'''=============下面的成员用于控制打印=================================='''
self.log_status = {} #用于控制各种输出是否执行的dict
self.log_status['total'] = True #若为False,则静默执行
self.log_status['add_con'] = True #控制是否显示添加约束的进度
self.log_status['node_price'] = False #控制是否展示节点电价
self.log_status['gen_status'] = False #控制是否展示SCUC的计算结果,即机组开停组合
self.log_status['show_time'] = True #控制是否展示总耗时
self.log_status['upload'] = True #控制是否展示数据库的写入情况
self.log_status['process'] = False #控制是否展示规划问题的求解过程
def solve(self):
'''调用后开始进行日前SCUC+SCED、实时SCUC+SCED出清,并对数据库的相应表进行新增,若主键已存在,则不新增'''
self.start_time = time()
TIME_NUM = self.TIME_NUM
self.load_data, self.line_data, self.generator_data = self._read_datebase()
GSDF = self._calculate_GSDF(self.line_data)
for type in [0,1]:#0表示日前、1表示实时
self.type = type
self._set_state(type=type)
gen_status = self._SCUC(GSDF=GSDF)
self._SCED(GSDF=GSDF,gen_status=gen_status)
self.end_time = time()
if self.log_status['show_time'] and self.log_status['total']:
self.show_cost_time()
def solve_case(self,case_num):
'''调用时输入case的编号,调用后进行日前SCUC+SCED、实时SCUC+SCED出清,
并对数据库的相应表进行新增,若主键已存在,则不新增'''
self.start_time = time()
self.load_data, self.line_data, self.generator_data = self._read_case_conf(case_num)
TIME_NUM = self.TIME_NUM
GSDF = self._calculate_GSDF(self.line_data)
self.case = case_num
for type in [0,1]:#0表示日前、1表示实时
self.type = type
self._set_state(type=type)
gen_status = self._SCUC(GSDF=GSDF)
self._SCED(GSDF=GSDF,gen_status=gen_status)
self.end_time = time()
if self.log_status['show_time'] and self.log_status['total']:
self.show_cost_time()
def show_cost_time(self):
'''用于展示耗时'''
print('总耗费的时间为%.2f秒:'%(self.end_time-self.start_time))
def set_log(self,name='total', status=True):
'''用于设置各种print是否执行,具体看本类的__init__()'''
self.log_status[name] = bool(status)
pass
'''下面的方法都是不需要在外部调用'''
def _read_datebase(self):
'''读取发电机、线路、负荷数据,根据type不同,决定是实时(type=1)还是日前(type=0)'''
eng = create_engine(self.DATABASE_URL)
Session = sessionmaker(bind=eng)
session = Session()
# 从表st_node_load 读取负荷数据
load_data = []
for instance in session.query(Node):
load_data.append(Load(instance.get_ID()))
for instance in session.query(Node_Load):
load_data[instance.get_node_num() - 1].update_load(instance.get_serial_num(), instance.get_load())
# 从表base_line 读取线路数据,state不为1的线路将被忽视,最终输出线路数据时也不会包含被忽视的线路
line_data = []
for instance in session.query(Edge).filter(Edge.state==1):
line_data.append(instance)
# 从表base_gen 读取读取发电机数据
generator_data = []
for instance in session.query(Generator):
generator_data.append(instance)
generator_data[-1].prepare()
return load_data, line_data, generator_data
def _set_state(self,type):
'''为了区分日前、实时,对机组和负荷的type进行设置'''
if type==1:
for gen in self.generator_data:
gen.type = type
for load in self.load_data:
load.type = type
load.update_load_real()
def _calculate_GSDF(self,line_data):
'''输入类Edge组成的list,返回值为潮流分布转移矩阵GSDF'''
def get_admittance_matrix(row_list):
'''返回一个list,索引0为导纳矩阵。1为传输线路数量'''
nrows = len(row_list)
# 求取母线数目
bus_list = []
for row_num in range(nrows):
bus_list.append(row_list[row_num].start_node_num - 1)
bus_list.append(row_list[row_num].end_node_num - 1)
busnum = max(bus_list)
# 计算自导纳
selfconduct = []
for bus_num in range(int(busnum + 1)):
y = 0
for row_num in range(nrows):
if bus_num == row_list[row_num].start_node_num - 1 or bus_num == row_list[row_num].end_node_num - 1:
y = y + (1 / row_list[row_num].rea)
selfconduct.append(y)
# print(selfconduct)
# 计算互导纳并形成导纳矩阵
huconduct = []
for bus_num in range(int(busnum + 1)):
singleline = []
for b in range(int(busnum + 1)):
# 判断a!=b则计算互阻抗
if bus_num != b:
y = 0
for row_num in range(nrows):
if ((bus_num == row_list[row_num].start_node_num - 1 and b == row_list[
row_num].end_node_num - 1) \
or (bus_num == row_list[row_num].end_node_num - 1 and b == row_list[
row_num].start_node_num - 1)) \
and (row_list[row_num].end_node_num - 1 != row_list[row_num].start_node_num - 1):
y = y + (1 / row_list[row_num].rea)
singleline.append(-y)
# 判断a=b添加自阻抗
if bus_num == b:
singleline.append(selfconduct[bus_num])
huconduct.append(singleline)
# 输出生成的导纳矩阵
return [np.mat(huconduct), nrows]
matrix_list = get_admittance_matrix(line_data)
matrix = matrix_list[0]
line_num = matrix_list[1]
X_matrix = matrix.I.tolist() #阻抗矩阵
GSDF = []
for i in range(line_num):
GSDF.append([])
for j in range(118):
bus_num_start = i