python3实现从实现从kafka获取数据获取数据,并解析为并解析为json格式格式,写入到写入到mysql中中
今天小编就为大家分享一篇python3实现从kafka获取数据,并解析为json格式,写入到mysql中,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
项目需求:将kafka解析来的日志获取到数据库的变更记录,按照订单的级别和订单明细级别写入数据库,一条订单的所有信息包括各种维度信息均保存在一条json中,写入mysql5.7中。
配置信息:配置信息:
[Global]
kafka_server=xxxxxxxxxxx:9092
kafka_topic=mes
consumer_group=test100
passwd = tracking
port = 3306
host = xxxxxxxxxx
user = track
schema = track
dd_socket =
dd_host = xxxxxxxxxxxx
dd_port = 3306
dd_user = xxxxxxxxx
dd_passwd = xxxxxxxx
代码又长又丑,半吊子,只完成了面向过程的编程,没做到对象,将就看,有问题可以联系我
代码:
#encoding=utf-8
import datetime
import configparser
import re
import pymysql
from vertica_python import connect
import vertica_python
import json
from confluent_kafka import Consumer, KafkaError
import csv
import logging
import os
import time
import signal
import sys
#写日志
logging.basicConfig(filename=os.path.join(os.getcwd(), 'log_tracking.txt'), level=logging.WARN, filemode='a',format='%(asctime)s - %(levelname)s: %(message)s')
def writeErrorLog(errSrc, errType, errMsg):
try:
v_log_file = 'err_tracking.log';
v_file = open(v_log_file, 'a')
v_file.write(datetime.datetime.strftime(datetime.datetime.now(),"%Y-%m-%d %H:%M:%S") + " - " + errSrc + " - " + errType +" : " + errMsg + '')
v_file.flush()
except Exception as data:
v_err_file = open('err_tracking.log', 'a')
v_err_file.write(str(data) + '')
v_err_file.write(datetime.datetime.strftime(datetime.datetime.now(), "%Y-%m-%d %H:%M:%S") + " - " + errSrc + " - " + errType + " : " + errMsg + '')
v_err_file.flush()
v_err_file.close()
finally:
v_file.close()
class RH_Consumer:
#读取配置文件的配置信息,并初始化一些类需要的变量
def __init__(self):
self.config = configparser.ConfigParser()
self.config.read('config.ini')
self.host = self.config.get('Global', 'host')
self.user = self.config.get('Global', 'user')
self.passwd = self.config.get('Global', 'passwd')
self.schema = self.config.get('Global', 'schema')
self.port = int(self.config.get('Global', 'port'))
self.kafka_server = self.config.get('Global', 'kafka_server')
self.kafka_topic = self.config.get('Global', 'kafka_topic')
self.consumer_group = self.config.get('Global', 'consumer_group')
self.dd_host = self.config.get('Global', 'dd_host')
self.dd_user = self.config.get('Global', 'dd_user')
self.dd_passwd = self.config.get('Global', 'dd_passwd')
self.dd_port = int(self.config.get('Global', 'dd_port'))
self.dd_socket = self.config.get('Global', 'dd_socket')
self.operation_time = datetime.datetime.now()
self.stop_flag = 0
self.src_table_name = []
self.__init_db()
self.__init_mes_db()
self._get_all_src_table()
#连接写入目标数据库
def __init_db(self):
try:
self.conn_info = {'host': self.host,'port': self.port,'user': self.user,'password': self.passwd,'db': 'tracking'}
self.mysql_db = pymysql.connect(**self.conn_info, charset="utf8" )
self.mysql_cur = self.mysql_db.cursor()
except Exception as data:
writeErrorLog('__init_db', 'Error', str(data))
#连接生产数据库,用于获取相关维度信息
def __init_mes_db(self):
try:
self.mes_mysql_db = pymysql.connect(host=self.dd_host, user=self.dd_user, passwd=self.dd_passwd,port=self.dd_port, unix_socket=self.dd_socket, charset="utf8")
self.mes_mysql_cur = self.mes_mysql_db.cursor()
except Exception as data:
writeErrorLog('__init_db', 'Error', str(data))
#关闭数据库
def _release_db(self):
self.mysql_cur.close()
self.mysql_db.close()
self.mes_mysql_cur.close()
self.mes_mysql_db.close()
#获取所有的配置表信息(需要获取的表)
def _get_all_src_table(self):
try:
# 获取table的信息
select_src_table_names = "select distinct src_table_name from tracking.tracking_table_mapping_rule"
self.mysql_cur.execute(select_src_table_names)
rows = self.mysql_cur.fetchall()
for item in rows:
self.src_table_name.append(item[0])
return self.src_table_name
except Exception as data:
writeErrorLog('_get_all_src_table', 'Error', str(data))
logging.error('_get_all_src_table: ' + str(data))
#获取src表的目标表信息
def _get_tgt_table_name(self,table_name,table_schema):
try:
# 获取table的信息(table_name是schema|tablename)
select_tgt_table_names = "select distinct tgt_table_name from tracking.tracking_table_mapping_rule where src_table_name = '%s' and src_table_schema = '%s'" %(table_name,table_schema)
self.mysql_cur.execute(select_tgt_table_names)
rows = self.mysql_cur.fetchall()
tgt_table_names=[]
评论0
最新资源