elasticstream/source/elastic/search.go

62 lines
1.3 KiB
Go
Raw Permalink Normal View History

2024-10-07 16:26:41 +05:30
package elastic
2024-10-03 12:57:23 +05:30
import (
"context"
"encoding/json"
"fmt"
"strings"
2024-10-07 22:41:59 +05:30
"time"
2024-10-07 16:26:41 +05:30
2024-10-03 12:57:23 +05:30
"github.com/elastic/go-elasticsearch/esapi"
"github.com/elastic/go-elasticsearch/v8"
)
2024-10-07 22:41:59 +05:30
type SearchResponse struct {
Hits struct {
Total struct {
Value int `json:"value"`
} `json:"total"`
Hits []struct {
Index string `json:"_index"`
ID string `json:"_id"`
Source map[string]interface{} `json:"_source"`
} `json:"hits"`
} `json:"hits"`
}
2024-10-03 12:57:23 +05:30
// search is calling Elastic Search search API
2024-10-07 22:41:59 +05:30
func search(client *elasticsearch.Client, index string, offset, size *int) (*SearchResponse, error) {
2024-10-03 12:57:23 +05:30
query := fmt.Sprintf(`{
"query": {
"match_all": {}
}
}`)
// Create the search request
req := esapi.SearchRequest{
Index: []string{index},
Body: strings.NewReader(query),
2024-10-07 09:50:00 +05:30
From: offset,
Size: size,
2024-10-03 12:57:23 +05:30
}
// Perform the request
2024-10-07 22:41:59 +05:30
ctx, _ := context.WithTimeout(context.TODO(), 5*time.Second)
res, err := req.Do(ctx, client)
2024-10-03 12:57:23 +05:30
if err != nil {
return nil, fmt.Errorf("error getting response: %s", err)
}
defer res.Body.Close()
if res.IsError() {
2024-10-07 09:50:00 +05:30
return nil, fmt.Errorf("res.IsError() error: %s", res.String())
2024-10-03 12:57:23 +05:30
}
2024-10-07 22:41:59 +05:30
result := &SearchResponse{}
if err := json.NewDecoder(res.Body).Decode(result); err != nil {
2024-10-03 12:57:23 +05:30
return nil, fmt.Errorf("error parsing the response body: %s", err)
}
2024-10-07 22:41:59 +05:30
return result, nil
2024-10-03 12:57:23 +05:30
}