import os
import boto3
from botocore.exceptions import ClientError
def define_job(
name,
command,
file_location,
destination_key,
trigger_name,
trigger_type,
trigger_definition={},
job_configuration={},
python_version="3",
aws_access_key=None,
aws_secret_key=None,
region=None,
role=None,
glue_script_bucket=None,
environment_prefix=None,
):
"""
Primary function to call when defining a new job in a Jupyter notebook or script. This function is designed to succeed regardless of existing
state of the system.
"""
environment_prefix_name = environment_prefix
aws_access_key_id = (
os.environ["AWS_ACCESS_KEY"] if aws_access_key is None else aws_access_key
)
aws_secret_key_id = (
os.environ["AWS_SECRET_KEY"] if aws_secret_key is None else aws_secret_key
)
region_name = os.environ["AWS_REGION"] if region is None else region
role_arn = os.environ["GLUE_ROLE"] if role is None else role
glue_script_bucket_name = (
os.environ["GLUE_SCRIPT_BUCKET"]
if glue_script_bucket is None
else glue_script_bucket
)
environment_prefix_name = (
os.environ["ENVIRONMENT_PREFIX"]
if environment_prefix is None
else environment_prefix
)
glueclient = boto3.client(
"glue",
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_key_id,
region_name=region_name,
)
s3client = boto3.client(
"s3",
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_key_id,
region_name=region_name,
)
job_name = f"{environment_prefix_name}{name}"
trigger_environment_name = f"{environment_prefix_name}{trigger_name}"
glue_script_bucket_name = f"{environment_prefix_name}{glue_script_bucket_name}"
new_job = is_new_job(glueclient, job_name)
response_job = handle_job(
glueclient,
new_job,
job_name,
command,
destination_key,
role_arn,
python_version,
job_configuration,
)
response_script = handle_script_upload(
s3client,
file_location,
destination_key,
glue_script_bucket_name,
)
response_trigger = handle_trigger(
glueclient, job_name, trigger_environment_name, trigger_type, trigger_definition
)
return {"job": response_job, "script": response_script, "trigger": response_trigger}
def handle_job(
glueclient,
new_job,
name,
command,
destination_key,
role_arn,
python_version,
job_configuration={},
):
"""
Sets up the job in AWS Glue or updates it if it already exists.
"""
command_object = {
"Name": command,
"ScriptLocation": destination_key,
"PythonVersion": python_version,
}
if new_job:
response = glueclient.create_job(
Name=name,
Role=role_arn,
Command=command_object,
)
job_configuration["Command"] = command_object
job_configuration["Role"] = (
role_arn if "Role" not in job_configuration else job_configuration["Role"]
)
response = glueclient.update_job(JobName=name, JobUpdate=job_configuration)
return response
def handle_script_upload(s3client, file_location, destination_key, glue_script_bucket):
"""
Uploads the Glue Job script to the S3 bucket, and creates the necessary environment bucket if it doesn't exist.
"""
glue_script_bucket_sanitized = glue_script_bucket.replace("_", "-")
try:
s3client.create_bucket(Bucket=glue_script_bucket_sanitized)
except ClientError as e:
pass
response = s3client.upload_file(
file_location, glue_script_bucket_sanitized, destination_key
)
return response
def handle_trigger(
glueclient, job_name, trigger_name, trigger_type, trigger_definition
):
"""
Sets up a trigger in AWS Glue or updates it if it already exists.
"""
new_trigger = is_new_trigger(glueclient, trigger_name)
schedule = None if trigger_type != "SCHEDULED" else trigger_definition["Schedule"]
predicate = {} if trigger_type != "CONDITIONAL" else trigger_definition["Predicate"]
event_batching = (
{"BatchSize": 1}
if trigger_type != "EVENT"
else trigger_definition["EventBatchingCondition"]
)
actions = (
[{"JobName": job_name}]
if "Actions" not in trigger_definition
else trigger_definition["Actions"]
)
trigger_definition["Actions"] = actions
if new_trigger:
response = glueclient.create_trigger(
Name=trigger_name,
Type=trigger_type,
Actions=[{"JobName": job_name}],
Schedule=schedule,
Predicate=predicate,
EventBatchingCondition=event_batching,
)
response = glueclient.update_trigger(
Name=trigger_name, TriggerUpdate=trigger_definition
)
return response
def is_new_job(glueclient, name):
"""
Test to determine if the AWS Glue job name already exists.
"""
try:
glueclient.get_job(JobName=name)
return False
except ClientError as e:
return True
def is_new_trigger(glueclient, name):
"""
Test to determine if the AWS Glue Trigger name already exists.
"""
try:
glueclient.get_trigger(Name=name)
return False
except ClientError as e:
return True
挣扎的蓝藻
- 粉丝: 14w+
- 资源: 15万+
最新资源
- 基于matlab车牌识别,bp和模板匹配法的比较,这一套模板匹配法我给调好了都可以随便添加图片,目前有四十多张,这个是我自己做的,所以可讲解,程序不重要,重要的是讲解不是嘛 bp也就七八张,这个需要
- Java毕设项目:基于spring+mybatis+maven+mysql实现的汽车站车辆运管管理系统【含源码+数据库+毕业论文】
- 基于Matlab simulink的电力电子基础仿真 1.单相半波可控整流电路电阻负载 2. 单相半波可控整流电阻电感负载 3. 单相半波可控整流电路电阻电感负载带续流二极管4. 单相桥式全控整流电路
- 车牌识别matlab 操作简单到手直接用 可识别50多张图片 程序有注释 灰度化二值化定位分割识别等等 包含一份文档 需要的直接拿 保证可以运行 基于matlab的车牌识别,这是先读入图片,在裁剪
- matlab simulink 风电调频,双馈风机,四机两区系统,对风机附加惯性控制,下垂控制,风电渗透率为10%,故障设置为200MW负荷扰动,童叟无欺 用phasor模型,仿真只需要20秒 仿真速
- matlab 智能优化算法 基于自私羊群优化算法求解单目标优化问题
- Java毕设项目:基于spring+mybatis+maven+mysql实现的社区管理系统【含源码+数据库+毕业论文】
- 欧姆龙CP1H-XA40DT-D功能完好,可学习模拟量.通讯.脉冲等,开发机配台达通讯程序
- 电力系统机组调度 考虑了源荷不确定性 求解:matlab+yalmip+gurobi作为求解器) 内容:考虑源荷两侧不确定性的含风电的低碳调度,引入模糊机会约束,程序包括储能、风光、火电机组及水电机组
- Java毕设项目:基于spring+mybatis+maven+mysql实现的体育馆管理系统【含源码+数据库+毕业论文】
- Java毕设项目:基于spring+mybatis+maven+mysql实现的传统文化网站管理系统【含源码+数据库+毕业论文】
- 预弯成型切断机(sw18可编辑+工程图)全套设计资料100%好用.zip
- Java毕设项目:基于spring+mybatis+maven+mysql实现的台球厅管理系统台球管理系统【含源码+数据库+毕业论文】
- Java毕设项目:基于spring+mybatis+maven+mysql实现的培训学校教学管理系统【含源码+数据库+毕业论文】
- LabVIEW基于欧姆龙PLC串口通讯教学,
- 风电场可靠性评估序贯蒙特卡洛 标价即卖价,不议价,不,程序是可以运行的 (非完全复现) 仿真平台:MATLAB 拿后前可以看运行结果,参考文档见图,出不 不 出不 不 联系留邮箱,留邮箱
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈