zoukankan      html  css  js  c++  java
  • Golang 实现UDPServer并发送消息到ActiveMQ

    示例代码

    package main
    
    import (
        "net"
        "os"
        "github.com/gpmgo/gopm/modules/goconfig"
        "github.com/go-stomp/stomp"
        "time"
        "strconv"
        "log"
        "strings"
    )
    
    
    
    
    // 限制goroutine数量
    var limitChan = make(chan bool, 10000) // Todo 从配置文件中读取
    
    // 限制同时处理消息数量
    var msgChan = make(chan string, 10000) // Todo 从配置文件中读取
    var activeMqLimitedChan = make(chan bool, 100)
    var activeMq *stomp.Conn
    var activeQueue string
    var host string
    var port string
    var connectTimes = 0
    var udpAddress = "0.0.0.0"    // Todo 从配置文件中读取
    var udpPort = "514"    // Todo 从配置文件中读取
    var logFilePath = "/var/log/syslog_server/"
    var configFilePath = "./config.ini"
    // UDP goroutine 实现并发读取UDP数据
    func udpProcess(conn *net.UDPConn) {
        defer func() {
            if e := recover(); e != nil {
                // 初始化日志,每天生成一个日志文件,日志文件名以日志结尾
                logFileName := logFilePath + "server-" + strings.Split(time.Now().String(), " ")[0] + ".log"
                logFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766) // 应该判断error,此处简略
                defer logFile.Close()
                if err != nil {
                    log.Fatalln("open log file error: ", err.Error())
                }
                logger := log.New(logFile, "[Error] ", log.Ldate|log.Ltime|log.Lshortfile)
    
                // 记录错误日志
                logger.Println("udpProcess error:", e)
            }
            // 释放出一个协程
            <- limitChan
        }()
    
        // 最大读取数据大小
        data := make([]byte, 1024)
        n, _, err := conn.ReadFromUDP(data)
        if err != nil {
            panic(err)
        }
    
        // 获取对端的IP地址
        // remoteAddr := conn.RemoteAddr()
        // msgChan <- remoteAddr.String() + " " + string(data[:n])
    
        msgChan <- string(data[:n])
    
    
    }
    
    
    func udpServer(address, port string) {
        // @todo 如何防止udpServer 一直Panic导致无限循环重启
        defer func() {
            if e := recover(); e != nil {
                // 初始化日志,每天生成一个日志文件,日志文件名以日志结尾
                logFileName := logFilePath + "server-" + strings.Split(time.Now().String(), " ")[0] + ".log"
                logFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766) // 应该判断error,此处简略
                defer logFile.Close()
                if err != nil {
                    log.Fatalln("open log file error: ", err.Error())
                }
                logger := log.New(logFile, "[Error] ", log.Ldate|log.Ltime|log.Lshortfile)
    
                // 记录错误日志
                logger.Println("udpServer error:", e)
    
                // udpServer启动失败后,间隔10秒后重试
                time.Sleep(10 * time.Second)
                udpServer(udpAddress, udpPort)
            }
        }()
    
        udpAddr, err := net.ResolveUDPAddr("udp", net.JoinHostPort(address, port))
        conn, err := net.ListenUDP("udp", udpAddr)
        defer conn.Close()
        if err != nil {
            panic(err)
        }
    
        for {
            limitChan <- true
            go udpProcess(conn)
        }
    }
    
    
    // 读取ActiveMQ配置信息
    func getConfiguration(){
        defer func() {
            if e := recover(); e != nil {
                // 初始化日志,每天生成一个日志文件,日志文件名以日志结尾
                logFileName := logFilePath + "server-" + strings.Split(time.Now().String(), " ")[0] + ".log"
                logFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766) // 应该判断error,此处简略
                defer logFile.Close()
                if err != nil {
                    log.Fatalln("open log file error: ", err.Error(), ", programing exit.")
                    os.Exit(1)
                }
                logger := log.New(logFile, "[Error] ", log.Ldate|log.Ltime|log.Lshortfile)
    
                // 记录错误日志
                logger.Println("Get Configuration error:", e)
            }
        }()
    
        configFile, err := goconfig.LoadConfigFile(configFilePath)
        if err != nil {
            panic(err)
        }
    
        host, err = configFile.GetValue("active_mq", "host")
        if err != nil {
            // 如果没有配置主机,则使用本地主机
            host = "127.0.0.1"
        }
        port, err = configFile.GetValue("active_mq", "port")
        if err != nil {
            // 如果没配置端口,则使用默认端口
            port = "61613"
        }
    
        activeQueue, err = configFile.GetValue("active_mq", "queue")
        if err != nil {
            // 如果没配置端口,则使用默认队列名
            activeQueue = "syslog.queue"
        }
    }
    
    // 使用IP和端口连接到ActiveMQ服务器, 返回ActiveMQ连接对象
    func connActiveMq(){
        // @todo 如何防止无限循环
        defer func() {
            if e := recover(); e != nil {
                // 初始化日志,每天生成一个日志文件,日志文件名以日志结尾
                logFileName := logFilePath + "server-" + strings.Split(time.Now().String(), " ")[0] + ".log"
                logFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766) // 应该判断error,此处简略
                defer logFile.Close()
                if err != nil {
                    log.Fatalln("open log file error: ", err.Error())
                }
                logger := log.New(logFile, "[Error] ", log.Ldate|log.Ltime|log.Lshortfile)
    
                // 记录错误日志
                logger.Println("connActiveMq error:", e)
    
                // ActiveMQ服务器连接失败后,间隔3秒后重试
                time.Sleep(3 * time.Second)
                activeMq = nil
                connActiveMq()
            }
        }()
    
        // @todo 实现断开重连
        if activeMq == nil {
            var err error
            activeMq, err = stomp.Dial("tcp", net.JoinHostPort(host, port))
            if err != nil {
                connectTimes ++
                if connectTimes >= 100 {
                    time.Sleep(60 * time.Second)
                }else if connectTimes >= 10 {
                    time.Sleep(10 * time.Second)
                }else {
                    time.Sleep(3 * time.Second)
                }
                panic(err.Error() + ", 重新连接ActiveMQ, 已重试次数: " + strconv.Itoa(connectTimes))
    
            }else {
                connectTimes = 0
            }
        }
    }
    
    
    func activeMqProducer(c chan string){
        // @todo 如何防止activeMqProducer 退出
        defer func() {
            if e := recover(); e != nil {
                // 初始化日志,每天生成一个日志文件,日志文件名以日志结尾
                logFileName := logFilePath + "server-" + strings.Split(time.Now().String(), " ")[0] + ".log"
                logFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766) // 应该判断error,此处简略
                defer logFile.Close()
                if err != nil {
                    log.Fatalln("open log file error: ", err.Error())
                }
                logger := log.New(logFile, "[Error] ", log.Ldate|log.Ltime|log.Lshortfile)
    
                // 记录错误日志
                logger.Println("activeMqProducer error:", e)
    
                // 重试
                go activeMqProducer(msgChan)
            }
        }()
        for{
            activeMqLimitedChan <- true    // 限制开启协程数量
            contentMsg := <-c
            go func() {
                defer func() {
                    if e := recover(); e != nil {
                        err := os.MkdirAll(logFilePath, 777)
                        log.Fatalln("create log dirctory error: ", err.Error())
                        // 初始化日志,每天生成一个日志文件,日志文件名以日志结尾
                        logFileName := logFilePath + "server-" + strings.Split(time.Now().String(), " ")[0] + ".log"
                        logFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766) // 应该判断error,此处简略
                        defer logFile.Close()
                        if err != nil {
                            log.Fatalln("open log file error: ", err.Error())
                        }
                        logger := log.New(logFile, "[Error] ", log.Ldate|log.Ltime|log.Lshortfile)
    
                        // 记录错误日志
                        logger.Println("activeMqProducer error:", e)
                    }
                    // 释放出一个协程
                    <- activeMqLimitedChan
                }()
    
                err := activeMq.Send(activeQueue, "text/plain", []byte(contentMsg))
                if err != nil {
                    if err.Error() == "connection already closed"{
                        activeMq = nil
                        connActiveMq()
                        activeMq.Send(activeQueue, "text/plain", []byte(contentMsg))
                    }
                    panic(err)
                }
            }()
    
        }
    
    }
    
    
    func init(){
        // 初始化 ActiveMQ 配置
        getConfiguration()
    
        // 连接到 ActiveMQ 服务器
        connActiveMq()
    
        // 启动一个协程将Syslog消息放入ActiveMQ队列中
        go activeMqProducer(msgChan)
    
    }
    
    func main() {
        defer activeMq.Disconnect()
        udpServer(udpAddress, udpPort)
    }
  • 相关阅读:
    政府网文件搜索列表页
    docker 容器间相互连接
    asp.net core 3.1 中使用cookie
    abp vnext 去IdentityServer 精简单体项目基于abp vnext 3.2版本
    c# 图片加文字,横线,圆
    淘宝客根据优惠券url更新优惠券信息
    软件产品ui模型制作工具
    在类库中获取配置信息 asp.net core 3.1
    vs 或 Android Studio 使用Visual Studio Emulator for Android调试安卓程序
    ABP 替换异常过滤器
  • 原文地址:https://www.cnblogs.com/vincenshen/p/10869432.html
Copyright © 2011-2022 走看看