diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..dda1605 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +insert +main +elasticstream \ No newline at end of file diff --git a/README.md b/README.md index e69de29..bd30708 100644 --- a/README.md +++ b/README.md @@ -0,0 +1 @@ +# ELASTIC STREAM \ No newline at end of file diff --git a/client.go b/client.go new file mode 100644 index 0000000..5456107 --- /dev/null +++ b/client.go @@ -0,0 +1,43 @@ +package elasticstream + +import ( + "github.com/elastic/go-elasticsearch/v8" +) + +type Client struct { + es *elasticsearch.Client + config *Config +} + +func NewClient() (*Client, error) { + es, err := elasticsearch.NewDefaultClient() + if err != nil { + return nil, err + } + return &Client{es: es}, nil +} + +func (c *Client) Configure(config *Config) error { + c.config = config + return nil +} + +func (c *Client) Open() error { + // create a buffer channel + ch := make(chan Data, 1) + + for index, from := range c.config.Indexes { + NewWorker(c.es, index, from, c.config.BatchSize, ch) + } + + return nil +} + +func (c *Client) Read() (Data, error) { + return Data{}, nil +} + +// close the client +func (c *Client) Teardown() error { + return nil +} diff --git a/cmd/insert.go b/cmd/insert.go new file mode 100644 index 0000000..2dc803d --- /dev/null +++ b/cmd/insert.go @@ -0,0 +1,83 @@ +package main + +import ( + "bytes" + "context" + "encoding/json" + "flag" + "fmt" + "log" + "time" + + "github.com/elastic/go-elasticsearch/esapi" + "github.com/elastic/go-elasticsearch/v8" +) + +func insert(client *elasticsearch.Client, index string, record []byte) error { + req := esapi.IndexRequest{ + Index: index, + // DocumentID: fmt.Sprintf("%d", u.ID), + Body: bytes.NewReader(record), + Refresh: "true", + } + + res, err := req.Do(context.Background(), client) + if err != nil { + return err + } + defer res.Body.Close() + + if res.IsError() { + return fmt.Errorf("error indexing document: %s", res.String()) + } + + fmt.Printf("Document created: %s\n", res.String()) + return nil +} + +type Record struct { + ID int `json:"id"` + Name string `json:"name"` + Price float64 `json:"price"` + Timestamp time.Time `json:"timestamp"` +} + +func main() { + var count int + flag.IntVar(&count, "count", 10, "number of records to generate") + + var index string + flag.StringVar(&index, "index", "default", "index name") + + var host string + flag.StringVar(&host, "host", "http://localhost:9200", "host") + + flag.Parse() + + fmt.Println("count:", count) + fmt.Println("index:", index) + fmt.Println("host:", host) + + client, err := elasticsearch.NewDefaultClient() + if err != nil { + log.Println("elasticsearch.NewDefaultClient() err:", err) + return + } + + for i := 0; i < count; i++ { + record := Record{ + ID: i, + Price: float64(i * 10), + Name: fmt.Sprintf("%s_%d", index, i), + Timestamp: time.Now(), + } + + bytes, _ := json.Marshal(record) + + err := insert(client, index, bytes) + if err != nil { + log.Println("insert() err:", err) + continue + } + } +} diff --git a/cmd/main.go b/cmd/main.go new file mode 100644 index 0000000..6f2ac12 --- /dev/null +++ b/cmd/main.go @@ -0,0 +1,44 @@ +package main + +import ( + "fmt" + "log" + + "11-11.dev/goexamples/elasticstream" +) + +func main() { + client, err := elasticstream.NewClient() + if err != nil { + log.Println("elasticstream.NewClient() err:", err) + return + } + + config := &elasticstream.Config{ + Host: "http://localhost:9200", + Indexes: map[string]int{"users": 0, "students": 0, "teachers": 0}, + BatchSize: 10, + } + + err = client.Configure(config) + if err != nil { + log.Println("client.Configure() err:", err) + return + } + + err = client.Open() + if err != nil { + log.Println("client.Open() err:", err) + return + } + + for { + data, err := client.Read() + if err != nil { + log.Println("eclient.Read() err:", err) + continue + } + + fmt.Println(data) + } +} diff --git a/config.go b/config.go new file mode 100644 index 0000000..15d6892 --- /dev/null +++ b/config.go @@ -0,0 +1,8 @@ +package elasticstream + +type Config struct { + Host string + // map of index name and position from where data is to be read. + Indexes map[string]int + BatchSize int +} diff --git a/data.go b/data.go new file mode 100644 index 0000000..9d23549 --- /dev/null +++ b/data.go @@ -0,0 +1,8 @@ +package elasticstream + +type Data struct { + // we can store index name + Header map[string]string + // actual hit from elastic search + Payload map[string]interface{} +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..6416876 --- /dev/null +++ b/go.mod @@ -0,0 +1,17 @@ +module 11-11.dev/goexamples/elasticstream + +go 1.22.4 + +require ( + github.com/elastic/go-elasticsearch v0.0.0 + github.com/elastic/go-elasticsearch/v8 v8.15.0 +) + +require ( + github.com/elastic/elastic-transport-go/v8 v8.6.0 // indirect + github.com/go-logr/logr v1.4.1 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + go.opentelemetry.io/otel v1.24.0 // indirect + go.opentelemetry.io/otel/metric v1.24.0 // indirect + go.opentelemetry.io/otel/trace v1.24.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..986603e --- /dev/null +++ b/go.sum @@ -0,0 +1,31 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/elastic/elastic-transport-go/v8 v8.6.0 h1:Y2S/FBjx1LlCv5m6pWAF2kDJAHoSjSRSJCApolgfthA= +github.com/elastic/elastic-transport-go/v8 v8.6.0/go.mod h1:YLHer5cj0csTzNFXoNQ8qhtGY1GTvSqPnKWKaqQE3Hk= +github.com/elastic/go-elasticsearch v0.0.0 h1:Pd5fqOuBxKxv83b0+xOAJDAkziWYwFinWnBO0y+TZaA= +github.com/elastic/go-elasticsearch v0.0.0/go.mod h1:TkBSJBuTyFdBnrNqoPc54FN0vKf5c04IdM4zuStJ7xg= +github.com/elastic/go-elasticsearch/v8 v8.15.0 h1:IZyJhe7t7WI3NEFdcHnf6IJXqpRf+8S8QWLtZYYyBYk= +github.com/elastic/go-elasticsearch/v8 v8.15.0/go.mod h1:HCON3zj4btpqs2N1jjsAy4a/fiAul+YBP00mBH4xik8= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= +github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo= +go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo= +go.opentelemetry.io/otel/metric v1.24.0 h1:6EhoGWWK28x1fbpA4tYTOWBkPefTDQnb8WSGXlc88kI= +go.opentelemetry.io/otel/metric v1.24.0/go.mod h1:VYhLe1rFfxuTXLgj4CBiyz+9WYBA8pNGJgDcSFRKBco= +go.opentelemetry.io/otel/sdk v1.21.0 h1:FTt8qirL1EysG6sTQRZ5TokkU8d0ugCj8htOgThZXQ8= +go.opentelemetry.io/otel/sdk v1.21.0/go.mod h1:Nna6Yv7PWTdgJHVRD9hIYywQBRx7pbox6nwBnZIxl/E= +go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y1YELI= +go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU= +golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= +golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/worker.go b/worker.go new file mode 100644 index 0000000..0b7e3e0 --- /dev/null +++ b/worker.go @@ -0,0 +1,97 @@ +package elasticstream + +import ( + "context" + "encoding/json" + "fmt" + "strings" + + "github.com/elastic/go-elasticsearch/esapi" + "github.com/elastic/go-elasticsearch/v8" +) + +type Worker struct { + client *elasticsearch.Client + index string // name of the indexes worker pulls data from + from int // from where to start read data + size int // batch size + buffer chan Data +} + +func NewWorker(client *elasticsearch.Client, index string, from, size int, buffer chan Data) *Worker { + w := &Worker{ + client: client, + index: index, + from: from, + size: size, + buffer: buffer, + } + + go w.start() + + return w +} + +func (w *Worker) start() { + +} + +// search is calling Elastic Search search API +func search(client *elasticsearch.Client, from, size int, index string) ([]Data, error) { + query := fmt.Sprintf(`{ + "query": { + "match_all": {} + } + }`) + + // Create the search request + req := esapi.SearchRequest{ + Index: []string{index}, + Body: strings.NewReader(query), + From: from, + Size: size, + } + + // Perform the request + res, err := req.Do(context.Background(), client) + if err != nil { + return nil, fmt.Errorf("error getting response: %s", err) + } + defer res.Body.Close() + + if res.IsError() { + return nil, fmt.Errorf("error: %s", res.String()) + } + + // Parse the response + var result struct { + Hits struct { + Hits []struct { + Source map[string]interface{} `json:"_source"` + } `json:"hits"` + } `json:"hits"` + } + + if err := json.NewDecoder(res.Body).Decode(&result); err != nil { + return nil, fmt.Errorf("error parsing the response body: %s", err) + } + + // Collect the records + newRecords := make([]map[string]interface{}, len(result.Hits.Hits)) + for i, hit := range result.Hits.Hits { + newRecords[i] = hit.Source + } + + header := map[string]string{"index": index} + + var records []Data + for _, v := range newRecords { + data := Data{ + Header: header, + Payload: v, + } + records = append(records, data) + } + + return records, nil +}