zoukankan      html  css  js  c++  java
  • Packetbeat协议扩展开发教程(3)

    前面介绍了Packetbeat的项目结构,今天终于要开始写代码了,想想还是有点小激动呢。(你快点吧,拖半天了)
    网络传输两大协议TCP和UDP,我们的所有协议都不离这两种,HTTP、MySQL走的是TCP传输协议,DNS走的是UDP协议,在Packetbeat里面,实现一个自己的协议非常简单,继承并实现这两者对应的接口就行了,我们看一下长什么样:
    打开一个现有的UDP和HTTP协议接口定义:

    /~/go/src/github.com/elastic/beats/packetbeat/protos/protos.go

    // Functions to be exported by a protocol plugin
    type ProtocolPlugin interface {
    	// Called to initialize the Plugin
    	Init(test_mode bool, results publisher.Client) error
     
    	// Called to return the configured ports
    	GetPorts() []int
    }
     
    type TcpProtocolPlugin interface {
    	ProtocolPlugin
     
    	// Called when TCP payload data is available for parsing.
    	Parse(pkt *Packet, tcptuple *common.TcpTuple,
    		dir uint8, private ProtocolData) ProtocolData
     
    	// Called when the FIN flag is seen in the TCP stream.
    	ReceivedFin(tcptuple *common.TcpTuple, dir uint8,
    		private ProtocolData) ProtocolData
     
    	// Called when a packets are missing from the tcp
    	// stream.
    	GapInStream(tcptuple *common.TcpTuple, dir uint8, nbytes int,
    		private ProtocolData) (priv ProtocolData, drop bool)
     
    	// ConnectionTimeout returns the per stream connection timeout.
    	// Return <=0 to set default tcp module transaction timeout.
    	ConnectionTimeout() time.Duration
    }
     
    type UdpProtocolPlugin interface {
    	ProtocolPlugin
     
    	// ParseUdp is invoked when UDP payload data is available for parsing.
    	ParseUdp(pkt *Packet)
    }

    TcpProtocolPlugin:TCP协议插件的接口定义,依次是:Parse() 解析Packet,ReceivedFin()处理TCP断开连接,GapInStream()处理空包丢包,ConnectionTimeout()超时时间;

    UdpProtocolPlugin: UDP协议的接口定义,UDP协议是不需要握手和保障数据可靠性的,扔出去就结束,速度快,不保证数据可靠送达,所以只有ParseUdp一个方法需要实现,比较简单;

    ProtocolPlugin:TCP和UDP都需要实现ProtocolPlugin的基础接口,其实就定义了获取端口和初始化接口。

    请问:Packetbeat怎么工作的?

    回答:每一个协议都有一个固定的端口用于通信,你要做的事情就是定义协议端口,然后按协议是TCP还是UDP来实现对应的接口,Packetbeat将会截获指定端口的数据包(Packet),然后如果交给你定义的方法来进行解析,TCP是Parse,UDP是ParseUdp,都在上面的接口定义好的,然后将解析出来的结构化数据封装成Json,然后扔给Elasticsearch,后续的就的如何对这些数据做一些有趣的分析和应用了。

    貌似很简单嘛!

    进入每个端口的数据包,我们假设是一个自来水管,拧开80端口,哗啦啦出来的全是HTTP请求的数据包,Packetbeat里面Http协议监听的是80端口啊,所有这些包统统都交给Packetbeat里面的Http协议模块来进行解析,Http协议会一个个的检查这些数据包,也就是每个数据包都会调用一次Parse接口,到这里提到了传过来一个Packet,我们看看它的数据结构长什么样?

    type Packet struct {
    	Ts      time.Time
    	Tuple   common.IpPortTuple
    	Payload []byte
    }

    Packet结构简单,
    Ts是收到数据包的时间戳;
    Tuple是一个来源IP+来源端口和目的IP+目的端口的元组;
    Payload就是这个包里面的传输的有用的数据,应用层的字节数据,不包括IP和TCP/UDP头信息,是不是处理起来简单许多。
    首选我们确定SMTP协议的配置,每个协议在packetbeat.yml的protocol下面都应该有一个配置节点,如下:

    protocols:
      smtp:
        # Configure the ports where to listen for Smtp traffic. You can disable
        # the Smtp protocol by commenting out the list of ports.
        ports: [25]

    还需要在对应的config类文件:packetbeat/config/config.go,增加SMTP的结构体,目前只支持一个端口参数,继承基类ProtocolCommon就行,如下:

    git diff config/config.go
    @@ -42,6 +42,7 @@ type Protocols struct {
            Pgsql    Pgsql
            Redis    Redis
            Thrift   Thrift
    +       Smtp     Smtp
     }
     
     type Dns struct {
    @@ -118,5 +119,9 @@ type Redis struct {
            Send_response *bool
     }
     
    +type Smtp struct {
    +	ProtocolCommon        `yaml:",inline"`
    +}
    +
     // Config Singleton
     var ConfigSingleton Config

    在protos文件夹下面,新增smtp目录,并新增空白文件smtp.go,路径:packetbeat/protos/smtp/smtp.go,这里就是解析SMTP协议的地方,也是我们扩展协议的主要的工作。

    ...TODO...

    修改protos/protos.go,增加SMTP协议枚举,这里记得保证顺序一致,并且protocol名称必须和配置的节点名称一致,如这里都是smtp。

    git diff protos/protos.go
    @@ -103,6 +103,7 @@ const (
            MongodbProtocol
            DnsProtocol
            MemcacheProtocol
    +       SmtpProtocol
     )
     
     // Protocol names
    @@ -116,6 +117,7 @@ var ProtocolNames = []string{
            "mongodb",
            "dns",
            "memcache",
    +       "smtp",
     }

    继续修改packetbeat.go主文件,允许SMTP协议并加载。

    git diff packetbeat.go
    @@ -27,6 +27,7 @@ import (
            "github.com/elastic/packetbeat/protos/tcp"
            "github.com/elastic/packetbeat/protos/thrift"
            "github.com/elastic/packetbeat/protos/udp"
    +       "github.com/elastic/packetbeat/protos/smtp"
            "github.com/elastic/packetbeat/sniffer"
     )
     
    @@ -43,6 +44,7 @@ var EnabledProtocolPlugins map[protos.Protocol]protos.ProtocolPlugin = map[proto
            protos.ThriftProtocol:   new(thrift.Thrift),
            protos.MongodbProtocol:  new(mongodb.Mongodb),
            protos.DnsProtocol:      new(dns.Dns),
    +       protos.SmtpProtocol:      new(smtp.Smtp),
     }

    做完上面一系列修改之后,一个空白的SMTP协议的插件的架子就搭好了,并且插件也注册到了Packetbeat里面了,接下来我们再把packetbeat/protos/smtp/smtp.go按照TCPplugin接口的要求实现一下。

    说实话TCP处理起来很难,开始之前,我们先明确几个概念,TCP协议是有状态的,并且是流式的,我们关注的是七层应用层的消息,如HTTP里面的一个HTTP请求和返回,但是TCP底层都是一系列数据包,并且不同的请求的数据包是混杂在一起的,也就是说一个数据包里面可能只是一个HTTP请求的一部分也可能包含多条HTTP请求的一部分,所以Parse()里面需要处理跨数据包的状态信息,我们要把这些数据包和具体的七层的应用层的消息关联起来。

    现在我们仔细看看Parse()接口的各个参数定义是做什么用的

    Parse(pkt *Packet, tcptuple *common.TcpTuple,
    		dir uint8, private ProtocolData) ProtocolData

    pkt不用说了,是送进来的数据包,前面已经介绍了其数据结构,tcptuple是该数据包所属的TCP数据流所在的唯一标示(一个未关闭的TCP数据量包含若干数据包,直到TCP链接关闭),使用tcptuple.Hashable()获取唯一值;dir参数标示数据包在TCP数据流中的流向,和第一个TCP数据包方向一致是TcpDirectionOriginal,否则是TcpDirectionReverse;private参数可用来在TCP流中存储状态信息,可在运行时转换成具体的强类型,任意修改和传递给下一个Parse方法,简单来说就是进行中间数据的共享。

    下面看段MySQL模块里面的例子

    priv := mysqlPrivateData{}
            if private != nil {
                    var ok bool
                    priv, ok = private.(mysqlPrivateData)
                    if !ok {
                            priv = mysqlPrivateData{}
                    }
            }
     
            [ ... ]
     
            return priv

    上面的代码就是将private强制转换成mysqlPrivateData结构,然后再使用。我们再继续看后续怎么处理这些包的一个逻辑例子

    ok, complete := mysqlMessageParser(priv.Data[dir])
                    if !ok {
                            // drop this tcp stream. Will retry parsing with the next
                            // segment in it
                            priv.Data[dir] = nil
                            logp.Debug("mysql", "Ignore MySQL message. Drop tcp stream.")
                            return priv
                    }
     
                    if complete {
                            mysql.messageComplete(tcptuple, dir, stream)
                    } else {
                            // wait for more data
                            break
                    }

    mysqlMessageParser是一个解析mysql消息的方法,细节我们忽略,我们只需要关心它的返回,ok标示成功或者失败,true则继续处理,false表示数据包不能用,那就直接忽略;第二个参数complete表示判断这一个MySQL消息是否已经完整了,如果完整了,我们就可以扔出去了,否则继续等待剩下的消息内容。

    好的,我们看看SMTP协议怎么折腾吧,先看看一个邮件交互的流程图,来自 RFC5321 (https://tools.ietf.org/html/rfc5321):


    由上图可见,发送端和邮件服务器通过一系列命令来执行邮件的发送,下面看看一个具体的命令操作流程(来源: https://zh.wikipedia.org/wiki/简单邮件传输协议 )

    S: 220 www.example.com ESMTP Postfix
    C: HELO mydomain.com
    S: 250 Hello mydomain.com
    C: MAIL FROM:   
       
     
    S: 250 Ok
    C: RCPT TO:    
        
      
    S: 250 Ok
    C: DATA
    S: 354 End data with 
      
          
    C: Subject: test message
    C: From:""< sender@mydomain.com>
    C: To:""< friend@example.com>
    C:
    C: Hello,
    C: This is a test.
    C: Goodbye.
    C: .
    S: 250 Ok: queued as 12345
    C: quit
    S: 221 Bye

    上面的过程可以看到就几个命令就能将邮件发送出去,但是其实SMTP协议比较复杂,还包括身份认证、附件、多媒体编码等等,我们今天精简一下,我们目前只关心谁给谁发了邮件,发送内容先不管,这样相比完整的SMTP协议( RFC5321 ),我们只需要关注以下几个命令:

    MAIL:开始一份邮件 mail from: xxx@xx.com

    RCPT: 标识单个的邮件接收人;常在mail命令后面 可有多个rcpt to: xx@xx.com

    QUIT:结束SMTP会话,不一定发送了邮件,注意

    RESET:重置会话,当前传输被取消

    最终希望通过Packetbeat将这些数据解析并处理成我们想要的如下JSON数据,即大功告成:

    {
    "timestamp":"2016-1-15 12:00:00",
    "from":"medcl@example.co",
    "to":["lcdem@example.co"]
    }

    我们还需要一个测试数据,这里有一个下载各种协议测试数据包的地方,由wireshark站点提供: https://wiki.wireshark.org/SampleCaptures/

    Ctrl+F找到SMTP的 下载地址(https://wiki.wireshark.org/SampleCaptures?action=AttachFile&do=get&target=smtp.pcap)

    用wireshark打开我们刚刚下载的smtp.pcap文件,然后再输入过滤条件:tcp.port == 25,只看25端口的数据,如下图:


    上图可以看到25端口的跑的数据有很多,不过我们只关心我们需要的那几个命令就好了。

    打开/~/go/src/github.com/elastic/beats/packetbeat/protos/smtp/smtp.go定义smtpPrivateData,里面的Data是一个数组,分别是TCP两个方向的数据,SmtpMessage是解析出来的邮件信息

    type smtpPrivateData struct{
    	Data [2]*SmtpStream
    }
     
    type SmtpStream struct {
    	tcptuple *common.TcpTuple
     
    	data []byte
     
    	parseOffset int
    	isClient    bool
    	message *SmtpMessage
    }
     
    type SmtpMessage struct {
    	Ts   time.Time
    	From string
    	To []string
    }

    然后参照MySQL协议,定义相应的方法,最终如下:

    package smtp
     
    import (
    	"github.com/elastic/beats/libbeat/common"
    	"github.com/elastic/beats/libbeat/logp"
    	"github.com/elastic/beats/libbeat/publisher"
    	"github.com/elastic/beats/packetbeat/config"
    	"github.com/elastic/beats/packetbeat/protos"
    	"github.com/elastic/beats/packetbeat/protos/tcp"
    	"bytes"
    	"time"
    	"strings"
    )
     
    type smtpPrivateData struct{
    	Data [2]*SmtpStream
    }
     
    type SmtpStream struct {
    	tcptuple *common.TcpTuple
     
    	data []byte
     
    	parseOffset int
    	isClient    bool
     
    	message *SmtpMessage
    }
     
    type SmtpMessage struct {
    	start int
    	end   int
     
    	Ts   time.Time
    	From string
    	To []string
    	IgnoreMessage bool
    }
     
    type Smtp struct {
    	SendRequest         bool
    	SendResponse        bool
    	transactionTimeout time.Duration
    	Ports         []int
    	results publisher.Client
    }
     
    func (smtp *Smtp) initDefaults() {
    	smtp.SendRequest = false
    	smtp.SendResponse = false
    	smtp.transactionTimeout = protos.DefaultTransactionExpiration
    }
     
    func (smtp *Smtp) setFromConfig(config config.Smtp) error {
    	smtp.Ports = config.Ports
    	if config.SendRequest != nil {
    		smtp.SendRequest = *config.SendRequest
    	}
    	if config.SendResponse != nil {
    		smtp.SendResponse = *config.SendResponse
    	}
     
    	if config.TransactionTimeout != nil && *config.TransactionTimeout > 0 {
    		smtp.transactionTimeout = time.Duration(*config.TransactionTimeout) * time.Second
    	}
     
    	return nil
    }
     
    func (smtp *Smtp) GetPorts() []int {
    	return smtp.Ports
    }
     
    func (smtp *Smtp) Init(test_mode bool, results publisher.Client) error {
    	smtp.initDefaults()
     
    	if !test_mode {
    		err := smtp.setFromConfig(config.ConfigSingleton.Protocols.Smtp)
    		if err != nil {
    			return err
    		}
    	}
    	smtp.results = results
     
    	return nil
    }
     
    func readLine(data []byte, offset int) (bool, string, int) {
    	q := bytes.Index(data[offset:], []byte("
    "))
    	if q == -1 {
    		return false, "", 0
    	}
    	return true, string(data[offset : offset+q]), offset + q + 2
    }
     
    func (smtp *Smtp) Parse(pkt *protos.Packet, tcptuple *common.TcpTuple, dir uint8, private protos.ProtocolData, ) protos.ProtocolData {
     
    	defer logp.Recover("ParseSmtp exception")
     
    	priv := smtpPrivateData{}
    	if private != nil {
    		var ok bool
    		priv, ok = private.(smtpPrivateData)
    		if !ok {
    			priv = smtpPrivateData{}
    		}
    	}
     
    	if priv.Data[dir] == nil {
    		priv.Data[dir] = &SmtpStream{
    			tcptuple: tcptuple,
    			data:     pkt.Payload,
    			message:  &SmtpMessage{Ts: pkt.Ts},
    		}
    	} else {
    		// concatenate bytes
    		priv.Data[dir].data = append(priv.Data[dir].data, pkt.Payload...)
    		if len(priv.Data[dir].data) > tcp.TCP_MAX_DATA_IN_STREAM {
    			logp.Debug("smtp", "Stream data too large, dropping TCP stream")
    			priv.Data[dir] = nil
    			return priv
    		}
    	}
     
    	stream := priv.Data[dir]
    	for len(stream.data) > 0 {
    		if stream.message == nil {
    			stream.message = &SmtpMessage{Ts: pkt.Ts}
    		}
     
    		ok, complete := stmpMessageParser(priv.Data[dir])
    		if !ok {
    			// drop this tcp stream. Will retry parsing with the next
    			// segment in it
    			priv.Data[dir] = nil
    			logp.Debug("smtp", "Ignore SMTP message. Drop tcp stream. Try parsing with the next segment")
    			return priv
    		}
     
    		if complete {
    			smtp.messageComplete(tcptuple, dir, stream)
    		} else {
    			logp.Debug("smtp","still wait message...")
    			// wait for more data
    			break
    		}
    	}
     
    	return priv
    }
     
    func (smtp *Smtp) ConnectionTimeout() time.Duration {
    	return smtp.transactionTimeout
    }
     
    func stmpMessageParser(s *SmtpStream) (bool, bool) {
     
    	var value string=""
     
    	for s.parseOffset < len(s.data) {
     
     
    		logp.Info("smtp", "Parse message: %s", string(s.data[s.parseOffset]))
     
     
    		if strings.HasPrefix(string(s.data[s.parseOffset]),"MAIL" ) {
     
    			logp.Debug("smtp", "Hit MAIL command: %s", string(s.data[s.parseOffset]))
     
    			found, line, off := readLine(s.data, s.parseOffset)
    			if !found {
    				return true, false
    			}
     
    			value = line[1:]
    			logp.Debug("smtp", "value  %s", value)
     
    			s.parseOffset = off
    		} else {
    			logp.Debug("smtp", "Unexpected message starting with %s", s.data[s.parseOffset:])
    			return false, false
    		}
    	}
     
    	return true, false
    }
     
    func handleSmtp(stmp *Smtp, m *SmtpMessage, tcptuple *common.TcpTuple,
    dir uint8, raw_msg []byte) {
    	logp.Info("smtp","handle smtp message...")
     
    	//TODO
     
    }
     
    // Called when the parser has identified a full message.
    func (smtp *Smtp) messageComplete(tcptuple *common.TcpTuple, dir uint8, stream *SmtpStream) {
     
    	logp.Info("smtp","message completed...")
     
    	// all ok, ship it
    	msg := stream.data[stream.message.start:stream.message.end]
     
    	if !stream.message.IgnoreMessage {
    		handleSmtp(smtp, stream.message, tcptuple, dir, msg)
    	}
     
    	// and reset message
    	stream.PrepareForNewMessage()
    }
     
    func (stream *SmtpStream) PrepareForNewMessage() {
    	logp.Info("smtp","prepare for new message...")
     
    	stream.data = stream.data[stream.parseOffset:]
    	stream.parseOffset = 0
    	stream.isClient = false
    	stream.message = nil
    }
     
     
     
    func (smtp *Smtp) GapInStream(tcptuple *common.TcpTuple, dir uint8,
    nbytes int, private protos.ProtocolData) (priv protos.ProtocolData, drop bool) {
     
    	defer logp.Recover("GapInStream(smtp) exception")
     
    	if private == nil {
    		return private, false
    	}
     
    	return private, true
    }
     
    func (smtp *Smtp) ReceivedFin(tcptuple *common.TcpTuple, dir uint8,
    private protos.ProtocolData) protos.ProtocolData {
     
    	logp.Info("smtp","stream closed...")
     
    	// TODO: check if we have data pending and either drop it to free
    	// memory or send it up the stack.
    	return private
    }

    现在切换到命令行,编译一下

    cd ~/go/src/github.com/elastic/beats/packetbeat
    make

    编译成功,一个滚烫的packetbeat可执行文件就躺在当前目录下了,运行一下先,参数-I 指定pcap文件(还记得前面下载的那个测试文件吧)

    ./packetbeat -d "smtp" -c etc/packetbeat.yml -I ~/Downloads/smtp.pcap  -e -N

    运行查看控制台输出结果:

    ➜  packetbeat git:(smtpbeat) ✗ ./packetbeat -d "smtp" -c etc/packetbeat.yml -I ~/Downloads/smtp.pcap  -e -N 
    2016/01/15 10:12:19.058535 publish.go:191: INFO Dry run mode. All output types except the file based one are disabled.
    2016/01/15 10:12:19.058570 geolite.go:24: INFO GeoIP disabled: No paths were set under output.geoip.paths
    2016/01/15 10:12:19.058592 publish.go:262: INFO Publisher name: medcls-MacBook.local
    2016/01/15 10:12:19.058724 beat.go:145: INFO Init Beat: packetbeat; Version: 1.0.0
    2016/01/15 10:12:19.059758 beat.go:171: INFO packetbeat sucessfully setup. Start running.
    2016/01/15 10:12:20.155335 smtp.go:163: DBG  Parse message: 2
    2016/01/15 10:12:20.155416 smtp.go:180: DBG  Unexpected message starting with 250-xc90.websitewelcome.com Hello GP [122.162.143.157]
    250-SIZE 52428800
    250-PIPELINING
    250-AUTH PLAIN LOGIN
    250-STARTTLS
    250 HELP
    2016/01/15 10:12:22.310974 smtp.go:163: DBG  Parse message: F
    2016/01/15 10:12:22.311025 smtp.go:180: DBG  Unexpected message starting with From: "Gurpartap Singh"   
       
     
    To:    
        
      
    Subject: SMTP
    Date: Mon, 5 Oct 2009 11:36:07 +0530
    Message-ID: <000301ca4581$ef9e57f0$cedb07d0$@in>
    MIME-Version: 1.0
    ...

    成功了,邮件内容都在控制台输出了,但这还不是我们要的最终结果,我需要里面的关键信息,我们继续修改smtp.go这个文件。

  • 相关阅读:
    Word 转换为 PDf 的技术方案
    [转载]sql server 常用存储过程
    Redmine 初体验
    Quartz.net Tutorial Lesson 1
    [转载]sql server TSQL 区分字符串大小写 的两种方法
    [原创]sql server inner join 效率测试
    java实现树的一般操作
    【转】Mybatis中进行批量更新
    【转载】单例模式的7种写法(整合自两篇文章)
    mybtis批量insert传入参数为list
  • 原文地址:https://www.cnblogs.com/beautiful-code/p/6416711.html
Copyright © 2011-2022 走看看