zoukankan      html  css  js  c++  java
  • Java: 使用pulsar-flink-connector读取pulsar catalog元数据

    简介

    通过 pulsar-flink-connector 读取到 Apache pulsar 中的namespaces、topics的元数据信息。
    pulsar-flink-connector 的 github: https://github.com/streamnative/pulsar-flink

    Maven

    
     <dependency>
       <groupId>io.streamnative.connectors</groupId>
       <artifactId>pulsar-flink-connector-2.11-1.12</artifactId>
       <version>2.7.3</version>
     </dependency>
    
       <!-- JAR repositories -->
       <repositories>
            <repository>
                <id>central</id>
                <layout>default</layout>
                <url>https://repo1.maven.org/maven2</url>
            </repository>
            <repository>
                <id>bintray-streamnative-maven</id>
                <name>bintray</name>
                <url>https://dl.bintray.com/streamnative/maven</url>
            </repository>
        </repositories>
    

    CODE

    使用PulsarMetadataReader获取元数据

    package com.levi.demo;
    
    import org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader;
    import org.apache.pulsar.client.admin.PulsarAdminException;
    import org.apache.pulsar.client.impl.auth.AuthenticationToken;
    import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
    import org.apache.pulsar.common.schema.SchemaInfo;
    import org.apache.pulsar.common.schema.SchemaType;
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    /**
     * Test.
     *
     * @author levi
     * @version 1.0
     **/
    public class Test {
    
        public static void main(String[] args)  {
            final ClientConfigurationData configurationData = new ClientConfigurationData();
            configurationData.setServiceUrl("pulsar://127.0.0.1:6650");
            //Your Pulsar Token
            final AuthenticationToken token =
                    new AuthenticationToken(
                            "eyJxxxxxxxxxxx.eyxxxxxxxxxxxxx.xxxxxxxxxxx"); 
            configurationData.setAuthentication(token);
     
            try (final PulsarMetadataReader reader =
                         new PulsarMetadataReader("http://127.0.0.1:8443",
                                 configurationData,
                                 "",
                                 new HashMap(),
                                 -1,
                                 -1)) {
                //获取namespaces
                final List<String> namespaces = reader.listNamespaces();
                System.out.println("namespaces: " + namespaces.toString());
                
                for (final String namespace : namespaces) {
                    //获取Topics
                    final List<String> topics = reader.getTopics(namespace);
                    System.out.println("topic: " + topics.toString());
                    
                    for (String topic : topics) {
                        //获取字段SchemaInfo
                        final SchemaInfo schemaInfo = reader.getPulsarSchema(topic);
                        final String name = schemaInfo.getName();
                        System.out.println("SchemaName:" + name); //topicName
                        final SchemaType type = schemaInfo.getType(); 
                        System.out.println("SchemaType:" + type.toString());// "JSON"...
                        final Map<String, String> properties = schemaInfo.getProperties();
                        System.out.println(properties); 
                        final String schemaDefinition = schemaInfo.getSchemaDefinition();
                        System.out.println(schemaDefinition); // Field info.
                    }
                }
    
            } catch (IOException | PulsarAdminException e) {
                e.printStackTrace();
            }
    
    
        }
    
    
    }
    
    
  • 相关阅读:
    leetcode第四题
    解决Hystrix主线程结束,子线程拿不到request
    RabbitMQ如何保证消息的顺序性+解决消息积压+设计消息队列中间件
    RabbitMQ 如何保证消息不丢失?
    redis布隆过滤器的使用
    PageHelper自定义count
    mysqlbinlog 工具分析binlog日志
    linuxubuntu常用命令
    MySQL 常用命令
    Ubuntu 16.04 安装 Apache, MySQL, PHP7
  • 原文地址:https://www.cnblogs.com/levi125/p/14500436.html
Copyright © 2011-2022 走看看