From a6da23819b68b649e59b1cc45becec0bc36805dc Mon Sep 17 00:00:00 2001 From: Sangeet Kumar Date: Mon, 7 Oct 2024 16:49:36 +0530 Subject: [PATCH] fixed erors --- controller/{teardown.go => close.go} | 2 +- controller/open.go | 7 ++++- main.go | 6 ++++- source/elastic/configure.go | 9 ++++++- source/elastic/db.go | 40 ---------------------------- source/elastic/open.go | 3 +++ source/elastic/read.go | 12 +++++---- source/elastic/teardown.go | 5 ++++ 8 files changed, 35 insertions(+), 49 deletions(-) rename controller/{teardown.go => close.go} (87%) delete mode 100644 source/elastic/db.go diff --git a/controller/teardown.go b/controller/close.go similarity index 87% rename from controller/teardown.go rename to controller/close.go index b39931d..039961a 100644 --- a/controller/teardown.go +++ b/controller/close.go @@ -6,7 +6,7 @@ import ( "github.com/gin-gonic/gin" ) -func Teardown(c *gin.Context) { +func Close(c *gin.Context) { // Close connection err := client.Teardown(c) diff --git a/controller/open.go b/controller/open.go index 1e933d5..836edf1 100644 --- a/controller/open.go +++ b/controller/open.go @@ -1,6 +1,7 @@ package controller import ( + "log" "net/http" "elasticstream/config" @@ -13,10 +14,12 @@ import ( var client *elastic.Client func Open(c *gin.Context) { + log.Println(">>>> controller.Open()") + defer log.Println("<<<< controller.Open()") client = elastic.NewClient() - err := client.Configure(c, config.Config{ + err := client.Configure(c, &config.Config{ Host: "http://test.urantiacloud.com:9200", Indexes: []string{"index-a", "index-b", "index-c"}, BatchSize: 100, @@ -30,6 +33,8 @@ func Open(c *gin.Context) { // get position from boltdb var positions []source.Position + log.Printf("client: %#v\n", client) + err = client.Open(c, positions) if err != nil { c.Status(http.StatusInternalServerError) diff --git a/main.go b/main.go index e5456ba..71770d7 100644 --- a/main.go +++ b/main.go @@ -1,6 +1,8 @@ package main import ( + "log" + "elasticstream/controller" "github.com/gin-gonic/gin" @@ -8,12 +10,14 @@ import ( func main() { + log.SetFlags(log.LstdFlags | log.Lshortfile) + r := gin.Default() r.GET("/open", controller.Open) r.GET("/read", controller.Read) r.GET("/ack", controller.Ack) - r.DELETE("/teardown", controller.Teardown) + r.GET("/close", controller.Close) r.Run(":8080") } diff --git a/source/elastic/configure.go b/source/elastic/configure.go index 03e065c..64d4b29 100644 --- a/source/elastic/configure.go +++ b/source/elastic/configure.go @@ -2,10 +2,17 @@ package elastic import ( "context" + "fmt" "elasticstream/config" ) -func (c *Client) Configure(ctx context.Context, cfg config.Config) error { +func (c *Client) Configure(ctx context.Context, cfg *config.Config) error { + + if c == nil || c.ch == nil { + return fmt.Errorf("error source not opened for reading") + } + + c.cfg = cfg return nil } diff --git a/source/elastic/db.go b/source/elastic/db.go deleted file mode 100644 index 252e851..0000000 --- a/source/elastic/db.go +++ /dev/null @@ -1,40 +0,0 @@ -package elastic - -import ( - "fmt" - "log" - "strconv" - - "github.com/boltdb/bolt" -) - -func getOffset(db *bolt.DB, index string) (int, error) { - offset := 0 - if err := db.Update(func(tx *bolt.Tx) error { - b, err := tx.CreateBucketIfNotExists([]byte("position")) - if err != nil { - return err - } - v := b.Get([]byte(index)) - offset, _ = strconv.Atoi(string(v)) - return nil - }); err != nil { - return offset, err - } - return offset, nil -} - -func setOffset(db *bolt.DB, index string, offset int) error { - // log.Printf("setOffset() index=%s offset=%d", index, offset) - if err := db.Update(func(tx *bolt.Tx) error { - b := tx.Bucket([]byte("position")) - - if err := b.Put([]byte(index), []byte(fmt.Sprintf("%d", offset))); err != nil { - return err - } - return nil - }); err != nil { - log.Fatal(err) - } - return nil -} diff --git a/source/elastic/open.go b/source/elastic/open.go index 61ad0ef..2d07439 100644 --- a/source/elastic/open.go +++ b/source/elastic/open.go @@ -2,6 +2,7 @@ package elastic import ( "context" + "log" "elasticstream/opencdc" "elasticstream/source" @@ -10,6 +11,8 @@ import ( ) func (c *Client) Open(ctx context.Context, positions []source.Position) error { + log.Println(">>>> elastic.Open()") + defer log.Println("<<<< elastic.Open()") // Open a connection with ElasticSearch cfg := elasticsearch.Config{ diff --git a/source/elastic/read.go b/source/elastic/read.go index 3312f0b..3e4f6af 100644 --- a/source/elastic/read.go +++ b/source/elastic/read.go @@ -3,11 +3,18 @@ package elastic import ( "context" "fmt" + "log" "elasticstream/opencdc" ) func (c *Client) Read(context.Context) (*opencdc.Data, error) { + log.Println(">>>> elastic.Read()") + defer log.Println("<<<< elastic.Read()") + + if c == nil || c.ch == nil { + return nil, fmt.Errorf("error source not opened for reading") + } data, ok := <-c.ch if !ok { @@ -26,10 +33,5 @@ func (c *Client) Read(context.Context) (*opencdc.Data, error) { c.offsets[index] = offset + 1 - err := setOffset(c.db, index, c.offsets[index]) - if err != nil { - return nil, err - } - return &data, nil } diff --git a/source/elastic/teardown.go b/source/elastic/teardown.go index 94389c0..3a41497 100644 --- a/source/elastic/teardown.go +++ b/source/elastic/teardown.go @@ -2,9 +2,14 @@ package elastic import ( "context" + "fmt" ) // close the client func (c *Client) Teardown(ctx context.Context) error { + if c == nil || c.ch == nil { + return fmt.Errorf("error source not opened for reading") + } + return nil }