From 1e2975ad0fe6aaf20a58624fb5bd30c4f748918d Mon Sep 17 00:00:00 2001 From: Sangeet Kumar Date: Thu, 3 Oct 2024 14:04:17 +0530 Subject: [PATCH] some refactoring --- client.go | 12 ++++++++---- cmd/main.go | 4 ++-- config.go | 2 +- worker.go | 14 +++----------- 4 files changed, 14 insertions(+), 18 deletions(-) diff --git a/client.go b/client.go index 07b4456..8ec61ca 100644 --- a/client.go +++ b/client.go @@ -7,6 +7,7 @@ import ( type Client struct { es *elasticsearch.Client config *Config + ch chan Data } func NewClient(config *Config) (*Client, error) { @@ -30,11 +31,14 @@ func NewClient(config *Config) (*Client, error) { } func (c *Client) Open() error { - // create a buffer channel - ch := make(chan Data, 1) - for index, from := range c.config.Indexes { - NewWorker(c.es, index, from, c.config.BatchSize, ch) + // Open a connection with ElasticSearch + + // create a buffer channel + c.ch = make(chan Data, 1) + + for _, index := range c.config.Indexes { + NewWorker(c, index) } return nil diff --git a/cmd/main.go b/cmd/main.go index c756b38..3e45586 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -9,8 +9,8 @@ import ( func main() { config := &elasticstream.Config{ - Host: "http://localhost:9200", - Indexes: map[string]int{"users": 0, "students": 0, "teachers": 0}, + Host: "http://test.urantiacloud.com:9200", + Indexes: []string{"IndexA", "IndexB", "IndexC"}, BatchSize: 10, } diff --git a/config.go b/config.go index 15d6892..54b177e 100644 --- a/config.go +++ b/config.go @@ -3,6 +3,6 @@ package elasticstream type Config struct { Host string // map of index name and position from where data is to be read. - Indexes map[string]int + Indexes []string BatchSize int } diff --git a/worker.go b/worker.go index 1cd4784..3cdceb2 100644 --- a/worker.go +++ b/worker.go @@ -1,29 +1,21 @@ package elasticstream import ( - "github.com/elastic/go-elasticsearch/v8" +// "github.com/elastic/go-elasticsearch/v8" ) type Worker struct { - client *elasticsearch.Client + client *Client index string // name of the indexes worker pulls data from - from int // from where to start read data - size int // batch size - buffer chan Data } -func NewWorker(client *elasticsearch.Client, index string, from, size int, buffer chan Data) *Worker { +func NewWorker(client *Client, index string) { w := &Worker{ client: client, index: index, - from: from, - size: size, - buffer: buffer, } go w.start() - - return w } func (w *Worker) start() {