This commit is contained in:
Parikshit Gothwal 2024-10-07 09:50:00 +05:30
parent 2f9392955b
commit 26fb21902a
10 changed files with 130 additions and 16 deletions

3
.gitignore vendored
View File

@ -1,3 +1,4 @@
insert
main
elasticstream
elasticstream
index.db

View File

@ -2,19 +2,24 @@ package elasticstream
import (
"fmt"
"log"
"github.com/boltdb/bolt"
"github.com/elastic/go-elasticsearch/v8"
)
type Client struct {
es *elasticsearch.Client
config *Config
ch chan Data
es *elasticsearch.Client
config *Config
ch chan Data
db *bolt.DB
offsets map[string]int
}
func NewClient(config *Config) (*Client, error) {
client := &Client{
config: config,
config: config,
offsets: make(map[string]int),
}
return client, nil
}
@ -34,10 +39,25 @@ func (c *Client) Open() error {
}
// create a buffer channel
c.ch = make(chan Data, 1)
c.ch = make(chan Data, 25)
// open bolt db
c.db, err = bolt.Open(c.config.DBPath, 0666, nil)
if err != nil {
return err
}
// defer db.Close()
for _, index := range c.config.Indexes {
NewWorker(c, index)
offset, err := getOffset(c.db, index)
if err != nil {
log.Printf("getOffset(%s) err:%s\n", index, err)
continue
}
c.offsets[index] = offset
NewWorker(c, index, offset, c.config.BatchSize)
}
return nil
@ -49,6 +69,24 @@ func (c *Client) Read() (*Data, error) {
if !ok {
return nil, fmt.Errorf("error reading data from channel")
}
index, ok := data.Header["index"]
if !ok {
return nil, fmt.Errorf("error index not found in data header")
}
offset, ok := c.offsets[index]
if !ok {
return nil, fmt.Errorf("error offset index not found")
}
c.offsets[index] = offset + 1
err := setOffset(c.db, index, c.offsets[index])
if err != nil {
return nil, err
}
return &data, nil
}

View File

@ -58,7 +58,13 @@ func main() {
fmt.Println("index:", index)
fmt.Println("host:", host)
client, err := elasticsearch.NewDefaultClient()
cfg := elasticsearch.Config{
Addresses: []string{
host,
},
}
client, err := elasticsearch.NewClient(cfg)
if err != nil {
log.Println("elasticsearch.NewDefaultClient() err:", err)
return

View File

@ -10,8 +10,9 @@ import (
func main() {
config := &elasticstream.Config{
Host: "http://test.urantiacloud.com:9200",
Indexes: []string{"IndexA", "IndexB", "IndexC"},
Indexes: []string{"index-a", "index-b", "index-c"},
BatchSize: 10,
DBPath: "./index.db",
}
client, err := elasticstream.NewClient(config)

0
cmd/search.go Normal file
View File

40
db.go Normal file
View File

@ -0,0 +1,40 @@
package elasticstream
import (
"fmt"
"log"
"strconv"
"github.com/boltdb/bolt"
)
func getOffset(db *bolt.DB, index string) (int, error) {
offset := 0
if err := db.Update(func(tx *bolt.Tx) error {
b, err := tx.CreateBucketIfNotExists([]byte("position"))
if err != nil {
return err
}
v := b.Get([]byte(index))
offset, _ = strconv.Atoi(string(v))
return nil
}); err != nil {
return offset, err
}
return offset, nil
}
func setOffset(db *bolt.DB, index string, offset int) error {
// log.Printf("setOffset() index=%s offset=%d", index, offset)
if err := db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("position"))
if err := b.Put([]byte(index), []byte(fmt.Sprintf("%d", offset))); err != nil {
return err
}
return nil
}); err != nil {
log.Fatal(err)
}
return nil
}

2
go.mod
View File

@ -8,10 +8,12 @@ require (
)
require (
github.com/boltdb/bolt v1.3.1 // indirect
github.com/elastic/elastic-transport-go/v8 v8.6.0 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
go.opentelemetry.io/otel v1.24.0 // indirect
go.opentelemetry.io/otel/metric v1.24.0 // indirect
go.opentelemetry.io/otel/trace v1.24.0 // indirect
golang.org/x/sys v0.19.0 // indirect
)

2
go.sum
View File

@ -1,3 +1,5 @@
github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4=
github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/elastic/elastic-transport-go/v8 v8.6.0 h1:Y2S/FBjx1LlCv5m6pWAF2kDJAHoSjSRSJCApolgfthA=

View File

@ -11,7 +11,7 @@ import (
)
// search is calling Elastic Search search API
func search(client *elasticsearch.Client, from, size int, index string) ([]Data, error) {
func search(client *elasticsearch.Client, index string, offset, size *int) ([]Data, error) {
query := fmt.Sprintf(`{
"query": {
"match_all": {}
@ -22,8 +22,8 @@ func search(client *elasticsearch.Client, from, size int, index string) ([]Data,
req := esapi.SearchRequest{
Index: []string{index},
Body: strings.NewReader(query),
// From: from,
// Size: size,
From: offset,
Size: size,
}
// Perform the request
@ -34,7 +34,7 @@ func search(client *elasticsearch.Client, from, size int, index string) ([]Data,
defer res.Body.Close()
if res.IsError() {
return nil, fmt.Errorf("error: %s", res.String())
return nil, fmt.Errorf("res.IsError() error: %s", res.String())
}
// Parse the response
@ -67,5 +67,7 @@ func search(client *elasticsearch.Client, from, size int, index string) ([]Data,
records = append(records, data)
}
// log.Println("records:", records)
return records, nil
}

View File

@ -1,18 +1,24 @@
package elasticstream
import (
// "github.com/elastic/go-elasticsearch/v8"
"log"
"time"
// "github.com/elastic/go-elasticsearch/v8"
)
type Worker struct {
client *Client
index string // name of the indexes worker pulls data from
index string
offset int
size int
}
func NewWorker(client *Client, index string) {
func NewWorker(client *Client, index string, offset, size int) {
w := &Worker{
client: client,
index: index,
offset: offset,
size: size,
}
go w.start()
@ -20,4 +26,20 @@ func NewWorker(client *Client, index string) {
func (w *Worker) start() {
for {
// log.Printf("worker index=%s offset=%d size=%d\n", w.index, w.offset, w.size)
dataArray, err := search(w.client.es, w.index, &w.offset, &w.size)
if err != nil {
log.Println("search() err:", err)
time.Sleep(1 * time.Second)
continue
}
for _, data := range dataArray {
w.client.ch <- data
w.offset++
}
}
}