zoukankan      html  css  js  c++  java
  • 03 . Go开发一个日志平台之Elasticsearch使用及kafka消费消息发送到Elasticsearch

    Elasticsearch使用

    详细使用请看我写的Go操作Elasticsearch专篇

    https://www.cnblogs.com/you-men/p/13391265.html

    example1

    package main
    
    import (
    	"context"
    	"fmt"
    	"github.com/olivere/elastic/v7"
    )
    
    
    var eshost = "http://192.168.43.176:9200"
    var client *elastic.Client
    
    type Tyweet struct {
    	User string
    	Message string
    }
    
    
    //创建
    func main() {
    	var err error
    	client, err = elastic.NewClient(elastic.SetSniff(false), elastic.SetURL(eshost))
    	if err !=nil{
    		fmt.Println("connect es error",err)
    	}
    
    	//使用结构体
    	tweet := Tyweet{User: "youmen",Message: "Take Five"}
    	_,err = client.Index().
    		Index("user").
    		Type("tweet").
    		Id("1").
    		BodyJson(tweet).
    		Do(context.Background())
    	if err != nil{
    		// Handle error
    		panic(err)
    		return
    	}
    	fmt.Println("Insert index success")
    }
    

    example2

    package main
    
    import (
    	"context"
    	"fmt"
    	"github.com/olivere/elastic/v7"
    )
    
    var client *elastic.Client
    
    var host = "http://192.168.43.176:9200"
    
    
    type Employee struct {
    	FirstName string   `json:"first_name"`
    	LastName  string   `json:"last_name"`
    	Age       int      `json:"age"`
    	About     string   `json:"about"`
    	Interests []string `json:"interests"`
    }
    
    //初始化
    func init() {
    	//errorlog := log.New(os.Stdout, "APP", log.LstdFlags)
    	var err error
    	//这个地方有个小坑 不加上elastic.SetSniff(false) 会连接不上
    	client, err = elastic.NewClient(elastic.SetSniff(false), elastic.SetURL(host))
    	if err != nil {
    		panic(err)
    	}
    	_,_,err = client.Ping(host).Do(context.Background())
    	if err != nil {
    		panic(err)
    	}
    	//fmt.Printf("Elasticsearch returned with code %d and version %s
    ", code, info.Version.Number)
    
    	_,err = client.ElasticsearchVersion(host)
    	if err != nil {
    		panic(err)
    	}
    	//fmt.Printf("Elasticsearch version %s
    ", esversion)
    }
    
    //创建
    func create() {
    
    	//使用结构体
    	e1 := Employee{"Jane", "Smith", 32, "I like to collect rock albums", []string{"music"}}
    	put1, err := client.Index().
    		Index("megacorp").
    		Type("employee").
    		Id("1").
    		BodyJson(e1).
    		Do(context.Background())
    	if err != nil {
    		panic(err)
    	}
    	fmt.Printf("Indexed tweet %s to index s%s, type %s
    ", put1.Id, put1.Index, put1.Type)
    
    	//使用字符串
    	e2 := `{"first_name":"John","last_name":"Smith","age":25,"about":"I love to go rock climbing","interests":["sports","music"]}`
    	put2, err := client.Index().
    		Index("megacorp").
    		Type("employee").
    		Id("2").
    		BodyJson(e2).
    		Do(context.Background())
    	if err != nil {
    		panic(err)
    	}
    	fmt.Printf("Indexed tweet %s to index s%s, type %s
    ", put2.Id, put2.Index, put2.Type)
    
    	e3 := `{"first_name":"Douglas","last_name":"Fir","age":35,"about":"I like to build cabinets","interests":["forestry"]}`
    	put3, err := client.Index().
    		Index("megacorp").
    		Type("employee").
    		Id("3").
    		BodyJson(e3).
    		Do(context.Background())
    	if err != nil {
    		panic(err)
    	}
    	fmt.Printf("Indexed tweet %s to index s%s, type %s
    ", put3.Id, put3.Index, put3.Type)
    }
    
    func main()  {
    	create()
    }
    

    kafka消费消息发送ES

    kafka消费消息
    package Initial
    
    import (
    	"github.com/Shopify/sarama"
    	"github.com/astaxie/beego/logs"
    	"time"
    )
    
    func Run() (err error) {
    	partitionList, err := kafkaClient.client.Partitions(kafkaClient.topic)
    	if err != nil {
    		logs.Error("Failed to get the list of partitions: ", err)
    		return
    	}
    	for partition := range partitionList {
    		pc, errRet := kafkaClient.client.ConsumePartition(kafkaClient.topic, int32(partition), sarama.OffsetNewest)
    		if errRet != nil {
    			err = errRet
    			logs.Error("Failed to start consumer for partition %d: %s
    ", partition, err)
    			return
    		}
    		defer pc.AsyncClose()
    		go func(pc sarama.PartitionConsumer) {
    			kafkaClient.wg.Add(1)
    			for msg := range pc.Messages() {
    				logs.Debug("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
    
    				// 发送日志到es
    				err = sendToES(kafkaClient.topic,msg.Value)
    				if err != nil{
    					logs.Warn("send to es failed, err:%v",err)
    				}
    			}
    			kafkaClient.wg.Done()
    		}(pc)
    	}
    	kafkaClient.wg.Wait()
    	time.Sleep(time.Hour)
    	return
    }
    
    发送到es
    package Initial
    
    import (
    	"context"
    	"fmt"
    	"github.com/olivere/elastic/v7"
    )
    
    var esclient *elastic.Client
    
    
    type LogMessage struct {
    	App     string
    	Topic   string
    	Message string
    }
    
    
    type Tyweet struct {
    	User    string
    	Message string
    }
    
    //创建
    func InitEs(addr string) (err error) {
    	esclient, err = elastic.NewClient(elastic.SetSniff(false), elastic.SetURL(addr))
    	if err != nil {
    		fmt.Println("connect es error", err)
    	}
    	return
    }
    
    func sendToES(topic string, data []byte) (err error) {
    	msg := &LogMessage{}
    	msg.Topic = topic
    	msg.Message = string(data)
    
    	_, err = esclient.Index().
    		Index(topic).
    		Type(topic).
    		//Id(fmt.Sprintf("%d", i)).
    		BodyJson(msg).
    		Do(context.Background())
    	if err != nil {
    		// Handle error
    		panic(err)
    		return
    	}
    	return
    }
    
    验证数据是否kafka消息被消费并发送到es

  • 相关阅读:
    css3阴影效果
    应该了解的9种CSS技巧
    position
    MyEclipse设置Java代码注释模板
    Struts2 常用的常量配置
    CSS 中文字体对应英文和Unicode编码
    MyEclipse使用前优化与配置
    MyEclipse 快捷键收集
    Ajax 调用WebServices之一 基本应用
    C#控制台显示进度条
  • 原文地址:https://www.cnblogs.com/you-men/p/13588182.html
Copyright © 2011-2022 走看看