diff --git a/.gitignore b/.gitignore index dda1605..5134a74 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ insert main -elasticstream \ No newline at end of file +elasticstream +index.db \ No newline at end of file diff --git a/client.go b/client.go index 001a38e..846dd3b 100644 --- a/client.go +++ b/client.go @@ -2,19 +2,24 @@ package elasticstream import ( "fmt" + "log" + "github.com/boltdb/bolt" "github.com/elastic/go-elasticsearch/v8" ) type Client struct { - es *elasticsearch.Client - config *Config - ch chan Data + es *elasticsearch.Client + config *Config + ch chan Data + db *bolt.DB + offsets map[string]int } func NewClient(config *Config) (*Client, error) { client := &Client{ - config: config, + config: config, + offsets: make(map[string]int), } return client, nil } @@ -34,10 +39,25 @@ func (c *Client) Open() error { } // create a buffer channel - c.ch = make(chan Data, 1) + c.ch = make(chan Data, 25) + + // open bolt db + c.db, err = bolt.Open(c.config.DBPath, 0666, nil) + if err != nil { + return err + } + // defer db.Close() for _, index := range c.config.Indexes { - NewWorker(c, index) + offset, err := getOffset(c.db, index) + if err != nil { + log.Printf("getOffset(%s) err:%s\n", index, err) + continue + } + + c.offsets[index] = offset + + NewWorker(c, index, offset, c.config.BatchSize) } return nil @@ -49,6 +69,24 @@ func (c *Client) Read() (*Data, error) { if !ok { return nil, fmt.Errorf("error reading data from channel") } + + index, ok := data.Header["index"] + if !ok { + return nil, fmt.Errorf("error index not found in data header") + } + + offset, ok := c.offsets[index] + if !ok { + return nil, fmt.Errorf("error offset index not found") + } + + c.offsets[index] = offset + 1 + + err := setOffset(c.db, index, c.offsets[index]) + if err != nil { + return nil, err + } + return &data, nil } diff --git a/cmd/insert.go b/cmd/insert.go index 2dc803d..d228361 100644 --- a/cmd/insert.go +++ b/cmd/insert.go @@ -58,7 +58,13 @@ func main() { fmt.Println("index:", index) fmt.Println("host:", host) - client, err := elasticsearch.NewDefaultClient() + cfg := elasticsearch.Config{ + Addresses: []string{ + host, + }, + } + + client, err := elasticsearch.NewClient(cfg) if err != nil { log.Println("elasticsearch.NewDefaultClient() err:", err) return diff --git a/cmd/main.go b/cmd/main.go index 3e45586..e58d716 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -10,8 +10,9 @@ import ( func main() { config := &elasticstream.Config{ Host: "http://test.urantiacloud.com:9200", - Indexes: []string{"IndexA", "IndexB", "IndexC"}, + Indexes: []string{"index-a", "index-b", "index-c"}, BatchSize: 10, + DBPath: "./index.db", } client, err := elasticstream.NewClient(config) diff --git a/cmd/search.go b/cmd/search.go new file mode 100644 index 0000000..e69de29 diff --git a/db.go b/db.go new file mode 100644 index 0000000..22753a3 --- /dev/null +++ b/db.go @@ -0,0 +1,40 @@ +package elasticstream + +import ( + "fmt" + "log" + "strconv" + + "github.com/boltdb/bolt" +) + +func getOffset(db *bolt.DB, index string) (int, error) { + offset := 0 + if err := db.Update(func(tx *bolt.Tx) error { + b, err := tx.CreateBucketIfNotExists([]byte("position")) + if err != nil { + return err + } + v := b.Get([]byte(index)) + offset, _ = strconv.Atoi(string(v)) + return nil + }); err != nil { + return offset, err + } + return offset, nil +} + +func setOffset(db *bolt.DB, index string, offset int) error { + // log.Printf("setOffset() index=%s offset=%d", index, offset) + if err := db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte("position")) + + if err := b.Put([]byte(index), []byte(fmt.Sprintf("%d", offset))); err != nil { + return err + } + return nil + }); err != nil { + log.Fatal(err) + } + return nil +} diff --git a/go.mod b/go.mod index 6416876..8f554a4 100644 --- a/go.mod +++ b/go.mod @@ -8,10 +8,12 @@ require ( ) require ( + github.com/boltdb/bolt v1.3.1 // indirect github.com/elastic/elastic-transport-go/v8 v8.6.0 // indirect github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/stdr v1.2.2 // indirect go.opentelemetry.io/otel v1.24.0 // indirect go.opentelemetry.io/otel/metric v1.24.0 // indirect go.opentelemetry.io/otel/trace v1.24.0 // indirect + golang.org/x/sys v0.19.0 // indirect ) diff --git a/go.sum b/go.sum index 986603e..1ed9043 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4= +github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/elastic/elastic-transport-go/v8 v8.6.0 h1:Y2S/FBjx1LlCv5m6pWAF2kDJAHoSjSRSJCApolgfthA= diff --git a/search.go b/search.go index 4e07a31..f7ebb81 100644 --- a/search.go +++ b/search.go @@ -11,7 +11,7 @@ import ( ) // search is calling Elastic Search search API -func search(client *elasticsearch.Client, from, size int, index string) ([]Data, error) { +func search(client *elasticsearch.Client, index string, offset, size *int) ([]Data, error) { query := fmt.Sprintf(`{ "query": { "match_all": {} @@ -22,8 +22,8 @@ func search(client *elasticsearch.Client, from, size int, index string) ([]Data, req := esapi.SearchRequest{ Index: []string{index}, Body: strings.NewReader(query), - // From: from, - // Size: size, + From: offset, + Size: size, } // Perform the request @@ -34,7 +34,7 @@ func search(client *elasticsearch.Client, from, size int, index string) ([]Data, defer res.Body.Close() if res.IsError() { - return nil, fmt.Errorf("error: %s", res.String()) + return nil, fmt.Errorf("res.IsError() error: %s", res.String()) } // Parse the response @@ -67,5 +67,7 @@ func search(client *elasticsearch.Client, from, size int, index string) ([]Data, records = append(records, data) } + // log.Println("records:", records) + return records, nil } diff --git a/worker.go b/worker.go index 3cdceb2..7f890b0 100644 --- a/worker.go +++ b/worker.go @@ -1,18 +1,24 @@ package elasticstream import ( -// "github.com/elastic/go-elasticsearch/v8" + "log" + "time" + // "github.com/elastic/go-elasticsearch/v8" ) type Worker struct { client *Client - index string // name of the indexes worker pulls data from + index string + offset int + size int } -func NewWorker(client *Client, index string) { +func NewWorker(client *Client, index string, offset, size int) { w := &Worker{ client: client, index: index, + offset: offset, + size: size, } go w.start() @@ -20,4 +26,20 @@ func NewWorker(client *Client, index string) { func (w *Worker) start() { + for { + // log.Printf("worker index=%s offset=%d size=%d\n", w.index, w.offset, w.size) + + dataArray, err := search(w.client.es, w.index, &w.offset, &w.size) + if err != nil { + log.Println("search() err:", err) + time.Sleep(1 * time.Second) + continue + } + + for _, data := range dataArray { + w.client.ch <- data + w.offset++ + } + } + }