zoukankan      html  css  js  c++  java
  • elasticSearch6源码分析(6)http和transport模块

    1.http模块概述

    The http module allows to expose Elasticsearch APIs over HTTP.
    
    The http mechanism is completely asynchronous in nature, meaning that there is no blocking thread waiting for a response. The benefit of using asynchronous communication for HTTP is solving the C10k problem.
    
    When possible, consider using HTTP keep alive when connecting for better performance and try to get your favorite client not to do HTTP chunking.

    2.http配置类HttpTransportSettings

    public final class HttpTransportSettings {
    
        public static final Setting<Boolean> SETTING_CORS_ENABLED =
            Setting.boolSetting("http.cors.enabled", false, Property.NodeScope);
        public static final Setting<String> SETTING_CORS_ALLOW_ORIGIN =
            new Setting<>("http.cors.allow-origin", "", (value) -> value, Property.NodeScope);
        public static final Setting<Integer> SETTING_CORS_MAX_AGE =
            Setting.intSetting("http.cors.max-age", 1728000, Property.NodeScope);
        public static final Setting<String> SETTING_CORS_ALLOW_METHODS =
            new Setting<>("http.cors.allow-methods", "OPTIONS,HEAD,GET,POST,PUT,DELETE", (value) -> value, Property.NodeScope);
        public static final Setting<String> SETTING_CORS_ALLOW_HEADERS =
            new Setting<>("http.cors.allow-headers", "X-Requested-With,Content-Type,Content-Length", (value) -> value, Property.NodeScope);
        public static final Setting<Boolean> SETTING_CORS_ALLOW_CREDENTIALS =
            Setting.boolSetting("http.cors.allow-credentials", false, Property.NodeScope);
        public static final Setting<Integer> SETTING_PIPELINING_MAX_EVENTS =
            Setting.intSetting("http.pipelining.max_events", 10000, Property.NodeScope);
        public static final Setting<Boolean> SETTING_HTTP_COMPRESSION =
            Setting.boolSetting("http.compression", true, Property.NodeScope);
        // we intentionally use a different compression level as Netty here as our benchmarks have shown that a compression level of 3 is the
        // best compromise between reduction in network traffic and added latency. For more details please check #7309.
        public static final Setting<Integer> SETTING_HTTP_COMPRESSION_LEVEL =
            Setting.intSetting("http.compression_level", 3, Property.NodeScope);
        public static final Setting<List<String>> SETTING_HTTP_HOST =
            listSetting("http.host", emptyList(), Function.identity(), Property.NodeScope);
        public static final Setting<List<String>> SETTING_HTTP_PUBLISH_HOST =
            listSetting("http.publish_host", SETTING_HTTP_HOST, Function.identity(), Property.NodeScope);
        public static final Setting<List<String>> SETTING_HTTP_BIND_HOST =
            listSetting("http.bind_host", SETTING_HTTP_HOST, Function.identity(), Property.NodeScope);
    
        public static final Setting<PortsRange> SETTING_HTTP_PORT =
            new Setting<>("http.port", "9200-9300", PortsRange::new, Property.NodeScope);
        public static final Setting<Integer> SETTING_HTTP_PUBLISH_PORT =
            Setting.intSetting("http.publish_port", -1, -1, Property.NodeScope);
        public static final Setting<Boolean> SETTING_HTTP_DETAILED_ERRORS_ENABLED =
            Setting.boolSetting("http.detailed_errors.enabled", true, Property.NodeScope);
        public static final Setting<Boolean> SETTING_HTTP_CONTENT_TYPE_REQUIRED =
            new Setting<>("http.content_type.required", (s) -> Boolean.toString(true), (s) -> {
                final boolean value = Booleans.parseBoolean(s);
                if (value == false) {
                    throw new IllegalArgumentException("http.content_type.required cannot be set to false. It exists only to make a rolling" +
                        " upgrade easier");
                }
                return true;
            }, Property.NodeScope, Property.Deprecated);
        public static final Setting<ByteSizeValue> SETTING_HTTP_MAX_CONTENT_LENGTH =
            Setting.byteSizeSetting(
                    "http.max_content_length",
                    new ByteSizeValue(100, ByteSizeUnit.MB),
                    new ByteSizeValue(0, ByteSizeUnit.BYTES),
                    new ByteSizeValue(Integer.MAX_VALUE, ByteSizeUnit.BYTES),
                    Property.NodeScope);
        public static final Setting<ByteSizeValue> SETTING_HTTP_MAX_CHUNK_SIZE =
            Setting.byteSizeSetting("http.max_chunk_size", new ByteSizeValue(8, ByteSizeUnit.KB), Property.NodeScope);
        public static final Setting<ByteSizeValue> SETTING_HTTP_MAX_HEADER_SIZE =
            Setting.byteSizeSetting("http.max_header_size", new ByteSizeValue(8, ByteSizeUnit.KB), Property.NodeScope);
        public static final Setting<Integer> SETTING_HTTP_MAX_WARNING_HEADER_COUNT =
            Setting.intSetting("http.max_warning_header_count", -1, -1, Property.NodeScope);
        public static final Setting<ByteSizeValue> SETTING_HTTP_MAX_WARNING_HEADER_SIZE =
            Setting.byteSizeSetting("http.max_warning_header_size", new ByteSizeValue(-1), Property.NodeScope);
        public static final Setting<ByteSizeValue> SETTING_HTTP_MAX_INITIAL_LINE_LENGTH =
            Setting.byteSizeSetting("http.max_initial_line_length", new ByteSizeValue(4, ByteSizeUnit.KB), Property.NodeScope);
        // don't reset cookies by default, since I don't think we really need to
        // note, parsing cookies was fixed in netty 3.5.1 regarding stack allocation, but still, currently, we don't need cookies
        public static final Setting<Boolean> SETTING_HTTP_RESET_COOKIES =
            Setting.boolSetting("http.reset_cookies", false, Property.NodeScope);
    
        // A default of 0 means that by default there is no read timeout
        public static final Setting<TimeValue> SETTING_HTTP_READ_TIMEOUT =
            Setting.timeSetting("http.read_timeout", new TimeValue(0), new TimeValue(0), Property.NodeScope);
    
        public static final Setting<Boolean> SETTING_HTTP_TCP_NO_DELAY =
            boolSetting("http.tcp_no_delay", NetworkService.TCP_NO_DELAY, Setting.Property.NodeScope);
        public static final Setting<Boolean> SETTING_HTTP_TCP_KEEP_ALIVE =
            boolSetting("http.tcp.keep_alive", NetworkService.TCP_KEEP_ALIVE, Setting.Property.NodeScope);
        public static final Setting<Boolean> SETTING_HTTP_TCP_REUSE_ADDRESS =
            boolSetting("http.tcp.reuse_address", NetworkService.TCP_REUSE_ADDRESS, Setting.Property.NodeScope);
        public static final Setting<ByteSizeValue> SETTING_HTTP_TCP_SEND_BUFFER_SIZE =
            Setting.byteSizeSetting("http.tcp.send_buffer_size", NetworkService.TCP_SEND_BUFFER_SIZE, Setting.Property.NodeScope);
        public static final Setting<ByteSizeValue> SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE =
            Setting.byteSizeSetting("http.tcp.receive_buffer_size", NetworkService.TCP_RECEIVE_BUFFER_SIZE, Setting.Property.NodeScope);
    
        private HttpTransportSettings() {
        }
    }

    3.使用Netty4HttpServerTransport

    protected void bindServer() {
            // Bind and start to accept incoming connections.
            InetAddress hostAddresses[];
            try {
                hostAddresses = networkService.resolveBindHostAddresses(bindHosts);
            } catch (IOException e) {
                throw new BindHttpException("Failed to resolve host [" + Arrays.toString(bindHosts) + "]", e);
            }
    
            List<TransportAddress> boundAddresses = new ArrayList<>(hostAddresses.length);
            for (InetAddress address : hostAddresses) {
                boundAddresses.add(bindAddress(address));
            }
    
            final InetAddress publishInetAddress;
            try {
                publishInetAddress = networkService.resolvePublishHostAddresses(publishHosts);
            } catch (Exception e) {
                throw new BindTransportException("Failed to resolve publish address", e);
            }
    
            final int publishPort = resolvePublishPort(settings, boundAddresses, publishInetAddress);
            TransportAddress publishAddress = new TransportAddress(new InetSocketAddress(publishInetAddress, publishPort));
            this.boundAddress = new BoundTransportAddress(boundAddresses.toArray(new TransportAddress[0]), publishAddress);
            logger.info("{}", boundAddress);
        }

    4.http消息处理

    4.1 RestController 请求分发器

    @Override
        public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {
            if (request.rawPath().equals("/favicon.ico")) {
                handleFavicon(request, channel);
                return;
            }
            try {
                tryAllHandlers(request, channel, threadContext);
            } catch (Exception e) {
                try {
                    channel.sendResponse(new BytesRestResponse(channel, e));
                } catch (Exception inner) {
                    inner.addSuppressed(e);
                    logger.error(() ->
                        new ParameterizedMessage("failed to send failure response for uri [{}]", request.uri()), inner);
                }
            }
        }

    4.2 处理request的类RestHandler

    以search为例

    其具体实现为:RestSearchAction

        @Override
        public final void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
            // prepare the request for execution; has the side effect of touching the request parameters
            final RestChannelConsumer action = prepareRequest(request, client);
    
            // validate unconsumed params, but we must exclude params used to format the response
            // use a sorted set so the unconsumed parameters appear in a reliable sorted order
            final SortedSet<String> unconsumedParams =
                request.unconsumedParams().stream().filter(p -> !responseParams().contains(p)).collect(Collectors.toCollection(TreeSet::new));
    
            // validate the non-response params
            if (!unconsumedParams.isEmpty()) {
                final Set<String> candidateParams = new HashSet<>();
                candidateParams.addAll(request.consumedParams());
                candidateParams.addAll(responseParams());
                throw new IllegalArgumentException(unrecognized(request, unconsumedParams, candidateParams, "parameter"));
            }
    
            usageCount.increment();
            // execute the action
            action.accept(channel);
        }

    5.transport概述

    The transport module is used for internal communication between nodes within the cluster. Each call that goes from one node to the other uses the transport module (for example, when an HTTP GET request is processed by one node, and should actually be processed by another node that holds the data).
    
    The transport mechanism is completely asynchronous in nature, meaning that there is no blocking thread waiting for a response. The benefit of using asynchronous communication is first solving the C10k problem, as well as being the ideal solution for scatter (broadcast) / gather operations such as search in Elasticsearch.

    6.tansport配置类TcpTransport

     public static final String TRANSPORT_SERVER_WORKER_THREAD_NAME_PREFIX = "transport_server_worker";
        public static final String TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX = "transport_client_boss";
    
        public static final Setting<List<String>> HOST =
            listSetting("transport.host", emptyList(), Function.identity(), Setting.Property.NodeScope);
        public static final Setting<List<String>> BIND_HOST =
            listSetting("transport.bind_host", HOST, Function.identity(), Setting.Property.NodeScope);
        public static final Setting<List<String>> PUBLISH_HOST =
            listSetting("transport.publish_host", HOST, Function.identity(), Setting.Property.NodeScope);
        public static final Setting<String> PORT =
            new Setting<>("transport.tcp.port", "9300-9400", Function.identity(), Setting.Property.NodeScope);
        public static final Setting<Integer> PUBLISH_PORT =
            intSetting("transport.publish_port", -1, -1, Setting.Property.NodeScope);
        public static final String DEFAULT_PROFILE = "default";
        // the scheduled internal ping interval setting, defaults to disabled (-1)
        public static final Setting<TimeValue> PING_SCHEDULE =
            timeSetting("transport.ping_schedule", TimeValue.timeValueSeconds(-1), Setting.Property.NodeScope);
        public static final Setting<Boolean> TCP_NO_DELAY =
            boolSetting("transport.tcp_no_delay", NetworkService.TCP_NO_DELAY, Setting.Property.NodeScope);
        public static final Setting<Boolean> TCP_KEEP_ALIVE =
            boolSetting("transport.tcp.keep_alive", NetworkService.TCP_KEEP_ALIVE, Setting.Property.NodeScope);
        public static final Setting<Boolean> TCP_REUSE_ADDRESS =
            boolSetting("transport.tcp.reuse_address", NetworkService.TCP_REUSE_ADDRESS, Setting.Property.NodeScope);
        public static final Setting<ByteSizeValue> TCP_SEND_BUFFER_SIZE =
            Setting.byteSizeSetting("transport.tcp.send_buffer_size", NetworkService.TCP_SEND_BUFFER_SIZE, Setting.Property.NodeScope);
        public static final Setting<ByteSizeValue> TCP_RECEIVE_BUFFER_SIZE =
            Setting.byteSizeSetting("transport.tcp.receive_buffer_size", NetworkService.TCP_RECEIVE_BUFFER_SIZE, Setting.Property.NodeScope);
    
    
        public static final Setting.AffixSetting<Boolean> TCP_NO_DELAY_PROFILE = affixKeySetting("transport.profiles.", "tcp_no_delay",
            key -> boolSetting(key, TcpTransport.TCP_NO_DELAY, Setting.Property.NodeScope));
        public static final Setting.AffixSetting<Boolean> TCP_KEEP_ALIVE_PROFILE = affixKeySetting("transport.profiles.", "tcp_keep_alive",
            key -> boolSetting(key, TcpTransport.TCP_KEEP_ALIVE, Setting.Property.NodeScope));
        public static final Setting.AffixSetting<Boolean> TCP_REUSE_ADDRESS_PROFILE = affixKeySetting("transport.profiles.", "reuse_address",
            key -> boolSetting(key, TcpTransport.TCP_REUSE_ADDRESS, Setting.Property.NodeScope));
        public static final Setting.AffixSetting<ByteSizeValue> TCP_SEND_BUFFER_SIZE_PROFILE = affixKeySetting("transport.profiles.",
            "send_buffer_size", key -> Setting.byteSizeSetting(key, TcpTransport.TCP_SEND_BUFFER_SIZE, Setting.Property.NodeScope));
        public static final Setting.AffixSetting<ByteSizeValue> TCP_RECEIVE_BUFFER_SIZE_PROFILE = affixKeySetting("transport.profiles.",
            "receive_buffer_size", key -> Setting.byteSizeSetting(key, TcpTransport.TCP_RECEIVE_BUFFER_SIZE, Setting.Property.NodeScope));
    
        public static final Setting.AffixSetting<List<String>> BIND_HOST_PROFILE = affixKeySetting("transport.profiles.", "bind_host",
            key -> listSetting(key, BIND_HOST, Function.identity(), Setting.Property.NodeScope));
        public static final Setting.AffixSetting<List<String>> PUBLISH_HOST_PROFILE = affixKeySetting("transport.profiles.", "publish_host",
            key -> listSetting(key, PUBLISH_HOST, Function.identity(), Setting.Property.NodeScope));
        public static final Setting.AffixSetting<String> PORT_PROFILE = affixKeySetting("transport.profiles.", "port",
            key -> new Setting<>(key, PORT, Function.identity(), Setting.Property.NodeScope));
        public static final Setting.AffixSetting<Integer> PUBLISH_PORT_PROFILE = affixKeySetting("transport.profiles.", "publish_port",
            key -> intSetting(key, -1, -1, Setting.Property.NodeScope));

    7.配置使用Netty4Transport

        @Override
        protected void doStart() {
            boolean success = false;
            try {
                clientBootstrap = createClientBootstrap();
                if (NetworkService.NETWORK_SERVER.get(settings)) {
                    for (ProfileSettings profileSettings : profileSettings) {
                        createServerBootstrap(profileSettings);
                        bindServer(profileSettings);
                    }
                }
                super.doStart();
                success = true;
            } finally {
                if (success == false) {
                    doStop();
                }
            }
        }

    8.服务端启动TransportService

        @Override
        protected void doStart() {
            transport.addMessageListener(this);
            connectionManager.addListener(this);
            transport.start();
            if (transport.boundAddress() != null && logger.isInfoEnabled()) {
                logger.info("{}", transport.boundAddress());
                for (Map.Entry<String, BoundTransportAddress> entry : transport.profileBoundAddresses().entrySet()) {
                    logger.info("profile [{}]: {}", entry.getKey(), entry.getValue());
                }
            }
            localNode = localNodeFactory.apply(transport.boundAddress());
    
            if (connectToRemoteCluster) {
                // here we start to connect to the remote clusters
                remoteClusterService.initializeRemoteClusters();
            }
        }

    7.客户端启动TransportClient

        /**
         * Creates a new TransportClient with the given settings and plugins
         */
        public TransportClient(Settings settings, Collection<Class<? extends Plugin>> plugins) {
            this(buildTemplate(settings, Settings.EMPTY, plugins, null));
        }
    
    private static ClientTemplate buildTemplate(Settings providedSettings, Settings defaultSettings,
                                                    Collection<Class<? extends Plugin>> plugins, HostFailureListener failureListner) {
            if (Node.NODE_NAME_SETTING.exists(providedSettings) == false) {
                providedSettings = Settings.builder().put(providedSettings).put(Node.NODE_NAME_SETTING.getKey(), "_client_").build();
            }
            final PluginsService pluginsService = newPluginService(providedSettings, plugins);
            final Settings settings =
                    Settings.builder()
                            .put(defaultSettings)
                            .put(pluginsService.updatedSettings())
                            .put(TcpTransport.FEATURE_PREFIX + "." + TRANSPORT_CLIENT_FEATURE, true)
                            .build();
            final List<Closeable> resourcesToClose = new ArrayList<>();
            final ThreadPool threadPool = new ThreadPool(settings);
            resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
            final NetworkService networkService = new NetworkService(Collections.emptyList());
            try {
                final List<Setting<?>> additionalSettings = new ArrayList<>(pluginsService.getPluginSettings());
                final List<String> additionalSettingsFilter = new ArrayList<>(pluginsService.getPluginSettingsFilter());
                for (final ExecutorBuilder<?> builder : threadPool.builders()) {
                    additionalSettings.addAll(builder.getRegisteredSettings());
                }
                SettingsModule settingsModule =
                        new SettingsModule(settings, additionalSettings, additionalSettingsFilter, Collections.emptySet());
    
                SearchModule searchModule = new SearchModule(settings, true, pluginsService.filterPlugins(SearchPlugin.class));
                IndicesModule indicesModule = new IndicesModule(Collections.emptyList());
                List<NamedWriteableRegistry.Entry> entries = new ArrayList<>();
                entries.addAll(NetworkModule.getNamedWriteables());
                entries.addAll(searchModule.getNamedWriteables());
                entries.addAll(indicesModule.getNamedWriteables());
                entries.addAll(ClusterModule.getNamedWriteables());
                entries.addAll(pluginsService.filterPlugins(Plugin.class).stream()
                                             .flatMap(p -> p.getNamedWriteables().stream())
                                             .collect(Collectors.toList()));
                NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(entries);
                NamedXContentRegistry xContentRegistry = new NamedXContentRegistry(Stream.of(
                        searchModule.getNamedXContents().stream(),
                        pluginsService.filterPlugins(Plugin.class).stream()
                                .flatMap(p -> p.getNamedXContent().stream())
                        ).flatMap(Function.identity()).collect(toList()));
    
                ModulesBuilder modules = new ModulesBuilder();
                // plugin modules must be added here, before others or we can get crazy injection errors...
                for (Module pluginModule : pluginsService.createGuiceModules()) {
                    modules.add(pluginModule);
                }
                modules.add(b -> b.bind(ThreadPool.class).toInstance(threadPool));
                ActionModule actionModule = new ActionModule(true, settings, null, settingsModule.getIndexScopedSettings(),
                        settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(), threadPool,
                        pluginsService.filterPlugins(ActionPlugin.class), null, null, null);
                modules.add(actionModule);
    
                CircuitBreakerService circuitBreakerService = Node.createCircuitBreakerService(settingsModule.getSettings(),
                    settingsModule.getClusterSettings());
                resourcesToClose.add(circuitBreakerService);
                PageCacheRecycler pageCacheRecycler = new PageCacheRecycler(settings);
                BigArrays bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService);
                resourcesToClose.add(bigArrays);
                modules.add(settingsModule);
                NetworkModule networkModule = new NetworkModule(settings, true, pluginsService.filterPlugins(NetworkPlugin.class), threadPool,
                    bigArrays, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, null);
                final Transport transport = networkModule.getTransportSupplier().get();
                final TransportService transportService = new TransportService(settings, transport, threadPool,
                    networkModule.getTransportInterceptor(),
                    boundTransportAddress -> DiscoveryNode.createLocal(settings, new TransportAddress(TransportAddress.META_ADDRESS, 0),
                        UUIDs.randomBase64UUID()), null, Collections.emptySet());
                modules.add((b -> {
                    b.bind(BigArrays.class).toInstance(bigArrays);
                    b.bind(PluginsService.class).toInstance(pluginsService);
                    b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService);
                    b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry);
                    b.bind(Transport.class).toInstance(transport);
                    b.bind(TransportService.class).toInstance(transportService);
                    b.bind(NetworkService.class).toInstance(networkService);
                }));
    
                Injector injector = modules.createInjector();
                final TransportClientNodesService nodesService =
                    new TransportClientNodesService(settings, transportService, threadPool, failureListner == null
                        ? (t, e) -> {} : failureListner);
    
                // construct the list of client actions
                final List<ActionPlugin> actionPlugins = pluginsService.filterPlugins(ActionPlugin.class);
                final List<Action> clientActions =
                        actionPlugins.stream().flatMap(p -> p.getClientActions().stream()).collect(Collectors.toList());
                // add all the base actions
                final List<? extends Action<?>> baseActions =
                        actionModule.getActions().values().stream().map(ActionPlugin.ActionHandler::getAction).collect(Collectors.toList());
                clientActions.addAll(baseActions);
                final TransportProxyClient proxy = new TransportProxyClient(settings, transportService, nodesService, clientActions);
    
                List<LifecycleComponent> pluginLifecycleComponents = new ArrayList<>(pluginsService.getGuiceServiceClasses().stream()
                    .map(injector::getInstance).collect(Collectors.toList()));
                resourcesToClose.addAll(pluginLifecycleComponents);
    
                transportService.start();
                transportService.acceptIncomingRequests();
    
                ClientTemplate transportClient = new ClientTemplate(injector, pluginLifecycleComponents, nodesService, proxy,
                    namedWriteableRegistry);
                resourcesToClose.clear();
                return transportClient;
            } finally {
                IOUtils.closeWhileHandlingException(resourcesToClose);
            }
        }
  • 相关阅读:
    爱斯达M2C服装定制系统介绍—在线播放—优酷网,视频高清在线观看
    衣云网,依托深圳发达的服装产业,致力于整合服装上下游各个服装利益者,以自主研发的服装软件为核心,聚集服装设计师、纸样师和版师,这三类会员在衣云网开设工作室上传原创服装纸样,推荐相应纸样的面料吸引大批的面辅料商和服装企业。从而形成一个良性的服装生态圈。
    男士休闲装设计
    新郎胸花佩戴法则 小胸花也有大学问_新郎_婚礼时光-关注婚礼的一切,分享最美好的时光。
    上海游侠电动汽车团队招募。iOS,Android,产品经理以及 SEVER 端工程师
    沈晨:衣冠自成气场·都市周报
    寻访上海西服定制店_Enjoy·雅趣频道_财新网
    金错刀对话口袋购物王珂:找到痛点,确认卖点,制造爆点!
    Wish | IT桔子
    Tradesy | IT桔子
  • 原文地址:https://www.cnblogs.com/davidwang456/p/10114126.html
Copyright © 2011-2022 走看看