Go工程师体系课 012【学习笔记】

Go 中集成 Elasticsearch

1. 客户端库选择

1.1 主流 Go ES 客户端

  • olivere/elastic:功能最全面,API 设计优雅,支持 ES 7.x/8.x
  • elastic/go-elasticsearch:官方客户端,轻量级,更接近原生 REST API
  • go-elasticsearch/elasticsearch:社区维护的官方客户端分支

1.2 推荐选择

olivere/elastic 是生产环境首选,原因:

  • 类型安全的查询构建器
  • 完善的错误处理
  • 支持所有 ES 功能(聚合、批量操作、索引管理等)
  • 活跃维护,版本更新及时

2. olivere/elastic 快速入门

2.1 安装依赖

go mod init your-project
go get github.com/olivere/elastic/v7

2.2 基础连接

package main

import (
    "context"
    "fmt"
    "log"

    "github.com/olivere/elastic/v7"
)

func main() {
    // 创建客户端
    client, err := elastic.NewClient(
        elastic.SetURL("http://localhost:9200"),
        elastic.SetSniff(false), // 单节点环境关闭嗅探
        elastic.SetHealthcheck(false), // 关闭健康检查
    )
    if err != nil {
        log.Fatal(err)
    }
    defer client.Stop()

    // 检查连接
    info, code, err := client.Ping("http://localhost:9200").Do(context.Background())
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("ES 版本: %s, 状态码: %d\n", info.Version.Number, code)
}

2.3 连接配置选项

client, err := elastic.NewClient(
    elastic.SetURL("http://localhost:9200", "http://localhost:9201"), // 多节点
    elastic.SetBasicAuth("username", "password"), // 认证
    elastic.SetSniff(true), // 自动发现节点
    elastic.SetHealthcheckInterval(10*time.Second), // 健康检查间隔
    elastic.SetMaxRetries(3), // 最大重试次数
    elastic.SetRetryStatusCodes(502, 503, 504), // 重试状态码
    elastic.SetGzip(true), // 启用压缩
    elastic.SetErrorLog(log.New(os.Stderr, "ES ", log.LstdFlags)), // 错误日志
    elastic.SetInfoLog(log.New(os.Stdout, "ES ", log.LstdFlags)), // 信息日志
)

3. 索引管理

3.1 创建索引

// 创建索引
createIndex, err := client.CreateIndex("products").
    BodyString(`{
        "settings": {
            "number_of_shards": 1,
            "number_of_replicas": 1
        },
        "mappings": {
            "properties": {
                "title": {
                    "type": "text",
                    "analyzer": "ik_smart",
                    "fields": {
                        "keyword": {
                            "type": "keyword"
                        }
                    }
                },
                "price": {"type": "double"},
                "status": {"type": "keyword"},
                "created_at": {"type": "date"}
            }
        }
    }`).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}
fmt.Printf("索引创建结果: %v\n", createIndex.Acknowledged)

3.2 检查索引是否存在

exists, err := client.IndexExists("products").Do(context.Background())
if err != nil {
    log.Fatal(err)
}
fmt.Printf("索引存在: %v\n", exists)

3.3 删除索引

deleteIndex, err := client.DeleteIndex("products").Do(context.Background())
if err != nil {
    log.Fatal(err)
}
fmt.Printf("索引删除结果: %v\n", deleteIndex.Acknowledged)

4. 文档操作

4.1 定义文档结构

type Product struct {
    ID        string    `json:"id"`
    Title     string    `json:"title"`
    Price     float64   `json:"price"`
    Status    string    `json:"status"`
    CreatedAt time.Time `json:"created_at"`
    Tags      []string  `json:"tags"`
}

4.2 添加文档

// 添加单个文档
product := Product{
    ID:        "1",
    Title:     "iPhone 15 Pro",
    Price:     7999.0,
    Status:    "active",
    CreatedAt: time.Now(),
    Tags:      []string{"phone", "apple", "premium"},
}

put1, err := client.Index().
    Index("products").
    Id("1").
    BodyJson(product).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}
fmt.Printf("文档索引结果: %s\n", put1.Result)

4.3 获取文档

// 根据 ID 获取文档
get1, err := client.Get().
    Index("products").
    Id("1").
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}

if get1.Found {
    var product Product
    err = json.Unmarshal(get1.Source, &product)
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("获取到文档: %+v\n", product)
}

4.4 更新文档

// 部分更新
update, err := client.Update().
    Index("products").
    Id("1").
    Doc(map[string]interface{}{
        "price": 7599.0,
    }).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}
fmt.Printf("更新结果: %s\n", update.Result)

4.5 删除文档

delete, err := client.Delete().
    Index("products").
    Id("1").
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}
fmt.Printf("删除结果: %s\n", delete.Result)

5. 批量操作

5.1 批量索引

bulkRequest := client.Bulk()

products := []Product{
    {ID: "2", Title: "Samsung Galaxy S24", Price: 5999.0, Status: "active", CreatedAt: time.Now()},
    {ID: "3", Title: "MacBook Pro M3", Price: 12999.0, Status: "active", CreatedAt: time.Now()},
    {ID: "4", Title: "iPad Air", Price: 3999.0, Status: "active", CreatedAt: time.Now()},
}

for _, product := range products {
    req := elastic.NewBulkIndexRequest().
        Index("products").
        Id(product.ID).
        Doc(product)
    bulkRequest = bulkRequest.Add(req)
}

bulkResponse, err := bulkRequest.Do(context.Background())
if err != nil {
    log.Fatal(err)
}

fmt.Printf("批量索引完成,处理了 %d 个请求\n", len(bulkResponse.Items))

5.2 批量更新

bulkRequest := client.Bulk()

// 批量更新价格
updates := map[string]float64{
    "2": 5799.0,
    "3": 11999.0,
    "4": 3799.0,
}

for id, price := range updates {
    req := elastic.NewBulkUpdateRequest().
        Index("products").
        Id(id).
        Doc(map[string]interface{}{"price": price})
    bulkRequest = bulkRequest.Add(req)
}

bulkResponse, err := bulkRequest.Do(context.Background())
if err != nil {
    log.Fatal(err)
}

6. 搜索查询

6.1 简单搜索

// 匹配所有文档
searchResult, err := client.Search().
    Index("products").
    Query(elastic.NewMatchAllQuery()).
    Size(10).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}

fmt.Printf("找到 %d 个文档\n", searchResult.TotalHits())

6.2 匹配查询

// 文本匹配查询
searchResult, err := client.Search().
    Index("products").
    Query(elastic.NewMatchQuery("title", "iPhone")).
    Size(10).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}

for _, hit := range searchResult.Hits.Hits {
    var product Product
    err := json.Unmarshal(hit.Source, &product)
    if err != nil {
        continue
    }
    fmt.Printf("文档 ID: %s, 标题: %s, 分数: %f\n",
        hit.Id, product.Title, *hit.Score)
}

6.3 复合查询

// 布尔查询
boolQuery := elastic.NewBoolQuery().
    Must(elastic.NewMatchQuery("title", "iPhone")).
    Filter(elastic.NewTermQuery("status", "active")).
    Filter(elastic.NewRangeQuery("price").Gte(1000).Lte(10000))

searchResult, err := client.Search().
    Index("products").
    Query(boolQuery).
    Sort("price", true). // 按价格升序
    Size(10).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}

6.4 多字段搜索

// 多字段匹配
multiMatchQuery := elastic.NewMultiMatchQuery("苹果手机", "title", "tags").
    Type("best_fields").
    FieldWithBoost("title", 3.0)

searchResult, err := client.Search().
    Index("products").
    Query(multiMatchQuery).
    Highlight(elastic.NewHighlight().Field("title")).
    Size(10).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}

7. 聚合分析

7.1 基础聚合

// 按状态分组统计
searchResult, err := client.Search().
    Index("products").
    Query(elastic.NewMatchAllQuery()).
    Aggregation("status_count", elastic.NewTermsAggregation().Field("status")).
    Size(0). // 不返回文档,只要聚合结果
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}

statusAgg, found := searchResult.Aggregations.Terms("status_count")
if found {
    for _, bucket := range statusAgg.Buckets {
        fmt.Printf("状态: %s, 数量: %d\n", bucket.Key, bucket.DocCount)
    }
}

7.2 数值统计聚合

// 价格统计
searchResult, err := client.Search().
    Index("products").
    Query(elastic.NewMatchAllQuery()).
    Aggregation("price_stats", elastic.NewStatsAggregation().Field("price")).
    Size(0).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}

statsAgg, found := searchResult.Aggregations.Stats("price_stats")
if found {
    fmt.Printf("价格统计 - 最小值: %.2f, 最大值: %.2f, 平均值: %.2f, 总数: %.2f\n",
        statsAgg.Min, statsAgg.Max, statsAgg.Avg, statsAgg.Sum)
}

7.3 复合聚合

// 按状态分组,每组内统计价格
searchResult, err := client.Search().
    Index("products").
    Query(elastic.NewMatchAllQuery()).
    Aggregation("status_groups",
        elastic.NewTermsAggregation().Field("status").
            SubAggregation("price_stats", elastic.NewStatsAggregation().Field("price"))).
    Size(0).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}

statusAgg, found := searchResult.Aggregations.Terms("status_groups")
if found {
    for _, bucket := range statusAgg.Buckets {
        priceStats, found := bucket.Stats("price_stats")
        if found {
            fmt.Printf("状态: %s, 平均价格: %.2f, 数量: %d\n",
                bucket.Key, priceStats.Avg, bucket.DocCount)
        }
    }
}

8. 分页与滚动

8.1 基础分页

// 分页查询
page := 1
size := 10
from := (page - 1) * size

searchResult, err := client.Search().
    Index("products").
    Query(elastic.NewMatchAllQuery()).
    From(from).
    Size(size).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}

fmt.Printf("第 %d 页,共 %d 条记录\n", page, searchResult.TotalHits())

8.2 滚动查询(大数据量)

// 滚动查询,适合大数据量导出
scroll := client.Scroll("products").Size(100).KeepAlive("1m")
for {
    searchResult, err := scroll.Do(context.Background())
    if err == io.EOF {
        break // 数据读取完毕
    }
    if err != nil {
        log.Fatal(err)
    }

    for _, hit := range searchResult.Hits.Hits {
        var product Product
        err := json.Unmarshal(hit.Source, &product)
        if err != nil {
            continue
        }
        fmt.Printf("处理文档: %s\n", product.Title)
    }
}

9. 错误处理

9.1 常见错误处理

func handleESError(err error) {
    if err == nil {
        return
    }

    // 检查是否是 ES 错误
    if esErr, ok := err.(*elastic.Error); ok {
        switch esErr.Status {
        case 404:
            fmt.Println("文档或索引不存在")
        case 409:
            fmt.Println("版本冲突")
        case 400:
            fmt.Printf("请求错误: %s\n", esErr.Details)
        default:
            fmt.Printf("ES 错误: %d - %s\n", esErr.Status, esErr.Details)
        }
        return
    }

    // 网络或其他错误
    fmt.Printf("其他错误: %v\n", err)
}

10. 最佳实践

10.1 连接池配置

client, err := elastic.NewClient(
    elastic.SetURL("http://localhost:9200"),
    elastic.SetMaxRetries(3),
    elastic.SetRetryBackoff(func(i int) time.Duration {
        return time.Duration(i) * 100 * time.Millisecond
    }),
    elastic.SetHealthcheckInterval(30*time.Second),
    elastic.SetSniff(false), // 生产环境建议关闭
)

10.2 上下文管理

// 使用带超时的上下文
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

searchResult, err := client.Search().
    Index("products").
    Query(elastic.NewMatchAllQuery()).
    Do(ctx)

10.3 批量操作优化

// 批量大小控制
const batchSize = 1000

func bulkIndexProducts(products []Product) error {
    for i := 0; i < len(products); i += batchSize {
        end := i + batchSize
        if end > len(products) {
            end = len(products)
        }

        bulkRequest := client.Bulk()
        for j := i; j < end; j++ {
            req := elastic.NewBulkIndexRequest().
                Index("products").
                Id(products[j].ID).
                Doc(products[j])
            bulkRequest = bulkRequest.Add(req)
        }

        _, err := bulkRequest.Do(context.Background())
        if err != nil {
            return err
        }
    }
    return nil
}

11. 完整示例项目结构

project/
├── go.mod
├── go.sum
├── main.go
├── config/
│   └── es.go          # ES 配置
├── models/
│   └── product.go     # 数据模型
├── services/
│   └── es_service.go  # ES 服务层
└── handlers/
    └── product.go     # 业务处理

11.1 配置管理

// config/es.go
package config

import (
    "github.com/olivere/elastic/v7"
)

type ESConfig struct {
    URLs     []string
    Username string
    Password string
    Sniff    bool
}

func NewESClient(config ESConfig) (*elastic.Client, error) {
    options := []elastic.ClientOptionFunc{
        elastic.SetURL(config.URLs...),
        elastic.SetSniff(config.Sniff),
    }

    if config.Username != "" && config.Password != "" {
        options = append(options, elastic.SetBasicAuth(config.Username, config.Password))
    }

    return elastic.NewClient(options...)
}

11.2 服务层封装

// services/es_service.go
package services

import (
    "context"
    "encoding/json"

    "github.com/olivere/elastic/v7"
)

type ProductService struct {
    client *elastic.Client
    index  string
}

func NewProductService(client *elastic.Client) *ProductService {
    return &ProductService{
        client: client,
        index:  "products",
    }
}

func (s *ProductService) SearchProducts(query string, from, size int) ([]Product, int64, error) {
    searchResult, err := s.client.Search().
        Index(s.index).
        Query(elastic.NewMultiMatchQuery(query, "title", "tags")).
        From(from).
        Size(size).
        Do(context.Background())
    if err != nil {
        return nil, 0, err
    }

    var products []Product
    for _, hit := range searchResult.Hits.Hits {
        var product Product
        err := json.Unmarshal(hit.Source, &product)
        if err != nil {
            continue
        }
        products = append(products, product)
    }

    return products, searchResult.TotalHits(), nil
}

12. 总结

olivere/elastic 是 Go 语言中最成熟的 ES 客户端,提供了:

  • 类型安全:编译时检查查询语法
  • 功能完整:支持所有 ES 功能
  • 性能优化:连接池、批量操作、重试机制
  • 易于使用:链式 API 设计,代码可读性强

通过本文的示例,您可以快速在 Go 项目中集成 Elasticsearch,实现高效的搜索和分析功能。

主题测试文章,只做测试使用。发布者:Walker,转转请注明出处:https://joyjs.cn/archives/4785

(0)
Walker的头像Walker
上一篇 2025年11月25日 12:00
下一篇 2025年11月25日 10:00

相关推荐

  • Go工程师体系课 003【学习笔记】

    grpc grpc grpc-go grpc 无缝集成了 protobuf protobuf 习惯用 Json、XML 数据存储格式的你们,相信大多都没听过 Protocol Buffer。 Protocol Buffer 其实是 Google 出品的一种轻量 & 高效的结构化数据存储格式,性能比 Json、XML 真的强!太!多! protobuf…

    个人 2025年11月25日
    15600
  • Go工程师体系课 001【学习笔记】

    转型 想在短时间系统转到Go工程理由 提高CRUD,无自研框架经验 拔高技术深度,做专、做精需求的同学 进阶工程化,拥有良好开发规范和管理能力的 工程化的重要性 高级开的期望 良好的代码规范 深入底层原理 熟悉架构 熟悉k8s的基础架构 扩展知识广度,知识的深度,规范的开发体系 四个大的阶段 go语言基础 微服务开发的(电商项目实战) 自研微服务 自研然后重…

    个人 2025年11月25日
    24500
  • 【开篇】

    我是Walker,生于八十年代初,代码与生活的旅者。全栈开发工程师,游走于前端与后端的边界,执着于技术与艺术的交汇点。 代码,是我编织梦想的语言;项目,是我刻画未来的画布。在键盘的敲击声中,我探索技术的无尽可能,让灵感在代码里永恒绽放。 深度咖啡爱好者,迷恋每一杯手冲的诗意与仪式感。在咖啡的醇香与苦涩中,寻找专注与灵感,亦如在开发的世界中追求极致与平衡。 骑…

    2025年2月6日 个人
    2.1K00
  • 深入理解ES6 003【学习笔记】

    函数 参数默认值,以及一些关于arguments对象,如何使用表达式作为参数、参数的临时死区。 以前设置默认值总是利用在含有逻辑或操作符的表达式中,前一个值是false时,总是返回后面一个的值,但如果我们给参数传0时,就有些麻烦。需要去验证一下类型 function makeRequest(url,timeout,callback){ timeout = t…

    个人 2025年3月8日
    1.2K00
  • 深入理解ES6 004【学习笔记】

    扩展对象的功能 普通对象 具有js对象所有默认内部行为的 特异对象 具有某些与默认行为不符的内部行为 标准对象 es6规范中定义的对象,Array/Date等 内建对象 脚本开始执行时存在于javascript执行环境的对象,所有标准对象都是内建对象 对象字面量语法扩展 属性初始值的简写,当一个对象的属性与本地变量同名时,不必再写冒号和值 对象方法的简写语法…

    个人 2025年3月8日
    1.2K00

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信
欢迎🌹 Coding never stops, keep learning! 💡💻 光临🌹