from __future__ import print_function
import json
import logging
import os
import time
from datetime import datetime, timedelta
from tempfile import TemporaryDirectory
from airflow import models
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.sensors.gcs_sensor import GoogleCloudStorageObjectSensor
from airflow.operators import python_operator
from airflow.operators.email_operator import EmailOperator
from airflow.operators.python_operator import PythonOperator
from google.cloud import bigquery
from google.cloud.bigquery import TimePartitioning, SchemaField
from ethereumetl_airflow.bigquery_utils import submit_bigquery_job
from ethereumetl_airflow.build_export_dag import upload_to_gcs
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
def build_load_dag(
dag_id,
output_bucket,
destination_dataset_project_id,
chain='ethereum',
notification_emails=None,
load_start_date=datetime(2018, 7, 1),
schedule_interval='0 0 * * *',
load_all_partitions=True
):
# The following datasets must be created in BigQuery:
# - crypto_{chain}_raw
# - crypto_{chain}_temp
# - crypto_{chain}
dataset_name = f'crypto_{chain}'
dataset_name_raw = f'crypto_{chain}_raw'
dataset_name_temp = f'crypto_{chain}_temp'
if not destination_dataset_project_id:
raise ValueError('destination_dataset_project_id is required')
environment = {
'dataset_name': dataset_name,
'dataset_name_raw': dataset_name_raw,
'dataset_name_temp': dataset_name_temp,
'destination_dataset_project_id': destination_dataset_project_id,
'load_all_partitions': load_all_partitions
}
def read_bigquery_schema_from_file(filepath):
result = []
file_content = read_file(filepath)
json_content = json.loads(file_content)
for field in json_content:
result.append(bigquery.SchemaField(
name=field.get('name'),
field_type=field.get('type', 'STRING'),
mode=field.get('mode', 'NULLABLE'),
description=field.get('description')))
return result
def read_file(filepath):
with open(filepath) as file_handle:
content = file_handle.read()
return content
default_dag_args = {
'depends_on_past': False,
'start_date': load_start_date,
'email_on_failure': True,
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(minutes=5)
}
if notification_emails and len(notification_emails) > 0:
default_dag_args['email'] = [email.strip() for email in notification_emails.split(',')]
# Define a DAG (directed acyclic graph) of tasks.
dag = models.DAG(
dag_id,
catchup=False,
schedule_interval=schedule_interval,
default_args=default_dag_args)
dags_folder = os.environ.get('DAGS_FOLDER', '/home/airflow/gcs/dags')
def add_load_tasks(task, file_format, allow_quoted_newlines=False):
wait_sensor = GoogleCloudStorageObjectSensor(
task_id='wait_latest_{task}'.format(task=task),
timeout=60 * 60,
poke_interval=60,
bucket=output_bucket,
object='export/{task}/block_date={datestamp}/{task}.{file_format}'.format(
task=task, datestamp='{{ds}}', file_format=file_format),
dag=dag
)
def load_task(ds, **kwargs):
client = bigquery.Client()
job_config = bigquery.LoadJobConfig()
schema_path = os.path.join(dags_folder, 'resources/stages/raw/schemas/{task}.json'.format(task=task))
schema = read_bigquery_schema_from_file(schema_path)
schema = adjust_schema_for_kovan(dag_id, task, schema)
job_config.schema = schema
job_config.source_format = bigquery.SourceFormat.CSV if file_format == 'csv' else bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
if file_format == 'csv':
job_config.skip_leading_rows = 1
job_config.write_disposition = 'WRITE_TRUNCATE'
job_config.allow_quoted_newlines = allow_quoted_newlines
job_config.ignore_unknown_values = True
export_location_uri = 'gs://{bucket}/export'.format(bucket=output_bucket)
if load_all_partitions:
uri = '{export_location_uri}/{task}/*.{file_format}'.format(
export_location_uri=export_location_uri, task=task, file_format=file_format)
else:
uri = '{export_location_uri}/{task}/block_date={ds}/*.{file_format}'.format(
export_location_uri=export_location_uri, task=task, ds=ds, file_format=file_format)
table_ref = client.dataset(dataset_name_raw).table(task)
load_job = client.load_table_from_uri(uri, table_ref, job_config=job_config)
submit_bigquery_job(load_job, job_config)
assert load_job.state == 'DONE'
load_operator = PythonOperator(
task_id='load_{task}'.format(task=task),
python_callable=load_task,
provide_context=True,
execution_timeout=timedelta(minutes=30),
dag=dag
)
wait_sensor >> load_operator
return load_operator
def add_enrich_tasks(task, time_partitioning_field='block_timestamp', dependencies=None, always_load_all_partitions=False):
def enrich_task(ds, **kwargs):
template_context = kwargs.copy()
template_context['ds'] = ds
template_context['params'] = environment
client = bigquery.Client()
# Need to use a temporary table because bq query sets field modes to NULLABLE and descriptions to null
# when writeDisposition is WRITE_TRUNCATE
# Create a temporary table
temp_table_name = '{task}_{milliseconds}'.format(task=task, milliseconds=int(round(time.time() * 1000)))
temp_table_ref = client.dataset(dataset_name_temp).table(temp_table_name)
schema_path = os.path.join(dags_folder, 'resources/stages/enrich/schemas/{task}.json'.format(task=task))
schema = read_bigquery_schema_from_file(schema_path)
schema = adjust_schema_for_kovan(dag_id, task, schema)
table = bigquery.Table(temp_table_ref, schema=schema)
description_path = os.path.join(
dags_folder, 'resources/stages/enrich/descriptions/{task}.txt'.format(task=task))
table.description = read_file(description_path)
if time_partitioning_field is not None:
table.time_partitioning = TimePartitioning(field=time_partitioning_field)
logging.info('Creating table: ' + json.dumps(table.to_api_repr()))
table = client.create_table(table)
assert table.table_id == temp_table_name
# Query from raw to temporary table
query_job_config = bigquery.QueryJobConfig()
# Finishes faster, query limit for concurrent interactive queries is 50
query_job_config.priority = bigquery.QueryPriority.INTERACTIVE
query_job_config.destination = temp_table_ref
sql_path = os.path.join(dags_folder, 'resources/stages/enrich/sqls/{task}.sql'.format(task=task))
sql_template = read_file(sql_path)
sql = kwargs['task'].render_template(sql_template, template_context)
print('Enrichment sql:')
print(sql)
query_job = client.query(sql, location='US', job_config=query_job_config)
submit_bigquery_job(query_job, query_job_config)
assert query_job.state == 'DONE'
if load_all_partitions or always_load_all_partitions:
# Copy temporary table to destination
co
没有合适的资源?快使用搜索试试~ 我知道了~
用于导出、加载和解析以太坊区块链数据的 Airflow DAG_python_代码_下载
共3353个文件
json:3213个
sql:87个
py:34个
1.该资源内容由用户上传,如若侵权请联系客服进行举报
2.虚拟产品一经售出概不退款(资源遇到问题,请及时私信上传者)
2.虚拟产品一经售出概不退款(资源遇到问题,请及时私信上传者)
版权申诉
0 下载量 20 浏览量
2022-07-02
17:18:13
上传
评论
收藏 2.33MB ZIP 举报
温馨提示
使用 Google Cloud Composer 设置 Airflow DAG 创建 BigQuery 数据集 登录 BigQuery https://bigquery.cloud.google.com/ 创建名为crypto_ethereum, crypto_ethereum_raw, 的新数据集crypto_ethereum_temp 创建 Google Cloud Storage 存储分区 创建一个新的 Google Storage 存储桶来存储导出的文件https://console.cloud.google.com/storage/browser 创建 Google Cloud Composer 环境 创建一个新的 Cloud Composer 环境: 更多详情、使用方法,请下载后阅读README.md文件
资源推荐
资源详情
资源评论
收起资源包目录
用于导出、加载和解析以太坊区块链数据的 Airflow DAG_python_代码_下载 (3353个子文件)
token_amendments.csv 5KB
.gitignore 742B
SeaportV11_call_fulfillAvailableAdvancedOrders.json 14KB
X2Y2r1_call_run1.json 14KB
Seaport_call_fulfillAvailableAdvancedOrders.json 14KB
SeaportV11_call_matchAdvancedOrders.json 13KB
Seaport_call_matchAdvancedOrders.json 13KB
X2Y2r1_call_run.json 13KB
SoloMargin_event_LogTrade.json 12KB
SoloMargin_event_LogLiquidate.json 12KB
SeaportV11_call_fulfillAvailableOrders.json 12KB
Seaport_call_fulfillAvailableOrders.json 12KB
SeaportV11_call_matchOrders.json 11KB
Seaport_call_matchOrders.json 11KB
SeaportV11_call_fulfillAdvancedOrder.json 10KB
X2Y2r1_event_EvInventory.json 10KB
Seaport_call_fulfillAdvancedOrder.json 10KB
SoloMargin_event_LogVaporize.json 10KB
PegSwapThreeAssets_event_RemoveLiquidityImbalance.json 10KB
PegSwapFourAssets_event_RemoveLiquidityImbalance.json 10KB
PegSwapTwoAssets_event_RemoveLiquidityImbalance.json 10KB
PegSwap_event_TokenExchangeUnderlying.json 10KB
PegSwapThreeAssets_event_AddLiquidity.json 10KB
PegSwapFourAssets_event_AddLiquidity.json 10KB
PegSwapTwoAssets_event_AddLiquidity.json 10KB
PegSwap_event_TokenExchange.json 10KB
LooksRareExchange_call_matchAskWithTakerBidUsingETHAndWETH.json 9KB
LooksRareExchange_call_matchBidWithTakerAsk.json 9KB
LooksRareExchange_call_matchAskWithTakerBid.json 9KB
PegSwapThreeAssets_event_RemoveLiquidity.json 9KB
PegSwapFourAssets_event_RemoveLiquidity.json 9KB
PegSwapTwoAssets_event_RemoveLiquidity.json 9KB
PegSwap_event_RemoveLiquidityOne.json 9KB
SeaportV11_call_fulfillOrder.json 8KB
Seaport_call_fulfillOrder.json 8KB
SeaportV11_call_validate.json 7KB
Seaport_call_validate.json 7KB
SoloMargin_event_LogTransfer.json 7KB
SoloMargin_event_LogSell.json 7KB
SoloMargin_event_LogBuy.json 7KB
SeaportV11_call_getOrderHash.json 6KB
Seaport_call_getOrderHash.json 6KB
SeaportV11_call_cancel.json 6KB
Seaport_call_cancel.json 6KB
CryptoSwap_event_TokenExchange.json 6KB
CryptoSwap_event_RemoveLiquidityOne.json 5KB
CryptoSwapThreeAssets_event_AddLiquidity.json 5KB
CryptoSwapTwoAssets_event_AddLiquidity.json 5KB
SeaportV11_call_fulfillBasicOrder.json 5KB
Seaport_call_fulfillBasicOrder.json 5KB
Exchange_v3_0_event_Fill.json 5KB
CryptoSwapThreeAssets_event_RemoveLiquidity.json 5KB
CryptoSwapTwoAssets_event_RemoveLiquidity.json 5KB
Swap_event_Swap.json 5KB
ExchangeV2_event_Match.json 5KB
default_schema.json 5KB
AaveGovernanceV2_event_ProposalCreated.json 4KB
V2_Vault_call_batchSwap.json 4KB
SeaportV11_event_OrderFulfilled.json 4KB
Seaport_event_OrderFulfilled.json 4KB
Exchange_v4_0_event_LimitOrderFilled.json 4KB
AsksV1_1_event_ExchangeExecuted.json 4KB
WyvernExchangeWithBulkCancellations_event_OrderApprovedPartTwo.json 4KB
WyvernExchange_event_OrderApprovedPartTwo.json 4KB
Exchange_v4_0_event_ERC1155OrderFilled.json 4KB
WyvernExchangeWithBulkCancellations_event_OrderApprovedPartOne.json 4KB
WyvernExchange_event_OrderApprovedPartOne.json 4KB
GraphTokenLockManager_event_TokenLockCreated.json 4KB
KyberNetwork_v5_event_KyberTrade.json 4KB
Clipper_event_Take.json 4KB
Clipper_event_Kick.json 4KB
StableDebtToken_v2_event_Mint.json 4KB
SoloMargin_event_LogDeposit.json 4KB
SoloMargin_event_LogWithdraw.json 4KB
MoneyMarket_event_BorrowLiquidated.json 4KB
MoneyMarket_event_BorrowLiquidated.json 4KB
V2_Vault_call_swap.json 4KB
ExchangeV2_event_Transfer.json 4KB
Exchange_v4_0_event_ERC721OrderFilled.json 4KB
Exchange_v2_1_event_Fill.json 4KB
Exchange_v2_0_event_Fill.json 4KB
EulerBeats_event_PrintMinted.json 4KB
Exchange_v1_0_event_LogFill.json 4KB
LooksRareExchange_event_TakerAsk.json 4KB
LooksRareExchange_event_TakerBid.json 4KB
ExchangeV1_event_Buy.json 4KB
AaveProtoGovernance_event_ProposalCreated.json 4KB
SwapOld_event_Swap.json 4KB
FundDeployer_event_NewFundCreated.json 4KB
GovernorAlpha_event_ProposalCreated.json 4KB
ExchangeProxy2_event_multihopBatchSwapExactIn.json 4KB
ExchangeProxy2_call_multihopBatchSwapExactIn.json 4KB
ExchangeProxy2_event_batchSwapExactIn.json 4KB
ExchangeProxy2_call_batchSwapExactIn.json 4KB
SAFEEngine_event_TransferSAFECollateralAndDebt.json 4KB
SAFEEngine_event_ModifySAFECollateralization.json 3KB
LendingPool_event_LiquidationCall.json 3KB
ExchangeV2_event_Cancel.json 3KB
LendingPool_event_Borrow.json 3KB
AuctionHouse_event_AuctionEnded.json 3KB
共 3353 条
- 1
- 2
- 3
- 4
- 5
- 6
- 34
资源评论
快撑死的鱼
- 粉丝: 1w+
- 资源: 9149
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功