Go 中集成 Elasticsearch
1. 客户端库选择
1.1 主流 Go ES 客户端
- olivere/elastic:功能最全面,API 设计优雅,支持 ES 7.x/8.x
- elastic/go-elasticsearch:官方客户端,轻量级,更接近原生 REST API
- go-elasticsearch/elasticsearch:社区维护的官方客户端分支
1.2 推荐选择
olivere/elastic 是生产环境首选,原因:
- 类型安全的查询构建器
- 完善的错误处理
- 支持所有 ES 功能(聚合、批量操作、索引管理等)
- 活跃维护,版本更新及时
2. olivere/elastic 快速入门
2.1 安装依赖
go mod init your-project
go get github.com/olivere/elastic/v7
2.2 基础连接
package main
import (
"context"
"fmt"
"log"
"github.com/olivere/elastic/v7"
)
func main() {
// 创建客户端
client, err := elastic.NewClient(
elastic.SetURL("http://localhost:9200"),
elastic.SetSniff(false), // 单节点环境关闭嗅探
elastic.SetHealthcheck(false), // 关闭健康检查
)
if err != nil {
log.Fatal(err)
}
defer client.Stop()
// 检查连接
info, code, err := client.Ping("http://localhost:9200").Do(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("ES 版本: %s, 状态码: %d\n", info.Version.Number, code)
}
2.3 连接配置选项
client, err := elastic.NewClient(
elastic.SetURL("http://localhost:9200", "http://localhost:9201"), // 多节点
elastic.SetBasicAuth("username", "password"), // 认证
elastic.SetSniff(true), // 自动发现节点
elastic.SetHealthcheckInterval(10*time.Second), // 健康检查间隔
elastic.SetMaxRetries(3), // 最大重试次数
elastic.SetRetryStatusCodes(502, 503, 504), // 重试状态码
elastic.SetGzip(true), // 启用压缩
elastic.SetErrorLog(log.New(os.Stderr, "ES ", log.LstdFlags)), // 错误日志
elastic.SetInfoLog(log.New(os.Stdout, "ES ", log.LstdFlags)), // 信息日志
)
3. 索引管理
3.1 创建索引
// 创建索引
createIndex, err := client.CreateIndex("products").
BodyString(`{
"settings": {
"number_of_shards": 1,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"title": {
"type": "text",
"analyzer": "ik_smart",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"price": {"type": "double"},
"status": {"type": "keyword"},
"created_at": {"type": "date"}
}
}
}`).
Do(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("索引创建结果: %v\n", createIndex.Acknowledged)
3.2 检查索引是否存在
exists, err := client.IndexExists("products").Do(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("索引存在: %v\n", exists)
3.3 删除索引
deleteIndex, err := client.DeleteIndex("products").Do(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("索引删除结果: %v\n", deleteIndex.Acknowledged)
4. 文档操作
4.1 定义文档结构
type Product struct {
ID string `json:"id"`
Title string `json:"title"`
Price float64 `json:"price"`
Status string `json:"status"`
CreatedAt time.Time `json:"created_at"`
Tags []string `json:"tags"`
}
4.2 添加文档
// 添加单个文档
product := Product{
ID: "1",
Title: "iPhone 15 Pro",
Price: 7999.0,
Status: "active",
CreatedAt: time.Now(),
Tags: []string{"phone", "apple", "premium"},
}
put1, err := client.Index().
Index("products").
Id("1").
BodyJson(product).
Do(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("文档索引结果: %s\n", put1.Result)
4.3 获取文档
// 根据 ID 获取文档
get1, err := client.Get().
Index("products").
Id("1").
Do(context.Background())
if err != nil {
log.Fatal(err)
}
if get1.Found {
var product Product
err = json.Unmarshal(get1.Source, &product)
if err != nil {
log.Fatal(err)
}
fmt.Printf("获取到文档: %+v\n", product)
}
4.4 更新文档
// 部分更新
update, err := client.Update().
Index("products").
Id("1").
Doc(map[string]interface{}{
"price": 7599.0,
}).
Do(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("更新结果: %s\n", update.Result)
4.5 删除文档
delete, err := client.Delete().
Index("products").
Id("1").
Do(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("删除结果: %s\n", delete.Result)
5. 批量操作
5.1 批量索引
bulkRequest := client.Bulk()
products := []Product{
{ID: "2", Title: "Samsung Galaxy S24", Price: 5999.0, Status: "active", CreatedAt: time.Now()},
{ID: "3", Title: "MacBook Pro M3", Price: 12999.0, Status: "active", CreatedAt: time.Now()},
{ID: "4", Title: "iPad Air", Price: 3999.0, Status: "active", CreatedAt: time.Now()},
}
for _, product := range products {
req := elastic.NewBulkIndexRequest().
Index("products").
Id(product.ID).
Doc(product)
bulkRequest = bulkRequest.Add(req)
}
bulkResponse, err := bulkRequest.Do(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("批量索引完成,处理了 %d 个请求\n", len(bulkResponse.Items))
5.2 批量更新
bulkRequest := client.Bulk()
// 批量更新价格
updates := map[string]float64{
"2": 5799.0,
"3": 11999.0,
"4": 3799.0,
}
for id, price := range updates {
req := elastic.NewBulkUpdateRequest().
Index("products").
Id(id).
Doc(map[string]interface{}{"price": price})
bulkRequest = bulkRequest.Add(req)
}
bulkResponse, err := bulkRequest.Do(context.Background())
if err != nil {
log.Fatal(err)
}
6. 搜索查询
6.1 简单搜索
// 匹配所有文档
searchResult, err := client.Search().
Index("products").
Query(elastic.NewMatchAllQuery()).
Size(10).
Do(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("找到 %d 个文档\n", searchResult.TotalHits())
6.2 匹配查询
// 文本匹配查询
searchResult, err := client.Search().
Index("products").
Query(elastic.NewMatchQuery("title", "iPhone")).
Size(10).
Do(context.Background())
if err != nil {
log.Fatal(err)
}
for _, hit := range searchResult.Hits.Hits {
var product Product
err := json.Unmarshal(hit.Source, &product)
if err != nil {
continue
}
fmt.Printf("文档 ID: %s, 标题: %s, 分数: %f\n",
hit.Id, product.Title, *hit.Score)
}
6.3 复合查询
// 布尔查询
boolQuery := elastic.NewBoolQuery().
Must(elastic.NewMatchQuery("title", "iPhone")).
Filter(elastic.NewTermQuery("status", "active")).
Filter(elastic.NewRangeQuery("price").Gte(1000).Lte(10000))
searchResult, err := client.Search().
Index("products").
Query(boolQuery).
Sort("price", true). // 按价格升序
Size(10).
Do(context.Background())
if err != nil {
log.Fatal(err)
}
6.4 多字段搜索
// 多字段匹配
multiMatchQuery := elastic.NewMultiMatchQuery("苹果手机", "title", "tags").
Type("best_fields").
FieldWithBoost("title", 3.0)
searchResult, err := client.Search().
Index("products").
Query(multiMatchQuery).
Highlight(elastic.NewHighlight().Field("title")).
Size(10).
Do(context.Background())
if err != nil {
log.Fatal(err)
}
7. 聚合分析
7.1 基础聚合
// 按状态分组统计
searchResult, err := client.Search().
Index("products").
Query(elastic.NewMatchAllQuery()).
Aggregation("status_count", elastic.NewTermsAggregation().Field("status")).
Size(0). // 不返回文档,只要聚合结果
Do(context.Background())
if err != nil {
log.Fatal(err)
}
statusAgg, found := searchResult.Aggregations.Terms("status_count")
if found {
for _, bucket := range statusAgg.Buckets {
fmt.Printf("状态: %s, 数量: %d\n", bucket.Key, bucket.DocCount)
}
}
7.2 数值统计聚合
// 价格统计
searchResult, err := client.Search().
Index("products").
Query(elastic.NewMatchAllQuery()).
Aggregation("price_stats", elastic.NewStatsAggregation().Field("price")).
Size(0).
Do(context.Background())
if err != nil {
log.Fatal(err)
}
statsAgg, found := searchResult.Aggregations.Stats("price_stats")
if found {
fmt.Printf("价格统计 - 最小值: %.2f, 最大值: %.2f, 平均值: %.2f, 总数: %.2f\n",
statsAgg.Min, statsAgg.Max, statsAgg.Avg, statsAgg.Sum)
}
7.3 复合聚合
// 按状态分组,每组内统计价格
searchResult, err := client.Search().
Index("products").
Query(elastic.NewMatchAllQuery()).
Aggregation("status_groups",
elastic.NewTermsAggregation().Field("status").
SubAggregation("price_stats", elastic.NewStatsAggregation().Field("price"))).
Size(0).
Do(context.Background())
if err != nil {
log.Fatal(err)
}
statusAgg, found := searchResult.Aggregations.Terms("status_groups")
if found {
for _, bucket := range statusAgg.Buckets {
priceStats, found := bucket.Stats("price_stats")
if found {
fmt.Printf("状态: %s, 平均价格: %.2f, 数量: %d\n",
bucket.Key, priceStats.Avg, bucket.DocCount)
}
}
}
8. 分页与滚动
8.1 基础分页
// 分页查询
page := 1
size := 10
from := (page - 1) * size
searchResult, err := client.Search().
Index("products").
Query(elastic.NewMatchAllQuery()).
From(from).
Size(size).
Do(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("第 %d 页,共 %d 条记录\n", page, searchResult.TotalHits())
8.2 滚动查询(大数据量)
// 滚动查询,适合大数据量导出
scroll := client.Scroll("products").Size(100).KeepAlive("1m")
for {
searchResult, err := scroll.Do(context.Background())
if err == io.EOF {
break // 数据读取完毕
}
if err != nil {
log.Fatal(err)
}
for _, hit := range searchResult.Hits.Hits {
var product Product
err := json.Unmarshal(hit.Source, &product)
if err != nil {
continue
}
fmt.Printf("处理文档: %s\n", product.Title)
}
}
9. 错误处理
9.1 常见错误处理
func handleESError(err error) {
if err == nil {
return
}
// 检查是否是 ES 错误
if esErr, ok := err.(*elastic.Error); ok {
switch esErr.Status {
case 404:
fmt.Println("文档或索引不存在")
case 409:
fmt.Println("版本冲突")
case 400:
fmt.Printf("请求错误: %s\n", esErr.Details)
default:
fmt.Printf("ES 错误: %d - %s\n", esErr.Status, esErr.Details)
}
return
}
// 网络或其他错误
fmt.Printf("其他错误: %v\n", err)
}
10. 最佳实践
10.1 连接池配置
client, err := elastic.NewClient(
elastic.SetURL("http://localhost:9200"),
elastic.SetMaxRetries(3),
elastic.SetRetryBackoff(func(i int) time.Duration {
return time.Duration(i) * 100 * time.Millisecond
}),
elastic.SetHealthcheckInterval(30*time.Second),
elastic.SetSniff(false), // 生产环境建议关闭
)
10.2 上下文管理
// 使用带超时的上下文
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
searchResult, err := client.Search().
Index("products").
Query(elastic.NewMatchAllQuery()).
Do(ctx)
10.3 批量操作优化
// 批量大小控制
const batchSize = 1000
func bulkIndexProducts(products []Product) error {
for i := 0; i < len(products); i += batchSize {
end := i + batchSize
if end > len(products) {
end = len(products)
}
bulkRequest := client.Bulk()
for j := i; j < end; j++ {
req := elastic.NewBulkIndexRequest().
Index("products").
Id(products[j].ID).
Doc(products[j])
bulkRequest = bulkRequest.Add(req)
}
_, err := bulkRequest.Do(context.Background())
if err != nil {
return err
}
}
return nil
}
11. 完整示例项目结构
project/
├── go.mod
├── go.sum
├── main.go
├── config/
│ └── es.go # ES 配置
├── models/
│ └── product.go # 数据模型
├── services/
│ └── es_service.go # ES 服务层
└── handlers/
└── product.go # 业务处理
11.1 配置管理
// config/es.go
package config
import (
"github.com/olivere/elastic/v7"
)
type ESConfig struct {
URLs []string
Username string
Password string
Sniff bool
}
func NewESClient(config ESConfig) (*elastic.Client, error) {
options := []elastic.ClientOptionFunc{
elastic.SetURL(config.URLs...),
elastic.SetSniff(config.Sniff),
}
if config.Username != "" && config.Password != "" {
options = append(options, elastic.SetBasicAuth(config.Username, config.Password))
}
return elastic.NewClient(options...)
}
11.2 服务层封装
// services/es_service.go
package services
import (
"context"
"encoding/json"
"github.com/olivere/elastic/v7"
)
type ProductService struct {
client *elastic.Client
index string
}
func NewProductService(client *elastic.Client) *ProductService {
return &ProductService{
client: client,
index: "products",
}
}
func (s *ProductService) SearchProducts(query string, from, size int) ([]Product, int64, error) {
searchResult, err := s.client.Search().
Index(s.index).
Query(elastic.NewMultiMatchQuery(query, "title", "tags")).
From(from).
Size(size).
Do(context.Background())
if err != nil {
return nil, 0, err
}
var products []Product
for _, hit := range searchResult.Hits.Hits {
var product Product
err := json.Unmarshal(hit.Source, &product)
if err != nil {
continue
}
products = append(products, product)
}
return products, searchResult.TotalHits(), nil
}
12. 总结
olivere/elastic 是 Go 语言中最成熟的 ES 客户端,提供了:
- 类型安全:编译时检查查询语法
- 功能完整:支持所有 ES 功能
- 性能优化:连接池、批量操作、重试机制
- 易于使用:链式 API 设计,代码可读性强
通过本文的示例,您可以快速在 Go 项目中集成 Elasticsearch,实现高效的搜索和分析功能。
主题测试文章,只做测试使用。发布者:Walker,转转请注明出处:https://joyjs.cn/archives/4785