zoukankan      html  css  js  c++  java
  • [Apache Atlas] Atlas 架构设计及源代码简单分析

    Apache Atlas 架构图

    image

    Atlas 支持多数据源接入:Hive、HBase、Storm等

    Type System

    Type

    Atlas 中定义了一些元数据类型

    ── AtlasBaseTypeDef
      │   ├── AtlasEnumDef
      │   └── AtlasStructDef
      │       ├── AtlasBusinessMetadataDef
      │       ├── AtlasClassificationDef
      │       ├── AtlasEntityDef
      │       └── AtlasRelationshipDef
      ├── AtlasStructType
      │   ├── AtlasBusinessMetadataType
      │   ├── AtlasClassificationType
      │   ├── AtlasRelationshipType
      │   └── AtlasEntityType
      │       └── AtlasRootEntityType
      ├── AtlasType
      │   ├── AtlasArrayType
      │   ├── AtlasBigDecimalType
      │   ├── AtlasBigIntegerType
      │   ├── AtlasByteType
      │   ├── AtlasDateType
      │   ├── AtlasDoubleType
      │   ├── AtlasEnumType
      │   ├── AtlasFloatType
      │   ├── AtlasIntType
      │   ├── AtlasLongType
      │   ├── AtlasMapType
      │   ├── AtlasObjectIdType
      │   ├── AtlasShortType
      │   ├── AtlasStringType
      │   └── AtlasStructType
      │       ├── AtlasBusinessMetadataType
      │       ├── AtlasClassificationType
      │       ├── AtlasEntityType
      │       └── AtlasRelationshipType
      ├── AtlasTypeDefStore
      │   └── AtlasTypeDefGraphStore
      │       └── AtlasTypeDefGraphStoreV2
      └── StructTypeDefinition
          └── HierarchicalTypeDefinition
              ├── ClassTypeDefinition
              └── TraitTypeDefinition
    

    Entity

    Entity 是基于类型的具体实现

    AtlasEntity
      ├── AtlasEntityExtInfo
      │   ├── AtlasEntitiesWithExtInfo
      │   └── AtlasEntityWithExtInfo
      ├── AtlasEntityStore
      │   └── AtlasEntityStoreV2
      ├── AtlasEntityStream
      │   └── AtlasEntityStreamForImport
      ├── AtlasEntityType
      │   └── AtlasRootEntityType
      └── IAtlasEntityChangeNotifier
          ├── AtlasEntityChangeNotifier
          └── EntityChangeNotifierNop
    

    Attributes

    针对模型定义属性

    AtlasAttributeDef
          └── AtlasRelationshipAttributeDef
    

    AtlasAttributeDef 属性字段:

    private String                   name;
    private String                   typeName;
    private boolean                  isOptional;
    private Cardinality              cardinality;
    private int                      valuesMinCount;
    private int                      valuesMaxCount;
    private boolean                  isUnique;
    private boolean                  isIndexable;
    private boolean                  includeInNotification;
    private String                   defaultValue;
    private String                   description;
    private int                      searchWeight = DEFAULT_SEARCHWEIGHT;
    private IndexType                indexType    = null;
    private List<AtlasConstraintDef> constraints;
    private Map<String, String>      options;
    private String                   displayName;
    
    具体实现:
    
    db:
        "name":        "db",
        "typeName":    "hive_db",
        "isOptional":  false,
        "isIndexable": true,
        "isUnique":    false,
        "cardinality": "SINGLE"
        
        
    columns:
        "name":        "columns",
        "typeName":    "array<hive_column>",
        "isOptional":  optional,
        "isIndexable": true,
        “isUnique":    false,
        "constraints": [ { "type": "ownedRef" } ]  
    
    • isComposite - 是否复合
    • isIndexable - 是否索引
    • isUnique - 是否唯一
    • multiplicity - 指示此属性是(必需的/可选的/还是可以是多值)的

    System specific types and their significance

    Referenceable

    This type represents all entities that can be searched for using a unique attribute called qualifiedName.

      ├── Referenceable
      ├── ReferenceableDeserializer
      ├── ReferenceableSerializer
      └── V1SearchReferenceableSerializer
    

    Hooks

    以Hive元信息采集为例分析采集过程:

    全量导入

    import-hive.sh

    "${JAVA_BIN}" ${JAVA_PROPERTIES} -cp "${CP}" 
    org.apache.atlas.hive.bridge.HiveMetaStoreBridge $IMPORT_ARGS
    
    importTables
      └── importDatabases        [addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java +295]
          └── importHiveMetadata [addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java +289]
    

    上面是调用过程:

    importTables -> importTable --> registerInstances

    AtlasEntitiesWithExtInfo ret = null;
    EntityMutationResponse   response        = atlasClientV2.createEntities(entities);
    List<AtlasEntityHeader>  createdEntities = response.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE);
    
    if (CollectionUtils.isNotEmpty(createdEntities)) {
        ret = new AtlasEntitiesWithExtInfo();
    
        for (AtlasEntityHeader createdEntity : createdEntities) {
            AtlasEntityWithExtInfo entity = atlasClientV2.getEntityByGuid(createdEntity.getGuid());
    
            ret.addEntity(entity.getEntity());
    
            if (MapUtils.isNotEmpty(entity.getReferredEntities())) {
                for (Map.Entry<String, AtlasEntity> entry : entity.getReferredEntities().entrySet()) {
                    ret.addReferredEntity(entry.getKey(), entry.getValue());
                }
            }
    
            LOG.info("Created {} entity: name={}, guid={}", entity.getEntity().getTypeName(), entity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME), entity.getEntity().getGuid());
        }
    }
    

    通过Http Post 的请求将库表数据更新至Atlas

    atlasClientV2有很多Http接口

    Atlas HTTP 客户端API:

    image

    实时监听

    HiveHook implements ExecuteWithHookContext

    ExecuteWithHookContext is a new interface that the Pre/Post Execute Hook can run with the HookContext.

    实现run()方法来对Hive 相关事件做处理

    Hive相关事件:

    BaseHiveEvent
          ├── AlterTableRename
          ├── CreateHiveProcess
          ├── DropDatabase
          ├── DropTable
          ├── CreateDatabase
          │   └── AlterDatabase
          └── CreateTable
              └── AlterTable
                  └── AlterTableRenameCol
    

    以create database 为例分析流程:

    //处理Hook 上下文信息
    AtlasHiveHookContext context = 
    new AtlasHiveHookContext(this, oper, hookContext, getKnownObjects(), isSkipTempTables());
    
    //建库事件处理,提取相关库信息
    event = new CreateDatabase(context);
    
    if (event != null) {
        final UserGroupInformation ugi = hookContext.getUgi() == null ? Utils.getUGI() : hookContext.getUgi();
        super.notifyEntities(ActiveEntityFilter.apply(event.getNotificationMessages()), ugi);
    }
    
    
    public enum HookNotificationType {
        TYPE_CREATE, TYPE_UPDATE, ENTITY_CREATE, ENTITY_PARTIAL_UPDATE, ENTITY_FULL_UPDATE, ENTITY_DELETE,
        ENTITY_CREATE_V2, ENTITY_PARTIAL_UPDATE_V2, ENTITY_FULL_UPDATE_V2, ENTITY_DELETE_V2
    }
    
    //操作用户获取
    if (context.isMetastoreHook()) {
        try {
            ugi = SecurityUtils.getUGI();
        } catch (Exception e) {
            //do nothing
        }
    } else {
        ret = getHiveUserName();
    
        if (StringUtils.isEmpty(ret)) {
            ugi = getUgi();
        }
    }
    
    if (ugi != null) {
        ret = ugi.getShortUserName();
    }
    
    if (StringUtils.isEmpty(ret)) {
        try {
            ret = UserGroupInformation.getCurrentUser().getShortUserName();
        } catch (IOException e) {
            LOG.warn("Failed for UserGroupInformation.getCurrentUser() ", e);
    
            ret = System.getProperty("user.name");
        }
    }
    

    主要:

    获取实体信息, 传递Hook message的类型、操作用户

    notifyEntities 可以看出其他组件HBase、impala也会调用该方法进行消息的发送

    image

    public static void notifyEntities(List<HookNotification> messages, UserGroupInformation ugi, int maxRetries) {
        if (executor == null) { // send synchronously
            notifyEntitiesInternal(messages, maxRetries, ugi, notificationInterface, logFailedMessages, failedMessagesLogger);
        } else {
            executor.submit(new Runnable() {
                @Override
                public void run() {
                    notifyEntitiesInternal(messages, maxRetries, ugi, notificationInterface, logFailedMessages, failedMessagesLogger);
                }
            });
        }
    }
    

    消息通知框架:

    NotificationInterface
          ├── AtlasFileSpool
          └── AbstractNotification
              ├── KafkaNotification
              └── Spooler
    

    数据写入Kafka中:

    @Override
    public void sendInternal(NotificationType notificationType, List<String> messages) throws NotificationException {
        KafkaProducer producer = getOrCreateProducer(notificationType);
    
        sendInternalToProducer(producer, notificationType, messages);
    }
    

    根据NotificationType写入指定topic 中:

    private static final Map<NotificationType, String> PRODUCER_TOPIC_MAP = new HashMap<NotificationType, String>() {
        {
            put(NotificationType.HOOK, ATLAS_HOOK_TOPIC);
            put(NotificationType.ENTITIES, ATLAS_ENTITIES_TOPIC);
        }
    };
    
    NOTIFICATION_HOOK_TOPIC_NAME("atlas.notification.hook.topic.name", "ATLAS_HOOK"),
    NOTIFICATION_ENTITIES_TOPIC_NAME("atlas.notification.entities.topic.name", "ATLAS_ENTITIES"),
    

    数据主要写入两个Topic中: ATLAS_ENTITIES、ATLAS_HOOK

    ATLAS_HOOK是写入Hook事件消息, 创建库的事件元数据信息会写入该Topic中

    如何唯一确定一个库:

    public String getQualifiedName(Database db) {
        return getDatabaseName(db) + QNAME_SEP_METADATA_NAMESPACE + getMetadataNamespace();
    }
    

    dbName@clusterName 确定唯一性

    外延应用

    一个基于Hive hook 实现Impala 元数据刷新的用例:
    AutoRefreshImpala:https://github.com/Observe-secretly/AutoRefreshImpala

    参考

    [1] Apache Atlas – Data Governance and Metadata framework for Hadoop
    [2] Apache Atlas 源码

    本文作者: chaplinthink, 关注领域:大数据、基础架构、系统设计, 一个热爱学习、分享的大数据工程师
  • 相关阅读:
    使用Jquery EasyUi常见问题解决方案
    短信平台接口调用方法参考
    linux查找日志技巧
    Linux 上传 启动 删除...命令总结
    java 验证手机号码、电话号码(包括最新的电信、联通和移动号码)
    Web Services 中XML、SOAP和WSDL的一些必要知识
    Mac环境下配置PhpStorm
    Python爬虫刷回复
    Django和layim实现websocket
    Python爬虫刷回复
  • 原文地址:https://www.cnblogs.com/bigdata1024/p/15306365.html
Copyright © 2011-2022 走看看