initial setup

This commit is contained in:
Parikshit Gothwal 2024-10-03 12:40:46 +05:30
parent 7cab6999a3
commit 10526d8be1
10 changed files with 335 additions and 0 deletions

3
.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
insert
main
elasticstream

View File

@ -0,0 +1 @@
# ELASTIC STREAM

43
client.go Normal file
View File

@ -0,0 +1,43 @@
package elasticstream
import (
"github.com/elastic/go-elasticsearch/v8"
)
type Client struct {
es *elasticsearch.Client
config *Config
}
func NewClient() (*Client, error) {
es, err := elasticsearch.NewDefaultClient()
if err != nil {
return nil, err
}
return &Client{es: es}, nil
}
func (c *Client) Configure(config *Config) error {
c.config = config
return nil
}
func (c *Client) Open() error {
// create a buffer channel
ch := make(chan Data, 1)
for index, from := range c.config.Indexes {
NewWorker(c.es, index, from, c.config.BatchSize, ch)
}
return nil
}
func (c *Client) Read() (Data, error) {
return Data{}, nil
}
// close the client
func (c *Client) Teardown() error {
return nil
}

83
cmd/insert.go Normal file
View File

@ -0,0 +1,83 @@
package main
import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"log"
"time"
"github.com/elastic/go-elasticsearch/esapi"
"github.com/elastic/go-elasticsearch/v8"
)
func insert(client *elasticsearch.Client, index string, record []byte) error {
req := esapi.IndexRequest{
Index: index,
// DocumentID: fmt.Sprintf("%d", u.ID),
Body: bytes.NewReader(record),
Refresh: "true",
}
res, err := req.Do(context.Background(), client)
if err != nil {
return err
}
defer res.Body.Close()
if res.IsError() {
return fmt.Errorf("error indexing document: %s", res.String())
}
fmt.Printf("Document created: %s\n", res.String())
return nil
}
type Record struct {
ID int `json:"id"`
Name string `json:"name"`
Price float64 `json:"price"`
Timestamp time.Time `json:"timestamp"`
}
func main() {
var count int
flag.IntVar(&count, "count", 10, "number of records to generate")
var index string
flag.StringVar(&index, "index", "default", "index name")
var host string
flag.StringVar(&host, "host", "http://localhost:9200", "host")
flag.Parse()
fmt.Println("count:", count)
fmt.Println("index:", index)
fmt.Println("host:", host)
client, err := elasticsearch.NewDefaultClient()
if err != nil {
log.Println("elasticsearch.NewDefaultClient() err:", err)
return
}
for i := 0; i < count; i++ {
record := Record{
ID: i,
Price: float64(i * 10),
Name: fmt.Sprintf("%s_%d", index, i),
Timestamp: time.Now(),
}
bytes, _ := json.Marshal(record)
err := insert(client, index, bytes)
if err != nil {
log.Println("insert() err:", err)
continue
}
}
}

44
cmd/main.go Normal file
View File

@ -0,0 +1,44 @@
package main
import (
"fmt"
"log"
"11-11.dev/goexamples/elasticstream"
)
func main() {
client, err := elasticstream.NewClient()
if err != nil {
log.Println("elasticstream.NewClient() err:", err)
return
}
config := &elasticstream.Config{
Host: "http://localhost:9200",
Indexes: map[string]int{"users": 0, "students": 0, "teachers": 0},
BatchSize: 10,
}
err = client.Configure(config)
if err != nil {
log.Println("client.Configure() err:", err)
return
}
err = client.Open()
if err != nil {
log.Println("client.Open() err:", err)
return
}
for {
data, err := client.Read()
if err != nil {
log.Println("eclient.Read() err:", err)
continue
}
fmt.Println(data)
}
}

8
config.go Normal file
View File

@ -0,0 +1,8 @@
package elasticstream
type Config struct {
Host string
// map of index name and position from where data is to be read.
Indexes map[string]int
BatchSize int
}

8
data.go Normal file
View File

@ -0,0 +1,8 @@
package elasticstream
type Data struct {
// we can store index name
Header map[string]string
// actual hit from elastic search
Payload map[string]interface{}
}

17
go.mod Normal file
View File

@ -0,0 +1,17 @@
module 11-11.dev/goexamples/elasticstream
go 1.22.4
require (
github.com/elastic/go-elasticsearch v0.0.0
github.com/elastic/go-elasticsearch/v8 v8.15.0
)
require (
github.com/elastic/elastic-transport-go/v8 v8.6.0 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
go.opentelemetry.io/otel v1.24.0 // indirect
go.opentelemetry.io/otel/metric v1.24.0 // indirect
go.opentelemetry.io/otel/trace v1.24.0 // indirect
)

31
go.sum Normal file
View File

@ -0,0 +1,31 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/elastic/elastic-transport-go/v8 v8.6.0 h1:Y2S/FBjx1LlCv5m6pWAF2kDJAHoSjSRSJCApolgfthA=
github.com/elastic/elastic-transport-go/v8 v8.6.0/go.mod h1:YLHer5cj0csTzNFXoNQ8qhtGY1GTvSqPnKWKaqQE3Hk=
github.com/elastic/go-elasticsearch v0.0.0 h1:Pd5fqOuBxKxv83b0+xOAJDAkziWYwFinWnBO0y+TZaA=
github.com/elastic/go-elasticsearch v0.0.0/go.mod h1:TkBSJBuTyFdBnrNqoPc54FN0vKf5c04IdM4zuStJ7xg=
github.com/elastic/go-elasticsearch/v8 v8.15.0 h1:IZyJhe7t7WI3NEFdcHnf6IJXqpRf+8S8QWLtZYYyBYk=
github.com/elastic/go-elasticsearch/v8 v8.15.0/go.mod h1:HCON3zj4btpqs2N1jjsAy4a/fiAul+YBP00mBH4xik8=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ=
github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo=
go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo=
go.opentelemetry.io/otel/metric v1.24.0 h1:6EhoGWWK28x1fbpA4tYTOWBkPefTDQnb8WSGXlc88kI=
go.opentelemetry.io/otel/metric v1.24.0/go.mod h1:VYhLe1rFfxuTXLgj4CBiyz+9WYBA8pNGJgDcSFRKBco=
go.opentelemetry.io/otel/sdk v1.21.0 h1:FTt8qirL1EysG6sTQRZ5TokkU8d0ugCj8htOgThZXQ8=
go.opentelemetry.io/otel/sdk v1.21.0/go.mod h1:Nna6Yv7PWTdgJHVRD9hIYywQBRx7pbox6nwBnZIxl/E=
go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y1YELI=
go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU=
golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o=
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

97
worker.go Normal file
View File

@ -0,0 +1,97 @@
package elasticstream
import (
"context"
"encoding/json"
"fmt"
"strings"
"github.com/elastic/go-elasticsearch/esapi"
"github.com/elastic/go-elasticsearch/v8"
)
type Worker struct {
client *elasticsearch.Client
index string // name of the indexes worker pulls data from
from int // from where to start read data
size int // batch size
buffer chan Data
}
func NewWorker(client *elasticsearch.Client, index string, from, size int, buffer chan Data) *Worker {
w := &Worker{
client: client,
index: index,
from: from,
size: size,
buffer: buffer,
}
go w.start()
return w
}
func (w *Worker) start() {
}
// search is calling Elastic Search search API
func search(client *elasticsearch.Client, from, size int, index string) ([]Data, error) {
query := fmt.Sprintf(`{
"query": {
"match_all": {}
}
}`)
// Create the search request
req := esapi.SearchRequest{
Index: []string{index},
Body: strings.NewReader(query),
From: from,
Size: size,
}
// Perform the request
res, err := req.Do(context.Background(), client)
if err != nil {
return nil, fmt.Errorf("error getting response: %s", err)
}
defer res.Body.Close()
if res.IsError() {
return nil, fmt.Errorf("error: %s", res.String())
}
// Parse the response
var result struct {
Hits struct {
Hits []struct {
Source map[string]interface{} `json:"_source"`
} `json:"hits"`
} `json:"hits"`
}
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("error parsing the response body: %s", err)
}
// Collect the records
newRecords := make([]map[string]interface{}, len(result.Hits.Hits))
for i, hit := range result.Hits.Hits {
newRecords[i] = hit.Source
}
header := map[string]string{"index": index}
var records []Data
for _, v := range newRecords {
data := Data{
Header: header,
Payload: v,
}
records = append(records, data)
}
return records, nil
}