zoukankan      html  css  js  c++  java
  • Node——微服务架构(一)

    new ServiceBroker

    default settings

    const { ServiceBroker } = require("moleculer");
    const broker = new ServiceBroker();
    

    custom settings

    const { ServiceBroker } = require("moleculer");
    const broker = new ServiceBroker({
        logLevel: "info"
    });
    

    communicate with remote nodes

    const { ServiceBroker } = require("moleculer");
    const broker = new ServiceBroker({
        nodeID: "node-1",
        transporter: "nats://localhost:4222",
        logLevel: "debug",
        requestTimeout: 5 * 1000,
        requestRetry: 3
    });
    

    broker options

    • logLevel
      • type:string
      • default:info
      • des:可选项目还有 trace、debug、 info、 warn、 error、 fatal
    • middlewares
      • type:Array<Function>
      • default:null
      • des:中间件
    • created
      • type:Function
      • default:null
      • des:broker 实例被创建的时候将会触发此函数
    • started
      • type:Function
      • default:null
      • des:broker 实例开始执行时触发此函数
    • stopped
      • type:Function
      • default:null
      • des:broker 实例停止执行时触发此函数
    • hotReload
      • type:Boolean
      • default:false
      • des:是否启动热加载
    • cacher
      • type:String、Object、Cacher
      • default:null
      • des:若是启动缓存,两个相同模型的 broker.call,只有第一个 call 会让 action 中对应 handler 完整的执行一遍,第二个 call 就不会了,它会直接从缓存中取数据,不常用
      • https://moleculer.services/docs/0.13/caching.html
    • transporter
    • serializer
    • nodeID
      • type:string
      • default:hostname + PID
      • des:这是节点的id,挂载在 某一个 namespace 中是不能够同名的
    • namespace
      • type:string
      • defalut:”“
      • des:分割一个网咯中的不同区域,基本上用不到,除非项目特别复杂,子服务特别多
    • requestTimeout
      • type:Number
      • default:0
      • des:请求超时设置,单位毫秒

    createService

    该服务表示Moleculer框架中的一个微服务。您可以定义操作并订阅事件。若要创建服务,必须定义架构。服务模式类似于VueJS的一个组件

    // 定义了两个actions
    broker.createService({
        name: "math",
        actions: {
            add(ctx) {
                return Number(ctx.params.a) + Number(ctx.params.b);
            },
    
            sub(ctx) {
                return Number(ctx.params.a) - Number(ctx.params.b);
            }
        }
    });
    

    name

    • 强制属性,最后去 call 某一个微服务的时候必须带上 name

    version

    settings

    • 此属性相当于仓库
      • 赋值可以是对象,对象中设置任意键值对,action 中通过 this.settings.xxxx 能够访问到设置项
      • 远程节点上可以获得这些设置项
      • 有一些内部设置是由核心模块使用的。这些设置名称以$(美元符号)开头
        • $noVersionPrefix
          • type:Boolean
          • default:false
          • des:禁用 action 版本前缀
        • $noServiceNamePrefix
          • type:Boolean
          • default:false
          • des:禁用 action 中的服务名称前缀。
        • $dependencyTimeout
          • type:Number
          • default:0
          • des:依赖等待超时
        • $shutdownTimeout
          • type:Number
          • default:0
          • des:关闭时等待活动请求的超时

    mixins

    Mixins是一种为Moleculer服务分发可重用功能的灵活方法。服务构造函数将这些混合与当前架构合并。它是在您的服务中扩展其他服务。当服务使用混音时,混音中的所有属性都将“混合”到当前服务中。

    const ApiGwService = require("moleculer-web");
    
    module.exports = {
        name: "api",
        mixins: [ApiGwService]
        settings: {
            // Change port setting
            port: 8080
        },
        actions: {
            myAction() {
                // Add a new action to apiGwService service
            }
        }
    }
    

    上面的示例创建了一个API服务,该服务继承了ApiGwService的所有内容,但是覆盖了端口设置,并使用新的myAction操作对其进行了扩展

    actions

    • action 是服务中可调用的公共方法,broker.call 或 ctx.call,具体 action 必须在 action 中,可以是一个函数,可以是一个对象

    events

    • 事件订阅

    lifecycle events

    • 有一些生命周期服务事件,这些事件将由代理触发。它们被放置在模式的根中
      • created:broker.loadService 或者 broker.createService 触发
      • started:broker.start() 触发
      • stopped:broker.stop() 触发

    methods

    • 创建私有方法,以供 action、event、lifecycle event 使用

    dependencies

    • 如果您的服务依赖于其他服务,请使用架构中的依赖项属性。服务在调用已启动的生命周期事件处理程序之前等待依赖服务
    • 除了配做中添加 dependencies 属性,也可以用 broker 实例进行外部设置
      • broker.waitForServices(["posts", "users"]),返回以恶 promise 对象
      • broker.waitForServices("accounts", 10 * 1000, 500),设置超时事件和

    metadata

    • 元数据属性,您可以在这里存储有关服务的任何元信息。在服务函数中可以访问到元数据
    • 元数据时可以被远程节点获取的

    this

    broker.createService

    // 创建微服务实例方式之一
    broker.createService({
        name: "math",
        actions: {
            add(ctx) {
                return Number(ctx.params.a) + Number(ctx.params.b);
            }
        }
    });
    

    load service from file

    math.service.js

    // Export the schema of service
    module.exports = {
        name: "math",
        actions: {
            add(ctx) {
                return Number(ctx.params.a) + Number(ctx.params.b);
            },
            sub(ctx) {
                return Number(ctx.params.a) - Number(ctx.params.b);
            }
        }
    }
    
    // Create broker
    const broker = new ServiceBroker();
    
    // Load service
    broker.loadService("./math.service");
    
    // Start broker
    broker.start();
    

    推荐使用这样的方式,一目了然,不会在一个文件写过多的代码

    Load multiple services from a folder

    如果您有很多服务,建议将它们放到一个服务文件夹中,并使用 Serge.loadService s方法加载所有这些服务

    broker.loadServices(folder = "./services", fileMask = "**/*.service.js");
    
    // 从 ./services 文件夹(包括子文件夹)加载每个 *.service.js 文件
    broker.loadServices();
    // 从当前文件夹(包括子文件夹)加载每个 *.service.js 文件
    broker.loadServices("./");
    // 从“./svc”文件夹加载每个用户*.service.js文件
    broker.loadServices("./svc", "user*.service.js");
    

    hot reloading services

    Moleculer具有内置的热重加载功能.在开发期间,注意只针对 service.js 文件的修改被启动热重启,其他位置可以使用 nodemon

    const broker = new ServiceBroker({
        hotReload: true
    });
    
    broker.loadService("./services/test.service.js");
    

    Internal services

    // 列出所有已知节点(包括本地节点)
    broker.call("$node.list").then(res => console.log(res))
    
    // 列出所有注册的服务(本地和远程)
    broker.call("$node.services").then(res => console.log(res))
    
    // 列出所有已注册 action(本地和远程)。
    broker.call("$node.actions").then(res => console.log(res))
    
    // 列出所有订阅的事件
    broker.call("$node.events").then(res => console.log(res))
    
    // 列出本地节点的健康信息(包括进程和OS信息)
    broker.call("$node.health").then(res => console.log(res));
    

    action

    action 是服务的可调用的公共方法。action 调用表示远程过程调用(RPC)。它有请求参数&返回响应,就像HTTP请求一样。如果您有多个服务实例,代理将在实例之间负载平衡请求

    call services

    若要调用服务,请使用 broke.Call 方法。代理查找具有给定 action service (可能在某一个节点上)并调用它。调用之后将会返回一个承诺

    const res = await broker.call(actionName, params, opts)
    
    • params:参数是作为上下文的一部分传递给 action,action service 可以通过 ctx.params 访问传递参数,这是可选的
    • ops:一个对象,用于设置或者覆盖某些请求参数,例如:timeout、retry Count,这是可选的
      • tiemout:请求超时,以毫秒为单位。如果请求超时,而您没有定义应急响应,将会报错。若要禁用设置0,请执行以下操作。如果未定义,将会启用 new ServiceBroker 中的 requestTimeout 设置
      • retries :请求重试次数,如果请求超时,代理将再次尝试调用。若要禁用设置0。如果没有定义,将启用 new ServiceBroker 中的配置
      • fallbackResponse :若请求失败就返回,这是一个 Function
      • nodeID:目标节点,如果设置,它将直接调用给定的节点
      • meta:请求元数据,通过操作处理程序中的 ctx.meta 访问它,它也将在嵌套调用中被传输和合并
      • parentCtx:父亲的上下文实例
      • requestID:请求ID或相关ID。它出现在标准事件中
    broker.call("user.recommendation", { limit: 5 }, {
        timeout: 500,
        retries: 3,
        fallbackResponse: defaultRecommendation
    }).then(res => console.log("Result: ", res));
    

    meta

    • 元信息发送到具有元属性的服务,通过 action 处理程序中的ctx.meta访问它。请注意,在嵌套调用时,元被合并。

    Streaming

    • Moleculer支持Node.js流作为请求参数和响应。使用它来传输从网关上传的文件,或者编码/解码或压缩/解压缩流
    const stream = fs.createReadStream(fileName);
    
    broker.call("storage.save", stream, { meta: { filename: "avatar-123.jpg" }});	
    
    • 请注意,参数应该是一个流,您不能向参数中添加更多的变量。使用元属性传输其他数据。
    • 服务中接受流
    module.exports = {
        name: "storage",
        actions: {
            save(ctx) {
                const s = fs.createWriteStream(`/tmp/${ctx.meta.filename}`);
                ctx.params.pipe(s);
            }
        }
    };
    
    • 将流作为服务中的响应返回
    module.exports = {
        name: "storage",
        actions: {
            get: {
                params: {
                    filename: "string"
                },
                handler(ctx) {
                    return fs.createReadStream(`/tmp/${ctx.params.filename}`);
                }
            }
        }
    };
    
    • 调用方接受流
    const filename = "avatar-123.jpg";
    broker.call("storage.get", { filename })
        .then(stream => {
            const s = fs.createWriteStream(`./${filename}`);
            stream.pipe(s);
            s.on("close", () => broker.logger.info("File has been received"));
        })
    
    • AES编解码示例服务
    const crypto = require("crypto");
    const password = "moleculer";
    
    module.exports = {
        name: "aes",
        actions: {
            encrypt(ctx) {
                const encrypt = crypto.createCipher("aes-256-ctr", password);
                return ctx.params.pipe(encrypt);
            },
    
            decrypt(ctx) {
                const decrypt = crypto.createDecipher("aes-256-ctr", password);
                return ctx.params.pipe(decrypt);
            }
        }
    };
    

    action visibility

    • visibility:该属性控制 action service 是否可见、可调用
      • published:公共的 action,它可以在本地调用,也可以远程调用,并且可以通过API网关发布
      • pulic:公共的 action ,可以在本地或者远程调用,但不能通过APIGW发布
      • protected:只能在本地 action service 调用(从本地服务调用)
      • private:只能在内部调用(通过 this.actions.xy() 内部服务)
      • 不设置,默认是 null,也就是 published,公共的
    module.exports = {
        name: "posts",
        actions: {
            // It's published by default
            find(ctx) {},
            clean: {
                // Callable only via `this.actions.clean`
                visibility: "private",
                handler(ctx) {}
            }
        },
        methods: {
            cleanEntities() {
                // Call the action directly
                return this.actions.clean();
            }
        }
    }
    

    action hooks

    • 定义 action 钩子来包装来自混合器的某些 action
    • 有 before、after、error 钩子,将其分配给指定的 action 或者所有 action service (*)
    • 钩子可以是函数,也可以是字符串。字符串必须是本地服务方法名。
    const DbService = require("moleculer-db");
    // before hook
    module.exports = {
        name: "posts",
        mixins: [DbService]
        hooks: {
            before: {
                // Define a global hook for all actions
                // The hook will call the `resolveLoggedUser` method.
                "*": "resolveLoggedUser",
    
                // Define multiple hooks
                remove: [
                    function isAuthenticated(ctx) {
                        if (!ctx.user)
                            throw new Error("Forbidden");
                    },
                    function isOwner(ctx) {
                        if (!this.checkOwner(ctx.params.id, ctx.user.id))
                            throw new Error("Only owner can remove it.");
                    }
                ]
            }
        },
    
        methods: {
            async resolveLoggedUser(ctx) {
                if (ctx.meta.user)
                    ctx.user = await ctx.call("users.get", { id: ctx.meta.user.id });
            }
        }
    }
    
    const DbService = require("moleculer-db");
    // after hook
    // error hook
    module.exports = {
        name: "users",
        mixins: [DbService]
        hooks: {
            after: {
                // Define a global hook for all actions to remove sensitive data
                "*": function(ctx, res) {
                    // Remove password
                    delete res.password;
    
                    // Please note, must return result (either the original or a new)
                    return res;
                },
                get: [
                    // Add a new virtual field to the entity
                    async function (ctx, res) {
                        res.friends = await ctx.call("friends.count", { query: { follower: res._id }});
    
                        return res;
                    },
                    // Populate the `referrer` field
                    async function (ctx, res) {
                        if (res.referrer)
                            res.referrer = await ctx.call("users.get", { id: res._id });
    
                        return res;
                    }
                ]
            },
            error: {
                // Global error handler
                "*": function(ctx, err) {
                    this.logger.error(`Error occurred when '${ctx.action.name}' action was called`, err);
    
                    // Throw further the error
                    throw err;
                }
            }
        }
    };
    
    • 推荐的用例是创建混合元素,用方法填充服务,并在钩子中设置方法名
    module.exports = {
        methods: {
            checkIsAuthenticated(ctx) {
                if (!ctx.meta.user)
                    throw new Error("Unauthenticated");
            },
            checkUserRole(ctx) {
                if (ctx.action.role && ctx.meta.user.role != ctx.action.role)
                    throw new Error("Forbidden");
            },
            checkOwner(ctx) {
                // Check the owner of entity
            }
        }
    }
    
    // Use mixin methods in hooks
    const MyAuthMixin = require("./my.mixin");
    
    module.exports = {
        name: "posts",
        mixins: [MyAuthMixin]
        hooks: {
            before: {
                "*": ["checkIsAuthenticated"],
                create: ["checkUserRole"],
                update: ["checkUserRole", "checkOwner"],
                remove: ["checkUserRole", "checkOwner"]
            }
        },
    
        actions: {
            find: {
                // No required role
                handler(ctx) {}
            },
            create: {
                role: "admin",
                handler(ctx) {}
            },
            update: {
                role: "user",
                handler(ctx) {}
            }
        }
    };
    

    context

    • 当你去 call 一个 action service,broker 就会创建一个上下文 context 实例,这个实例包含着所有的请求信息,最后这些信息都会被当做 action service 中 handler 的一个参数 ctx 进行传递使用
    • 在 handler 中可以点出的上下文信息(属性或者方法)
      • ctx.id:context id
      • ctx.broker:broker 对象实例
      • ctx.action:action 定义实例
      • ctx.nodeID:caller 或者 目标节点 id
      • ctx.requestID:请求ID,如果在 nested-calls 中使用,它将是相同的ID。
      • ctx.parentID:父亲上下文实例 id(在 nested-calls 中使用)
      • ctx.params:请求参数,也就是 broker.call 中第二个参数具体设置
      • ctx.meta:请求元数据,它将会传递到 nested-calls 中
      • ctx.level:请求等级(在 nested-calls 内部使用),第一层等级为1
      • ctx.call():在 nested-calls 中触发 action service,参数形式与 broker.call 一样
      • ctx.emit():emit an event,same as broker.emit
      • ctx.broadcast():Broadcast an event, same as broker.broadcast
    • 优雅地关闭服务,请在代理选项中启用上下文跟踪功能。如果启用它,所有服务都将在关闭之前等待所有正在运行的上下文
      • 一个超时值可以通过关闭Timeout Broker选项来定义。默认值为5秒
      • 在 action services 中,关闭超时设置可以通过 $Shupdown Timeout 属性重写
    const broker = new ServiceBroker({
        nodeID: "node-1",
        tracking: {
            enabled: true,
            shutdownTimeout: 10 * 1000
        }
    });
    
    broker.call("posts.find", {}, { tracking: false }) // 关闭追踪
    

    event

    • Broker 有一个内置的事件总线来支持事件驱动体系结构,并将事件发送到本地和远程服务
    • 事件侦听器被排列成逻辑组,这意味着每个组中只触发一个侦听器
    • 例如你有两个主要服务 users、payments,这两个服务都订阅了 user.created 事件。此时,从 users 服务上注册 3 个具体实例,同时从 paymengs 服务上注册 2 个具体实例,当 emit 触发 user.created 事件,只有一个 user 和一个 payments 服务会被触发,效果如下

    • 组名来自服务名称,但可以在服务中的事件定义中覆盖它。
    module.exports = {
        name: "payment",
        events: {
            "order.created": {
                // Register handler to the "other" group instead of "payment" group.
                group: "other",
                handler(payload) {
                    // ...
                }
            }
        }
    }
    

    Emit balanced events

    • broker.emit 函数发送平衡的事件,第一个参数是事件的名称,第二个参数是传递的载荷,如果是复杂数据,可以传递一个对象
    // The `user` will be serialized to transportation.
    broker.emit("user.created", user);
    
    • 指定哪些组/服务接收事件
    // Only the `mail` & `payments` services receives it
    broker.emit("user.created", user, ["mail", "payments"]);
    

    Broadcast event

    • 广播事件被发送到所有可用的本地和远程服务,它是不平衡的,所有服务实例都会收到它

    • 利用 broker.broadcast 发送广播
    broker.broadcast("config.changed", config);
    
    • 指定哪些组/服务接收事件
    // Send to all "mail" service instances
    broker.broadcast("user.created", { user }, "mail");
    
    // Send to all "user" & "purchase" service instances.
    broker.broadcast("user.created", { user }, ["user", "purchase"]);
    

    Local broadcast event

    • Send broadcast events to only all local services with broker.broadcastLocal method
    broker.broadcastLocal("config.changed", config);
    

    Subscribe to events

    • 通过 service 中的属性 event 可以订阅具体事件,在事件名称中可以使用通配符
    module.exports = {
        events: {
            // Subscribe to `user.created` event
            "user.created"(user) {
                console.log("User created:", user);
            },
    
            // Subscribe to all `user` events
            "user.*"(user) {
                console.log("User event:", user);
            }
    
            // Subscribe to all internal events
            "$**"(payload, sender, event) {
                console.log(`Event '${event}' received from ${sender} node:`, payload);
            }
        }
    }
    

    Internal events

    • broker broadcasts 广播内部事件,这些事件总是以$前缀开头
    • $services.changed
      • 如果本地节点或远程节点加载或破坏服务,代理将发送此事件
    • $circuit-breaker.opened
      • The broker sends this event when the circuit breaker module change its state to open
    • $circuit-breaker.half-opened
      • The broker sends this event when the circuit breaker module change its state to half-open.
    • $circuit-breaker.closed
      • The broker sends this event when the circuit breaker module change its state to closed.
    • $node.connected
      • The broker sends this event when a node connected or reconnected.
    • $node.updated
      • The broker sends this event when it has received an INFO message from a node, (i.e. a service is loaded or destroyed).
    • $node.disconnected
      • The broker sends this event when a node disconnected (gracefully or unexpectedly).
    • $broker.started
      • The broker sends this event once broker.start() is called and all local services are started.
    • $broker.stopped
      • The broker sends this event once broker.stop() is called and all local services are stopped.
    • $transporter.connected
      • The transporter sends this event once the transporter is connected.
    • $transporter.disconnected
      • The transporter sends this event once the transporter is disconnected.

    lifecycle

    Broker lifecycle

    • starting logic
      • broker 启动传输连接,但是不会将本地服务列表发送到远程节点
      • 完成后,broker 将启动所有服务(call service started handler)
      • 一旦所有服务启动成功,broker 就会将本地服务列表发布到远程节点上
      • 因此,远程节点只有在所有本地服务正确启动之后才能发送请求

    • avoid deadlocks
      • broker start...
      • user service has dependencies: ["post"]
      • posts service has dependencies: ["users"]
      • 这就死锁了,按照顺序加载,user 永元无法加载到依赖项 post
    • stopping logic
      • call broker.stop 或者停止进程
      • 首先,broker 会向远程节点发送一个空的服务列表,所以他们可以将请求路由到其他实例而不是停止服务
      • 之后,broker 开始停止所有本地服务,之后 transporter 断开连接

    Service lifecycle

    • created event handler
      • broker.createService or broker.loadService 会触发此事件
      • 函数内部拿到 broker 实例(this),还可以创建其他模块实例,例如 http 服务器、数据库模块
    const http = require("http");
    
    module.exports = {
        name: "www",
        created() {
            // Create HTTP server
            this.server = http.createServer(this.httpHandler);
        }
    };
    // created function is sync event handler,can not use async/await
    
    • started event handler
      • 它被触发的时候,代理会启动所有的本地服务,而 broker 会启动所有的本地服务。使用它连接到数据库,侦听服务器…等
    module.exports = {
        name: "users",
        async started() {
            try {
                await this.db.connect();
            } catch(e) {
                throw new MoleculerServerError("Unable to connect to database.", e.message);
            }
        }
    };
    // started function is async handler. you can use async/await
    
    • stopped event handler
      • 它被触发的时候,broker.stop 被调用和 broker 开始停止所有的本地服务。使用它关闭数据库连接,关闭套接字…等
    module.exports = {
        name: "users",
        async stopped() {
            try {
                await this.db.disconnect();
            } catch(e) {
                this.logger.warn("Unable to stop database connection gracefully.", e);
            }
        }
    };
    // stopped function is async handler. you can use async/await
    

    logging

    • 在Moleculer框架中,所有核心模块都有一个自定义记录器实例。它们是从Broker记录器实例继承的,该实例可以在Broker选项中进行配置。

    Built-in logger

    • Moleculer有一个内置控制台记录器。这是默认的记录器
    const { ServiceBroker } = require("moleculer");
    const broker = new ServiceBroker({
        nodeID: "node-100",
        // logger: true,
        logLevel: "info"
    });
    
    broker.createService({
        name: "posts",
        actions: {
            get(ctx) {
                this.logger.info("Log message via Service logger");
            }
        }
    });
    
    broker.start()
        .then(() => broker.call("posts.get"))
        .then(() => broker.logger.info("Log message via Broker logger"));
    
    [2018-06-26T11:38:06.728Z] INFO  node-100/POSTS: Log message via Service logger
    [2018-06-26T11:38:06.728Z] INFO  node-100/BROKER: Log message via Broker logger
    [2018-06-26T11:38:06.730Z] INFO  node-100/BROKER: ServiceBroker is stopped. Good bye.
    
    • 可以使用Broker选项中的logLevel选项更改日志级别。只与内置控制台记录器一起使用
    const broker = new ServiceBroker({
        logger: true, // the `true` is same as `console`
        logLevel: "warn" // only logs the 'warn' & 'error' entries to the console
    });
    
    • Available log levels: fatalerrorwarninfodebugtrace
    • 可以为每个Moleculer模块设置日志级别。允许通配符使用
    const broker = new ServiceBroker({
        logLevel: {
            "MY.**": false,         // Disable log
            "TRANS": "warn",        // Only 'warn ' and 'error' log entries
            "*.GREETER": "debug",   // All log entries
            "**": "info",           // All other modules use this level
        }
    });
    // 此设置是从上到下计算的,因此*级别必须是最后一项。
    
    • 有一些内置的日志格式化程序
      • default:[2018-06-26T13:36:05.761Z] INFO node-100/BROKER: Message
      • simple:INFO - Message
      • short:[13:36:30.968Z] INFO BROKER: Message
    • 可以为内置控制台记录器设置自定义日志格式化程序函数
    const broker = new ServiceBroker({ 
        logFormatter(level, args, bindings) {
            return level.toUpperCase() + " " + bindings.nodeID + ": " + args.join(" ");
        }
    });
    broker.logger.warn("Warn message");
    broker.logger.error("Error message");
    
    WARN dev-pc: Warn message
    ERROR dev-pc: Error message
    
    • 自定义对象&数组打印格式化程序

      • 设置一个自定义格式化程序函数来打印对象和数组。默认函数将对象和数组打印到一行,以便便于使用外部日志工具进行处理。但是,当您正在开发时,将对象打印成人类可读的多行格式将是有用的。为此,在代理选项中覆盖logObjectPrint函数。
    const util = require("util");
    
    const broker = new ServiceBroker({  
        logObjectPrinter: o => util.inspect(o, { depth: 4, breakLength: 100 })
    });
    broker.logger.warn(process.release);
    
    [2017-08-18T12:37:25.720Z] INFO  dev-pc/BROKER: { name: 'node',
      lts: 'Carbon',
      sourceUrl: 'https://nodejs.org/download/release/v8.10.0/node-v8.10.0.tar.gz',
      headersUrl: 'https://nodejs.org/download/release/v8.10.0/node-v8.10.0-headers.tar.gz' }
    

    External loggers

    • 外部记录器可以与Moleculer一起使用。在这种情况下,将创建者函数设置为LOGER。当一个新模块继承一个新的记录器实例时,ServiceBroker将调用它
    // pino
    const pino = require("pino")({ level: "info" });
    const broker = new ServiceBroker({ 
        logger: bindings => pino.child(bindings)
    });
    
    // bunyan
    const bunyan = require("bunyan");
    const logger = bunyan.createLogger({ name: "moleculer", level: "info" });
    const broker = new ServiceBroker({ 
        logger: bindings => logger.child(bindings)
    });
    

    middlewares

    networking

    要通信其他节点(ServiceBrokers),您需要配置一个传输程序。大多数传输者连接到中心消息代理服务器,该服务器负责节点之间的消息传输。这些消息代理主要支持发布/订阅消息传递模式

    Transporters

    如果要在多个节点上运行服务,传输程序是一个重要的模块。传送器与其他节点通信。它传输事件、调用请求和处理响应… 如果一个服务在不同节点上的多个实例上运行,则请求将在活动节点之间实现负载平衡

    整个通信逻辑是在传输类之外的。这意味着在不改变代码行的情况下,在传送器之间切换是很容易的。

    Moleculer框架中有几个内置的运输机。

    NATS

    NATS服务器是一个简单、高性能的开源消息传递系统,用于云本机应用程序、物联网消息传递和微服务体系结构。

    let { ServiceBroker } = require("moleculer");
    
    const broker = new ServiceBroker({
        nodeID: "server-1",
        transporter: "nats://nats.server:4222"
    });
    

    使用 nats 传输需要安装 nats 模块 npm install nats

    // Connect to 'nats://localhost:4222'
    const broker = new ServiceBroker({
        transporter: "NATS"
    });
    
    // Connect to a remote NATS server
    const broker = new ServiceBroker({
        transporter: "nats://nats-server:4222"
    });
    
    // Connect with options
    const broker = new ServiceBroker({
        transporter: {
            type: "NATS",
            options: {
                url: "nats://localhost:4222"
                user: "admin",
                pass: "1234"
            }
        }
    });
    
    // Connect with TLS
    const broker = new ServiceBroker({
        transporter: {
            type: "NATS",
            options: {
                url: "nats://localhost:4222"
                // More info: https://github.com/nats-io/node-nats#tls
                tls: {
                    key: fs.readFileSync('./client-key.pem'),
                    cert: fs.readFileSync('./client-cert.pem'),
                    ca: [ fs.readFileSync('./ca.pem') ]
                }
            }
        }
    });
    

    Serialization

    传输程序需要一个序列化模块来序列化和反序列化传输的数据包。默认的串行化程序是JSONS序列化程序,但是有几个内置的串行化程序。

    const { ServiceBroker } = require("moleculer");
    
    const broker = new ServiceBroker({
        nodeID: "server-1",
        transporter: "NATS",
        serializer: "ProtoBuf"
    });
    
    • JSON serializer:这是内置的默认序列化程序。它将数据包序列化为JSON字符串,并将接收到的数据反序列化为数据包。
    const broker = new ServiceBroker({
        // serializer: "JSON" // don't need to set, because it is the default
    });
    

    Load balancing

    Built-in strategies

    若要配置策略,请在注册表属性下设置策略代理选项。它可以是一个名称(在内置策略的情况下),也可以是一个策略类(在自定义策略的情况下)。

    Random strategy

    const broker = new ServiceBroker({
        registry: {
            strategy: "Random"
        }
    });
    

    RoundRobin strategy

    const broker = new ServiceBroker({
        registry: {
            strategy: "RoundRobin"
        }
    });
    

    CPU usage-based strategy

    const broker = new ServiceBroker({
        registry: {
            strategy: "CpuUsage"
        }
    });
    

    Fault tolerance

    Circuit Breaker

    • Moleculer有一个内置的断路器解决方案.这是一个基于阈值的实现。它使用一个时间窗口来检查失败的请求率。一旦达到阈值,它就会触发断路器。
    • 电路断路器可以防止应用程序重复尝试执行可能失败的操作。允许它继续,而不等待故障被修复或浪费CPU周期,而它确定故障是长期的。断路器模式还允许应用程序检测故障是否已经解决。如果问题似乎已经解决,应用程序可以尝试调用操作。
    • 如果启用它,所有服务调用都将受到此内置断路器的保护。
    • 在代理选项中启用它
    const broker = new ServiceBroker({
        circuitBreaker: {
            enabled: true,
            threshold: 0.5,
            minRequestCount: 20,
            windowTime: 60, // in seconds
            halfOpenTime: 5 * 1000, // in milliseconds
            check: err => err && err.code >= 500
        }
    });
    
    • settings
      • enabled:是否启动此功能,默认是 false
      • threshold:阈值,默认0.5,意味着50%的跳闸失败
      • minRequestCount:最小请求数,默认20,在它下面,回调函数不会触发
      • windowTime:时间窗口的秒数,默认60秒
      • halfOpenTime:从打开状态切换到半打开状态的毫秒数,默认10000毫秒
      • check:检查失败请求的函数,默认 err && err.code >= 500
    • 如果断路器状态发生更改,ServiceBroker将发送内部事件
    • 这些全局选项也可以在操作定义中重写。
    // users.service.js
    module.export = {
        name: "users",
        actions: {
            create: {
                circuitBreaker: {
                    // All CB options can be overwritten from broker options.
                    threshold: 0.3,
                    windowTime: 30
                },
                handler(ctx) {}
            }
        }
    };
    

    Retry

    • 重试解决方案
    const broker = new ServiceBroker({
        retryPolicy: {
            enabled: true,
            retries: 5,
            delay: 100,
            maxDelay: 2000,
            factor: 2,
            check: err => err && !!err.retryable
        }
    });
    
    • settings
      • enabled:是否启用,默认 false
      • retries:重试的次数,默认5次
      • delay:第一次延迟以毫秒为单位,默认 100
      • maxDelay:最大延迟(以毫秒为单位),默认 2000
      • factor:延迟退避系数,默认是 2,表示指数退避
      • check:检查失败请求的函数,err && !!err.retryable
    • 在调用选项中覆盖retry值
    broker.call("posts.find", {}, { retries: 3 });
    
    • 在操作定义中覆盖重试策略值
    // users.service.js
    module.export = {
        name: "users",
        actions: {
            find: {
                retryPolicy: {
                    // All Retry policy options can be overwritten from broker options.
                    retries: 3,
                    delay: 500
                },
                handler(ctx) {}
            },
            create: {
                retryPolicy: {
                    // Disable retries for this action
                    enabled: false
                },
                handler(ctx) {}
            }
        }
    };
    

    Timeout

    • 可以为服务调用设置超时。它可以在代理选项或调用选项中全局设置。如果定义了超时并且请求超时,代理将抛出RequestTimeoutError错误。
    const broker = new ServiceBroker({
        requestTimeout: 5 * 1000 // in seconds
    });
    
    • 覆盖调用选项中的超时值
    broker.call("posts.find", {}, { timeout: 3000 });
    
    • 分布式超时:Moleculer使用分布式超时。在嵌套调用的情况下,超时值会随着时间的推移而递减。如果超时值小于或等于0,则跳过下一个嵌套调用(RequestSkippedError),因为第一个调用已被RequestTimeoutError错误拒绝。

    Bulkhead

    • 在Moleculer框架中实现了舱壁特性,以控制动作的并发请求处理。
    const broker = new ServiceBroker({
        bulkhead: {
            enabled: true,
            concurrency: 3,
            maxQueueSize: 10,
        }
    });
    
    • settings

      • enabled:是否启动,默认 false

      • concurreny:最大限度的并行数量,默认3

      • maxQueueSize:最大队列大小,默认10

      • concurreny 值限制并发请求执行

      • 如果 maxQueueSize大于0,则如果所有插槽都被占用,则 broker 将额外的请求存储在队列中

      • 如果队列大小达到maxQueueSize限制或为0,则 Broker 将对每个添加请求抛出QueueIsFull异常

    • 这些全局选项也可以在操作定义中重写

    // users.service.js
    // 在操作定义中覆盖重试策略值
    module.export = {
        name: "users",
        actions: {
            find: {
                bulkhead: {
                    enabled: false
                },
                handler(ctx) {}
            },
            create: {
                bulkhead: {
                    // Increment the concurrency value
                    // for this action
                    concurrency: 10
                },
                handler(ctx) {}
            }
        }
    };
    

    Fallback

    • 当您不想将错误返回给用户时,回退功能是非常有用的。相反,调用其他操作或返回一些常见的内容。可以在调用选项或操作定义中设置回退响应。
    • 它应该是一个返回包含任何内容的承诺的函数。borker 将当前 context&Error对象作为参数传递给此函数。
    // fallback settings in calling options 
    const result = await broker.call("users.recommendation", { userID: 5 }, {
        timeout: 500,
        fallbackResponse(ctx, err) {
            // Return a common response from cache
            return broker.cacher.get("users.fallbackRecommendation:" + ctx.params.userID);
        }
    });
    
    • 回退响应也可以在接收端,在 action 中定义
    • 请注意,只有在action 处理程序中发生错误时,才会使用此回退响应。如果从远程节点调用请求,并且请求在远程节点上超时,则不使用回退响应。在这种情况下,在调用选项中使用回退响应。
    // fallback as a function
    module.exports = {
        name: "recommends",
        actions: {
            add: {
                fallback: (ctx, err) => "Some cached result",
                //fallback: "fakeResult",
                handler(ctx) {
                    // Do something
                }
            }
        }
    };
    
    // fallback as method name
    module.exports = {
        name: "recommends",
        actions: {
            add: {
                // Call the 'getCachedResult' method when error occurred
                fallback: "getCachedResult",
                handler(ctx) {
                    // Do something
                }
            }
        },
    
        methods: {
            getCachedResult(ctx, err) {
                return "Some cached result";
            }
        }
    };
    

    NATS

    帮助文档

  • 相关阅读:
    内置函数
    递归函数:
    函数(迭代器与生成器)
    函数的装饰器
    函数:(函数的名字,闭包)
    函数(命名空间,作用域,嵌套)
    函数:(定义,调用,返回值和参数)
    hdu 4267 A Simple Problem with Integers(线段树)
    hdu 2089 不要62 hdu 3555 Bomb (数位DP)
    poj 2955 Brackets (区间DP)
  • 原文地址:https://www.cnblogs.com/cnloop/p/9434497.html
Copyright © 2011-2022 走看看