From ab182ae9afa6cbbec49b726ba738d8978ac501b7 Mon Sep 17 00:00:00 2001 From: Sangeet Kumar Date: Mon, 7 Oct 2024 15:25:13 +0530 Subject: [PATCH] refactoring --- Makefile | 2 +- client.go | 100 -------------------------- cmd/dummy.go | 80 --------------------- cmd/main.go | 39 ---------- config.go => config/config.go | 3 +- controller/ack.go | 22 ++++++ controller/open.go | 34 +++++++++ controller/read.go | 19 +++++ controller/teardown.go | 20 ++++++ go.mod | 32 ++++++++- go.sum | 79 +++++++++++++++++++- interface.go | 38 ---------- main.go | 20 ++++++ data.go => opencdc/data.go | 0 source/elastic/ack.go | 11 +++ source/elastic/client.go | 24 +++++++ source/elastic/configure.go | 9 +++ db.go => source/elastic/db.go | 0 source/elastic/open.go | 41 +++++++++++ source/elastic/read.go | 32 +++++++++ search.go => source/elastic/search.go | 2 +- source/elastic/teardown.go | 10 +++ worker.go => source/elastic/worker.go | 0 source/interface.go | 19 +++++ {cmd => utils}/insert.go | 0 {cmd => utils}/search.go | 0 26 files changed, 370 insertions(+), 266 deletions(-) delete mode 100644 client.go delete mode 100644 cmd/dummy.go delete mode 100644 cmd/main.go rename config.go => config/config.go (55%) create mode 100644 controller/ack.go create mode 100644 controller/open.go create mode 100644 controller/read.go create mode 100644 controller/teardown.go delete mode 100644 interface.go create mode 100644 main.go rename data.go => opencdc/data.go (100%) create mode 100644 source/elastic/ack.go create mode 100644 source/elastic/client.go create mode 100644 source/elastic/configure.go rename db.go => source/elastic/db.go (100%) create mode 100644 source/elastic/open.go create mode 100644 source/elastic/read.go rename search.go => source/elastic/search.go (97%) create mode 100644 source/elastic/teardown.go rename worker.go => source/elastic/worker.go (100%) create mode 100644 source/interface.go rename {cmd => utils}/insert.go (100%) rename {cmd => utils}/search.go (100%) diff --git a/Makefile b/Makefile index 9a95485..c5cb43b 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ all: - go build -o elasticstream cmd/main.go + go build -o elasticstream main.go clean: rm -f elasticstream diff --git a/client.go b/client.go deleted file mode 100644 index 846dd3b..0000000 --- a/client.go +++ /dev/null @@ -1,100 +0,0 @@ -package elasticstream - -import ( - "fmt" - "log" - - "github.com/boltdb/bolt" - "github.com/elastic/go-elasticsearch/v8" -) - -type Client struct { - es *elasticsearch.Client - config *Config - ch chan Data - db *bolt.DB - offsets map[string]int -} - -func NewClient(config *Config) (*Client, error) { - client := &Client{ - config: config, - offsets: make(map[string]int), - } - return client, nil -} - -func (c *Client) Open() error { - // Open a connection with ElasticSearch - cfg := elasticsearch.Config{ - Addresses: []string{ - c.config.Host, - }, - } - - var err error - c.es, err = elasticsearch.NewClient(cfg) - if err != nil { - return err - } - - // create a buffer channel - c.ch = make(chan Data, 25) - - // open bolt db - c.db, err = bolt.Open(c.config.DBPath, 0666, nil) - if err != nil { - return err - } - // defer db.Close() - - for _, index := range c.config.Indexes { - offset, err := getOffset(c.db, index) - if err != nil { - log.Printf("getOffset(%s) err:%s\n", index, err) - continue - } - - c.offsets[index] = offset - - NewWorker(c, index, offset, c.config.BatchSize) - } - - return nil -} - -func (c *Client) Read() (*Data, error) { - - data, ok := <-c.ch - if !ok { - return nil, fmt.Errorf("error reading data from channel") - } - - index, ok := data.Header["index"] - if !ok { - return nil, fmt.Errorf("error index not found in data header") - } - - offset, ok := c.offsets[index] - if !ok { - return nil, fmt.Errorf("error offset index not found") - } - - c.offsets[index] = offset + 1 - - err := setOffset(c.db, index, c.offsets[index]) - if err != nil { - return nil, err - } - - return &data, nil -} - -// func (c *Client) Ack(ctx context.Context, position int) error { -// return nil -// } - -// close the client -func (c *Client) Teardown() error { - return nil -} diff --git a/cmd/dummy.go b/cmd/dummy.go deleted file mode 100644 index 9e41d2f..0000000 --- a/cmd/dummy.go +++ /dev/null @@ -1,80 +0,0 @@ -package main - -import ( - "context" - "log" - - "11-11.dev/goexamples/elasticstream" -) - -func main() { - ctx := context.Background() - - // STEP 1: Create a new client - client := &elasticstream.Client{} - - // STEP 2: Configure() the client - config := &elasticstream.Config{ - Host: "http://test.urantiacloud.com:9200", - Indexes: []string{"index-a", "index-b", "index-c"}, - BatchSize: 10, - DBPath: "./index.db", - } - err := client.Configure(ctx, config) - if err != nil { - log.Println("client.Configure() err:", err) - return - } - - // ------------ IF RUN FOR THE FIRST TIME ---------------- - // STEP 2.5: LifecycleOnCreated() - err = client.LifecycleOnCreated(ctx, config) - if err != nil { - log.Println("client.LifecycleOnCreated() err:", err) - return - } - // ------------ IF RUN FOR THE FIRST TIME ---------------- - - // ------------ IF CONFIG CHANGED ------------------------ - // STEP 2.5: LifecycleOnUpdated() - err = client.LifecycleOnUpdated(ctx, config) - if err != nil { - log.Println("client.LifecycleOnUpdated() err:", err) - return - } - // ------------ IF CONFIG CHANGED ------------------------ - - // STEP 3: Open() the client - err = client.Open(ctx, []Position{}) - if err != nil { - log.Println("client.Open() err:", err) - return - } - - // STEP 4: Read() using client - // for untill context is cancelled - for { - data, err := client.Read(ctx) - if err != nil { - log.Println("client.Read() err:", err) - continue - } - - fmt.Println("data:", data) - } - - // STEP 5: Ack() - err = client.Ack(ctx, Position{}) - if err != nil { - log.Println("client.Ack() err:", err) - return - } - - // STEP 6: Teardown() - err = client.Teardown(ctx) - if err != nil { - log.Println("client.Teardown() err:", err) - return - } - -} diff --git a/cmd/main.go b/cmd/main.go deleted file mode 100644 index e58d716..0000000 --- a/cmd/main.go +++ /dev/null @@ -1,39 +0,0 @@ -package main - -import ( - "fmt" - "log" - - "11-11.dev/goexamples/elasticstream" -) - -func main() { - config := &elasticstream.Config{ - Host: "http://test.urantiacloud.com:9200", - Indexes: []string{"index-a", "index-b", "index-c"}, - BatchSize: 10, - DBPath: "./index.db", - } - - client, err := elasticstream.NewClient(config) - if err != nil { - log.Println("elasticstream.NewClient() err:", err) - return - } - - err = client.Open() - if err != nil { - log.Println("client.Open() err:", err) - return - } - - for { - data, err := client.Read() - if err != nil { - log.Println("eclient.Read() err:", err) - continue - } - - fmt.Println(data) - } -} diff --git a/config.go b/config/config.go similarity index 55% rename from config.go rename to config/config.go index 8a6838c..a2a7a54 100644 --- a/config.go +++ b/config/config.go @@ -1,8 +1,7 @@ package elasticstream type Config struct { - Host string - // map of index name and position from where data is to be read. + Host string Indexes []string BatchSize int DBPath string diff --git a/controller/ack.go b/controller/ack.go new file mode 100644 index 0000000..58c6828 --- /dev/null +++ b/controller/ack.go @@ -0,0 +1,22 @@ +package controller + +import ( + "elasticstream/source/elastic" + + "github.com/elastic/go-elasticsearch/v8" + "github.com/gin-gonic/gin" +) + +func Ack(c *gin.Context) { + + var req Position + + c.BindJSON(c, &req) + + err := client.Ack(c, req) + if err != nil { + c.Status(http.StatusInternalServerError) + return + } + c.Status(http.StatusOK) +} diff --git a/controller/open.go b/controller/open.go new file mode 100644 index 0000000..3332334 --- /dev/null +++ b/controller/open.go @@ -0,0 +1,34 @@ +package controller + +import ( + "elasticstream/source/elastic" + + "github.com/elastic/go-elasticsearch/v8" + "github.com/gin-gonic/gin" +) + +var client *elastic.Client + +func Open(c *gin.Context) { + + client = elastic.NewClient() + + err := client.Configure(&Config{ + Host: "http://test.urantiacloud.com:9200", + Indexes: []string{"index-a", "index-b", "index-c"}, + BatchSize: 100, + DBPath: "index.db", + }) + if err != nil { + c.Status(http.StatusInternalServerError) + return + } + + // Open connection with Elastic Search + err = client.Open() + if err != nil { + c.Status(http.StatusInternalServerError) + return + } + c.Status(http.StatusOK) +} diff --git a/controller/read.go b/controller/read.go new file mode 100644 index 0000000..d359ef6 --- /dev/null +++ b/controller/read.go @@ -0,0 +1,19 @@ +package controller + +import ( + "elasticstream/source/elastic" + + "github.com/elastic/go-elasticsearch/v8" + "github.com/gin-gonic/gin" +) + +func Read(c *gin.Context) { + + // Read data from the client + data, err := client.Read() + if err != nil { + c.Status(http.StatusInternalServerError) + return + } + c.JSON(http.StatusOK, data) +} diff --git a/controller/teardown.go b/controller/teardown.go new file mode 100644 index 0000000..2091805 --- /dev/null +++ b/controller/teardown.go @@ -0,0 +1,20 @@ +package controller + +import ( + "elasticstream/source/elastic" + + "github.com/elastic/go-elasticsearch/v8" + "github.com/gin-gonic/gin" +) + +func Teardown(c *gin.Context) { + + // Close connection + err := client.Teardown() + if err != nil { + c.Status(http.StatusInternalServerError) + return + } + c.Status(http.StatusOK) + +} diff --git a/go.mod b/go.mod index 8f554a4..ca11036 100644 --- a/go.mod +++ b/go.mod @@ -1,19 +1,45 @@ -module 11-11.dev/goexamples/elasticstream +module elasticstream go 1.22.4 require ( + github.com/boltdb/bolt v1.3.1 github.com/elastic/go-elasticsearch v0.0.0 github.com/elastic/go-elasticsearch/v8 v8.15.0 + github.com/gin-gonic/gin v1.10.0 ) require ( - github.com/boltdb/bolt v1.3.1 // indirect + github.com/bytedance/sonic v1.11.6 // indirect + github.com/bytedance/sonic/loader v0.1.1 // indirect + github.com/cloudwego/base64x v0.1.4 // indirect + github.com/cloudwego/iasm v0.2.0 // indirect github.com/elastic/elastic-transport-go/v8 v8.6.0 // indirect + github.com/gabriel-vasile/mimetype v1.4.3 // indirect + github.com/gin-contrib/sse v0.1.0 // indirect github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/stdr v1.2.2 // indirect + github.com/go-playground/locales v0.14.1 // indirect + github.com/go-playground/universal-translator v0.18.1 // indirect + github.com/go-playground/validator/v10 v10.20.0 // indirect + github.com/goccy/go-json v0.10.2 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/cpuid/v2 v2.2.7 // indirect + github.com/leodido/go-urn v1.4.0 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/pelletier/go-toml/v2 v2.2.2 // indirect + github.com/twitchyliquid64/golang-asm v0.15.1 // indirect + github.com/ugorji/go/codec v1.2.12 // indirect go.opentelemetry.io/otel v1.24.0 // indirect go.opentelemetry.io/otel/metric v1.24.0 // indirect go.opentelemetry.io/otel/trace v1.24.0 // indirect - golang.org/x/sys v0.19.0 // indirect + golang.org/x/arch v0.8.0 // indirect + golang.org/x/crypto v0.23.0 // indirect + golang.org/x/net v0.25.0 // indirect + golang.org/x/sys v0.20.0 // indirect + golang.org/x/text v0.15.0 // indirect + google.golang.org/protobuf v1.34.1 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 1ed9043..682b83d 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,14 @@ github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4= github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps= +github.com/bytedance/sonic v1.11.6 h1:oUp34TzMlL+OY1OUWxHqsdkgC/Zfc85zGqw9siXjrc0= +github.com/bytedance/sonic v1.11.6/go.mod h1:LysEHSvpvDySVdC2f87zGWf6CIKJcAvqab1ZaiQtds4= +github.com/bytedance/sonic/loader v0.1.1 h1:c+e5Pt1k/cy5wMveRDyk2X4B9hF4g7an8N3zCYjJFNM= +github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU= +github.com/cloudwego/base64x v0.1.4 h1:jwCgWpFanWmN8xoIUHa2rtzmkd5J2plF/dnLS6Xd/0Y= +github.com/cloudwego/base64x v0.1.4/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w= +github.com/cloudwego/iasm v0.2.0 h1:1KNIy1I1H9hNNFEEH3DVnI4UujN+1zjpuk6gwHLTssg= +github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/elastic/elastic-transport-go/v8 v8.6.0 h1:Y2S/FBjx1LlCv5m6pWAF2kDJAHoSjSRSJCApolgfthA= @@ -8,17 +17,65 @@ github.com/elastic/go-elasticsearch v0.0.0 h1:Pd5fqOuBxKxv83b0+xOAJDAkziWYwFinWn github.com/elastic/go-elasticsearch v0.0.0/go.mod h1:TkBSJBuTyFdBnrNqoPc54FN0vKf5c04IdM4zuStJ7xg= github.com/elastic/go-elasticsearch/v8 v8.15.0 h1:IZyJhe7t7WI3NEFdcHnf6IJXqpRf+8S8QWLtZYYyBYk= github.com/elastic/go-elasticsearch/v8 v8.15.0/go.mod h1:HCON3zj4btpqs2N1jjsAy4a/fiAul+YBP00mBH4xik8= +github.com/gabriel-vasile/mimetype v1.4.3 h1:in2uUcidCuFcDKtdcBxlR0rJ1+fsokWf+uqxgUFjbI0= +github.com/gabriel-vasile/mimetype v1.4.3/go.mod h1:d8uq/6HKRL6CGdk+aubisF/M5GcPfT7nKyLpA0lbSSk= +github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= +github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= +github.com/gin-gonic/gin v1.10.0 h1:nTuyha1TYqgedzytsKYqna+DfLos46nTv2ygFy86HFU= +github.com/gin-gonic/gin v1.10.0/go.mod h1:4PMNQiOhvDRa013RKVbsiNwoyezlm2rm0uX/T7kzp5Y= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s= +github.com/go-playground/assert/v2 v2.2.0/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= +github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA= +github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY= +github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY= +github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY= +github.com/go-playground/validator/v10 v10.20.0 h1:K9ISHbSaI0lyB2eWMPJo+kOS/FBExVwjEviJTixqxL8= +github.com/go-playground/validator/v10 v10.20.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM= +github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= +github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= +github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM= +github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= +github.com/knz/go-libedit v1.10.1/go.mod h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgShxwh4x8M= +github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= +github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= +github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= +github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= +github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE= +github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg= go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo= go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo= go.opentelemetry.io/otel/metric v1.24.0 h1:6EhoGWWK28x1fbpA4tYTOWBkPefTDQnb8WSGXlc88kI= @@ -27,7 +84,25 @@ go.opentelemetry.io/otel/sdk v1.21.0 h1:FTt8qirL1EysG6sTQRZ5TokkU8d0ugCj8htOgThZ go.opentelemetry.io/otel/sdk v1.21.0/go.mod h1:Nna6Yv7PWTdgJHVRD9hIYywQBRx7pbox6nwBnZIxl/E= go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y1YELI= go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU= -golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= -golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= +golang.org/x/arch v0.8.0 h1:3wRIsP3pM4yUptoR96otTUOXI367OS0+c9eeRi9doIc= +golang.org/x/arch v0.8.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys= +golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI= +golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= +golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= +golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= +golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= +golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= +google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +nullprogram.com/x/optparse v1.0.0/go.mod h1:KdyPE+Igbe0jQUrVfMqDMeJQIJZEuyV7pjYmp6pbG50= +rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= diff --git a/interface.go b/interface.go deleted file mode 100644 index b606c48..0000000 --- a/interface.go +++ /dev/null @@ -1,38 +0,0 @@ -package elasticstream - -type Position struct { - ID string - Index string - Pos int -} - -type Source interface { - Configure(context.Context, Config) error - Open(context.Context, []Position) error - Read(context.Context) (*Data, error) - Ack(context.Context, Position) error - Teardown(context.Context) error - - // -- Lifecycle events ----------------------------------------------------- - - // LifecycleOnCreated is called after Configure and before Open when the - // connector is run for the first time. This call will be skipped if the - // connector was already started before. This method can be used to do some - // initialization that needs to happen only once in the lifetime of a - // connector (e.g. create a logical replication slot). Anything that the - // connector creates in this method is considered to be owned by this - // connector and should be cleaned up in LifecycleOnDeleted. - LifecycleOnCreated(ctx context.Context, config config.Config) error - // LifecycleOnUpdated is called after Configure and before Open when the - // connector configuration has changed since the last run. This call will be - // skipped if the connector configuration did not change. It can be used to - // update anything that was initialized in LifecycleOnCreated, in case the - // configuration change affects it. - LifecycleOnUpdated(ctx context.Context, configBefore, configAfter config.Config) error - // LifecycleOnDeleted is called when the connector was deleted. It will be - // the only method that is called in that case. This method can be used to - // clean up anything that was initialized in LifecycleOnCreated. - LifecycleOnDeleted(ctx context.Context, config config.Config) error - - mustEmbedUnimplementedSource() -} diff --git a/main.go b/main.go new file mode 100644 index 0000000..8fe2ee5 --- /dev/null +++ b/main.go @@ -0,0 +1,20 @@ +package main + +import ( + "elasticstream/controller" + + "github.com/gin-gonic/gin" +) + +func main() { + + r := gin.Default() + + r.GET("/open", controller.Open) + r.GET("/read", controller.Read) + r.GET("/ack", controller.Ack) + r.DELETE("/teardown", controller.Teardown) + + r.Run(":8080") + +} diff --git a/data.go b/opencdc/data.go similarity index 100% rename from data.go rename to opencdc/data.go diff --git a/source/elastic/ack.go b/source/elastic/ack.go new file mode 100644 index 0000000..f4d6a3d --- /dev/null +++ b/source/elastic/ack.go @@ -0,0 +1,11 @@ +package elastic + +import ( + "context" +) + +func (c *Client) Teardown(context.Context) error { + + return nil + +} diff --git a/source/elastic/client.go b/source/elastic/client.go new file mode 100644 index 0000000..d1e60bd --- /dev/null +++ b/source/elastic/client.go @@ -0,0 +1,24 @@ +package elastic + +import ( + "fmt" + "log" + + "github.com/boltdb/bolt" + "github.com/elastic/go-elasticsearch/v8" +) + +type Client struct { + cfg *config.Config + es *elasticsearch.Client + db *bolt.DB + offset map[string]int +} + +func NewClient(config *Config) *Client { + client := &Client{ + config: config, + offsets: make(map[string]int), + } + return client +} diff --git a/source/elastic/configure.go b/source/elastic/configure.go new file mode 100644 index 0000000..2dd7f37 --- /dev/null +++ b/source/elastic/configure.go @@ -0,0 +1,9 @@ +package elastic + +import ( + "context" +) + +func (c *Client) Configure(context.Context, Config) error { + return nil +} diff --git a/db.go b/source/elastic/db.go similarity index 100% rename from db.go rename to source/elastic/db.go diff --git a/source/elastic/open.go b/source/elastic/open.go new file mode 100644 index 0000000..7e35692 --- /dev/null +++ b/source/elastic/open.go @@ -0,0 +1,41 @@ +package elastic + +import ( + "context" +) + +func Open(context.Context, []Position) error { + + // Open a connection with ElasticSearch + cfg := elasticsearch.Config{ + Addresses: []string{ + c.cfg.Host, + }, + } + + var err error + c.es, err = elasticsearch.NewClient(cfg) + if err != nil { + return err + } + + // create a buffer channel + c.ch = make(chan Data, c.cfg.BatchSize) + + // open bolt db + c.db, err = bolt.Open(c.cfg.DBPath, 0644, nil) + if err != nil { + return err + } + + for _, index := range c.config.Indexes { + offset, err := getOffset(c.db, index) + if err != nil { + return err + } + c.offsets[index] = offset + NewWorker(c.es, index, offset, c.config.BatchSize) + } + + return nil +} diff --git a/source/elastic/read.go b/source/elastic/read.go new file mode 100644 index 0000000..d2d2b7f --- /dev/null +++ b/source/elastic/read.go @@ -0,0 +1,32 @@ +package elastic + +import ( + "context" +) + +func (c *Client) Read(context.Context) (*Data, error) { + + data, ok := <-c.ch + if !ok { + return nil, fmt.Errorf("error reading data from channel") + } + + index := data.Header.Index + if !ok { + return nil, fmt.Errorf("error index not found in data header") + } + + offset, ok := c.offsets[index] + if !ok { + return nil, fmt.Errorf("error offset index not found") + } + + c.offsets[index] = offset + 1 + + err := setOffset(c.db, index, c.offsets[index]) + if err != nil { + return nil, err + } + + return &data, nil +} diff --git a/search.go b/source/elastic/search.go similarity index 97% rename from search.go rename to source/elastic/search.go index f7ebb81..1d0fb30 100644 --- a/search.go +++ b/source/elastic/search.go @@ -56,7 +56,7 @@ func search(client *elasticsearch.Client, index string, offset, size *int) ([]Da newRecords[i] = hit.Source } - header := map[string]string{"index": index} + header := Header{Index: index} var records []Data for _, v := range newRecords { diff --git a/source/elastic/teardown.go b/source/elastic/teardown.go new file mode 100644 index 0000000..ebee6fe --- /dev/null +++ b/source/elastic/teardown.go @@ -0,0 +1,10 @@ +package elastic + +import ( + "context" +) + +// close the client +func (c *Client) Teardown(context.Context) error { + return nil +} diff --git a/worker.go b/source/elastic/worker.go similarity index 100% rename from worker.go rename to source/elastic/worker.go diff --git a/source/interface.go b/source/interface.go new file mode 100644 index 0000000..97087de --- /dev/null +++ b/source/interface.go @@ -0,0 +1,19 @@ +package elasticstream + +import ( + "context" +) + +type Position struct { + ID string + Index string + Pos int +} + +type Source interface { + Configure(context.Context, Config) error + Open(context.Context, []Position) error + Read(context.Context) (*Data, error) + Ack(context.Context, Position) error + Teardown(context.Context) error +} diff --git a/cmd/insert.go b/utils/insert.go similarity index 100% rename from cmd/insert.go rename to utils/insert.go diff --git a/cmd/search.go b/utils/search.go similarity index 100% rename from cmd/search.go rename to utils/search.go