zoukankan      html  css  js  c++  java
  • hazelcast-jet 开源分布式流以及批处理框架

    hazelcast-jet 是hazelcast 团队开源的分布式流以及批处理框架,相比一些其他流以及批处理框架来说,只需要依赖jar包(Siddhi 也是不错的选择)
    当然hazelcast-jet 同时支持流处理以及批处理,其他的一些stream 处理的平台(工具)streamsets,apache nifi 相对复杂了,而且很多时候我们
    并需要这么复杂的东西,kafka stream 就相对只是kafaka 的生态了,很多时候也复杂化了系统的开发
    以下是一个简单的学习试用

    maven 项目

    • 目录结构
     
    ├── pom.xml
    └── src
        ├── main
        │   ├── java
        │   │   └── com
        │   │       └── dalong
        │   │           ├── Application.java
        │   │          
        │   └── resources
        │       └── mybooks
        │           └── books
     
     
    • pom.xml
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.dalong</groupId>
        <artifactId>myid</artifactId>
        <version>1.0-SNAPSHOT</version>
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <encoding>UTF-8</encoding>
            <java.version>1.8</java.version>
            <maven.compiler.source>1.8</maven.compiler.source>
            <maven.compiler.target>1.8</maven.compiler.target>
        </properties>
        <dependencies>
            <dependency>
                <groupId>com.hazelcast.jet</groupId>
                <artifactId>hazelcast-jet</artifactId>
                <version>4.1</version>
            </dependency>
        </dependencies>
    </project>
     
    • 文本内
    demo
    app
    rong
    fengliang
    rong
    demo
    app
    rong
    rong
     
     
    • batch 模式
    package com.dalong;
    import com.hazelcast.jet.Jet;
    import com.hazelcast.jet.JetInstance;
    import com.hazelcast.jet.Traversers;
    import com.hazelcast.jet.aggregate.AggregateOperations;
    import com.hazelcast.jet.pipeline.Pipeline;
    import com.hazelcast.jet.pipeline.Sinks;
    import com.hazelcast.jet.pipeline.Sources;
    import com.hazelcast.jet.pipeline.WindowDefinition;
    public class Application {
        public static void main(String[] args) {
            String path = "src/main/resources/mybooks";
            // batch mode
            BatchMode(path);
            // stream mode
            // StreamMode(path);
        }
        //
        private static void StreamMode(String path) {
            JetInstance jet = Jet.bootstrappedInstance();
            Pipeline p = Pipeline.create();
            p.readFrom(Sources.fileWatcher(path))
                    .withIngestionTimestamps()
                    .setLocalParallelism(1)
                    .flatMap(line -> Traversers.traverseArray(line.toLowerCase().split("\\W+")))
                    .filter(word -> !word.isEmpty())
                    .groupingKey(word -> word)
                    .window(WindowDefinition.tumbling(4))
                    .aggregate(AggregateOperations.counting())
                    .writeTo(Sinks.logger());
            jet.newJob(p).join();
        }
        private static void BatchMode(String path) {
            JetInstance jet = Jet.bootstrappedInstance();
            Pipeline p = Pipeline.create();
            p.readFrom(Sources.files(path))
                    .flatMap(line -> Traversers.traverseArray(line.toLowerCase().split("\W+")))
                    .filter(word -> !word.isEmpty())
                    .groupingKey(word -> word)
                    .aggregate(AggregateOperations.counting())
                    .writeTo(Sinks.logger());
            jet.newJob(p).join();
        }
    }

    效果

    14:18:02.150 [ INFO] [c.h.j.Jet] Bootstrapped instance requested but application wasn't called from jet submit script. Creating a standalone Jet instance instead.
    14:18:02.278 [ INFO] [c.h.i.c.AbstractConfigLocator] Loading 'hazelcast-jet-default.yaml' from the classpath.
    14:18:02.299 [ INFO] [c.h.j.c.JetConfig] jet.home is /Users/dalong/Downloads/myid
    14:18:02.761 [ INFO] [c.h.i.c.AbstractConfigLocator] Loading 'hazelcast-jet-member-default.yaml' from the classpath.
    14:18:03.017 [ INFO] [c.h.i.AddressPicker] Prefer IPv4 stack is true, prefer IPv6 addresses is false
    14:18:03.078 [ INFO] [c.h.i.AddressPicker] Picked [10.6.205.88]:5701, using socket ServerSocket[addr=/0:0:0:0:0:0:0:0,localport=5701], bind any local is true
    14:18:03.186 [ INFO] [c.h.system] Hazelcast Jet 4.2 (20200714 - 0e841ef) starting at [10.6.205.88]:5701
    14:18:03.188 [ INFO] [c.h.system] Cluster name: jet
    14:18:03.189 [ INFO] [c.h.system] 
        o   o   o   o---o o---o o     o---o   o   o---o o-o-o        o o---o o-o-o
        |   |  /      /  |     |     |      /   |       |          | |       |
        o---o o---o   o   o-o   |     o     o---o o---o   |          | o-o     |
        |   | |   |  /    |     |     |     |   |     |   |         | |       |
        o   o o   o o---o o---o o---o o---o o   o o---o   o       o--o o---o   o
    14:18:03.191 [ INFO] [c.h.system] Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved.
    14:18:04.425 [ INFO] [c.h.s.i.o.i.BackpressureRegulator] Backpressure is disabled
    14:18:05.926 [ WARN] [c.h.c.CPSubsystem] CP Subsystem is not enabled. CP data structures will operate in UNSAFE mode! Please note that UNSAFE mode will not provide strong consistency guarantees.
    14:18:07.140 [ INFO] [c.h.j.i.e.TaskletExecutionService] Creating idler with jet.idle.cooperative.min.microseconds=25µs,jet.idle.cooperative.max.microseconds=500µs
    14:18:07.143 [ INFO] [c.h.j.i.e.TaskletExecutionService] Creating idler with jet.idle.noncooperative.min.microseconds=25µs,jet.idle.noncooperative.max.microseconds=5000µs
    14:18:07.373 [ INFO] [c.h.j.i.JetService] Setting number of cooperative threads and default parallelism to 8
    14:18:07.386 [ INFO] [c.h.s.i.o.i.OperationExecutorImpl] Starting 8 partition threads and 5 generic threads (1 dedicated for priority tasks)
    14:18:07.391 [ INFO] [c.h.i.d.Diagnostics] Diagnostics disabled. To enable add -Dhazelcast.diagnostics.enabled=true to the JVM arguments.
    14:18:07.416 [ INFO] [c.h.c.LifecycleService] [10.6.205.88]:5701 is STARTING
    14:18:07.487 [ WARN] [c.h.i.i.Node] No join method is enabled! Starting standalone.
    14:18:07.534 [ INFO] [c.h.c.LifecycleService] [10.6.205.88]:5701 is STARTED
    14:18:22.713 [ INFO] [c.h.i.p.i.PartitionStateManager] Initializing cluster partition table arrangement...
    14:18:22.910 [ INFO] [c.h.j.i.JobCoordinationService] Starting job 04a8-86cc-9300-0001 based on submit request
    14:18:22.930 [ INFO] [c.h.j.i.MasterJobContext] Didn't find any snapshot to restore for job '04a8-86cc-9300-0001', execution 04a8-86cc-9301-0001
    14:18:22.931 [ INFO] [c.h.j.i.MasterJobContext] Start executing job '04a8-86cc-9300-0001', execution 04a8-86cc-9301-0001, execution graph in DOT format:
    digraph DAG {
        "filesSource(src/main/resources/mybooks/*)" [localParallelism=4];
        "fused(flat-map, filter)" [localParallelism=8];
        "group-and-aggregate-prepare" [localParallelism=8];
        "group-and-aggregate" [localParallelism=8];
        "loggerSink" [localParallelism=1];
        "filesSource(src/main/resources/mybooks/*)" -> "fused(flat-map, filter)" [queueSize=1024];
        "fused(flat-map, filter)" -> "group-and-aggregate-prepare" [label="partitioned", queueSize=1024];
        subgraph cluster_0 {
            "group-and-aggregate-prepare" -> "group-and-aggregate" [label="distributed-partitioned", queueSize=1024];
        }
        "group-and-aggregate" -> "loggerSink" [queueSize=1024];
    }
    HINT: You can use graphviz or http://viz-js.com to visualize the printed graph.
    14:18:23.050 [ INFO] [c.h.j.i.JobExecutionService] Execution plan for jobId=04a8-86cc-9300-0001, jobName='04a8-86cc-9300-0001', executionId=04a8-86cc-9301-0001 initialized
    14:18:23.056 [ INFO] [c.h.j.i.JobExecutionService] Start execution of job '04a8-86cc-9300-0001', execution 04a8-86cc-9301-0001 from coordinator [10.6.205.88]:5701
    14:18:23.113 [ INFO] [c.h.j.i.c.W.loggerSink#0] rong=4
    14:18:23.114 [ INFO] [c.h.j.i.c.W.loggerSink#0] fengliang=1
    14:18:23.114 [ INFO] [c.h.j.i.c.W.loggerSink#0] app=2
    14:18:23.114 [ INFO] [c.h.j.i.c.W.loggerSink#0] demo=2
    14:18:23.121 [ INFO] [c.h.j.i.MasterJobContext] Execution of job '04a8-86cc-9300-0001', execution 04a8-86cc-9301-0001 completed successfully
        Start time: 2020-07-15T14:18:22.912
        Duration: 208 ms
        To see additional job metrics enable JobConfig.storeMetricsAfterJobCompletion
     
     
    • stream 模式

      注意运行模式,需要先启动,然后创建一个文件,填充部分内容,然后就可以看到内容了,同时取消对于streamMpde

     public static void main(String[] args) {
            String path = "src/main/resources/mybooks";
            // batch mode
            // BatchMode(path);
            // stream mode
            StreamMode(path);
        }
     
     

    效果

    14:59:23.648 [ INFO] [c.h.j.Jet] Bootstrapped instance requested but application wasn't called from jet submit script. Creating a standalone Jet instance instead.
    14:59:23.733 [ INFO] [c.h.i.c.AbstractConfigLocator] Loading 'hazelcast-jet-default.yaml' from the classpath.
    14:59:23.737 [ INFO] [c.h.j.c.JetConfig] jet.home is /Users/dalong/Downloads/myid
    14:59:24.023 [ INFO] [c.h.i.c.AbstractConfigLocator] Loading 'hazelcast-jet-member-default.yaml' from the classpath.
    14:59:24.116 [ INFO] [c.h.i.AddressPicker] Prefer IPv4 stack is true, prefer IPv6 addresses is false
    14:59:24.156 [ INFO] [c.h.i.AddressPicker] Picked [10.6.205.88]:5701, using socket ServerSocket[addr=/0:0:0:0:0:0:0:0,localport=5701], bind any local is true
    14:59:24.203 [ INFO] [c.h.system] Hazelcast Jet 4.2 (20200714 - 0e841ef) starting at [10.6.205.88]:5701
    14:59:24.204 [ INFO] [c.h.system] Cluster name: jet
    14:59:24.205 [ INFO] [c.h.system] 
        o   o   o   o---o o---o o     o---o   o   o---o o-o-o        o o---o o-o-o
        |   |  /      /  |     |     |      /   |       |          | |       |
        o---o o---o   o   o-o   |     o     o---o o---o   |          | o-o     |
        |   | |   |  /    |     |     |     |   |     |   |         | |       |
        o   o o   o o---o o---o o---o o---o o   o o---o   o       o--o o---o   o
    14:59:24.205 [ INFO] [c.h.system] Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved.
    14:59:24.523 [ INFO] [c.h.s.i.o.i.BackpressureRegulator] Backpressure is disabled
    14:59:24.830 [ WARN] [c.h.c.CPSubsystem] CP Subsystem is not enabled. CP data structures will operate in UNSAFE mode! Please note that UNSAFE mode will not provide strong consistency guarantees.
    14:59:25.218 [ INFO] [c.h.j.i.e.TaskletExecutionService] Creating idler with jet.idle.cooperative.min.microseconds=25µs,jet.idle.cooperative.max.microseconds=500µs
    14:59:25.219 [ INFO] [c.h.j.i.e.TaskletExecutionService] Creating idler with jet.idle.noncooperative.min.microseconds=25µs,jet.idle.noncooperative.max.microseconds=5000µs
    14:59:25.281 [ INFO] [c.h.j.i.JetService] Setting number of cooperative threads and default parallelism to 8
    14:59:25.285 [ INFO] [c.h.s.i.o.i.OperationExecutorImpl] Starting 8 partition threads and 5 generic threads (1 dedicated for priority tasks)
    14:59:25.287 [ INFO] [c.h.i.d.Diagnostics] Diagnostics disabled. To enable add -Dhazelcast.diagnostics.enabled=true to the JVM arguments.
    14:59:25.297 [ INFO] [c.h.c.LifecycleService] [10.6.205.88]:5701 is STARTING
    14:59:25.322 [ WARN] [c.h.i.i.Node] No join method is enabled! Starting standalone.
    14:59:25.336 [ INFO] [c.h.c.LifecycleService] [10.6.205.88]:5701 is STARTED
    14:59:25.562 [ INFO] [c.h.i.p.i.PartitionStateManager] Initializing cluster partition table arrangement...
    14:59:25.685 [ INFO] [c.h.j.i.JobCoordinationService] Starting job 04a8-9031-ad80-0001 based on submit request
    14:59:25.700 [ INFO] [c.h.j.i.MasterJobContext] Didn't find any snapshot to restore for job '04a8-9031-ad80-0001', execution 04a8-9031-ad81-0001
    14:59:25.700 [ INFO] [c.h.j.i.MasterJobContext] Start executing job '04a8-9031-ad80-0001', execution 04a8-9031-ad81-0001, execution graph in DOT format:
    digraph DAG {
        "fileWatcherSource(src/main/resources/mybooks/*)" [localParallelism=1];
        "fileWatcherSource(src/main/resources/mybooks/*)-add-timestamps" [localParallelism=1];
        "fused(flat-map, filter)" [localParallelism=8];
        "sliding-window-prepare" [localParallelism=8];
        "sliding-window" [localParallelism=8];
        "loggerSink" [localParallelism=1];
        "fileWatcherSource(src/main/resources/mybooks/*)" -> "fileWatcherSource(src/main/resources/mybooks/*)-add-timestamps" [label="isolated", queueSize=1024];
        "fileWatcherSource(src/main/resources/mybooks/*)-add-timestamps" -> "fused(flat-map, filter)" [queueSize=1024];
        "fused(flat-map, filter)" -> "sliding-window-prepare" [label="partitioned", queueSize=1024];
        subgraph cluster_0 {
            "sliding-window-prepare" -> "sliding-window" [label="distributed-partitioned", queueSize=1024];
        }
        "sliding-window" -> "loggerSink" [queueSize=1024];
    }
    HINT: You can use graphviz or http://viz-js.com to visualize the printed graph.
    14:59:25.838 [ INFO] [c.h.j.i.JobExecutionService] Execution plan for jobId=04a8-9031-ad80-0001, jobName='04a8-9031-ad80-0001', executionId=04a8-9031-ad81-0001 initialized
    14:59:25.856 [ INFO] [c.h.j.i.JobExecutionService] Start execution of job '04a8-9031-ad80-0001', execution 04a8-9031-ad81-0001 from coordinator [10.6.205.88]:5701
    14:59:25.935 [ INFO] [c.h.j.i.c.S.fileWatcherSource(src/main/resources/mybooks/*)#0] Started to watch directory: src/main/resources/mybooks
    14:59:41.981 [ INFO] [c.h.j.i.c.W.loggerSink#0] KeyedWindowResult{start=14:59:41.936, end=14:59:41.940, key='rong', value='21', isEarly=false}
    14:59:41.982 [ INFO] [c.h.j.i.c.W.loggerSink#0] KeyedWindowResult{start=14:59:41.936, end=14:59:41.940, key='fengliang', value='3', isEarly=false}
    14:59:41.982 [ INFO] [c.h.j.i.c.W.loggerSink#0] KeyedWindowResult{start=14:59:41.936, end=14:59:41.940, key='app', value='7', isEarly=false}
    14:59:41.982 [ INFO] [c.h.j.i.c.W.loggerSink#0] KeyedWindowResult{start=14:59:41.936, end=14:59:41.940, key='emo', value='3', isEarly=false}
    14:59:41.983 [ INFO] [c.h.j.i.c.W.loggerSink#0] KeyedWindowResult{start=14:59:41.936, end=14:59:41.940, key='dmo', value='3', isEarly=false}
    14:59:41.983 [ INFO] [c.h.j.i.c.W.loggerSink#0] KeyedWindowResult{start=14:59:41.936, end=14:59:41.940, key='demo', value='22', isEarly=false}
     

    支持的特性

    • 低延迟
    • 零依赖
    • 云原生支持
    • 弹性
    • 容错
    • 内存存储
    • 支持多种source以及sink kafka cdc,jdbc,redis...
    • 动态节点发现

    说明

    hazelcast-jet 的好多特性,当然和hazelcast自身在搞分布式cache上的工业实践有关系,比如分布式内存存储,动态发现。。。

    参考资料

    https://jet-start.sh/
    https://github.com/hazelcast/hazelcast-jet
    https://github.com/siddhi-io/siddhi
    https://github.com/espertechinc/esper

  • 相关阅读:
    反向代理实例
    nginx常用命令和配置
    nginx的安装
    Can Live View boot up images acquired from 64bit OS evidence?
    What is the behavior of lnk files?
    EnCase v7 search hits in compound files?
    How to search compound files
    iOS 8.3 JB ready
    Sunglasses
    现代福尔摩斯
  • 原文地址:https://www.cnblogs.com/rongfengliang/p/13305702.html
Copyright © 2011-2022 走看看