zoukankan      html  css  js  c++  java
  • Java: 使用pulsar-flink-connector读取pulsar catalog元数据

    简介

    通过 pulsar-flink-connector 读取到 Apache pulsar 中的namespaces、topics的元数据信息。
    pulsar-flink-connector 的 github: https://github.com/streamnative/pulsar-flink

    Maven

    
     <dependency>
       <groupId>io.streamnative.connectors</groupId>
       <artifactId>pulsar-flink-connector-2.11-1.12</artifactId>
       <version>2.7.3</version>
     </dependency>
    
       <!-- JAR repositories -->
       <repositories>
            <repository>
                <id>central</id>
                <layout>default</layout>
                <url>https://repo1.maven.org/maven2</url>
            </repository>
            <repository>
                <id>bintray-streamnative-maven</id>
                <name>bintray</name>
                <url>https://dl.bintray.com/streamnative/maven</url>
            </repository>
        </repositories>
    

    CODE

    使用PulsarMetadataReader获取元数据

    package com.levi.demo;
    
    import org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader;
    import org.apache.pulsar.client.admin.PulsarAdminException;
    import org.apache.pulsar.client.impl.auth.AuthenticationToken;
    import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
    import org.apache.pulsar.common.schema.SchemaInfo;
    import org.apache.pulsar.common.schema.SchemaType;
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    /**
     * Test.
     *
     * @author levi
     * @version 1.0
     **/
    public class Test {
    
        public static void main(String[] args)  {
            final ClientConfigurationData configurationData = new ClientConfigurationData();
            configurationData.setServiceUrl("pulsar://127.0.0.1:6650");
            //Your Pulsar Token
            final AuthenticationToken token =
                    new AuthenticationToken(
                            "eyJxxxxxxxxxxx.eyxxxxxxxxxxxxx.xxxxxxxxxxx"); 
            configurationData.setAuthentication(token);
     
            try (final PulsarMetadataReader reader =
                         new PulsarMetadataReader("http://127.0.0.1:8443",
                                 configurationData,
                                 "",
                                 new HashMap(),
                                 -1,
                                 -1)) {
                //获取namespaces
                final List<String> namespaces = reader.listNamespaces();
                System.out.println("namespaces: " + namespaces.toString());
                
                for (final String namespace : namespaces) {
                    //获取Topics
                    final List<String> topics = reader.getTopics(namespace);
                    System.out.println("topic: " + topics.toString());
                    
                    for (String topic : topics) {
                        //获取字段SchemaInfo
                        final SchemaInfo schemaInfo = reader.getPulsarSchema(topic);
                        final String name = schemaInfo.getName();
                        System.out.println("SchemaName:" + name); //topicName
                        final SchemaType type = schemaInfo.getType(); 
                        System.out.println("SchemaType:" + type.toString());// "JSON"...
                        final Map<String, String> properties = schemaInfo.getProperties();
                        System.out.println(properties); 
                        final String schemaDefinition = schemaInfo.getSchemaDefinition();
                        System.out.println(schemaDefinition); // Field info.
                    }
                }
    
            } catch (IOException | PulsarAdminException e) {
                e.printStackTrace();
            }
    
    
        }
    
    
    }
    
    
  • 相关阅读:
    Lucene in action 笔记 case study
    关于Restful Web Service的一些理解
    Lucene in action 笔记 analysis篇
    Lucene in action 笔记 index篇
    Lucene in action 笔记 term vector
    Lucene in action 笔记 search篇
    博客园开博记录
    数论(算法概述)
    DIV, IFRAME, Select, Span标签入门
    记一个较困难的SharePoint性能问题的分析和解决
  • 原文地址:https://www.cnblogs.com/levi125/p/14500436.html
Copyright © 2011-2022 走看看