zoukankan      html  css  js  c++  java
  • Presto通过RESTful接口新增Connector

    在实际使用Presto的过程中,经常会有以下的一些需求。

    • 添加一个新的Catalog
    • 对不再使用的Catalog希望把它删除
    • 修改某个Catalog的参数

    但在Presto中如果进行上述的修改,需要重启Presto服务才可以生效,这给集群维护带来额外的工作量之外,还给上层应用带来很不好的使用体验。

    如果还不能在开发环境很好运行Presto的话,参考在windows的IDEA运行Presto
    观察PrestoServer的run方法,可以知道Presot分了多个模块,而且多个模块依赖airlift(Airlift framework for building REST services)的项目,倒不如说airlift是Presto的根基。说实在话,我对于Presto的理解可能还在管中窥豹的阶段,但是不影响我对它做一些手脚。

    1、新增一个Hello world测试接口

    在presto-main中com.facebook.presto.server中新增一个CatalogResource,作用和Spring类似吧。

    /*
     * Licensed under the Apache License, Version 2.0 (the "License");
     * you may not use this file except in compliance with the License.
     * You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    package com.facebook.presto.server;
    
    import javax.inject.Inject;
    import javax.ws.rs.GET;
    import javax.ws.rs.Path;
    import javax.ws.rs.core.Response;
    
    /**
     * @author Gin
     * @since 2018/8/15.
     */
    @Path("/v1/catalog")
    public class CatalogResource
    {
        @Inject
        public CatalogResource()
        {
        }
    
        @GET
        @Path("test")
        public Response test()
        {
            return Response.ok("Hello world").build();
        }
    }
    

    在ServerMainModule的setup()的方法中最后一行注册CatalogResource:

    // Catalogs
    jaxrsBinder(binder).bind(CatalogResource.class);
    

    启动server,访问http://localhost:8080/v1/catalog/test,如果出现Hello World的话,那么后面的步骤才行得通。

    2、新增一个Add Connector的RESTful接口

    新建CatalogInfo用于接收参数:

    package com.facebook.presto.server;
    
    import com.fasterxml.jackson.annotation.JsonCreator;
    import com.fasterxml.jackson.annotation.JsonProperty;
    
    import java.util.Map;
    
    import static java.util.Objects.requireNonNull;
    
    /**
     * @author Gin
     * @since 2018/8/17.
     */
    public class CatalogInfo 
    {
    
        private final String catalogName;
    
        private final String connectorName;
    
        private final Map<String, String> properties;
    
        @JsonCreator
        public CatalogInfo(
                @JsonProperty("catalogName") String catalogName,
                @JsonProperty("connectorName") String connectorName,
                @JsonProperty("properties")Map<String, String> properties)
        {
            this.catalogName = requireNonNull(catalogName, "catalogName is null");
            this.connectorName = requireNonNull(connectorName, "connectorName is null");
            this.properties = requireNonNull(properties, "properties is null");
        }
    
        @JsonProperty
        public String getCatalogName() {
            return catalogName;
        }
    
        @JsonProperty
        public String getConnectorName() {
            return connectorName;
        }
    
        @JsonProperty
        public Map<String, String> getProperties() {
            return properties;
        }
    }
    

    在CatalogResource中增加对应的接口,用到的服务用注入方法声明。

    /*
     * Licensed under the Apache License, Version 2.0 (the "License");
     * you may not use this file except in compliance with the License.
     * You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    package com.facebook.presto.server;
    
    import com.facebook.presto.connector.ConnectorId;
    import com.facebook.presto.connector.ConnectorManager;
    import com.facebook.presto.metadata.InternalNodeManager;
    import com.google.common.base.Joiner;
    import com.google.common.base.Splitter;
    import io.airlift.discovery.client.Announcer;
    import io.airlift.discovery.client.ServiceAnnouncement;
    
    import javax.inject.Inject;
    import javax.ws.rs.*;
    import javax.ws.rs.core.MediaType;
    import javax.ws.rs.core.Response;
    
    import java.util.LinkedHashMap;
    import java.util.LinkedHashSet;
    import java.util.Map;
    import java.util.Set;
    
    import static com.google.common.base.Strings.nullToEmpty;
    import static io.airlift.discovery.client.ServiceAnnouncement.serviceAnnouncement;
    import static java.util.Objects.requireNonNull;
    
    /**
     * @author Gin
     * @since 2018/8/15.
     */
    @Path("/v1/catalog")
    public class CatalogResource
    {
        private final ConnectorManager connectorManager;
        private final Announcer announcer;
    
        @Inject
        public CatalogResource(
                ConnectorManager connectorManager,
                Announcer announcer)
        {
            this.connectorManager = requireNonNull(connectorManager, "connectorManager is null");
            this.announcer = requireNonNull(announcer, "announcer is null");
        }
    
        @GET
        @Path("test")
        public Response test()
        {
            return Response.ok("Hello world").build();
        }
    
        @PUT
        @Consumes(MediaType.APPLICATION_JSON)
        @Produces(MediaType.APPLICATION_JSON)
        public Response createCatalog(CatalogInfo catalogInfo)
        {
            requireNonNull(catalogInfo, "catalogInfo is null");
    
            ConnectorId connectorId = connectorManager.createConnection(
                    catalogInfo.getCatalogName(),
                    catalogInfo.getConnectorName(),
                    catalogInfo.getProperties());
    
            updateConnectorIdAnnouncement(announcer, connectorId);
            return Response.status(Response.Status.OK).build();
        }
    
        private static void updateConnectorIdAnnouncement(Announcer announcer, ConnectorId connectorId)
        {
            //
            // This code was copied from PrestoServer, and is a hack that should be removed when the connectorId property is removed
            //
    
            // get existing announcement
            ServiceAnnouncement announcement = getPrestoAnnouncement(announcer.getServiceAnnouncements());
    
            // update connectorIds property
            Map<String, String> properties = new LinkedHashMap<>(announcement.getProperties());
            String property = nullToEmpty(properties.get("connectorIds"));
            Set<String> connectorIds = new LinkedHashSet<>(Splitter.on(',').trimResults().omitEmptyStrings().splitToList(property));
            connectorIds.add(connectorId.toString());
            properties.put("connectorIds", Joiner.on(',').join(connectorIds));
    
            // update announcement
            announcer.removeServiceAnnouncement(announcement.getId());
            announcer.addServiceAnnouncement(serviceAnnouncement(announcement.getType()).addProperties(properties).build());
            announcer.forceAnnounce();
        }
    
        private static ServiceAnnouncement getPrestoAnnouncement(Set<ServiceAnnouncement> announcements)
        {
            for (ServiceAnnouncement announcement : announcements) {
                if (announcement.getType().equals("presto")) {
                    return announcement;
                }
            }
            throw new RuntimeException("Presto announcement not found: " + announcements);
        }
    }
    

    3、测试RESTful接口

    这步需要安装需要的插件,检查插件是否安装。使用postman类似的东西,发送application/json的PUT请求到http://localhost:8080/v1/catalog/,body为

    {
    	"catalogName": "test",
    	"connectorName": "mysql",
    	"properties": {
    	    "connection-url":"jdbc:mysql://localhost:3306",
    	    "connection-user":"root",
    	    "connection-password":"root"
    	}
    }
    

    我们可以看到控制台,重新输出了connector的信息:

    2018-08-19T14:09:03.502+0800	INFO	main	com.facebook.presto.server.PrestoServer	======== SERVER STARTED ========
    2018-08-19T14:09:23.496+0800	INFO	http-worker-133	Bootstrap	PROPERTY                  DEFAULT     RUNTIME                      DESCRIPTION
    2018-08-19T14:09:23.496+0800	INFO	http-worker-133	Bootstrap	connection-password       [REDACTED]  [REDACTED]
    2018-08-19T14:09:23.496+0800	INFO	http-worker-133	Bootstrap	connection-url            null        jdbc:mysql://localhost:3306
    2018-08-19T14:09:23.496+0800	INFO	http-worker-133	Bootstrap	connection-user           null        root
    2018-08-19T14:09:23.496+0800	INFO	http-worker-133	Bootstrap	allow-drop-table          false       false                        Allow connector to drop tables
    2018-08-19T14:09:23.496+0800	INFO	http-worker-133	Bootstrap	mysql.auto-reconnect      true        true
    2018-08-19T14:09:23.496+0800	INFO	http-worker-133	Bootstrap	mysql.connection-timeout  10.00s      10.00s
    2018-08-19T14:09:23.496+0800	INFO	http-worker-133	Bootstrap	mysql.max-reconnects      3           3
    2018-08-19T14:09:23.876+0800	INFO	http-worker-133	io.airlift.bootstrap.LifeCycleManager	Life cycle starting...
    2018-08-19T14:09:23.877+0800	INFO	http-worker-133	io.airlift.bootstrap.LifeCycleManager	Life cycle startup complete. System ready.
    

    接下来要怎么利用,要看大家的脑洞了:)?

    参考文献:
    Presto技术内幕

  • 相关阅读:
    UVa 1451 Average (斜率优化)
    POJ 1160 Post Office (四边形不等式优化DP)
    HDU 3507 Print Article (斜率DP)
    LightOJ 1427 Substring Frequency (II) (AC自动机)
    UVa 10245 The Closest Pair Problem (分治)
    POJ 1741 Tree (树分治)
    HDU 3487 Play with Chain (Splay)
    POJ 2828 Buy Tickets (线段树)
    HDU 3723 Delta Wave (高精度+calelan数)
    UVa 1625 Color Length (DP)
  • 原文地址:https://www.cnblogs.com/ginponson/p/9500775.html
Copyright © 2011-2022 走看看