zoukankan      html  css  js  c++  java
  • opcua协议介绍

    opc ua

    是一种应用层协议,基于tcp之上,其url通常为opc.tcp://127.0.0.1:4840/abc,在opc ua中常被称为endpoint

    两种模式

    opc ua支持c/s模式,同时也支持类似mqtt的发布订阅模式,通常各种设备作为opc ua的服务端提供各种服务。

    k1yyi693ha

    信息模型

    opc ua采用面向对象的设计思路, 使用了对象(objects)作为过程系统表示数据和活动的基础。对象包含了变量事件方法,它们通过引用(reference)来互相连接。

    OPC UA 信息模型是节点的网络(Network of Node,),或者称为结构化图(graph),由节点(node)和引用(References)组成,这种结构图称之为OPC UA 的地址空间。这种图形结构可以描述各种各样的结构化信息(对象)。

    20190403111143907

    注意⚠️:opc ua中所说的节点是在一个opc ua服务器中,不要理解为一个服务器对应一个node

    节点

    opc ua定义了8种类型的节点

    对象(Object)
    对象类型(Object Type)
    变量(Variable)
    变量类型(Variable Type)
    方法(Method)
    视图(View)
    引用(Reference)
    数据类型(Data Type)

    每种节点都包含一些公共属性,如下:

    属性 数据类型 说明
    NodeId NodeId 在OPC UA服务器内唯一确定的一个节点,并且在OPC UA服务器中定位该节点
    NodeClass Int32 该节点的类型(上面列出的8种之一)
    BrowseName QualifiedName 浏览OPC UA服务器事定义的节点。它是非本地化的
    DisplayName LocalizedText 包含节点的名字,用来在用户接口中显示名字,本地化
    Description LocalizedText 本地化的描述(可选)
    WriteMask Uint32 指示哪个节点属性是可写的,即可被OPC UA客户端修改(可选)
    UserWriteMask Uint32 指示哪个节点属性可以被当前连接到服务器上的用户修改(可选)

    除了数据类型节点之外,其他各个节点都有额外的专属属性

    20190403111321142

    引用

    引用描述了两个节点之间的关系,用来连接多个节点。OPC UA预定义了多种引用,常见的引用有:

    • hasTypeDefinition

    描述对象、变量和类型之间的关系

    • ObjectNode的hasTypeDefinition引用,指向了一个ObjectTypeNode,表示该ObjectNode的类型;
    • VariableNode的hasTypeDefinition引用,指向一个VariableTypeNode,表示该 VariableNode的类型。
    • hasSubType

    描述对象的挤成关系,当子类从父类继承后,子类拥有一个hasSubType引用指向父类。

    • hasComponents

    描述一种组合关系

    • ObjectNode一般都由多个VariableNode组成,ObjectNode包含某个VariableNode时,ObjectNode拥有一个hasComponents引用,指向该VariableNode;
    • VariableNode也可以包含子VariableNode,此时也用hasComponents描述它们的关系。
    • Organizes

    指明两个节点的层次结构,通过organizes可以把多个节点组织到同一个父节点下。

    完整引用如下

    opc_ua_refs

    服务

    服务可以看成是OPC UA服务器提供的API集合,OPC UA与定义了37个标准服务,常用的服务有:

    • 读写服务

    可以获取和修改服务器指定节点指定属性的值

    • 调用服务

    执行服务器上指定节点的方法

    • 订阅数据变化和订阅事件

    可以监控服务器数据的变化

    opc ua编程

    Sdk

    客户端

    • opcua-client-gui

      使用python(pyqt5)开发使用pip可以安装,跨平台

      sudo pip3 install pyqt5 -i https://pypi.mirrors.ustc.edu.cn/simple/
      sudo pip3 install numpy -i https://pypi.mirrors.ustc.edu.cn/simple/
      sudo pip3 install pyqtgraph -i https://pypi.mirrors.ustc.edu.cn/simple/
      sudo pip3 install cryptography -i https://pypi.mirrors.ustc.edu.cn/simple/
      sudo pip3 install opcua-client -i https://pypi.mirrors.ustc.edu.cn/simple/
      

    模拟设备

    可利用sdk自己开发 见下面的python demo

    golang Demo

    读取服务器数据

    package main
    
    import (
    	"context"
    	"log"
    
    	"github.com/gopcua/opcua"
    	"github.com/gopcua/opcua/ua"
    )
    
    func main() {
    	endpoint := "opc.tcp://milo.digitalpetri.com:62541/milo"
    	nodeID := "ns=2;s=Dynamic/RandomFloat"
    
    	ctx := context.Background()
    
    	c := opcua.NewClient(endpoint, opcua.SecurityMode(ua.MessageSecurityModeNone))
    	if err := c.Connect(ctx); err != nil {
    		log.Fatal(err)
    	}
    	defer c.Close()
    
    	id, err := ua.ParseNodeID(nodeID)
    	if err != nil {
    		log.Fatalf("invalid node id: %v", err)
    	}
    
    	req := &ua.ReadRequest{
    		MaxAge:             2000,
    		NodesToRead:        []*ua.ReadValueID{{NodeID: id}},
    		TimestampsToReturn: ua.TimestampsToReturnBoth,
    	}
    
    	resp, err := c.Read(req)
    	if err != nil {
    		log.Fatalf("Read failed: %s", err)
    	}
    	if resp.Results[0].Status != ua.StatusOK {
    		log.Fatalf("Status not OK: %v", resp.Results[0].Status)
    	}
    	log.Printf("%#v", resp.Results[0].Value.Value())
    }
    

    向服务器写数据

    package main
    
    import (
    	"context"
    	"github.com/gopcua/opcua"
    	"github.com/gopcua/opcua/ua"
    	"log"
    )
    
    func main() {
    	endpoint := "opc.tcp://milo.digitalpetri.com:62541/milo"
    	nodeID := "ns=2;s=Dynamic/RandomFloat"
    
    	ctx := context.Background()
    
    	c := opcua.NewClient(endpoint, opcua.SecurityMode(ua.MessageSecurityModeNone))
    	if err := c.Connect(ctx); err != nil {
    		log.Fatal(err)
    	}
    	defer c.Close()
    
    	id, err := ua.ParseNodeID(nodeID)
    	if err != nil {
    		log.Fatalf("invalid node id: %v", err)
    	}
    
    	v, err := ua.NewVariant(10.0)
    	if err != nil {
    		log.Fatalf("invalid value: %v", err)
    	}
    
    	req := &ua.WriteRequest{
    		NodesToWrite: []*ua.WriteValue{
    			{
    				NodeID:      id,
    				AttributeID: ua.AttributeIDValue,
    				Value: &ua.DataValue{
    					EncodingMask: ua.DataValueValue,
    					Value:        v,
    				},
    			},
    		},
    	}
    
    	resp, err := c.Write(req)
    	if err != nil {
    		log.Fatalf("Read failed: %s", err)
    	}
    	log.Printf("%v", resp.Results[0])
    }
    

    监听服务器数据变化

    package main
    
    import (
    	"context"
    	"github.com/gopcua/opcua/monitor"
    	"log"
    	"os"
    	"os/signal"
    	"sync"
    	"time"
    
    	"github.com/gopcua/opcua"
    	"github.com/gopcua/opcua/ua"
    )
    
    func cleanup(sub *monitor.Subscription, wg *sync.WaitGroup) {
    	log.Printf("stats: sub=%d delivered=%d dropped=%d", sub.SubscriptionID(), sub.Delivered(), sub.Dropped())
    	sub.Unsubscribe()
    	wg.Done()
    }
    
    func startCallbackSub(ctx context.Context, m *monitor.NodeMonitor, interval, lag time.Duration, wg *sync.WaitGroup, nodes ...string) {
    	sub, err := m.Subscribe(
    		ctx,
    		&opcua.SubscriptionParameters{
    			Interval: interval,
    		},
    		func(s *monitor.Subscription, msg *monitor.DataChangeMessage) {
    			if msg.Error != nil {
    				log.Printf("[callback] error=%s", msg.Error)
    			} else {
    				log.Printf("[callback] node=%s value=%v", msg.NodeID, msg.Value.Value())
    			}
    			time.Sleep(lag)
    		},
    		nodes...)
    
    	if err != nil {
    		log.Fatal(err)
    	}
    
    	defer cleanup(sub, wg)
    
    	<-ctx.Done()
    }
    
    func main() {
    	endpoint := "opc.tcp://milo.digitalpetri.com:62541/milo"
    	nodeID := "ns=2;s=Dynamic/RandomFloat"
    
    	signalCh := make(chan os.Signal, 1)
    	signal.Notify(signalCh, os.Interrupt)
    
    	ctx, cancel := context.WithCancel(context.Background())
    	defer cancel()
    
    	go func() {
    		<-signalCh
    		println()
    		cancel()
    	}()
    
    	c := opcua.NewClient(endpoint, opcua.SecurityMode(ua.MessageSecurityModeNone))
    	if err := c.Connect(ctx); err != nil {
    		log.Fatal(err)
    	}
    	defer c.Close()
    
    	m, err := monitor.NewNodeMonitor(c)
    	if err != nil {
    		log.Fatal(err)
    	}
    
    	m.SetErrorHandler(func(_ *opcua.Client, sub *monitor.Subscription, err error) {
    		log.Printf("error: sub=%d err=%s", sub.SubscriptionID(), err.Error())
    	})
    	wg := &sync.WaitGroup{}
    
    	// start callback-based subscription
    	wg.Add(1)
    	go startCallbackSub(ctx, m, time.Second, 0, wg, nodeID)
    
    	<-ctx.Done()
    	wg.Wait()
    }
    

    python opcua server demo

    #!/usr/bin/env python3
    
    from threading import Thread
    import random
    import time
    from opcua import ua, uamethod, Server
    
    @uamethod
    def set_temperature(parent, variant):
        print(f"set_temperature {variant.Value}")
        temperature_thread.temperature.set_value(variant.Value)
    
    @uamethod
    def set_onoff(parent, variant):
        print(f"set_onoff {variant.Value}")
        temperature_thread.temperature.set_value(variant.Value)
    
    # 这个类用于后台定时随机修改值
    class Temperature(Thread):
        def __init__(self, temperature, onoff):
            Thread.__init__(self)
            self._stop = False
            self.temperature = temperature
            self.onoff = onoff
    
        def stop(self):
            self._stop = True
    
        def run(self):
            count = 1
            while not self._stop:
                value = random.randint(-20, 100)
                self.temperature.set_value(value)
                print(f"random set temperature {value}")
    
                value = bool(random.randint(0, 1))
                self.onoff.set_value(value)
                print(f"random set onoff {value}")
    
                led_event.event.Message = ua.LocalizedText("high_temperature %d" % count)
                led_event.event.Severity = count
                #led_event.event.temperature = random.randint(60, 100)
                led_event.event.onoff = bool(random.randint(0, 1))
                led_event.trigger()
                count += 1
    
                time.sleep(10)
    
    if __name__ == "__main__":
        # now setup our server
        server = Server()
        server.set_endpoint("opc.tcp://0.0.0.0:40840/tuyaopcua/server/")
        server.set_server_name("TuyaOpcUa Driver Demo Device")
    
        # set all possible endpoint policies for clients to connect through
        server.set_security_policy([
            ua.SecurityPolicyType.NoSecurity,
            ua.SecurityPolicyType.Basic128Rsa15_SignAndEncrypt,
            ua.SecurityPolicyType.Basic128Rsa15_Sign,
            ua.SecurityPolicyType.Basic256_SignAndEncrypt,
            ua.SecurityPolicyType.Basic256_Sign])
    
        # setup our own namespace
        uri = "http://tuya.com"
        idx = server.register_namespace(uri)
    
        # 添加一个 `空调` 对象
        air_conditioner = server.nodes.objects.add_object(idx, "AirConditioner")
        temperature = air_conditioner.add_variable(idx, "temperature", 20)
        temperature.set_writable()
        onoff = air_conditioner.add_variable(idx, "onoff", True)
        onoff.set_writable()
    
    
        air_conditioner.add_method(idx, "set_temperature", set_temperature, [ua.VariantType.UInt32])
        air_conditioner.add_method(idx, "set_onoff", set_onoff, [ua.VariantType.Boolean])
    
        # creating a default event object, the event object automatically will have members for all events properties
        led_event_type = server.create_custom_event_type(idx,
                                                         'high_temperature',
                                                         ua.ObjectIds.BaseEventType,
                                                         [('temperature', ua.VariantType.UInt32), ('onoff', ua.VariantType.Boolean)])
    
        led_event = server.get_event_generator(led_event_type, air_conditioner)
        led_event.event.Severity = 300
    
        # start opcua server
        server.start()
        print("Start opcua server...")
    
        temperature_thread = Temperature(temperature, onoff)
        temperature_thread.start()
    
        try:
    
            led_event.trigger(message="This is BaseEvent")
    
            while True:
                time.sleep(5)
        finally:
            print("Exit opcua server...")
            temperature_thread.stop()
            server.stop()
    
  • 相关阅读:
    Redis的主从复制
    JVM之jps命令
    ThreadLocal源码分析
    ThreadLocal初体验
    Redis的逐出算法
    Redis的删除策略
    DagScheduler 和 TaskScheduler
    数据挖掘的数据预处理
    日志实时收集和计算的简单方案
    spark ml 的例子
  • 原文地址:https://www.cnblogs.com/kainhuck/p/14663473.html
Copyright © 2011-2022 走看看