some refactoring
This commit is contained in:
parent
10526d8be1
commit
b4c9386577
21
client.go
21
client.go
|
@ -9,17 +9,24 @@ type Client struct {
|
||||||
config *Config
|
config *Config
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewClient() (*Client, error) {
|
func NewClient(config *Config) (*Client, error) {
|
||||||
es, err := elasticsearch.NewDefaultClient()
|
client := &Client{
|
||||||
|
config: config,
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg := elasticsearch.Config{
|
||||||
|
Addresses: []string{
|
||||||
|
config.Host,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
es, err := elasticsearch.NewClient(cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &Client{es: es}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Client) Configure(config *Config) error {
|
client.es = es
|
||||||
c.config = config
|
return client, nil
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) Open() error {
|
func (c *Client) Open() error {
|
||||||
|
|
10
cmd/main.go
10
cmd/main.go
|
@ -8,21 +8,15 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
client, err := elasticstream.NewClient()
|
|
||||||
if err != nil {
|
|
||||||
log.Println("elasticstream.NewClient() err:", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
config := &elasticstream.Config{
|
config := &elasticstream.Config{
|
||||||
Host: "http://localhost:9200",
|
Host: "http://localhost:9200",
|
||||||
Indexes: map[string]int{"users": 0, "students": 0, "teachers": 0},
|
Indexes: map[string]int{"users": 0, "students": 0, "teachers": 0},
|
||||||
BatchSize: 10,
|
BatchSize: 10,
|
||||||
}
|
}
|
||||||
|
|
||||||
err = client.Configure(config)
|
client, err := elasticstream.NewClient(config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println("client.Configure() err:", err)
|
log.Println("elasticstream.NewClient() err:", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,71 @@
|
||||||
|
package elasticstream
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/elastic/go-elasticsearch/esapi"
|
||||||
|
"github.com/elastic/go-elasticsearch/v8"
|
||||||
|
)
|
||||||
|
|
||||||
|
// search is calling Elastic Search search API
|
||||||
|
func search(client *elasticsearch.Client, from, size int, index string) ([]Data, error) {
|
||||||
|
query := fmt.Sprintf(`{
|
||||||
|
"query": {
|
||||||
|
"match_all": {}
|
||||||
|
}
|
||||||
|
}`)
|
||||||
|
|
||||||
|
// Create the search request
|
||||||
|
req := esapi.SearchRequest{
|
||||||
|
Index: []string{index},
|
||||||
|
Body: strings.NewReader(query),
|
||||||
|
// From: from,
|
||||||
|
// Size: size,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Perform the request
|
||||||
|
res, err := req.Do(context.Background(), client)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error getting response: %s", err)
|
||||||
|
}
|
||||||
|
defer res.Body.Close()
|
||||||
|
|
||||||
|
if res.IsError() {
|
||||||
|
return nil, fmt.Errorf("error: %s", res.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse the response
|
||||||
|
var result struct {
|
||||||
|
Hits struct {
|
||||||
|
Hits []struct {
|
||||||
|
Source map[string]interface{} `json:"_source"`
|
||||||
|
} `json:"hits"`
|
||||||
|
} `json:"hits"`
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
|
||||||
|
return nil, fmt.Errorf("error parsing the response body: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Collect the records
|
||||||
|
newRecords := make([]map[string]interface{}, len(result.Hits.Hits))
|
||||||
|
for i, hit := range result.Hits.Hits {
|
||||||
|
newRecords[i] = hit.Source
|
||||||
|
}
|
||||||
|
|
||||||
|
header := map[string]string{"index": index}
|
||||||
|
|
||||||
|
var records []Data
|
||||||
|
for _, v := range newRecords {
|
||||||
|
data := Data{
|
||||||
|
Header: header,
|
||||||
|
Payload: v,
|
||||||
|
}
|
||||||
|
records = append(records, data)
|
||||||
|
}
|
||||||
|
|
||||||
|
return records, nil
|
||||||
|
}
|
66
worker.go
66
worker.go
|
@ -1,12 +1,6 @@
|
||||||
package elasticstream
|
package elasticstream
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
|
||||||
"strings"
|
|
||||||
|
|
||||||
"github.com/elastic/go-elasticsearch/esapi"
|
|
||||||
"github.com/elastic/go-elasticsearch/v8"
|
"github.com/elastic/go-elasticsearch/v8"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -35,63 +29,3 @@ func NewWorker(client *elasticsearch.Client, index string, from, size int, buffe
|
||||||
func (w *Worker) start() {
|
func (w *Worker) start() {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// search is calling Elastic Search search API
|
|
||||||
func search(client *elasticsearch.Client, from, size int, index string) ([]Data, error) {
|
|
||||||
query := fmt.Sprintf(`{
|
|
||||||
"query": {
|
|
||||||
"match_all": {}
|
|
||||||
}
|
|
||||||
}`)
|
|
||||||
|
|
||||||
// Create the search request
|
|
||||||
req := esapi.SearchRequest{
|
|
||||||
Index: []string{index},
|
|
||||||
Body: strings.NewReader(query),
|
|
||||||
From: from,
|
|
||||||
Size: size,
|
|
||||||
}
|
|
||||||
|
|
||||||
// Perform the request
|
|
||||||
res, err := req.Do(context.Background(), client)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("error getting response: %s", err)
|
|
||||||
}
|
|
||||||
defer res.Body.Close()
|
|
||||||
|
|
||||||
if res.IsError() {
|
|
||||||
return nil, fmt.Errorf("error: %s", res.String())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Parse the response
|
|
||||||
var result struct {
|
|
||||||
Hits struct {
|
|
||||||
Hits []struct {
|
|
||||||
Source map[string]interface{} `json:"_source"`
|
|
||||||
} `json:"hits"`
|
|
||||||
} `json:"hits"`
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
|
|
||||||
return nil, fmt.Errorf("error parsing the response body: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Collect the records
|
|
||||||
newRecords := make([]map[string]interface{}, len(result.Hits.Hits))
|
|
||||||
for i, hit := range result.Hits.Hits {
|
|
||||||
newRecords[i] = hit.Source
|
|
||||||
}
|
|
||||||
|
|
||||||
header := map[string]string{"index": index}
|
|
||||||
|
|
||||||
var records []Data
|
|
||||||
for _, v := range newRecords {
|
|
||||||
data := Data{
|
|
||||||
Header: header,
|
|
||||||
Payload: v,
|
|
||||||
}
|
|
||||||
records = append(records, data)
|
|
||||||
}
|
|
||||||
|
|
||||||
return records, nil
|
|
||||||
}
|
|
||||||
|
|
Loading…
Reference in New Issue