Compare commits

..

4 Commits
main ... master

Author SHA1 Message Date
Parikshit Gothwal 7b46b3b0e8 solution 2024-10-07 22:41:59 +05:30
Sangeet Kumar a6da23819b fixed erors 2024-10-07 16:49:36 +05:30
Parikshit Gothwal 36412c7601 build 2024-10-07 16:26:41 +05:30
Sangeet Kumar ab182ae9af refactoring 2024-10-07 15:25:13 +05:30
28 changed files with 548 additions and 391 deletions

View File

@ -1,5 +1,5 @@
all: all:
go build -o elasticstream cmd/main.go go build -o elasticstream main.go
clean: clean:
rm -f elasticstream rm -f elasticstream

100
client.go
View File

@ -1,100 +0,0 @@
package elasticstream
import (
"fmt"
"log"
"github.com/boltdb/bolt"
"github.com/elastic/go-elasticsearch/v8"
)
type Client struct {
es *elasticsearch.Client
config *Config
ch chan Data
db *bolt.DB
offsets map[string]int
}
func NewClient(config *Config) (*Client, error) {
client := &Client{
config: config,
offsets: make(map[string]int),
}
return client, nil
}
func (c *Client) Open() error {
// Open a connection with ElasticSearch
cfg := elasticsearch.Config{
Addresses: []string{
c.config.Host,
},
}
var err error
c.es, err = elasticsearch.NewClient(cfg)
if err != nil {
return err
}
// create a buffer channel
c.ch = make(chan Data, 25)
// open bolt db
c.db, err = bolt.Open(c.config.DBPath, 0666, nil)
if err != nil {
return err
}
// defer db.Close()
for _, index := range c.config.Indexes {
offset, err := getOffset(c.db, index)
if err != nil {
log.Printf("getOffset(%s) err:%s\n", index, err)
continue
}
c.offsets[index] = offset
NewWorker(c, index, offset, c.config.BatchSize)
}
return nil
}
func (c *Client) Read() (*Data, error) {
data, ok := <-c.ch
if !ok {
return nil, fmt.Errorf("error reading data from channel")
}
index, ok := data.Header["index"]
if !ok {
return nil, fmt.Errorf("error index not found in data header")
}
offset, ok := c.offsets[index]
if !ok {
return nil, fmt.Errorf("error offset index not found")
}
c.offsets[index] = offset + 1
err := setOffset(c.db, index, c.offsets[index])
if err != nil {
return nil, err
}
return &data, nil
}
// func (c *Client) Ack(ctx context.Context, position int) error {
// return nil
// }
// close the client
func (c *Client) Teardown() error {
return nil
}

View File

@ -1,80 +0,0 @@
package main
import (
"context"
"log"
"11-11.dev/goexamples/elasticstream"
)
func main() {
ctx := context.Background()
// STEP 1: Create a new client
client := &elasticstream.Client{}
// STEP 2: Configure() the client
config := &elasticstream.Config{
Host: "http://test.urantiacloud.com:9200",
Indexes: []string{"index-a", "index-b", "index-c"},
BatchSize: 10,
DBPath: "./index.db",
}
err := client.Configure(ctx, config)
if err != nil {
log.Println("client.Configure() err:", err)
return
}
// ------------ IF RUN FOR THE FIRST TIME ----------------
// STEP 2.5: LifecycleOnCreated()
err = client.LifecycleOnCreated(ctx, config)
if err != nil {
log.Println("client.LifecycleOnCreated() err:", err)
return
}
// ------------ IF RUN FOR THE FIRST TIME ----------------
// ------------ IF CONFIG CHANGED ------------------------
// STEP 2.5: LifecycleOnUpdated()
err = client.LifecycleOnUpdated(ctx, config)
if err != nil {
log.Println("client.LifecycleOnUpdated() err:", err)
return
}
// ------------ IF CONFIG CHANGED ------------------------
// STEP 3: Open() the client
err = client.Open(ctx, []Position{})
if err != nil {
log.Println("client.Open() err:", err)
return
}
// STEP 4: Read() using client
// for untill context is cancelled
for {
data, err := client.Read(ctx)
if err != nil {
log.Println("client.Read() err:", err)
continue
}
fmt.Println("data:", data)
}
// STEP 5: Ack()
err = client.Ack(ctx, Position{})
if err != nil {
log.Println("client.Ack() err:", err)
return
}
// STEP 6: Teardown()
err = client.Teardown(ctx)
if err != nil {
log.Println("client.Teardown() err:", err)
return
}
}

View File

@ -1,39 +0,0 @@
package main
import (
"fmt"
"log"
"11-11.dev/goexamples/elasticstream"
)
func main() {
config := &elasticstream.Config{
Host: "http://test.urantiacloud.com:9200",
Indexes: []string{"index-a", "index-b", "index-c"},
BatchSize: 10,
DBPath: "./index.db",
}
client, err := elasticstream.NewClient(config)
if err != nil {
log.Println("elasticstream.NewClient() err:", err)
return
}
err = client.Open()
if err != nil {
log.Println("client.Open() err:", err)
return
}
for {
data, err := client.Read()
if err != nil {
log.Println("eclient.Read() err:", err)
continue
}
fmt.Println(data)
}
}

View File

@ -1,9 +0,0 @@
package elasticstream
type Config struct {
Host string
// map of index name and position from where data is to be read.
Indexes []string
BatchSize int
DBPath string
}

8
config/config.go Normal file
View File

@ -0,0 +1,8 @@
package config
type Config struct {
Host string
Indexes []string
BatchSize int
DBPath string
}

26
controller/ack.go Normal file
View File

@ -0,0 +1,26 @@
package controller
import (
"net/http"
"elasticstream/source"
"github.com/gin-gonic/gin"
)
func Ack(c *gin.Context) {
var req source.Position
err := c.BindJSON(&req)
if err != nil {
c.Status(http.StatusBadRequest)
return
}
err = client.Ack(c, req)
if err != nil {
c.Status(http.StatusInternalServerError)
return
}
c.Status(http.StatusOK)
}

18
controller/close.go Normal file
View File

@ -0,0 +1,18 @@
package controller
import (
"net/http"
"github.com/gin-gonic/gin"
)
func Close(c *gin.Context) {
// Close connection
err := client.Teardown(c)
if err != nil {
c.Status(http.StatusInternalServerError)
return
}
c.Status(http.StatusOK)
}

45
controller/open.go Normal file
View File

@ -0,0 +1,45 @@
package controller
import (
"log"
"net/http"
"elasticstream/config"
"elasticstream/source"
"elasticstream/source/elastic"
"github.com/gin-gonic/gin"
)
var client *elastic.Client
func Open(c *gin.Context) {
log.Println(">>>> controller.Open()")
defer log.Println("<<<< controller.Open()")
client = elastic.NewClient()
err := client.Configure(c, &config.Config{
Host: "http://test.urantiacloud.com:9200",
Indexes: []string{"index-a", "index-b", "index-c"},
BatchSize: 100,
DBPath: "index.db",
})
if err != nil {
log.Println("client.Configure() err:", err)
c.Status(http.StatusInternalServerError)
return
}
// get position from boltdb
var positions []source.Position
log.Printf("client: %#v\n", client)
err = client.Open(c, positions)
if err != nil {
c.Status(http.StatusInternalServerError)
return
}
c.Status(http.StatusOK)
}

18
controller/read.go Normal file
View File

@ -0,0 +1,18 @@
package controller
import (
"net/http"
"github.com/gin-gonic/gin"
)
func Read(c *gin.Context) {
// Read data from the client
data, err := client.Read(c)
if err != nil {
c.Status(http.StatusInternalServerError)
return
}
c.JSON(http.StatusOK, data)
}

40
db.go
View File

@ -1,40 +0,0 @@
package elasticstream
import (
"fmt"
"log"
"strconv"
"github.com/boltdb/bolt"
)
func getOffset(db *bolt.DB, index string) (int, error) {
offset := 0
if err := db.Update(func(tx *bolt.Tx) error {
b, err := tx.CreateBucketIfNotExists([]byte("position"))
if err != nil {
return err
}
v := b.Get([]byte(index))
offset, _ = strconv.Atoi(string(v))
return nil
}); err != nil {
return offset, err
}
return offset, nil
}
func setOffset(db *bolt.DB, index string, offset int) error {
// log.Printf("setOffset() index=%s offset=%d", index, offset)
if err := db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("position"))
if err := b.Put([]byte(index), []byte(fmt.Sprintf("%d", offset))); err != nil {
return err
}
return nil
}); err != nil {
log.Fatal(err)
}
return nil
}

32
go.mod
View File

@ -1,19 +1,45 @@
module 11-11.dev/goexamples/elasticstream module elasticstream
go 1.22.4 go 1.22.4
require ( require (
github.com/boltdb/bolt v1.3.1
github.com/elastic/go-elasticsearch v0.0.0 github.com/elastic/go-elasticsearch v0.0.0
github.com/elastic/go-elasticsearch/v8 v8.15.0 github.com/elastic/go-elasticsearch/v8 v8.15.0
github.com/gin-gonic/gin v1.10.0
) )
require ( require (
github.com/boltdb/bolt v1.3.1 // indirect github.com/bytedance/sonic v1.11.6 // indirect
github.com/bytedance/sonic/loader v0.1.1 // indirect
github.com/cloudwego/base64x v0.1.4 // indirect
github.com/cloudwego/iasm v0.2.0 // indirect
github.com/elastic/elastic-transport-go/v8 v8.6.0 // indirect github.com/elastic/elastic-transport-go/v8 v8.6.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.3 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.20.0 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.12 // indirect
go.opentelemetry.io/otel v1.24.0 // indirect go.opentelemetry.io/otel v1.24.0 // indirect
go.opentelemetry.io/otel/metric v1.24.0 // indirect go.opentelemetry.io/otel/metric v1.24.0 // indirect
go.opentelemetry.io/otel/trace v1.24.0 // indirect go.opentelemetry.io/otel/trace v1.24.0 // indirect
golang.org/x/sys v0.19.0 // indirect golang.org/x/arch v0.8.0 // indirect
golang.org/x/crypto v0.23.0 // indirect
golang.org/x/net v0.25.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/text v0.15.0 // indirect
google.golang.org/protobuf v1.34.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
) )

79
go.sum
View File

@ -1,5 +1,14 @@
github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4= github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4=
github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps= github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
github.com/bytedance/sonic v1.11.6 h1:oUp34TzMlL+OY1OUWxHqsdkgC/Zfc85zGqw9siXjrc0=
github.com/bytedance/sonic v1.11.6/go.mod h1:LysEHSvpvDySVdC2f87zGWf6CIKJcAvqab1ZaiQtds4=
github.com/bytedance/sonic/loader v0.1.1 h1:c+e5Pt1k/cy5wMveRDyk2X4B9hF4g7an8N3zCYjJFNM=
github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU=
github.com/cloudwego/base64x v0.1.4 h1:jwCgWpFanWmN8xoIUHa2rtzmkd5J2plF/dnLS6Xd/0Y=
github.com/cloudwego/base64x v0.1.4/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w=
github.com/cloudwego/iasm v0.2.0 h1:1KNIy1I1H9hNNFEEH3DVnI4UujN+1zjpuk6gwHLTssg=
github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/elastic/elastic-transport-go/v8 v8.6.0 h1:Y2S/FBjx1LlCv5m6pWAF2kDJAHoSjSRSJCApolgfthA= github.com/elastic/elastic-transport-go/v8 v8.6.0 h1:Y2S/FBjx1LlCv5m6pWAF2kDJAHoSjSRSJCApolgfthA=
@ -8,17 +17,65 @@ github.com/elastic/go-elasticsearch v0.0.0 h1:Pd5fqOuBxKxv83b0+xOAJDAkziWYwFinWn
github.com/elastic/go-elasticsearch v0.0.0/go.mod h1:TkBSJBuTyFdBnrNqoPc54FN0vKf5c04IdM4zuStJ7xg= github.com/elastic/go-elasticsearch v0.0.0/go.mod h1:TkBSJBuTyFdBnrNqoPc54FN0vKf5c04IdM4zuStJ7xg=
github.com/elastic/go-elasticsearch/v8 v8.15.0 h1:IZyJhe7t7WI3NEFdcHnf6IJXqpRf+8S8QWLtZYYyBYk= github.com/elastic/go-elasticsearch/v8 v8.15.0 h1:IZyJhe7t7WI3NEFdcHnf6IJXqpRf+8S8QWLtZYYyBYk=
github.com/elastic/go-elasticsearch/v8 v8.15.0/go.mod h1:HCON3zj4btpqs2N1jjsAy4a/fiAul+YBP00mBH4xik8= github.com/elastic/go-elasticsearch/v8 v8.15.0/go.mod h1:HCON3zj4btpqs2N1jjsAy4a/fiAul+YBP00mBH4xik8=
github.com/gabriel-vasile/mimetype v1.4.3 h1:in2uUcidCuFcDKtdcBxlR0rJ1+fsokWf+uqxgUFjbI0=
github.com/gabriel-vasile/mimetype v1.4.3/go.mod h1:d8uq/6HKRL6CGdk+aubisF/M5GcPfT7nKyLpA0lbSSk=
github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE=
github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI=
github.com/gin-gonic/gin v1.10.0 h1:nTuyha1TYqgedzytsKYqna+DfLos46nTv2ygFy86HFU=
github.com/gin-gonic/gin v1.10.0/go.mod h1:4PMNQiOhvDRa013RKVbsiNwoyezlm2rm0uX/T7kzp5Y=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ=
github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s=
github.com/go-playground/assert/v2 v2.2.0/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA=
github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY=
github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY=
github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY=
github.com/go-playground/validator/v10 v10.20.0 h1:K9ISHbSaI0lyB2eWMPJo+kOS/FBExVwjEviJTixqxL8=
github.com/go-playground/validator/v10 v10.20.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM=
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM=
github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/knz/go-libedit v1.10.1/go.mod h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgShxwh4x8M=
github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ=
github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE=
github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo= go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo=
go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo= go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo=
go.opentelemetry.io/otel/metric v1.24.0 h1:6EhoGWWK28x1fbpA4tYTOWBkPefTDQnb8WSGXlc88kI= go.opentelemetry.io/otel/metric v1.24.0 h1:6EhoGWWK28x1fbpA4tYTOWBkPefTDQnb8WSGXlc88kI=
@ -27,7 +84,25 @@ go.opentelemetry.io/otel/sdk v1.21.0 h1:FTt8qirL1EysG6sTQRZ5TokkU8d0ugCj8htOgThZ
go.opentelemetry.io/otel/sdk v1.21.0/go.mod h1:Nna6Yv7PWTdgJHVRD9hIYywQBRx7pbox6nwBnZIxl/E= go.opentelemetry.io/otel/sdk v1.21.0/go.mod h1:Nna6Yv7PWTdgJHVRD9hIYywQBRx7pbox6nwBnZIxl/E=
go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y1YELI= go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y1YELI=
go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU= go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU=
golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8=
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/arch v0.8.0 h1:3wRIsP3pM4yUptoR96otTUOXI367OS0+c9eeRi9doIc=
golang.org/x/arch v0.8.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys=
golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI=
golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8=
golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac=
golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk=
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg=
google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
nullprogram.com/x/optparse v1.0.0/go.mod h1:KdyPE+Igbe0jQUrVfMqDMeJQIJZEuyV7pjYmp6pbG50=
rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4=

View File

@ -1,38 +0,0 @@
package elasticstream
type Position struct {
ID string
Index string
Pos int
}
type Source interface {
Configure(context.Context, Config) error
Open(context.Context, []Position) error
Read(context.Context) (*Data, error)
Ack(context.Context, Position) error
Teardown(context.Context) error
// -- Lifecycle events -----------------------------------------------------
// LifecycleOnCreated is called after Configure and before Open when the
// connector is run for the first time. This call will be skipped if the
// connector was already started before. This method can be used to do some
// initialization that needs to happen only once in the lifetime of a
// connector (e.g. create a logical replication slot). Anything that the
// connector creates in this method is considered to be owned by this
// connector and should be cleaned up in LifecycleOnDeleted.
LifecycleOnCreated(ctx context.Context, config config.Config) error
// LifecycleOnUpdated is called after Configure and before Open when the
// connector configuration has changed since the last run. This call will be
// skipped if the connector configuration did not change. It can be used to
// update anything that was initialized in LifecycleOnCreated, in case the
// configuration change affects it.
LifecycleOnUpdated(ctx context.Context, configBefore, configAfter config.Config) error
// LifecycleOnDeleted is called when the connector was deleted. It will be
// the only method that is called in that case. This method can be used to
// clean up anything that was initialized in LifecycleOnCreated.
LifecycleOnDeleted(ctx context.Context, config config.Config) error
mustEmbedUnimplementedSource()
}

23
main.go Normal file
View File

@ -0,0 +1,23 @@
package main
import (
"log"
"elasticstream/controller"
"github.com/gin-gonic/gin"
)
func main() {
log.SetFlags(log.LstdFlags | log.Lshortfile)
r := gin.Default()
r.GET("/open", controller.Open)
r.GET("/read", controller.Read)
r.GET("/ack", controller.Ack)
r.GET("/close", controller.Close)
r.Run(":8080")
}

View File

@ -1,4 +1,4 @@
package elasticstream package opencdc
type Header struct { type Header struct {
ID string ID string

29
source/elastic/ack.go Normal file
View File

@ -0,0 +1,29 @@
package elastic
import (
"context"
"fmt"
"elasticstream/source"
)
func (c *Client) Ack(ctx context.Context, position source.Position) error {
curr := c.offsets[position.Index]
fmt.Println("curr:", curr)
fmt.Println("asked:", position.Pos)
for _, p := range c.positions {
if p.Index == position.Index {
fmt.Println("initial:", p.Pos)
if p.Pos > position.Pos {
return fmt.Errorf("not acknowledged pos less than initial position")
}
}
}
if curr < position.Pos {
return fmt.Errorf("not acknowledged pos more than current position")
}
return nil
}

29
source/elastic/client.go Normal file
View File

@ -0,0 +1,29 @@
package elastic
import (
"sync"
"elasticstream/config"
"elasticstream/opencdc"
"elasticstream/source"
"github.com/elastic/go-elasticsearch/v8"
)
type Client struct {
cfg *config.Config
es *elasticsearch.Client
offsets map[string]int
positions []source.Position
ch chan opencdc.Data
shutdown chan struct{}
wg *sync.WaitGroup
}
func NewClient() *Client {
client := &Client{
offsets: make(map[string]int),
wg: &sync.WaitGroup{},
}
return client
}

View File

@ -0,0 +1,19 @@
package elastic
import (
"context"
"fmt"
"elasticstream/config"
)
func (c *Client) Configure(ctx context.Context, cfg *config.Config) error {
// if c == nil || c.ch == nil {
if c == nil {
return fmt.Errorf("error source not opened for reading")
}
c.cfg = cfg
return nil
}

51
source/elastic/open.go Normal file
View File

@ -0,0 +1,51 @@
package elastic
import (
"context"
"log"
"elasticstream/opencdc"
"elasticstream/source"
"github.com/elastic/go-elasticsearch/v8"
)
func (c *Client) Open(ctx context.Context, positions []source.Position) error {
log.Println(">>>> elastic.Open()")
defer log.Println("<<<< elastic.Open()")
// Open a connection with ElasticSearch
cfg := elasticsearch.Config{
Addresses: []string{
c.cfg.Host,
},
}
var err error
c.es, err = elasticsearch.NewClient(cfg)
if err != nil {
return err
}
// create a buffer channel
c.ch = make(chan opencdc.Data, c.cfg.BatchSize)
c.shutdown = make(chan struct{})
for _, index := range c.cfg.Indexes {
c.wg.Add(1)
offset := 0
for _, position := range positions {
if index == position.Index {
offset = position.Pos
}
}
c.offsets[index] = offset
NewWorker(c, index, offset)
}
c.positions = positions
return nil
}

37
source/elastic/read.go Normal file
View File

@ -0,0 +1,37 @@
package elastic
import (
"context"
"fmt"
"log"
"elasticstream/opencdc"
)
func (c *Client) Read(context.Context) (*opencdc.Data, error) {
log.Println(">>>> elastic.Read()")
defer log.Println("<<<< elastic.Read()")
if c == nil || c.ch == nil {
return nil, fmt.Errorf("error source not opened for reading")
}
data, ok := <-c.ch
if !ok {
return nil, fmt.Errorf("error reading data from channel")
}
index := data.Header.Index
if !ok {
return nil, fmt.Errorf("error index not found in data header")
}
offset, ok := c.offsets[index]
if !ok {
return nil, fmt.Errorf("error offset index not found")
}
c.offsets[index] = offset + 1
return &data, nil
}

View File

@ -1,17 +1,31 @@
package elasticstream package elastic
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"strings" "strings"
"time"
"github.com/elastic/go-elasticsearch/esapi" "github.com/elastic/go-elasticsearch/esapi"
"github.com/elastic/go-elasticsearch/v8" "github.com/elastic/go-elasticsearch/v8"
) )
type SearchResponse struct {
Hits struct {
Total struct {
Value int `json:"value"`
} `json:"total"`
Hits []struct {
Index string `json:"_index"`
ID string `json:"_id"`
Source map[string]interface{} `json:"_source"`
} `json:"hits"`
} `json:"hits"`
}
// search is calling Elastic Search search API // search is calling Elastic Search search API
func search(client *elasticsearch.Client, index string, offset, size *int) ([]Data, error) { func search(client *elasticsearch.Client, index string, offset, size *int) (*SearchResponse, error) {
query := fmt.Sprintf(`{ query := fmt.Sprintf(`{
"query": { "query": {
"match_all": {} "match_all": {}
@ -27,7 +41,8 @@ func search(client *elasticsearch.Client, index string, offset, size *int) ([]Da
} }
// Perform the request // Perform the request
res, err := req.Do(context.Background(), client) ctx, _ := context.WithTimeout(context.TODO(), 5*time.Second)
res, err := req.Do(ctx, client)
if err != nil { if err != nil {
return nil, fmt.Errorf("error getting response: %s", err) return nil, fmt.Errorf("error getting response: %s", err)
} }
@ -37,37 +52,10 @@ func search(client *elasticsearch.Client, index string, offset, size *int) ([]Da
return nil, fmt.Errorf("res.IsError() error: %s", res.String()) return nil, fmt.Errorf("res.IsError() error: %s", res.String())
} }
// Parse the response result := &SearchResponse{}
var result struct { if err := json.NewDecoder(res.Body).Decode(result); err != nil {
Hits struct {
Hits []struct {
Source map[string]interface{} `json:"_source"`
} `json:"hits"`
} `json:"hits"`
}
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("error parsing the response body: %s", err) return nil, fmt.Errorf("error parsing the response body: %s", err)
} }
// Collect the records return result, nil
newRecords := make([]map[string]interface{}, len(result.Hits.Hits))
for i, hit := range result.Hits.Hits {
newRecords[i] = hit.Source
}
header := map[string]string{"index": index}
var records []Data
for _, v := range newRecords {
data := Data{
Header: header,
Payload: v,
}
records = append(records, data)
}
// log.Println("records:", records)
return records, nil
} }

View File

@ -0,0 +1,28 @@
package elastic
import (
"context"
"fmt"
"log"
)
// close the client
func (c *Client) Teardown(ctx context.Context) error {
log.Println(">>>> elastic.Teardown()")
defer log.Println("<<<< elastic.Teardown()")
if c == nil || c.ch == nil {
return fmt.Errorf("error source not opened for reading")
}
close(c.shutdown)
c.wg.Wait()
close(c.ch)
c.ch = nil
// c.es.Close()
return nil
}

66
source/elastic/worker.go Normal file
View File

@ -0,0 +1,66 @@
package elastic
import (
"fmt"
"log"
"time"
"elasticstream/opencdc"
)
type Worker struct {
client *Client
index string
offset int
}
func NewWorker(client *Client, index string, offset int) {
w := &Worker{
client: client,
index: index,
offset: offset,
}
go w.start()
}
func (w *Worker) start() {
defer w.client.wg.Done()
for {
log.Printf("worker index=%s offset=%d size=%d\n", w.index, w.offset, w.client.cfg.BatchSize)
searchResponse, err := search(w.client.es, w.index, &w.offset, &w.client.cfg.BatchSize)
if err != nil || len(searchResponse.Hits.Hits) == 0 {
// log.Println("search() err:", err)
select {
case <-w.client.shutdown:
fmt.Println("shuting donw..")
return
case <-time.After(time.Second):
continue
}
}
for _, hit := range searchResponse.Hits.Hits {
data := opencdc.Data{
Header: opencdc.Header{
ID: hit.ID,
Index: hit.Index,
Position: w.offset + 1,
},
Payload: hit.Source,
}
select {
case w.client.ch <- data:
w.offset++
case <-w.client.shutdown:
fmt.Println("Stopping worker...")
return
}
}
}
}

22
source/interface.go Normal file
View File

@ -0,0 +1,22 @@
package source
import (
"context"
"elasticstream/config"
"elasticstream/opencdc"
)
type Position struct {
ID string `json:"id"`
Index string `json:"index"`
Pos int `json:"pos"`
}
type Source interface {
Configure(context.Context, config.Config) error
Open(context.Context, []Position) error
Read(context.Context) (*opencdc.Data, error)
Ack(context.Context, Position) error
Teardown(context.Context) error
}

View File

@ -1,45 +0,0 @@
package elasticstream
import (
"log"
"time"
// "github.com/elastic/go-elasticsearch/v8"
)
type Worker struct {
client *Client
index string
offset int
size int
}
func NewWorker(client *Client, index string, offset, size int) {
w := &Worker{
client: client,
index: index,
offset: offset,
size: size,
}
go w.start()
}
func (w *Worker) start() {
for {
log.Printf("worker index=%s offset=%d size=%d\n", w.index, w.offset, w.size)
dataArray, err := search(w.client.es, w.index, &w.offset, &w.size)
if err != nil {
log.Println("search() err:", err)
time.Sleep(1 * time.Second)
continue
}
for _, data := range dataArray {
w.client.ch <- data
w.offset++
}
}
}