fixed erors

This commit is contained in:
Sangeet Kumar 2024-10-07 16:49:36 +05:30
parent 36412c7601
commit a6da23819b
8 changed files with 35 additions and 49 deletions

View File

@ -6,7 +6,7 @@ import (
"github.com/gin-gonic/gin"
)
func Teardown(c *gin.Context) {
func Close(c *gin.Context) {
// Close connection
err := client.Teardown(c)

View File

@ -1,6 +1,7 @@
package controller
import (
"log"
"net/http"
"elasticstream/config"
@ -13,10 +14,12 @@ import (
var client *elastic.Client
func Open(c *gin.Context) {
log.Println(">>>> controller.Open()")
defer log.Println("<<<< controller.Open()")
client = elastic.NewClient()
err := client.Configure(c, config.Config{
err := client.Configure(c, &config.Config{
Host: "http://test.urantiacloud.com:9200",
Indexes: []string{"index-a", "index-b", "index-c"},
BatchSize: 100,
@ -30,6 +33,8 @@ func Open(c *gin.Context) {
// get position from boltdb
var positions []source.Position
log.Printf("client: %#v\n", client)
err = client.Open(c, positions)
if err != nil {
c.Status(http.StatusInternalServerError)

View File

@ -1,6 +1,8 @@
package main
import (
"log"
"elasticstream/controller"
"github.com/gin-gonic/gin"
@ -8,12 +10,14 @@ import (
func main() {
log.SetFlags(log.LstdFlags | log.Lshortfile)
r := gin.Default()
r.GET("/open", controller.Open)
r.GET("/read", controller.Read)
r.GET("/ack", controller.Ack)
r.DELETE("/teardown", controller.Teardown)
r.GET("/close", controller.Close)
r.Run(":8080")
}

View File

@ -2,10 +2,17 @@ package elastic
import (
"context"
"fmt"
"elasticstream/config"
)
func (c *Client) Configure(ctx context.Context, cfg config.Config) error {
func (c *Client) Configure(ctx context.Context, cfg *config.Config) error {
if c == nil || c.ch == nil {
return fmt.Errorf("error source not opened for reading")
}
c.cfg = cfg
return nil
}

View File

@ -1,40 +0,0 @@
package elastic
import (
"fmt"
"log"
"strconv"
"github.com/boltdb/bolt"
)
func getOffset(db *bolt.DB, index string) (int, error) {
offset := 0
if err := db.Update(func(tx *bolt.Tx) error {
b, err := tx.CreateBucketIfNotExists([]byte("position"))
if err != nil {
return err
}
v := b.Get([]byte(index))
offset, _ = strconv.Atoi(string(v))
return nil
}); err != nil {
return offset, err
}
return offset, nil
}
func setOffset(db *bolt.DB, index string, offset int) error {
// log.Printf("setOffset() index=%s offset=%d", index, offset)
if err := db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("position"))
if err := b.Put([]byte(index), []byte(fmt.Sprintf("%d", offset))); err != nil {
return err
}
return nil
}); err != nil {
log.Fatal(err)
}
return nil
}

View File

@ -2,6 +2,7 @@ package elastic
import (
"context"
"log"
"elasticstream/opencdc"
"elasticstream/source"
@ -10,6 +11,8 @@ import (
)
func (c *Client) Open(ctx context.Context, positions []source.Position) error {
log.Println(">>>> elastic.Open()")
defer log.Println("<<<< elastic.Open()")
// Open a connection with ElasticSearch
cfg := elasticsearch.Config{

View File

@ -3,11 +3,18 @@ package elastic
import (
"context"
"fmt"
"log"
"elasticstream/opencdc"
)
func (c *Client) Read(context.Context) (*opencdc.Data, error) {
log.Println(">>>> elastic.Read()")
defer log.Println("<<<< elastic.Read()")
if c == nil || c.ch == nil {
return nil, fmt.Errorf("error source not opened for reading")
}
data, ok := <-c.ch
if !ok {
@ -26,10 +33,5 @@ func (c *Client) Read(context.Context) (*opencdc.Data, error) {
c.offsets[index] = offset + 1
err := setOffset(c.db, index, c.offsets[index])
if err != nil {
return nil, err
}
return &data, nil
}

View File

@ -2,9 +2,14 @@ package elastic
import (
"context"
"fmt"
)
// close the client
func (c *Client) Teardown(ctx context.Context) error {
if c == nil || c.ch == nil {
return fmt.Errorf("error source not opened for reading")
}
return nil
}