From 36412c76017e03db5f5724509e1c9117d7d2c77b Mon Sep 17 00:00:00 2001 From: Parikshit Gothwal Date: Mon, 7 Oct 2024 16:26:41 +0530 Subject: [PATCH] build --- config/config.go | 2 +- controller/ack.go | 16 ++++++++++------ controller/open.go | 13 +++++++++---- controller/read.go | 5 ++--- controller/teardown.go | 5 ++--- main.go | 1 - opencdc/data.go | 2 +- source/elastic/ack.go | 5 +++-- source/elastic/client.go | 16 ++++++++-------- source/elastic/configure.go | 4 +++- source/elastic/db.go | 2 +- source/elastic/open.go | 28 +++++++++++++++------------- source/elastic/read.go | 5 ++++- source/elastic/search.go | 12 +++++++----- source/elastic/teardown.go | 2 +- source/elastic/worker.go | 11 ++++------- source/interface.go | 9 ++++++--- 17 files changed, 77 insertions(+), 61 deletions(-) diff --git a/config/config.go b/config/config.go index a2a7a54..9c58ce9 100644 --- a/config/config.go +++ b/config/config.go @@ -1,4 +1,4 @@ -package elasticstream +package config type Config struct { Host string diff --git a/controller/ack.go b/controller/ack.go index 58c6828..078d33c 100644 --- a/controller/ack.go +++ b/controller/ack.go @@ -1,19 +1,23 @@ package controller import ( - "elasticstream/source/elastic" + "net/http" + + "elasticstream/source" - "github.com/elastic/go-elasticsearch/v8" "github.com/gin-gonic/gin" ) func Ack(c *gin.Context) { + var req source.Position - var req Position + err := c.BindJSON(&req) + if err != nil { + c.Status(http.StatusBadRequest) + return + } - c.BindJSON(c, &req) - - err := client.Ack(c, req) + err = client.Ack(c, req) if err != nil { c.Status(http.StatusInternalServerError) return diff --git a/controller/open.go b/controller/open.go index 3332334..1e933d5 100644 --- a/controller/open.go +++ b/controller/open.go @@ -1,9 +1,12 @@ package controller import ( + "net/http" + + "elasticstream/config" + "elasticstream/source" "elasticstream/source/elastic" - "github.com/elastic/go-elasticsearch/v8" "github.com/gin-gonic/gin" ) @@ -13,7 +16,7 @@ func Open(c *gin.Context) { client = elastic.NewClient() - err := client.Configure(&Config{ + err := client.Configure(c, config.Config{ Host: "http://test.urantiacloud.com:9200", Indexes: []string{"index-a", "index-b", "index-c"}, BatchSize: 100, @@ -24,8 +27,10 @@ func Open(c *gin.Context) { return } - // Open connection with Elastic Search - err = client.Open() + // get position from boltdb + var positions []source.Position + + err = client.Open(c, positions) if err != nil { c.Status(http.StatusInternalServerError) return diff --git a/controller/read.go b/controller/read.go index d359ef6..365279d 100644 --- a/controller/read.go +++ b/controller/read.go @@ -1,16 +1,15 @@ package controller import ( - "elasticstream/source/elastic" + "net/http" - "github.com/elastic/go-elasticsearch/v8" "github.com/gin-gonic/gin" ) func Read(c *gin.Context) { // Read data from the client - data, err := client.Read() + data, err := client.Read(c) if err != nil { c.Status(http.StatusInternalServerError) return diff --git a/controller/teardown.go b/controller/teardown.go index 2091805..b39931d 100644 --- a/controller/teardown.go +++ b/controller/teardown.go @@ -1,16 +1,15 @@ package controller import ( - "elasticstream/source/elastic" + "net/http" - "github.com/elastic/go-elasticsearch/v8" "github.com/gin-gonic/gin" ) func Teardown(c *gin.Context) { // Close connection - err := client.Teardown() + err := client.Teardown(c) if err != nil { c.Status(http.StatusInternalServerError) return diff --git a/main.go b/main.go index 8fe2ee5..e5456ba 100644 --- a/main.go +++ b/main.go @@ -16,5 +16,4 @@ func main() { r.DELETE("/teardown", controller.Teardown) r.Run(":8080") - } diff --git a/opencdc/data.go b/opencdc/data.go index 7c615f5..fe513ea 100644 --- a/opencdc/data.go +++ b/opencdc/data.go @@ -1,4 +1,4 @@ -package elasticstream +package opencdc type Header struct { ID string diff --git a/source/elastic/ack.go b/source/elastic/ack.go index f4d6a3d..dc6d0c5 100644 --- a/source/elastic/ack.go +++ b/source/elastic/ack.go @@ -2,10 +2,11 @@ package elastic import ( "context" + + "elasticstream/source" ) -func (c *Client) Teardown(context.Context) error { +func (c *Client) Ack(ctx context.Context, position source.Position) error { return nil - } diff --git a/source/elastic/client.go b/source/elastic/client.go index d1e60bd..4c78b09 100644 --- a/source/elastic/client.go +++ b/source/elastic/client.go @@ -1,23 +1,23 @@ package elastic import ( - "fmt" - "log" + "elasticstream/config" + "elasticstream/opencdc" "github.com/boltdb/bolt" "github.com/elastic/go-elasticsearch/v8" ) type Client struct { - cfg *config.Config - es *elasticsearch.Client - db *bolt.DB - offset map[string]int + cfg *config.Config + es *elasticsearch.Client + db *bolt.DB + offsets map[string]int + ch chan opencdc.Data } -func NewClient(config *Config) *Client { +func NewClient() *Client { client := &Client{ - config: config, offsets: make(map[string]int), } return client diff --git a/source/elastic/configure.go b/source/elastic/configure.go index 2dd7f37..03e065c 100644 --- a/source/elastic/configure.go +++ b/source/elastic/configure.go @@ -2,8 +2,10 @@ package elastic import ( "context" + + "elasticstream/config" ) -func (c *Client) Configure(context.Context, Config) error { +func (c *Client) Configure(ctx context.Context, cfg config.Config) error { return nil } diff --git a/source/elastic/db.go b/source/elastic/db.go index 22753a3..252e851 100644 --- a/source/elastic/db.go +++ b/source/elastic/db.go @@ -1,4 +1,4 @@ -package elasticstream +package elastic import ( "fmt" diff --git a/source/elastic/open.go b/source/elastic/open.go index 7e35692..61ad0ef 100644 --- a/source/elastic/open.go +++ b/source/elastic/open.go @@ -2,9 +2,14 @@ package elastic import ( "context" + + "elasticstream/opencdc" + "elasticstream/source" + + "github.com/elastic/go-elasticsearch/v8" ) -func Open(context.Context, []Position) error { +func (c *Client) Open(ctx context.Context, positions []source.Position) error { // Open a connection with ElasticSearch cfg := elasticsearch.Config{ @@ -20,21 +25,18 @@ func Open(context.Context, []Position) error { } // create a buffer channel - c.ch = make(chan Data, c.cfg.BatchSize) + c.ch = make(chan opencdc.Data, c.cfg.BatchSize) - // open bolt db - c.db, err = bolt.Open(c.cfg.DBPath, 0644, nil) - if err != nil { - return err - } - - for _, index := range c.config.Indexes { - offset, err := getOffset(c.db, index) - if err != nil { - return err + for _, index := range c.cfg.Indexes { + offset := 0 + for _, position := range positions { + if index == position.Index { + offset = position.Pos + } } + c.offsets[index] = offset - NewWorker(c.es, index, offset, c.config.BatchSize) + NewWorker(c, index, offset) } return nil diff --git a/source/elastic/read.go b/source/elastic/read.go index d2d2b7f..3312f0b 100644 --- a/source/elastic/read.go +++ b/source/elastic/read.go @@ -2,9 +2,12 @@ package elastic import ( "context" + "fmt" + + "elasticstream/opencdc" ) -func (c *Client) Read(context.Context) (*Data, error) { +func (c *Client) Read(context.Context) (*opencdc.Data, error) { data, ok := <-c.ch if !ok { diff --git a/source/elastic/search.go b/source/elastic/search.go index 1d0fb30..dd14d78 100644 --- a/source/elastic/search.go +++ b/source/elastic/search.go @@ -1,4 +1,4 @@ -package elasticstream +package elastic import ( "context" @@ -6,12 +6,14 @@ import ( "fmt" "strings" + "elasticstream/opencdc" + "github.com/elastic/go-elasticsearch/esapi" "github.com/elastic/go-elasticsearch/v8" ) // search is calling Elastic Search search API -func search(client *elasticsearch.Client, index string, offset, size *int) ([]Data, error) { +func search(client *elasticsearch.Client, index string, offset, size *int) ([]opencdc.Data, error) { query := fmt.Sprintf(`{ "query": { "match_all": {} @@ -56,11 +58,11 @@ func search(client *elasticsearch.Client, index string, offset, size *int) ([]Da newRecords[i] = hit.Source } - header := Header{Index: index} + header := opencdc.Header{Index: index} - var records []Data + var records []opencdc.Data for _, v := range newRecords { - data := Data{ + data := opencdc.Data{ Header: header, Payload: v, } diff --git a/source/elastic/teardown.go b/source/elastic/teardown.go index ebee6fe..94389c0 100644 --- a/source/elastic/teardown.go +++ b/source/elastic/teardown.go @@ -5,6 +5,6 @@ import ( ) // close the client -func (c *Client) Teardown(context.Context) error { +func (c *Client) Teardown(ctx context.Context) error { return nil } diff --git a/source/elastic/worker.go b/source/elastic/worker.go index 90d1612..bc8777e 100644 --- a/source/elastic/worker.go +++ b/source/elastic/worker.go @@ -1,24 +1,21 @@ -package elasticstream +package elastic import ( "log" "time" - // "github.com/elastic/go-elasticsearch/v8" ) type Worker struct { client *Client index string offset int - size int } -func NewWorker(client *Client, index string, offset, size int) { +func NewWorker(client *Client, index string, offset int) { w := &Worker{ client: client, index: index, offset: offset, - size: size, } go w.start() @@ -27,9 +24,9 @@ func NewWorker(client *Client, index string, offset, size int) { func (w *Worker) start() { for { - log.Printf("worker index=%s offset=%d size=%d\n", w.index, w.offset, w.size) + log.Printf("worker index=%s offset=%d size=%d\n", w.index, w.offset, w.client.cfg.BatchSize) - dataArray, err := search(w.client.es, w.index, &w.offset, &w.size) + dataArray, err := search(w.client.es, w.index, &w.offset, &w.client.cfg.BatchSize) if err != nil { log.Println("search() err:", err) time.Sleep(1 * time.Second) diff --git a/source/interface.go b/source/interface.go index 97087de..59a55d7 100644 --- a/source/interface.go +++ b/source/interface.go @@ -1,7 +1,10 @@ -package elasticstream +package source import ( "context" + + "elasticstream/config" + "elasticstream/opencdc" ) type Position struct { @@ -11,9 +14,9 @@ type Position struct { } type Source interface { - Configure(context.Context, Config) error + Configure(context.Context, config.Config) error Open(context.Context, []Position) error - Read(context.Context) (*Data, error) + Read(context.Context) (*opencdc.Data, error) Ack(context.Context, Position) error Teardown(context.Context) error }