zoukankan      html  css  js  c++  java
  • Kettle 分布式集群环境实践

    1. 环境搭建

    1.1 资源规划

    本次搭建一主两从分布式Kettle环境。资源规划情况如下表

    ip角色
    192.168.21.107 Master
    192.168.21.108 Slave1
    192.168.21.109 Slave2

    1.2 软件环境

    1.3 解压Kettle

    unzip pdi-ce-8.2.0.0-342.zip -d /usr/local/

    1.4 集群配置

    修改配置文件及所在的目录

    carte-config-master-8080.xml是主节点需要修改的配置文件. carte-config-808x.xml是从节点需要修改的配置文件,需要多少个从节点就修改多少个。 Master节点上 修改carte-config-master-8080.xml的内容为

      <slaveserver>
        <name>master</name>
        <hostname>192.168.21.107</hostname>
        <port>8080</port>
        <master>Y</master>
      </slaveserver>

    参数名称:

    1. name:节点名称
    2. hostname:本机ip或主机名
    3. port:端口号
    4. master:Y为本节点是主节点,N为本节点是从节点

    salve1节点

    修改carte-config-8081.xml

     <masters>
    
        <slaveserver>
          <name>master</name>
          <hostname>192.168.21.107</hostname>
          <port>8080</port>
          <username>cluster</username>
          <password>cluster</password>
          <master>Y</master>
        </slaveserver>
      </masters>
      <report_to_masters>Y</report_to_masters>
    
      <slaveserver>
        <name>slave1</name>
        <hostname>192.168.21.108</hostname>
        <port>8081</port>
        <username>cluster</username>
        <password>cluster</password>
        <master>N</master>
      </slaveserver>

     

    slave2节点
    修改carte-config-8082.xml
     <masters>
        <slaveserver>
          <name>master</name>
          <hostname>192.168.21.107</hostname>
          <port>8080</port>
          <username>cluster</username>
          <password>cluster</password>
          <master>Y</master>
        </slaveserver>
    
      </masters>
    
      <report_to_masters>Y</report_to_masters>
    
      <slaveserver>
        <name>slave2</name>
        <hostname>192.168.21.109</hostname>
        <port>8082</port>
        <username>cluster</username>
        <password>cluster</password>
        <master>N</master>
      </slaveserver>

    1.5 分发

    将master上配置的信息分发到各集群中

    scp -r /usr/local/data-integration root@192.168.21.107:/usr/local
    

    1.6 下载MySQL驱动包

    将mysql驱动包放到data-integration/lib 目录下,后续如果需要测试其他数据库,也应存放到该目录下,主从节点都需要。

    mv mysql-connector-java-5.1.47.jar /usr/local/data-integration/lib/
    

    1.7 启动各个节点

    • 启动master

      cd /usr/local/data-integration/
      ./carte.sh 192.168.21.107 8080
      

    • 启动slave1

      cd /usr/local/data-integration/
      ./carte.sh 192.168.21.108 8081
      

    • 启动slave2
     cd /usr/local/data-integration/
     ./carte.sh 192.168.21.109 8082
    

    2. Kettle 图形化方式使用集群模式

    2.1 启动Kettle 图形化工具

    Kettle 图形化工具安装在192.168.21.105节点上,与主从环境进行区分。 启动kettle图形化

    cd /data/software/data-integration/
    ./spoon.sh  
    

    2.2 在spoon中新建一个转换

    在新建的转换中选择主对象树--->子服务器--->新建子服务器。 主节点

    从节点1 从节点2

    注:填写的信息要跟服务器上配置的XML文件要一致,master子服务器填写master中配置文件的信息,slave则填写相应的slave信息。

    2.2 配置schema

    在主对象树中选择Kettle集群schemas,新建一个schema,在弹出的对话框中,选择子服务器。

    2.3 测试

    选择核心对象--->输入--->表输入,配置相应的数据库连接信息及抽取字段信息。

    同理,在核心对象,输出中配置相应的输出。此处测试使用表输出。 在表输出处,右键,选择集群,选择刚才配置的集群名称 选择完成后,可以看到C*2,表示有两个slave进行任务处理。(主节点只负责接受客户端请求及收集处理结果)

    点击启动 在Run configuration中选择集群,如图,c1,点击启动即可。

    3. java提交任务到集群运行

    3.1 创建maven工程

    POM依赖

     <properties>
            <kettle.version>8.2.0.0-342</kettle.version>
            <jersey.version>2.25.1</jersey.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>pentaho-kettle</groupId>
                <artifactId>kettle-core</artifactId>
                <version>${kettle.version}</version>
            </dependency>
    
            <dependency>
                <groupId>pentaho-kettle</groupId>
                <artifactId>kettle-engine</artifactId>
                <version>${kettle.version}</version>
            </dependency>
    
            <dependency>
                <groupId>pentaho-kettle</groupId>
                <artifactId>metastore</artifactId>
                <version>${kettle.version}</version>
            </dependency>
    
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.12</version>
                <scope>test</scope>
            </dependency>
    
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>1.7.26</version>
            </dependency>
    
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>1.7.26</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-vfs2</artifactId>
                <version>2.1</version>
            </dependency>
    
            <dependency>
                <groupId>com.google.guava</groupId>
                <artifactId>guava</artifactId>
                <version>20.0</version>
            </dependency>
    
            <dependency>
                <groupId>commons-lang</groupId>
                <artifactId>commons-lang</artifactId>
                <version>2.2</version>
            </dependency>
    
            <dependency>
                <groupId>commons-io</groupId>
                <artifactId>commons-io</artifactId>
                <version>2.2</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.httpcomponents</groupId>
                <artifactId>httpclient</artifactId>
                <version>4.5.13</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.httpcomponents</groupId>
                <artifactId>httpcore</artifactId>
                <version>4.4.13</version>
            </dependency>
    
            <!--MySQL-->
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.46</version>
            </dependency>
            <dependency>
                <groupId>cn.hutool</groupId>
                <artifactId>hutool-all</artifactId>
                <version>4.5.6</version>
            </dependency>
    
            <!--encoders-->
            <dependency>
                <groupId>org.owasp.encoder</groupId>
                <artifactId>encoder</artifactId>
                <version>1.2.1</version>
            </dependency>
    
            <!--apache cli-->
            <dependency>
                <groupId>commons-cli</groupId>
                <artifactId>commons-cli</artifactId>
                <version>1.2</version>
            </dependency>
    
            <!--lombok-->
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.18.16</version>
            </dependency>
    
        </dependencies>

    对于kettle相关的包,需要本地maven install 安装。

    将图形化生成的ktr文件存放至resource文件目录下。

    log4j.properties配置文件

    log4j.rootLogger=DEBUG,stdout
    #log4j.rootLogger=DEBUG,stdout,file
    log4j.additivity.org.apache=true
    
    log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.threshold=INFO
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=%-5p %c{1}:%L - %m%n
    
    # 将日志写入到指定文件中
    #log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
    #log4j.appender.file.layout=org.apache.log4j.PatternLayout
    #log4j.appender.file.DatePattern='.'yyyy-MM-dd-HH-mm
    #log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
    #log4j.appender.file.Threshold=INFO
    #log4j.appender.file.append=true
    

    Kettle集群模式调用代码

            KettleEnvironment.init();
    
                //加载ktr文件
                String fileName = "diner_test.ktr";
                TransMeta transMeta = new TransMeta(fileName);
    
                //设置执行模式
                TransExecutionConfiguration config = new TransExecutionConfiguration();
               //设置执行模式
                config.setExecutingClustered(true);
                config.setExecutingLocally(false);
                config.setExecutingRemotely(false);
    
                config.setClusterPosting(true);
                config.setClusterPreparing(true);
                config.setClusterStarting(true);
                config.setLogLevel(LogLevel.DEBUG);
    
                TransSplitter transSplitter = Trans.executeClustered(transMeta, config);
                // 配置clusterMonitor
                Object clusterMonitor = new Object();
                long nrErrors = Trans.monitorClusteredTransformation(new LogChannel(clusterMonitor),transSplitter, null, 1);
                System.out.println(nrErrors);
    
                System.out.println(transSplitter.getCarteObjectMap());
    
                System.out.println(transSplitter.getMaster());
                System.out.println(transSplitter.getSlaves()[0]);
                System.out.println(transSplitter.getSlaves()[1].getStepNames()[0]);
    
    
  • 相关阅读:
    TensorFlow笔记-初识
    SMP、NUMA、MPP体系结构介绍
    Page Cache, the Affair Between Memory and Files.页面缓存-内存与文件的那些事
    How The Kernel Manages Your Memory.内核是如何管理内存的
    Anatomy of a Program in Memory.剖析程序的内存布局
    Cache: a place for concealment and safekeeping.Cache: 一个隐藏并保存数据的场所
    Memory Translation and Segmentation.内存地址转换与分段
    CPU Rings, Privilege, and Protection.CPU的运行环, 特权级与保护
    The Kernel Boot Process.内核引导过程
    How Computers Boot Up.计算机的引导过程
  • 原文地址:https://www.cnblogs.com/shine-rainbow/p/kettle-fen-bu-shi-ji-qun-huan-jing-shi-jian.html
Copyright © 2011-2022 走看看