import time
import matplotlib.pyplot as plt
from datetime import *
import MetaTrader5 as mt5
import pandas as pd
import json
import numpy as np
import pytz
def 提取数据(name,start,untill):
if not mt5.initialize():
mt5.shutdown()
symbol = name
timeframe = mt5.TIMEFRAME_M1
# 多两天 现在8.26 软件8.25
tz = pytz.timezone('Europe/Helsinki')
end_time = datetime.now(tz)
start_time = end_time - timedelta(hours=start)
end_time = end_time + timedelta(hours=untill)
rates = mt5.copy_rates_range(symbol, timeframe, start_time, end_time)
mt5.shutdown()
# create 'datetime' objects in UTC time zone to avoid the implementation of a local time zone offset
result ={}
result['时间'] = []
result['high'] = []
result['low'] = []
result['k_line'] = []
x = list(range(0,len(rates)))
for i in range(len(rates)):
result['时间'].append(x[i])
result['high'].append(rates[i][2])
result['low'].append(rates[i][3])
result['k_line'].append([x[i],rates[i][2],rates[i][3]])
return result
import csv
import datetime
import time
import matplotlib.pyplot as plt#生成图要用matplotlib模块,若没有则得先安装。
import numpy as np
from datetime import datetime, timedelta
def 提取股票数据():
file_path = 'C:\\Users\\f1970\\Desktop\\缠论\\实时红线\\能源\\XAUUSD\\1.csv'
result ={}
result['时间'] = []
result['high'] = []
result['low'] = []
result['k_line'] = []
y=0
with open(file_path, 'r') as file:
reader = csv.reader(file)
for values in reader:
# values = row[0].split()
values[0].split(" ")
date_time_str = values[0][0]
time_obj = datetime.strptime(date_time_str, " %Y/%m/%d-%H:%M")
result['时间'].append(y)
result['high'].append(float(values[0][1]))
result['low'].append(float(values[0][2]))
result['k_line'].append([y,float(values[0][1]),float(values[0][2])])
y+=1
return result
def redline(high,low,k_line):
lens = len(high)
bk_line = []
# 向上
d_baohan = 1
# 确定方向,找到包含。
j = 0
比较k线 = [k_line[0][0],k_line[0][1],k_line[0][2]]
time30s = (k_line[1][0] - k_line[0][0])/2
前一个包含 = 0
while(j < (lens-2)):
# 没有包含
if((比较k线[1]<k_line[j+1][1])&(比较k线[2]<k_line[j+1][2]))|((比较k线[1]>k_line[j+1][1])&(比较k线[2]>k_line[j+1][2])):
if(前一个包含 == 0):
bk_line.append(比较k线)
if(比较k线[1]<k_line[j+1][1]):
d_baohan = 1
else:
d_baohan = 0
比较k线 = k_line[j+1].copy()
前一个包含 = 0
j += 1
# 有包含
else:
if(d_baohan == 1):
比较k线 = [比较k线[0],max(比较k线[1],k_line[j+1][1]),max(比较k线[2],k_line[j+1][2])].copy()
elif(d_baohan == 0):
比较k线 = [比较k线[0],min(比较k线[1],k_line[j+1][1]),min(比较k线[2],k_line[j+1][2])].copy()
if(前一个包含 == 1):
bk_line[-1] = 比较k线.copy()
前一个包含 = 1
j += 1
continue
if(前一个包含 == 0):
前一个包含 = 1
bk_line.append(比较k线)
j += 1
kk_line = bk_line.copy()
pointer_last = 0
mybuffer = []
# 1顶 2底
last_xingzhuang = 0
i= 0
while(i < (len(bk_line)-3)):
# 是顶
if((bk_line[i][2] < bk_line[i+1][2])&(bk_line[i+1][2] > bk_line[i+2][2])&(bk_line[i][1] < bk_line[i+1][1])&(bk_line[i+1][1] > bk_line[i+2][1])):
if(last_xingzhuang == 1):
# 更高,删除左边
if(bk_line[i+1][1]>bk_line[pointer_last][1]):
bk_line.pop(pointer_last-1)
bk_line.pop(pointer_last-1)
bk_line.pop(pointer_last-1)
mybuffer.pop(pointer_last-1)
mybuffer.pop(pointer_last-1)
mybuffer.pop(pointer_last-1)
i -= 3
pointer_last = i+1
mybuffer.append(0)
mybuffer.append(1)
mybuffer.append(0)
i += 3
continue
# 更低,删除右边
elif(bk_line[i+1][1]<=bk_line[pointer_last][1]):
bk_line.pop(i)
bk_line.pop(i)
bk_line.pop(i)
continue
elif(last_xingzhuang == 2):
# 符合的低连顶
if(((i - pointer_last) >= 4)&(bk_line[i+1][1]>max(bk_line[pointer_last-1][1],bk_line[pointer_last][1],bk_line[pointer_last+1][1]))):
pointer_last = i+1
last_xingzhuang = 1
mybuffer.append(0)
mybuffer.append(1)
mybuffer.append(0)
i += 3
continue
# 不符合,删除右边
else:
bk_line.pop(i)
bk_line.pop(i)
bk_line.pop(i)
continue
elif(last_xingzhuang == 0):
# 确定第一个顶
pointer_last = i+1
last_xingzhuang = 1
mybuffer.append(0)
mybuffer.append(1)
mybuffer.append(0)
i += 3
continue
# 是底
if((bk_line[i][2] > bk_line[i+1][2])&(bk_line[i+1][2] < bk_line[i+2][2])&(bk_line[i][1] > bk_line[i+1][1])&(bk_line[i+1][1] < bk_line[i+2][1])):
if(last_xingzhuang == 2):
# 更低,删除左边
if(bk_line[i+1][2]<bk_line[pointer_last][2]):
bk_line.pop(pointer_last-1)
bk_line.pop(pointer_last-1)
bk_line.pop(pointer_last-1)
mybuffer.pop(pointer_last-1)
mybuffer.pop(pointer_last-1)
mybuffer.pop(pointer_last-1)
i -= 3
pointer_last = i+1
mybuffer.append(0)
mybuffer.append(2)
mybuffer.append(0)
i += 3
continue
# 更高,删除右边
elif(bk_line[i+1][2]>=bk_line[pointer_last][2]):
bk_line.pop(i)
bk_line.pop(i)
bk_line.pop(i)
continue
elif(last_xingzhuang == 1):
# 符合的顶连低
if(((i - pointer_last) >= 4)&(bk_line[i+1][1]<min(bk_line[pointer_last-1][2],bk_line[pointer_last][2],bk_line[pointer_last+1][2]))):
pointer_last = i+1
last_xingzhuang = 2
mybuffer.append(0)
mybuffer.append(2)