zoukankan      html  css  js  c++  java
  • Presto通过RESTful接口新增Connector

    在实际使用Presto的过程中,经常会有以下的一些需求。

    • 添加一个新的Catalog
    • 对不再使用的Catalog希望把它删除
    • 修改某个Catalog的参数

    但在Presto中如果进行上述的修改,需要重启Presto服务才可以生效,这给集群维护带来额外的工作量之外,还给上层应用带来很不好的使用体验。

    如果还不能在开发环境很好运行Presto的话,参考在windows的IDEA运行Presto
    观察PrestoServer的run方法,可以知道Presot分了多个模块,而且多个模块依赖airlift(Airlift framework for building REST services)的项目,倒不如说airlift是Presto的根基。说实在话,我对于Presto的理解可能还在管中窥豹的阶段,但是不影响我对它做一些手脚。

    1、新增一个Hello world测试接口

    在presto-main中com.facebook.presto.server中新增一个CatalogResource,作用和Spring类似吧。

    /*
     * Licensed under the Apache License, Version 2.0 (the "License");
     * you may not use this file except in compliance with the License.
     * You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    package com.facebook.presto.server;
    
    import javax.inject.Inject;
    import javax.ws.rs.GET;
    import javax.ws.rs.Path;
    import javax.ws.rs.core.Response;
    
    /**
     * @author Gin
     * @since 2018/8/15.
     */
    @Path("/v1/catalog")
    public class CatalogResource
    {
        @Inject
        public CatalogResource()
        {
        }
    
        @GET
        @Path("test")
        public Response test()
        {
            return Response.ok("Hello world").build();
        }
    }
    

    在ServerMainModule的setup()的方法中最后一行注册CatalogResource:

    // Catalogs
    jaxrsBinder(binder).bind(CatalogResource.class);
    

    启动server,访问http://localhost:8080/v1/catalog/test,如果出现Hello World的话,那么后面的步骤才行得通。

    2、新增一个Add Connector的RESTful接口

    新建CatalogInfo用于接收参数:

    package com.facebook.presto.server;
    
    import com.fasterxml.jackson.annotation.JsonCreator;
    import com.fasterxml.jackson.annotation.JsonProperty;
    
    import java.util.Map;
    
    import static java.util.Objects.requireNonNull;
    
    /**
     * @author Gin
     * @since 2018/8/17.
     */
    public class CatalogInfo 
    {
    
        private final String catalogName;
    
        private final String connectorName;
    
        private final Map<String, String> properties;
    
        @JsonCreator
        public CatalogInfo(
                @JsonProperty("catalogName") String catalogName,
                @JsonProperty("connectorName") String connectorName,
                @JsonProperty("properties")Map<String, String> properties)
        {
            this.catalogName = requireNonNull(catalogName, "catalogName is null");
            this.connectorName = requireNonNull(connectorName, "connectorName is null");
            this.properties = requireNonNull(properties, "properties is null");
        }
    
        @JsonProperty
        public String getCatalogName() {
            return catalogName;
        }
    
        @JsonProperty
        public String getConnectorName() {
            return connectorName;
        }
    
        @JsonProperty
        public Map<String, String> getProperties() {
            return properties;
        }
    }
    

    在CatalogResource中增加对应的接口,用到的服务用注入方法声明。

    /*
     * Licensed under the Apache License, Version 2.0 (the "License");
     * you may not use this file except in compliance with the License.
     * You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    package com.facebook.presto.server;
    
    import com.facebook.presto.connector.ConnectorId;
    import com.facebook.presto.connector.ConnectorManager;
    import com.facebook.presto.metadata.InternalNodeManager;
    import com.google.common.base.Joiner;
    import com.google.common.base.Splitter;
    import io.airlift.discovery.client.Announcer;
    import io.airlift.discovery.client.ServiceAnnouncement;
    
    import javax.inject.Inject;
    import javax.ws.rs.*;
    import javax.ws.rs.core.MediaType;
    import javax.ws.rs.core.Response;
    
    import java.util.LinkedHashMap;
    import java.util.LinkedHashSet;
    import java.util.Map;
    import java.util.Set;
    
    import static com.google.common.base.Strings.nullToEmpty;
    import static io.airlift.discovery.client.ServiceAnnouncement.serviceAnnouncement;
    import static java.util.Objects.requireNonNull;
    
    /**
     * @author Gin
     * @since 2018/8/15.
     */
    @Path("/v1/catalog")
    public class CatalogResource
    {
        private final ConnectorManager connectorManager;
        private final Announcer announcer;
    
        @Inject
        public CatalogResource(
                ConnectorManager connectorManager,
                Announcer announcer)
        {
            this.connectorManager = requireNonNull(connectorManager, "connectorManager is null");
            this.announcer = requireNonNull(announcer, "announcer is null");
        }
    
        @GET
        @Path("test")
        public Response test()
        {
            return Response.ok("Hello world").build();
        }
    
        @PUT
        @Consumes(MediaType.APPLICATION_JSON)
        @Produces(MediaType.APPLICATION_JSON)
        public Response createCatalog(CatalogInfo catalogInfo)
        {
            requireNonNull(catalogInfo, "catalogInfo is null");
    
            ConnectorId connectorId = connectorManager.createConnection(
                    catalogInfo.getCatalogName(),
                    catalogInfo.getConnectorName(),
                    catalogInfo.getProperties());
    
            updateConnectorIdAnnouncement(announcer, connectorId);
            return Response.status(Response.Status.OK).build();
        }
    
        private static void updateConnectorIdAnnouncement(Announcer announcer, ConnectorId connectorId)
        {
            //
            // This code was copied from PrestoServer, and is a hack that should be removed when the connectorId property is removed
            //
    
            // get existing announcement
            ServiceAnnouncement announcement = getPrestoAnnouncement(announcer.getServiceAnnouncements());
    
            // update connectorIds property
            Map<String, String> properties = new LinkedHashMap<>(announcement.getProperties());
            String property = nullToEmpty(properties.get("connectorIds"));
            Set<String> connectorIds = new LinkedHashSet<>(Splitter.on(',').trimResults().omitEmptyStrings().splitToList(property));
            connectorIds.add(connectorId.toString());
            properties.put("connectorIds", Joiner.on(',').join(connectorIds));
    
            // update announcement
            announcer.removeServiceAnnouncement(announcement.getId());
            announcer.addServiceAnnouncement(serviceAnnouncement(announcement.getType()).addProperties(properties).build());
            announcer.forceAnnounce();
        }
    
        private static ServiceAnnouncement getPrestoAnnouncement(Set<ServiceAnnouncement> announcements)
        {
            for (ServiceAnnouncement announcement : announcements) {
                if (announcement.getType().equals("presto")) {
                    return announcement;
                }
            }
            throw new RuntimeException("Presto announcement not found: " + announcements);
        }
    }
    

    3、测试RESTful接口

    这步需要安装需要的插件,检查插件是否安装。使用postman类似的东西,发送application/json的PUT请求到http://localhost:8080/v1/catalog/,body为

    {
    	"catalogName": "test",
    	"connectorName": "mysql",
    	"properties": {
    	    "connection-url":"jdbc:mysql://localhost:3306",
    	    "connection-user":"root",
    	    "connection-password":"root"
    	}
    }
    

    我们可以看到控制台,重新输出了connector的信息:

    2018-08-19T14:09:03.502+0800	INFO	main	com.facebook.presto.server.PrestoServer	======== SERVER STARTED ========
    2018-08-19T14:09:23.496+0800	INFO	http-worker-133	Bootstrap	PROPERTY                  DEFAULT     RUNTIME                      DESCRIPTION
    2018-08-19T14:09:23.496+0800	INFO	http-worker-133	Bootstrap	connection-password       [REDACTED]  [REDACTED]
    2018-08-19T14:09:23.496+0800	INFO	http-worker-133	Bootstrap	connection-url            null        jdbc:mysql://localhost:3306
    2018-08-19T14:09:23.496+0800	INFO	http-worker-133	Bootstrap	connection-user           null        root
    2018-08-19T14:09:23.496+0800	INFO	http-worker-133	Bootstrap	allow-drop-table          false       false                        Allow connector to drop tables
    2018-08-19T14:09:23.496+0800	INFO	http-worker-133	Bootstrap	mysql.auto-reconnect      true        true
    2018-08-19T14:09:23.496+0800	INFO	http-worker-133	Bootstrap	mysql.connection-timeout  10.00s      10.00s
    2018-08-19T14:09:23.496+0800	INFO	http-worker-133	Bootstrap	mysql.max-reconnects      3           3
    2018-08-19T14:09:23.876+0800	INFO	http-worker-133	io.airlift.bootstrap.LifeCycleManager	Life cycle starting...
    2018-08-19T14:09:23.877+0800	INFO	http-worker-133	io.airlift.bootstrap.LifeCycleManager	Life cycle startup complete. System ready.
    

    接下来要怎么利用,要看大家的脑洞了:)?

    参考文献:
    Presto技术内幕

  • 相关阅读:
    动手动脑11.19
    随机产生四则运算,导入导出文件
    JAVA常用的异常处理情况
    动手动脑11.12
    动手动脑11.1
    动手动脑10.21
    动手动脑10.14
    Cygwin install apt-cyg
    php获取request_uri
    linux下sed批量替换文件内容
  • 原文地址:https://www.cnblogs.com/ginponson/p/9500775.html
Copyright © 2011-2022 走看看