zoukankan      html  css  js  c++  java
  • OSSIM传感器Agent传送机制初探

      OSSIM Agent的主要职责是收集网络上存在的各种设备发送的所有数据,然后按照一种标准方式有序发给OSSIM Server,Agent收集到数据后在发送给Server之前要对这些数据进行归一化处理,本文主要就如何有序发送数据与如何完成归一化进行讨论。

      OSSIM传感器在通过GET框架实现OSSIM代理和OSSIM服务器之间通信协议和数据格式的之间转换。下面我们先简要看一下ossim-agent脚本:

      #!/usr/bin/python -OOt

      import sys

      sys.path.append('/usr/share/ossim-agent/')

      sys.path.append('/usr/local/share/ossim-agent/')

      from ossim_agent.Agent import Agent

      agent = Agent()

      agent.main()

      这里需要GET作为OSSIM代理向OSSIM服务器输送数据。实现紧密整合所需的两个主要操作是“生成”(或)OSSIM兼容事件的“映射Mapping”)和此类数据向OSSIM的“传输”服务器。它负责此类操作的GET框架的两个组件是EventHandler和Sender Agent,如图1所示。

      图1 将Get框架内容集成到OSSIM

      Event Handler的主要任务是映射数据源插件采集的事件到SIEM实例警报的OSSIM标准化事件格式。为了执行这样的过程,原始消息经历由RAW LOG转换为现有归一化数据字段格式的一个转变;在上图中我们将这些机制表示为“归一化Normalization”和“OSSIM消息”。部分日志归一化代码:

      from Logger import Logger

      from time import mktime, strptime

      logger = Logger.logger

      class Event:

      EVENT_TYPE = 'event'

      EVENT_ATTRS = [

      "type",

      "date",

      "sensor",

      "interface",

      "plugin_id",

      "plugin_sid",

      "priority",

      "protocol",

      "src_ip",

      "src_port",

      "dst_ip",

      "dst_port",

      "username",

      "password",

      "filename",

      "userdata1",

      "userdata2",

      "userdata3",

      "userdata4",

      "userdata5",

      "userdata6",

      "userdata7",

      "userdata8",

      "userdata9",

      "occurrences",

      "log",

      "data",

      "snort_sid", # snort specific

      "snort_cid", # snort specific

      "fdate",

      "tzone"

      ]

      def __init__(self):

      self.event = {}

      self.event["event_type"] = self.EVENT_TYPE

      def __setitem__(self, key, value):

      if key in self.EVENT_ATTRS:

      self.event[key] = self.sanitize_value(value)

      if key == "date":

      # 以秒为单位

      self.event["fdate"]=self.event[key]

      try:

      self.event["date"]=int(mktime(strptime(self.event[key],"%Y-%m-%d %H:%M:%S")))

      except:

      logger.warning("There was an error parsing date (%s)" %

      (self.event[key]))

      elif key != 'event_type':

      logger.warning("Bad event attribute: %s" % (key))

      def __getitem__(self, key):

      return self.event.get(key, None)

      # 事件表示

      def __repr__(self):

      event = self.EVENT_TYPE

      for attr in self.EVENT_ATTRS:

      if self[attr]:

      event += ' %s="%s"' % (attr, self[attr])

      return event + " "

      # 返回内部哈希值

      def dict(self):

      return self.event

      def sanitize_value(self, string):

      return str(string).strip().replace(""", "\"").replace("'", "")

      class EventOS(Event):

      EVENT_TYPE = 'host-os-event'

      EVENT_ATTRS = [

      "host",

      "os",

      "sensor",

      "interface",

      "date",

      "plugin_id",

      "plugin_sid",

      "occurrences",

      "log",

      "fdate",

      ]

      class EventMac(Event):

      EVENT_TYPE = 'host-mac-event'

      EVENT_ATTRS = [

      "host",

      "mac",

      "vendor",

      "sensor",

      "interface",

      "date",

      "plugin_id",

      "plugin_sid",

      "occurrences",

      "log",

      "fdate",

      ]

      class EventService(Event):

      EVENT_TYPE = 'host-service-event'

      EVENT_ATTRS = [

      "host",

      "sensor",

      "interface",

      "port",

      "protocol",

      "service",

      "application",

      "date",

      "plugin_id",

      "plugin_sid",

      "occurrences",

      "log",

      "fdate",

      ]

      class EventHids(Event):

      EVENT_TYPE = 'host-ids-event'

      EVENT_ATTRS = [

      "host",

      "hostname",

      "hids_event_type",

      "target",

      "what",

      "extra_data",

      "sensor",

      "date",

      "plugin_id",

      "plugin_sid",

      "username",

      "password",

      "filename",

      "userdata1",

      "userdata2",

      "userdata3",

      "userdata4",

      "userdata5",

      "userdata6",

      "userdata7",

      "userdata8",

      "userdata9",

      "occurrences",

      "log",

      "fdate",

      ]

      class WatchRule(Event):

      EVENT_TYPE = 'event'

      EVENT_ATTRS = [

      "type",

      "date",

      "fdate",

      "sensor",

      "interface",

      "src_ip",

      "dst_ip",

      "protocol",

      "plugin_id",

      "plugin_sid",

      "condition",

      "value",

      "port_from",

      "src_port",

      "port_to",

      "dst_port",

      "interval",

      "from",

      "to",

      "absolute",

      "log",

      "userdata1",

      "userdata2",

      "userdata3",

      "userdata4",

      "userdata5",

      "userdata6",

      "userdata7",

      "userdata8",

      "userdata9",

      "filename",

      "username",

      ]

      class Snort(Event):

      EVENT_TYPE = 'snort-event'

      EVENT_ATTRS = [

      "sensor",

      "interface",

      "gzipdata",

      "unziplen",

      "event_type",

      "plugin_id",

      "type",

      "occurrences"

      ]

      日志编码代码:

      import threading, time

      from Logger import Logger

      logger = Logger.logger

      from Output import Output

      import Config

      import Event

      from Threshold import EventConsolidation

      from Stats import Stats

      from ConnPro import ServerConnPro

      class Detector(threading.Thread):

      def __init__(self, conf, plugin, conn):

      self._conf = conf

      self._plugin = plugin

      self.os_hash = {}

      self.conn = conn

      self.consolidation = EventConsolidation(self._conf)

      logger.info("Starting detector %s (%s).." %

      (self._plugin.get("config", "name"),

      self._plugin.get("config", "plugin_id")))

      threading.Thread.__init__(self)

      def _event_os_cached(self, event):

      if isinstance(event, Event.EventOS):

      import string

      current_os = string.join(string.split(event["os"]), ' ')

      previous_os = self.os_hash.get(event["host"], '')

      if current_os == previous_os:

      return True

      else:

      # 失败并添加到缓存

      self.os_hash[event["host"]] =

      string.join(string.split(event["os"]), ' ')

      return False

      def _exclude_event(self, event):

      if self._plugin.has_option("config", "exclude_sids"):

      exclude_sids = self._plugin.get("config", "exclude_sids")

      if event["plugin_sid"] in Config.split_sids(exclude_sids):

      logger.debug("Excluding event with " +

      "plugin_id=%s and plugin_sid=%s" %

      (event["plugin_id"], event["plugin_sid"]))

      return True

      return False

      def _thresholding(self):

      self.consolidation.process()

      def _plugin_defaults(self, event):

      # 从配置文件中获取默认参数

      if self._conf.has_section("plugin-defaults"):

      # 1) 日期

      default_date_format = self._conf.get("plugin-defaults",

      "date_format")

      if event["date"] is None and default_date_format and

      'date' in event.EVENT_ATTRS:

      event["date"] = time.strftime(default_date_format,

      time.localtime(time.time()))

      # 2) 传感器

      default_sensor = self._conf.get("plugin-defaults", "sensor")

      if event["sensor"] is None and default_sensor and

      'sensor' in event.EVENT_ATTRS:

      event["sensor"] = default_sensor

      # 3) 网络接口

      default_iface = self._conf.get("plugin-defaults", "interface")

      if event["interface"] is None and default_iface and

      'interface' in event.EVENT_ATTRS:

      event["interface"] = default_iface

      # 4) 源IP

      if event["src_ip"] is None and 'src_ip' in event.EVENT_ATTRS:

      event["src_ip"] = event["sensor"]

      # 5) 时区

      default_tzone = self._conf.get("plugin-defaults", "tzone")

      if event["tzone"] is None and 'tzone' in event.EVENT_ATTRS:

      event["tzone"] = default_tzone

      # 6) sensor,source ip and dest != localhost

      if event["sensor"] in ('127.0.0.1', '127.0.1.1'):

      event["sensor"] = default_sensor

      if event["dst_ip"] in ('127.0.0.1', '127.0.1.1'):

      event["dst_ip"] = default_sensor

      if event["src_ip"] in ('127.0.0.1', '127.0.1.1'):

      event["src_ip"] = default_sensor

      # 检测日志的类型

      if event["type"] is None and 'type' in event.EVENT_ATTRS:

      event["type"] = 'detector'

      return event

      def send_message(self, event):

      if self._event_os_cached(event):

      return

      if self._exclude_event(event):

      return

      #对于一些空属性使用默认值。

      event = self._plugin_defaults(event)

      # 合并之前检查

      if self.conn is not None:

      try:

      self.conn.send(str(event))

      except:

      id = self._plugin.get("config", "plugin_id")

      c = ServerConnPro(self._conf, id)

      self.conn = c.connect(0, 10)

      try:

      self.conn.send(str(event))

      except:

      return

      logger.info(str(event).rstrip())

      elif not self.consolidation.insert(event):

      Output.event(event)

      Stats.new_event(event)

      def stop(self):

      #self.consolidation.clear()

      pass

      #在子类中重写

      def process(self):

      pass

      def run(self):

      self.process()

      class ParserSocket(Detector):

      def process(self):

      self.process()

      class ParserDatabase(Detector):

      def process(self):

      self.process()

      … …郑州男科医院:http://mobile.zztongjiyiyuan.com/郑州男科医院哪家好:http://mobile.zztongjiyiyuan.com/郑州做包皮手术多少钱:http://mobile.zztongjiyiyuan.com/

      从上可以看出,传感器的归一化主要负责对每个LOG内数据字段进行重新编码,使其生成一个全新的可能用于发送到OSSIM服务器的完整事件。为达成这种目的GET框架中包含了一些特定的功能,以便将所有的功能转换需要BASE64转换的字段。“OSSIM消息”负责填充GET生成的原始事件中不存在的字段。所以上面讲的plugin_id、plugin_sid是用来表示日志消息来源类型和子类型,这也是生成SIEM事件的必填字段。为事件格式完整性考虑,有些时候在无法确认源或目标IP时,系统默认会采用0.0.0.0来填充该字段。

      注意:这种必填字段我们可利用phpmyadmin工具查看OSSIM的MySQL数据库。

      Sender Agent负责完成下面两个任务:

      发送由GET收集并由事件格式化的事件发送到OSSIM服务器,这项任务由Event Hander创建的消息组成消息队列发送到消息中间件实现,时序图如图2所示。

      图2 序列图: 从安全探测器的日志转换到OSSIM服务器事件

      2)管理GET框架和OSSIM服务器之间的通信,通信端口为TCP 40001通过双向握手实现。归一化原始日志是规范化过程的一个重要环节,OSSIM在归一化处理日志的同时保留了原始日志,可用于日志归档,提供了一种从规范化事件中提取原始日志的手段。

      经过归一化处理的EVENTS,存储到MySQL数据库中,如图3所示。接着就由关联引擎根据规则、优先级、可靠性等参数进行交叉关联分析,得出风险值并发出各种报警提示信息。

      图3 OSSIM平台日志存储机制

      接下来我们再看个实例,下面是一段Apache、CiscoASA以及SSH的原始日志,如图4、图5、图6所示。

      Apache插件中的正则表达式:

      [0001 - apache-access] 访问日志

      event_type=event

      regexp=((?Pd{1,3}.d{1,3}.d{1,3}.d{1,3})(:(?Pd{1,5}))? )?(?PS+) (?PS+) (?PS+) [(?Pd{2}/w{3}/d{4}:d{2}:d{2}:d{2})s+[+-]d{4}] "(?P[^"]*)" (?Pd{3}) ((?Pd+)|-)( "(?P[^"]*)" "(?P[^"]*)")?$

      src_ip={resolv($src)}

      dst_ip={resolv($dst)}

      dst_port={$port}

      date={normalize_date($date)}

      plugin_sid={$code}

      username={$user}

      userdata1={$request}

      userdata2={$size}

      userdata3={$referer_uri}

      userdata4={$useragent}

      filename={$id}

      [0002 - apache-error] 错误日志

      event_type=event

      regexp=[(?Pw{3} w{3} d{2} d{2}:d{2}:d{2} d{4})] [(?P(emerg|alert|crit|error|warn|notice|info|debug))] ([client (?PS+)] )?(?P.*)

      date={normalize_date($date)}

      plugin_sid={translate($type)}

      src_ip={resolv($src)}

      userdata1={$data}

      图4 Apache原始日志

  • 相关阅读:
    mysql实现主从复制
    go get时候 timeout
    linux 修改/etc/profile文件之后 没有效果
    初试 laravel
    php 实现单个大文件(视频)的 断点上传
    UEditor图片左对齐右对齐 要的作用显示之后 保存之后没有效果
    docker 实现 mysql+nginx+php
    redis
    easyPoi框架的excel导入导出
    从生产计划的角度认识精益生产
  • 原文地址:https://www.cnblogs.com/sushine1/p/11578928.html
Copyright © 2011-2022 走看看