diff --git a/client.go b/client.go index 5456107..07b4456 100644 --- a/client.go +++ b/client.go @@ -9,17 +9,24 @@ type Client struct { config *Config } -func NewClient() (*Client, error) { - es, err := elasticsearch.NewDefaultClient() +func NewClient(config *Config) (*Client, error) { + client := &Client{ + config: config, + } + + cfg := elasticsearch.Config{ + Addresses: []string{ + config.Host, + }, + } + + es, err := elasticsearch.NewClient(cfg) if err != nil { return nil, err } - return &Client{es: es}, nil -} -func (c *Client) Configure(config *Config) error { - c.config = config - return nil + client.es = es + return client, nil } func (c *Client) Open() error { diff --git a/cmd/main.go b/cmd/main.go index 6f2ac12..c756b38 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -8,21 +8,15 @@ import ( ) func main() { - client, err := elasticstream.NewClient() - if err != nil { - log.Println("elasticstream.NewClient() err:", err) - return - } - config := &elasticstream.Config{ Host: "http://localhost:9200", Indexes: map[string]int{"users": 0, "students": 0, "teachers": 0}, BatchSize: 10, } - err = client.Configure(config) + client, err := elasticstream.NewClient(config) if err != nil { - log.Println("client.Configure() err:", err) + log.Println("elasticstream.NewClient() err:", err) return } diff --git a/search.go b/search.go new file mode 100644 index 0000000..4e07a31 --- /dev/null +++ b/search.go @@ -0,0 +1,71 @@ +package elasticstream + +import ( + "context" + "encoding/json" + "fmt" + "strings" + + "github.com/elastic/go-elasticsearch/esapi" + "github.com/elastic/go-elasticsearch/v8" +) + +// search is calling Elastic Search search API +func search(client *elasticsearch.Client, from, size int, index string) ([]Data, error) { + query := fmt.Sprintf(`{ + "query": { + "match_all": {} + } + }`) + + // Create the search request + req := esapi.SearchRequest{ + Index: []string{index}, + Body: strings.NewReader(query), + // From: from, + // Size: size, + } + + // Perform the request + res, err := req.Do(context.Background(), client) + if err != nil { + return nil, fmt.Errorf("error getting response: %s", err) + } + defer res.Body.Close() + + if res.IsError() { + return nil, fmt.Errorf("error: %s", res.String()) + } + + // Parse the response + var result struct { + Hits struct { + Hits []struct { + Source map[string]interface{} `json:"_source"` + } `json:"hits"` + } `json:"hits"` + } + + if err := json.NewDecoder(res.Body).Decode(&result); err != nil { + return nil, fmt.Errorf("error parsing the response body: %s", err) + } + + // Collect the records + newRecords := make([]map[string]interface{}, len(result.Hits.Hits)) + for i, hit := range result.Hits.Hits { + newRecords[i] = hit.Source + } + + header := map[string]string{"index": index} + + var records []Data + for _, v := range newRecords { + data := Data{ + Header: header, + Payload: v, + } + records = append(records, data) + } + + return records, nil +} diff --git a/worker.go b/worker.go index 0b7e3e0..1cd4784 100644 --- a/worker.go +++ b/worker.go @@ -1,12 +1,6 @@ package elasticstream import ( - "context" - "encoding/json" - "fmt" - "strings" - - "github.com/elastic/go-elasticsearch/esapi" "github.com/elastic/go-elasticsearch/v8" ) @@ -35,63 +29,3 @@ func NewWorker(client *elasticsearch.Client, index string, from, size int, buffe func (w *Worker) start() { } - -// search is calling Elastic Search search API -func search(client *elasticsearch.Client, from, size int, index string) ([]Data, error) { - query := fmt.Sprintf(`{ - "query": { - "match_all": {} - } - }`) - - // Create the search request - req := esapi.SearchRequest{ - Index: []string{index}, - Body: strings.NewReader(query), - From: from, - Size: size, - } - - // Perform the request - res, err := req.Do(context.Background(), client) - if err != nil { - return nil, fmt.Errorf("error getting response: %s", err) - } - defer res.Body.Close() - - if res.IsError() { - return nil, fmt.Errorf("error: %s", res.String()) - } - - // Parse the response - var result struct { - Hits struct { - Hits []struct { - Source map[string]interface{} `json:"_source"` - } `json:"hits"` - } `json:"hits"` - } - - if err := json.NewDecoder(res.Body).Decode(&result); err != nil { - return nil, fmt.Errorf("error parsing the response body: %s", err) - } - - // Collect the records - newRecords := make([]map[string]interface{}, len(result.Hits.Hits)) - for i, hit := range result.Hits.Hits { - newRecords[i] = hit.Source - } - - header := map[string]string{"index": index} - - var records []Data - for _, v := range newRecords { - data := Data{ - Header: header, - Payload: v, - } - records = append(records, data) - } - - return records, nil -}