zoukankan      html  css  js  c++  java
  • Java: 使用pulsar-flink-connector读取pulsar catalog元数据

    简介

    通过 pulsar-flink-connector 读取到 Apache pulsar 中的namespaces、topics的元数据信息。
    pulsar-flink-connector 的 github: https://github.com/streamnative/pulsar-flink

    Maven

    
     <dependency>
       <groupId>io.streamnative.connectors</groupId>
       <artifactId>pulsar-flink-connector-2.11-1.12</artifactId>
       <version>2.7.3</version>
     </dependency>
    
       <!-- JAR repositories -->
       <repositories>
            <repository>
                <id>central</id>
                <layout>default</layout>
                <url>https://repo1.maven.org/maven2</url>
            </repository>
            <repository>
                <id>bintray-streamnative-maven</id>
                <name>bintray</name>
                <url>https://dl.bintray.com/streamnative/maven</url>
            </repository>
        </repositories>
    

    CODE

    使用PulsarMetadataReader获取元数据

    package com.levi.demo;
    
    import org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader;
    import org.apache.pulsar.client.admin.PulsarAdminException;
    import org.apache.pulsar.client.impl.auth.AuthenticationToken;
    import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
    import org.apache.pulsar.common.schema.SchemaInfo;
    import org.apache.pulsar.common.schema.SchemaType;
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    /**
     * Test.
     *
     * @author levi
     * @version 1.0
     **/
    public class Test {
    
        public static void main(String[] args)  {
            final ClientConfigurationData configurationData = new ClientConfigurationData();
            configurationData.setServiceUrl("pulsar://127.0.0.1:6650");
            //Your Pulsar Token
            final AuthenticationToken token =
                    new AuthenticationToken(
                            "eyJxxxxxxxxxxx.eyxxxxxxxxxxxxx.xxxxxxxxxxx"); 
            configurationData.setAuthentication(token);
     
            try (final PulsarMetadataReader reader =
                         new PulsarMetadataReader("http://127.0.0.1:8443",
                                 configurationData,
                                 "",
                                 new HashMap(),
                                 -1,
                                 -1)) {
                //获取namespaces
                final List<String> namespaces = reader.listNamespaces();
                System.out.println("namespaces: " + namespaces.toString());
                
                for (final String namespace : namespaces) {
                    //获取Topics
                    final List<String> topics = reader.getTopics(namespace);
                    System.out.println("topic: " + topics.toString());
                    
                    for (String topic : topics) {
                        //获取字段SchemaInfo
                        final SchemaInfo schemaInfo = reader.getPulsarSchema(topic);
                        final String name = schemaInfo.getName();
                        System.out.println("SchemaName:" + name); //topicName
                        final SchemaType type = schemaInfo.getType(); 
                        System.out.println("SchemaType:" + type.toString());// "JSON"...
                        final Map<String, String> properties = schemaInfo.getProperties();
                        System.out.println(properties); 
                        final String schemaDefinition = schemaInfo.getSchemaDefinition();
                        System.out.println(schemaDefinition); // Field info.
                    }
                }
    
            } catch (IOException | PulsarAdminException e) {
                e.printStackTrace();
            }
    
    
        }
    
    
    }
    
    
  • 相关阅读:
    基于Python的人脸动漫转换
    let 与 var的区别
    【LeetCode】汇总
    【HDU】4632 Palindrome subsequence(回文子串的个数)
    【算法】均匀的生成圆内的随机点
    【LeetCode】725. Split Linked List in Parts
    【LeetCode】445. Add Two Numbers II
    【LeetCode】437. Path Sum III
    【LeetCode】222. Count Complete Tree Nodes
    【LeetCode】124. Binary Tree Maximum Path Sum
  • 原文地址:https://www.cnblogs.com/levi125/p/14500436.html
Copyright © 2011-2022 走看看