zoukankan      html  css  js  c++  java
  • hazelcast-jet 开源分布式流以及批处理框架

    hazelcast-jet 是hazelcast 团队开源的分布式流以及批处理框架,相比一些其他流以及批处理框架来说,只需要依赖jar包(Siddhi 也是不错的选择)
    当然hazelcast-jet 同时支持流处理以及批处理,其他的一些stream 处理的平台(工具)streamsets,apache nifi 相对复杂了,而且很多时候我们
    并需要这么复杂的东西,kafka stream 就相对只是kafaka 的生态了,很多时候也复杂化了系统的开发
    以下是一个简单的学习试用

    maven 项目

    • 目录结构
     
    ├── pom.xml
    └── src
        ├── main
        │   ├── java
        │   │   └── com
        │   │       └── dalong
        │   │           ├── Application.java
        │   │          
        │   └── resources
        │       └── mybooks
        │           └── books
     
     
    • pom.xml
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.dalong</groupId>
        <artifactId>myid</artifactId>
        <version>1.0-SNAPSHOT</version>
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <encoding>UTF-8</encoding>
            <java.version>1.8</java.version>
            <maven.compiler.source>1.8</maven.compiler.source>
            <maven.compiler.target>1.8</maven.compiler.target>
        </properties>
        <dependencies>
            <dependency>
                <groupId>com.hazelcast.jet</groupId>
                <artifactId>hazelcast-jet</artifactId>
                <version>4.1</version>
            </dependency>
        </dependencies>
    </project>
     
    • 文本内
    demo
    app
    rong
    fengliang
    rong
    demo
    app
    rong
    rong
     
     
    • batch 模式
    package com.dalong;
    import com.hazelcast.jet.Jet;
    import com.hazelcast.jet.JetInstance;
    import com.hazelcast.jet.Traversers;
    import com.hazelcast.jet.aggregate.AggregateOperations;
    import com.hazelcast.jet.pipeline.Pipeline;
    import com.hazelcast.jet.pipeline.Sinks;
    import com.hazelcast.jet.pipeline.Sources;
    import com.hazelcast.jet.pipeline.WindowDefinition;
    public class Application {
        public static void main(String[] args) {
            String path = "src/main/resources/mybooks";
            // batch mode
            BatchMode(path);
            // stream mode
            // StreamMode(path);
        }
        //
        private static void StreamMode(String path) {
            JetInstance jet = Jet.bootstrappedInstance();
            Pipeline p = Pipeline.create();
            p.readFrom(Sources.fileWatcher(path))
                    .withIngestionTimestamps()
                    .setLocalParallelism(1)
                    .flatMap(line -> Traversers.traverseArray(line.toLowerCase().split("\\W+")))
                    .filter(word -> !word.isEmpty())
                    .groupingKey(word -> word)
                    .window(WindowDefinition.tumbling(4))
                    .aggregate(AggregateOperations.counting())
                    .writeTo(Sinks.logger());
            jet.newJob(p).join();
        }
        private static void BatchMode(String path) {
            JetInstance jet = Jet.bootstrappedInstance();
            Pipeline p = Pipeline.create();
            p.readFrom(Sources.files(path))
                    .flatMap(line -> Traversers.traverseArray(line.toLowerCase().split("\W+")))
                    .filter(word -> !word.isEmpty())
                    .groupingKey(word -> word)
                    .aggregate(AggregateOperations.counting())
                    .writeTo(Sinks.logger());
            jet.newJob(p).join();
        }
    }

    效果

    14:18:02.150 [ INFO] [c.h.j.Jet] Bootstrapped instance requested but application wasn't called from jet submit script. Creating a standalone Jet instance instead.
    14:18:02.278 [ INFO] [c.h.i.c.AbstractConfigLocator] Loading 'hazelcast-jet-default.yaml' from the classpath.
    14:18:02.299 [ INFO] [c.h.j.c.JetConfig] jet.home is /Users/dalong/Downloads/myid
    14:18:02.761 [ INFO] [c.h.i.c.AbstractConfigLocator] Loading 'hazelcast-jet-member-default.yaml' from the classpath.
    14:18:03.017 [ INFO] [c.h.i.AddressPicker] Prefer IPv4 stack is true, prefer IPv6 addresses is false
    14:18:03.078 [ INFO] [c.h.i.AddressPicker] Picked [10.6.205.88]:5701, using socket ServerSocket[addr=/0:0:0:0:0:0:0:0,localport=5701], bind any local is true
    14:18:03.186 [ INFO] [c.h.system] Hazelcast Jet 4.2 (20200714 - 0e841ef) starting at [10.6.205.88]:5701
    14:18:03.188 [ INFO] [c.h.system] Cluster name: jet
    14:18:03.189 [ INFO] [c.h.system] 
        o   o   o   o---o o---o o     o---o   o   o---o o-o-o        o o---o o-o-o
        |   |  /      /  |     |     |      /   |       |          | |       |
        o---o o---o   o   o-o   |     o     o---o o---o   |          | o-o     |
        |   | |   |  /    |     |     |     |   |     |   |         | |       |
        o   o o   o o---o o---o o---o o---o o   o o---o   o       o--o o---o   o
    14:18:03.191 [ INFO] [c.h.system] Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved.
    14:18:04.425 [ INFO] [c.h.s.i.o.i.BackpressureRegulator] Backpressure is disabled
    14:18:05.926 [ WARN] [c.h.c.CPSubsystem] CP Subsystem is not enabled. CP data structures will operate in UNSAFE mode! Please note that UNSAFE mode will not provide strong consistency guarantees.
    14:18:07.140 [ INFO] [c.h.j.i.e.TaskletExecutionService] Creating idler with jet.idle.cooperative.min.microseconds=25µs,jet.idle.cooperative.max.microseconds=500µs
    14:18:07.143 [ INFO] [c.h.j.i.e.TaskletExecutionService] Creating idler with jet.idle.noncooperative.min.microseconds=25µs,jet.idle.noncooperative.max.microseconds=5000µs
    14:18:07.373 [ INFO] [c.h.j.i.JetService] Setting number of cooperative threads and default parallelism to 8
    14:18:07.386 [ INFO] [c.h.s.i.o.i.OperationExecutorImpl] Starting 8 partition threads and 5 generic threads (1 dedicated for priority tasks)
    14:18:07.391 [ INFO] [c.h.i.d.Diagnostics] Diagnostics disabled. To enable add -Dhazelcast.diagnostics.enabled=true to the JVM arguments.
    14:18:07.416 [ INFO] [c.h.c.LifecycleService] [10.6.205.88]:5701 is STARTING
    14:18:07.487 [ WARN] [c.h.i.i.Node] No join method is enabled! Starting standalone.
    14:18:07.534 [ INFO] [c.h.c.LifecycleService] [10.6.205.88]:5701 is STARTED
    14:18:22.713 [ INFO] [c.h.i.p.i.PartitionStateManager] Initializing cluster partition table arrangement...
    14:18:22.910 [ INFO] [c.h.j.i.JobCoordinationService] Starting job 04a8-86cc-9300-0001 based on submit request
    14:18:22.930 [ INFO] [c.h.j.i.MasterJobContext] Didn't find any snapshot to restore for job '04a8-86cc-9300-0001', execution 04a8-86cc-9301-0001
    14:18:22.931 [ INFO] [c.h.j.i.MasterJobContext] Start executing job '04a8-86cc-9300-0001', execution 04a8-86cc-9301-0001, execution graph in DOT format:
    digraph DAG {
        "filesSource(src/main/resources/mybooks/*)" [localParallelism=4];
        "fused(flat-map, filter)" [localParallelism=8];
        "group-and-aggregate-prepare" [localParallelism=8];
        "group-and-aggregate" [localParallelism=8];
        "loggerSink" [localParallelism=1];
        "filesSource(src/main/resources/mybooks/*)" -> "fused(flat-map, filter)" [queueSize=1024];
        "fused(flat-map, filter)" -> "group-and-aggregate-prepare" [label="partitioned", queueSize=1024];
        subgraph cluster_0 {
            "group-and-aggregate-prepare" -> "group-and-aggregate" [label="distributed-partitioned", queueSize=1024];
        }
        "group-and-aggregate" -> "loggerSink" [queueSize=1024];
    }
    HINT: You can use graphviz or http://viz-js.com to visualize the printed graph.
    14:18:23.050 [ INFO] [c.h.j.i.JobExecutionService] Execution plan for jobId=04a8-86cc-9300-0001, jobName='04a8-86cc-9300-0001', executionId=04a8-86cc-9301-0001 initialized
    14:18:23.056 [ INFO] [c.h.j.i.JobExecutionService] Start execution of job '04a8-86cc-9300-0001', execution 04a8-86cc-9301-0001 from coordinator [10.6.205.88]:5701
    14:18:23.113 [ INFO] [c.h.j.i.c.W.loggerSink#0] rong=4
    14:18:23.114 [ INFO] [c.h.j.i.c.W.loggerSink#0] fengliang=1
    14:18:23.114 [ INFO] [c.h.j.i.c.W.loggerSink#0] app=2
    14:18:23.114 [ INFO] [c.h.j.i.c.W.loggerSink#0] demo=2
    14:18:23.121 [ INFO] [c.h.j.i.MasterJobContext] Execution of job '04a8-86cc-9300-0001', execution 04a8-86cc-9301-0001 completed successfully
        Start time: 2020-07-15T14:18:22.912
        Duration: 208 ms
        To see additional job metrics enable JobConfig.storeMetricsAfterJobCompletion
     
     
    • stream 模式

      注意运行模式,需要先启动,然后创建一个文件,填充部分内容,然后就可以看到内容了,同时取消对于streamMpde

     public static void main(String[] args) {
            String path = "src/main/resources/mybooks";
            // batch mode
            // BatchMode(path);
            // stream mode
            StreamMode(path);
        }
     
     

    效果

    14:59:23.648 [ INFO] [c.h.j.Jet] Bootstrapped instance requested but application wasn't called from jet submit script. Creating a standalone Jet instance instead.
    14:59:23.733 [ INFO] [c.h.i.c.AbstractConfigLocator] Loading 'hazelcast-jet-default.yaml' from the classpath.
    14:59:23.737 [ INFO] [c.h.j.c.JetConfig] jet.home is /Users/dalong/Downloads/myid
    14:59:24.023 [ INFO] [c.h.i.c.AbstractConfigLocator] Loading 'hazelcast-jet-member-default.yaml' from the classpath.
    14:59:24.116 [ INFO] [c.h.i.AddressPicker] Prefer IPv4 stack is true, prefer IPv6 addresses is false
    14:59:24.156 [ INFO] [c.h.i.AddressPicker] Picked [10.6.205.88]:5701, using socket ServerSocket[addr=/0:0:0:0:0:0:0:0,localport=5701], bind any local is true
    14:59:24.203 [ INFO] [c.h.system] Hazelcast Jet 4.2 (20200714 - 0e841ef) starting at [10.6.205.88]:5701
    14:59:24.204 [ INFO] [c.h.system] Cluster name: jet
    14:59:24.205 [ INFO] [c.h.system] 
        o   o   o   o---o o---o o     o---o   o   o---o o-o-o        o o---o o-o-o
        |   |  /      /  |     |     |      /   |       |          | |       |
        o---o o---o   o   o-o   |     o     o---o o---o   |          | o-o     |
        |   | |   |  /    |     |     |     |   |     |   |         | |       |
        o   o o   o o---o o---o o---o o---o o   o o---o   o       o--o o---o   o
    14:59:24.205 [ INFO] [c.h.system] Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved.
    14:59:24.523 [ INFO] [c.h.s.i.o.i.BackpressureRegulator] Backpressure is disabled
    14:59:24.830 [ WARN] [c.h.c.CPSubsystem] CP Subsystem is not enabled. CP data structures will operate in UNSAFE mode! Please note that UNSAFE mode will not provide strong consistency guarantees.
    14:59:25.218 [ INFO] [c.h.j.i.e.TaskletExecutionService] Creating idler with jet.idle.cooperative.min.microseconds=25µs,jet.idle.cooperative.max.microseconds=500µs
    14:59:25.219 [ INFO] [c.h.j.i.e.TaskletExecutionService] Creating idler with jet.idle.noncooperative.min.microseconds=25µs,jet.idle.noncooperative.max.microseconds=5000µs
    14:59:25.281 [ INFO] [c.h.j.i.JetService] Setting number of cooperative threads and default parallelism to 8
    14:59:25.285 [ INFO] [c.h.s.i.o.i.OperationExecutorImpl] Starting 8 partition threads and 5 generic threads (1 dedicated for priority tasks)
    14:59:25.287 [ INFO] [c.h.i.d.Diagnostics] Diagnostics disabled. To enable add -Dhazelcast.diagnostics.enabled=true to the JVM arguments.
    14:59:25.297 [ INFO] [c.h.c.LifecycleService] [10.6.205.88]:5701 is STARTING
    14:59:25.322 [ WARN] [c.h.i.i.Node] No join method is enabled! Starting standalone.
    14:59:25.336 [ INFO] [c.h.c.LifecycleService] [10.6.205.88]:5701 is STARTED
    14:59:25.562 [ INFO] [c.h.i.p.i.PartitionStateManager] Initializing cluster partition table arrangement...
    14:59:25.685 [ INFO] [c.h.j.i.JobCoordinationService] Starting job 04a8-9031-ad80-0001 based on submit request
    14:59:25.700 [ INFO] [c.h.j.i.MasterJobContext] Didn't find any snapshot to restore for job '04a8-9031-ad80-0001', execution 04a8-9031-ad81-0001
    14:59:25.700 [ INFO] [c.h.j.i.MasterJobContext] Start executing job '04a8-9031-ad80-0001', execution 04a8-9031-ad81-0001, execution graph in DOT format:
    digraph DAG {
        "fileWatcherSource(src/main/resources/mybooks/*)" [localParallelism=1];
        "fileWatcherSource(src/main/resources/mybooks/*)-add-timestamps" [localParallelism=1];
        "fused(flat-map, filter)" [localParallelism=8];
        "sliding-window-prepare" [localParallelism=8];
        "sliding-window" [localParallelism=8];
        "loggerSink" [localParallelism=1];
        "fileWatcherSource(src/main/resources/mybooks/*)" -> "fileWatcherSource(src/main/resources/mybooks/*)-add-timestamps" [label="isolated", queueSize=1024];
        "fileWatcherSource(src/main/resources/mybooks/*)-add-timestamps" -> "fused(flat-map, filter)" [queueSize=1024];
        "fused(flat-map, filter)" -> "sliding-window-prepare" [label="partitioned", queueSize=1024];
        subgraph cluster_0 {
            "sliding-window-prepare" -> "sliding-window" [label="distributed-partitioned", queueSize=1024];
        }
        "sliding-window" -> "loggerSink" [queueSize=1024];
    }
    HINT: You can use graphviz or http://viz-js.com to visualize the printed graph.
    14:59:25.838 [ INFO] [c.h.j.i.JobExecutionService] Execution plan for jobId=04a8-9031-ad80-0001, jobName='04a8-9031-ad80-0001', executionId=04a8-9031-ad81-0001 initialized
    14:59:25.856 [ INFO] [c.h.j.i.JobExecutionService] Start execution of job '04a8-9031-ad80-0001', execution 04a8-9031-ad81-0001 from coordinator [10.6.205.88]:5701
    14:59:25.935 [ INFO] [c.h.j.i.c.S.fileWatcherSource(src/main/resources/mybooks/*)#0] Started to watch directory: src/main/resources/mybooks
    14:59:41.981 [ INFO] [c.h.j.i.c.W.loggerSink#0] KeyedWindowResult{start=14:59:41.936, end=14:59:41.940, key='rong', value='21', isEarly=false}
    14:59:41.982 [ INFO] [c.h.j.i.c.W.loggerSink#0] KeyedWindowResult{start=14:59:41.936, end=14:59:41.940, key='fengliang', value='3', isEarly=false}
    14:59:41.982 [ INFO] [c.h.j.i.c.W.loggerSink#0] KeyedWindowResult{start=14:59:41.936, end=14:59:41.940, key='app', value='7', isEarly=false}
    14:59:41.982 [ INFO] [c.h.j.i.c.W.loggerSink#0] KeyedWindowResult{start=14:59:41.936, end=14:59:41.940, key='emo', value='3', isEarly=false}
    14:59:41.983 [ INFO] [c.h.j.i.c.W.loggerSink#0] KeyedWindowResult{start=14:59:41.936, end=14:59:41.940, key='dmo', value='3', isEarly=false}
    14:59:41.983 [ INFO] [c.h.j.i.c.W.loggerSink#0] KeyedWindowResult{start=14:59:41.936, end=14:59:41.940, key='demo', value='22', isEarly=false}
     

    支持的特性

    • 低延迟
    • 零依赖
    • 云原生支持
    • 弹性
    • 容错
    • 内存存储
    • 支持多种source以及sink kafka cdc,jdbc,redis...
    • 动态节点发现

    说明

    hazelcast-jet 的好多特性,当然和hazelcast自身在搞分布式cache上的工业实践有关系,比如分布式内存存储,动态发现。。。

    参考资料

    https://jet-start.sh/
    https://github.com/hazelcast/hazelcast-jet
    https://github.com/siddhi-io/siddhi
    https://github.com/espertechinc/esper

  • 相关阅读:
    js 防止页面后退的方法
    asp.net 设置网页过期
    C#子类调用基类构造备忘
    asp.net 自定义控件 嵌入资源文件 备忘
    CSS实现高度和宽度自适应
    C# 更新SQL Server数据库备注信息从另一数据库
    asp.net mvc4 学习笔记一(基本原理)
    CommittableTransaction和TransactionScope
    Delphi7 错误: Access violation at address ****** in module 'ntdll.dll'. Read of address ******.
    Delphi XE10 IdFtp 错误:No FTP list parsers have been registered
  • 原文地址:https://www.cnblogs.com/rongfengliang/p/13305702.html
Copyright © 2011-2022 走看看