zoukankan      html  css  js  c++  java
  • 通过AWS EMR降低集群计算成本

    本文首发于:行者AI

    AWS EMR是一个计算集群。可以通过ta创建自定义配置的虚拟机,并自动安装所需计算框架(Spark,Hadoop,Hive等),以便用来进行大数据计算。

    1. 项目背景

    公司目前有一个项目,通过爬虫收集数据,离线计算得到用户画像,并将最终结果写入rds,通过api向外展示数据。

    2. 架构演进

    2.1 技术栈

    • 计算框架 Spark
    • 调度框架 Airflow
    • 数据存储 Hadoop,Mysql
    • 数仓工具 Hive,Presto
    • 辅助工具 Zepplin
    • 脚本语言 Java,Scala,Python

    2.2 第一版

    环境

    我们在某云厂商开了6台虚拟器(4核8G),spark on yarn模式运行,其中1台作为主节点,运行hadoop主节点和airflow调度程序,其余作为数据节点。

    计算过程

    • 通过Spark Streaming将数据落地到Hadoop
    • Airflow定时向主节点通过Spark-submit方式提交命令
    • Spark计算后将最终结果写入Mysql
    • 平时开发人员可以在Zepplin进行查询

    效果

    计算流程可以正常进行

    思考

    通过一段时间的观察分析,我们发现

    • 大部分计算任务都能在较短时间内完成
    • 机器每天闲置时间很长
    • 业务没有很高的实时性要求
    • 高配置虚拟器成本很高

    结论

    基于现状,我们希望能有个即开即用的系统,就像电脑一样,要用就打开,用完就关闭。经过调研,最终选择了AWS的EMR。

    2.3 第二版

    环境

    在将系统迁移到AWS EMR之后,在AWS上开了一台虚拟器(1核2G)运行Airflow和Kinesis

    这台虚拟器需要一直运行,但Airflow本身不需要高配置

    计算过程

    • 通过Kinesis将数据落到S3
    • Airflow定时发起任务
      • 发起创建EMR请求

        可自定义机器配置,要安装的计算框架,也可覆盖框架配置。可通过Python脚本检测集群是否创建成功

      • 提交计算任务

    • 关闭集群

    效果

    计算流程可以正常进行,但不需要长开机器了,只需要一台低配来触发定时任务即可

    思考

    通过一段时间的观察

    • EMR费用比起虚拟器,确实便宜很多
    • 可以通过console台查看集群状态,控制集群开关
    • 不方便的地方,平时要查看Hadoop的数据,需要自己写脚本拉取,不能使用辅助工具了

    Talk is cheap, show me the code

    准备工作

    • 注册AWS账号,登录

    • 开通EMR,S3

      开通S3的目的是为了持久化数据,因为EMR集群本身不带额外硬盘,需要外部介质储存

    • 开通AWS内网可访问的Mysql

      如果不用Hive,可跳过这一步,同理,需要外部介质储存Hive的数据结构

    • 准备创建EMR集群的脚本

      这里有个坑,开始我们使用的AWS SDK来做这件事,但无法自定义计算框架配置(应该是BUG),最初我们通过修改SDK源码解决了这个问题,但后来发现基本没用到SDK其他功能时,我们将这部分代码提成了单独的文件,由于使用了Airflow进行调度,所以决定用了Python

    • 编写Spark任务,打包上传至S3

    EMR LIB

    # coding: UTF-8
    import boto3, json, requests, requests
    from datetime import datetime
    
    def get_region():
        # 这个地址不用改
        r = requests.get("http://169.254.169.254/latest/dynamic/instance-identity/document")
        response_json = r.json()
        return response_json.get('region')
    
    def client(region_name):
        global emr
        emr = boto3.client('emr', region_name=region_name)
    
    # 创建EMR
    def create_cluster(name):
        param = {
            # 修改需要的框架
            "Applications":[{
                "Name":"Hadoop"
            },{
                "Name":"Hive"
            },{
                "Name":"Spark"
            }],
            # 这里的名字会显示到控制台
            "Name":name,
            "ServiceRole":"EMR_DefaultRole",
            "Tags":[],
            "ReleaseLabel":"emr-5.26.0",
            "Instances":{
                "TerminationProtected":False,
                "EmrManagedMasterSecurityGroup":"sg-0085fba9c3a6818f5",
                "InstanceGroups":[{
                    "InstanceCount":1,
                    "Name":"主实例组 - 1",
                    "InstanceRole":"MASTER",
                    "EbsConfiguration":{
                        "EbsBlockDeviceConfigs":[{
                            "VolumeSpecification":{
                                "SizeInGB":32,
                                "VolumeType":"gp2"
                            },
                            "VolumesPerInstance":1
                        }]
                    },
                    # 修改需要的硬件配置
                    "InstanceType":"m4.large",
                    "Market":"ON_DEMAND",
                    "Configurations":[{
                        # 修改Hive的meta源
                        "Classification":"hive-site",
                        "Properties":{
                            "javax.jdo.option.ConnectionURL":"jdbc:mysql://host:port/db?useUnicode=true&characterEncoding=UTF-8",
                            "javax.jdo.option.ConnectionDriverName":"org.mariadb.jdbc.Driver",
                            "javax.jdo.option.ConnectionUserName":"user",
                            "javax.jdo.option.ConnectionPassword":"pwd"
                        }
                    },{
                        "Classification":"yarn-env",
                        "Properties":{},
                        "Configurations":[{
                            "Classification":"export",
                            "Properties":{
                                "AWS_REGION":"cn-northwest-1",
                                "S3_ENDPOINT":"s3.cn-northwest-1.amazonaws.com.cn",
                                "S3_USE_HTTPS":"0",
                                "S3_VERIFY_SSL":"0"
                            }
                        }]
                    }]
                },{
                    "InstanceRole":"CORE",
                    "InstanceCount":1,
                    "Name":"核心实例组 - 2",
                    "Market":"ON_DEMAND",
                    # 修改需要的硬件配置
                    "InstanceType":"r5d.2xlarge",
                    "Configurations":[{
                        "Classification":"hive-site",
                        "Properties":{
                            "javax.jdo.option.ConnectionURL":"jdbc:mysql://host:port/db?useUnicode=true&characterEncoding=UTF-8",
                            "javax.jdo.option.ConnectionDriverName":"org.mariadb.jdbc.Driver",
                            "javax.jdo.option.ConnectionUserName":"user",
                            "javax.jdo.option.ConnectionPassword":"pwd"
                        }
                    },{
                        "Classification":"yarn-env",
                        "Properties":{},
                        "Configurations":[{
                            "Classification":"export",
                            "Properties":{
                                "AWS_REGION":"cn-northwest-1",
                                "S3_ENDPOINT":"s3.cn-northwest-1.amazonaws.com.cn",
                                "S3_USE_HTTPS":"0",
                                "S3_VERIFY_SSL":"0"
                            }
                        }]
                    }]
                },{
                    # 修改需要的工作节点数
                    "InstanceCount":4,
                    "Name":"任务实例组 - 4",
                    "InstanceRole":"TASK",
                    "EbsConfiguration":{
                        "EbsBlockDeviceConfigs":[{
                            "VolumeSpecification":{
                                "SizeInGB":32,
                                "VolumeType":"gp2"
                            },
                            "VolumesPerInstance":4
                        }]
                    },
                    # 修改需要的硬件配置
                    "InstanceType":"r5d.2xlarge",
                    "Market":"ON_DEMAND",
                    "Configurations":[{
                        "Classification":"hive-site",
                        "Properties":{
                            "javax.jdo.option.ConnectionURL":"jdbc:mysql://host:port/db?useUnicode=true&characterEncoding=UTF-8",
                            "javax.jdo.option.ConnectionDriverName":"org.mariadb.jdbc.Driver",
                            "javax.jdo.option.ConnectionUserName":"user",
                            "javax.jdo.option.ConnectionPassword":"pwd"
                        }
                    },{
                        "Classification":"yarn-env",
                        "Properties":{},
                        "Configurations":[{
                            "Classification":"export",
                            "Properties":{
                                "AWS_REGION":"cn-northwest-1",
                                "S3_ENDPOINT":"s3.cn-northwest-1.amazonaws.com.cn",
                                "S3_USE_HTTPS":"0",
                                "S3_VERIFY_SSL":"0"
                            }
                        }]
                    }]
                }],
                "KeepJobFlowAliveWhenNoSteps":True,
                "Ec2SubnetId":"subnet-027bff297ea95039b",
                "Ec2KeyName":"hifive.airflow",
                "EmrManagedSlaveSecurityGroup":"sg-05a0e076ee7babb9e"
            },
            "JobFlowRole":"EMR_EC2_DefaultRole",
            "Steps":[{
                "HadoopJarStep":{
                    "Args":["state-pusher-script"],
                    "Jar":"command-runner.jar"
                },
                "Name":"Setup Hadoop Debugging"
            }],
            "ScaleDownBehavior":"TERMINATE_AT_TASK_COMPLETION",
            "VisibleToAllUsers":True,
            "EbsRootVolumeSize":10,
            "LogUri":"s3n://aws-logs-550775287661-cn-northwest-1/elasticmapreduce/",
            "AutoScalingRole":"EMR_AutoScaling_DefaultRole"
        }
        cluster_response = emr.run_job_flow(**param)
        return cluster_response['JobFlowId']
    
    # 获取EMR访问入口
    def get_cluster_dns(cluster_id):
        response = emr.describe_cluster(ClusterId=cluster_id)
        return response['Cluster']['MasterPublicDnsName']
    
    # 等待集群创建完成
    def wait_for_cluster_creation(cluster_id):
        emr.get_waiter('cluster_running').wait(ClusterId=cluster_id)
    
    # 关闭EMR
    def terminate_cluster(cluster_id):
        emr.terminate_job_flows(JobFlowIds=[cluster_id])
    
    

    调用测试

    # 创建6台机器的集群(1 master,1 core,4 worker)
    cluster_id = create_cluster("biz_daily_2020_10_09")
    # 阻塞直到创建成功
    wait_for_cluster_creation(cluster_id)
    # dns相当于虚拟机的ssh地址,每次都不同
    # ssh登录这个地址可以提交spark命令了,这里使用Airflow的SSHOperator模拟登录并提交命令
    cluster_dns = get_cluster_dns(cluster_id)
    # 关闭集群
    terminate_cluster(cluster_id)
    

    3. 其他坑

    • Airflow 1.9.0的时间模板{{ ds }}生成的是格林尼治时间,要改为我国时间,需手动加8小时,不知道新版本是否支持本地时间。

    • ssh登录dns用户名hadoop,这个用户是AWS生成的,似乎无法修改。


    PS:更多技术干货,快关注【公众号 | xingzhe_ai】,与行者一起讨论吧!

  • 相关阅读:
    Jackson学习资料
    servlet/filter/listener/interceptor区别与联系
    JDBC、JTA、Spring的事务管理
    hibernate 的SessionFactory的getCurrentSession 与 openSession() 的区别
    Spring MVC如何配置OpenSessionInViewInterceptor并结合Hibernate使用
    SpringMVC中使用Interceptor拦截器
    Hibernate条件查询
    Spring MVC之@RequestBody, @ResponseBody 详解
    Sping Environment为Null的原因和解决方法
    SpringMVC+Thymeleaf如何处理URL中的动态查询参数
  • 原文地址:https://www.cnblogs.com/xingzheai/p/14478784.html
Copyright © 2011-2022 走看看