Compare commits
4 Commits
Author | SHA1 | Date |
---|---|---|
Parikshit Gothwal | 7b46b3b0e8 | |
Sangeet Kumar | a6da23819b | |
Parikshit Gothwal | 36412c7601 | |
Sangeet Kumar | ab182ae9af |
2
Makefile
2
Makefile
|
@ -1,5 +1,5 @@
|
|||
all:
|
||||
go build -o elasticstream cmd/main.go
|
||||
go build -o elasticstream main.go
|
||||
|
||||
clean:
|
||||
rm -f elasticstream
|
||||
|
|
100
client.go
100
client.go
|
@ -1,100 +0,0 @@
|
|||
package elasticstream
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
"github.com/elastic/go-elasticsearch/v8"
|
||||
)
|
||||
|
||||
type Client struct {
|
||||
es *elasticsearch.Client
|
||||
config *Config
|
||||
ch chan Data
|
||||
db *bolt.DB
|
||||
offsets map[string]int
|
||||
}
|
||||
|
||||
func NewClient(config *Config) (*Client, error) {
|
||||
client := &Client{
|
||||
config: config,
|
||||
offsets: make(map[string]int),
|
||||
}
|
||||
return client, nil
|
||||
}
|
||||
|
||||
func (c *Client) Open() error {
|
||||
// Open a connection with ElasticSearch
|
||||
cfg := elasticsearch.Config{
|
||||
Addresses: []string{
|
||||
c.config.Host,
|
||||
},
|
||||
}
|
||||
|
||||
var err error
|
||||
c.es, err = elasticsearch.NewClient(cfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// create a buffer channel
|
||||
c.ch = make(chan Data, 25)
|
||||
|
||||
// open bolt db
|
||||
c.db, err = bolt.Open(c.config.DBPath, 0666, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// defer db.Close()
|
||||
|
||||
for _, index := range c.config.Indexes {
|
||||
offset, err := getOffset(c.db, index)
|
||||
if err != nil {
|
||||
log.Printf("getOffset(%s) err:%s\n", index, err)
|
||||
continue
|
||||
}
|
||||
|
||||
c.offsets[index] = offset
|
||||
|
||||
NewWorker(c, index, offset, c.config.BatchSize)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) Read() (*Data, error) {
|
||||
|
||||
data, ok := <-c.ch
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("error reading data from channel")
|
||||
}
|
||||
|
||||
index, ok := data.Header["index"]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("error index not found in data header")
|
||||
}
|
||||
|
||||
offset, ok := c.offsets[index]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("error offset index not found")
|
||||
}
|
||||
|
||||
c.offsets[index] = offset + 1
|
||||
|
||||
err := setOffset(c.db, index, c.offsets[index])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &data, nil
|
||||
}
|
||||
|
||||
// func (c *Client) Ack(ctx context.Context, position int) error {
|
||||
// return nil
|
||||
// }
|
||||
|
||||
// close the client
|
||||
func (c *Client) Teardown() error {
|
||||
return nil
|
||||
}
|
80
cmd/dummy.go
80
cmd/dummy.go
|
@ -1,80 +0,0 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
|
||||
"11-11.dev/goexamples/elasticstream"
|
||||
)
|
||||
|
||||
func main() {
|
||||
ctx := context.Background()
|
||||
|
||||
// STEP 1: Create a new client
|
||||
client := &elasticstream.Client{}
|
||||
|
||||
// STEP 2: Configure() the client
|
||||
config := &elasticstream.Config{
|
||||
Host: "http://test.urantiacloud.com:9200",
|
||||
Indexes: []string{"index-a", "index-b", "index-c"},
|
||||
BatchSize: 10,
|
||||
DBPath: "./index.db",
|
||||
}
|
||||
err := client.Configure(ctx, config)
|
||||
if err != nil {
|
||||
log.Println("client.Configure() err:", err)
|
||||
return
|
||||
}
|
||||
|
||||
// ------------ IF RUN FOR THE FIRST TIME ----------------
|
||||
// STEP 2.5: LifecycleOnCreated()
|
||||
err = client.LifecycleOnCreated(ctx, config)
|
||||
if err != nil {
|
||||
log.Println("client.LifecycleOnCreated() err:", err)
|
||||
return
|
||||
}
|
||||
// ------------ IF RUN FOR THE FIRST TIME ----------------
|
||||
|
||||
// ------------ IF CONFIG CHANGED ------------------------
|
||||
// STEP 2.5: LifecycleOnUpdated()
|
||||
err = client.LifecycleOnUpdated(ctx, config)
|
||||
if err != nil {
|
||||
log.Println("client.LifecycleOnUpdated() err:", err)
|
||||
return
|
||||
}
|
||||
// ------------ IF CONFIG CHANGED ------------------------
|
||||
|
||||
// STEP 3: Open() the client
|
||||
err = client.Open(ctx, []Position{})
|
||||
if err != nil {
|
||||
log.Println("client.Open() err:", err)
|
||||
return
|
||||
}
|
||||
|
||||
// STEP 4: Read() using client
|
||||
// for untill context is cancelled
|
||||
for {
|
||||
data, err := client.Read(ctx)
|
||||
if err != nil {
|
||||
log.Println("client.Read() err:", err)
|
||||
continue
|
||||
}
|
||||
|
||||
fmt.Println("data:", data)
|
||||
}
|
||||
|
||||
// STEP 5: Ack()
|
||||
err = client.Ack(ctx, Position{})
|
||||
if err != nil {
|
||||
log.Println("client.Ack() err:", err)
|
||||
return
|
||||
}
|
||||
|
||||
// STEP 6: Teardown()
|
||||
err = client.Teardown(ctx)
|
||||
if err != nil {
|
||||
log.Println("client.Teardown() err:", err)
|
||||
return
|
||||
}
|
||||
|
||||
}
|
39
cmd/main.go
39
cmd/main.go
|
@ -1,39 +0,0 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
"11-11.dev/goexamples/elasticstream"
|
||||
)
|
||||
|
||||
func main() {
|
||||
config := &elasticstream.Config{
|
||||
Host: "http://test.urantiacloud.com:9200",
|
||||
Indexes: []string{"index-a", "index-b", "index-c"},
|
||||
BatchSize: 10,
|
||||
DBPath: "./index.db",
|
||||
}
|
||||
|
||||
client, err := elasticstream.NewClient(config)
|
||||
if err != nil {
|
||||
log.Println("elasticstream.NewClient() err:", err)
|
||||
return
|
||||
}
|
||||
|
||||
err = client.Open()
|
||||
if err != nil {
|
||||
log.Println("client.Open() err:", err)
|
||||
return
|
||||
}
|
||||
|
||||
for {
|
||||
data, err := client.Read()
|
||||
if err != nil {
|
||||
log.Println("eclient.Read() err:", err)
|
||||
continue
|
||||
}
|
||||
|
||||
fmt.Println(data)
|
||||
}
|
||||
}
|
|
@ -1,9 +0,0 @@
|
|||
package elasticstream
|
||||
|
||||
type Config struct {
|
||||
Host string
|
||||
// map of index name and position from where data is to be read.
|
||||
Indexes []string
|
||||
BatchSize int
|
||||
DBPath string
|
||||
}
|
|
@ -0,0 +1,8 @@
|
|||
package config
|
||||
|
||||
type Config struct {
|
||||
Host string
|
||||
Indexes []string
|
||||
BatchSize int
|
||||
DBPath string
|
||||
}
|
|
@ -0,0 +1,26 @@
|
|||
package controller
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"elasticstream/source"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
func Ack(c *gin.Context) {
|
||||
var req source.Position
|
||||
|
||||
err := c.BindJSON(&req)
|
||||
if err != nil {
|
||||
c.Status(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
err = client.Ack(c, req)
|
||||
if err != nil {
|
||||
c.Status(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
c.Status(http.StatusOK)
|
||||
}
|
|
@ -0,0 +1,18 @@
|
|||
package controller
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
func Close(c *gin.Context) {
|
||||
|
||||
// Close connection
|
||||
err := client.Teardown(c)
|
||||
if err != nil {
|
||||
c.Status(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
c.Status(http.StatusOK)
|
||||
}
|
|
@ -0,0 +1,45 @@
|
|||
package controller
|
||||
|
||||
import (
|
||||
"log"
|
||||
"net/http"
|
||||
|
||||
"elasticstream/config"
|
||||
"elasticstream/source"
|
||||
"elasticstream/source/elastic"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
var client *elastic.Client
|
||||
|
||||
func Open(c *gin.Context) {
|
||||
log.Println(">>>> controller.Open()")
|
||||
defer log.Println("<<<< controller.Open()")
|
||||
|
||||
client = elastic.NewClient()
|
||||
|
||||
err := client.Configure(c, &config.Config{
|
||||
Host: "http://test.urantiacloud.com:9200",
|
||||
Indexes: []string{"index-a", "index-b", "index-c"},
|
||||
BatchSize: 100,
|
||||
DBPath: "index.db",
|
||||
})
|
||||
if err != nil {
|
||||
log.Println("client.Configure() err:", err)
|
||||
c.Status(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
// get position from boltdb
|
||||
var positions []source.Position
|
||||
|
||||
log.Printf("client: %#v\n", client)
|
||||
|
||||
err = client.Open(c, positions)
|
||||
if err != nil {
|
||||
c.Status(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
c.Status(http.StatusOK)
|
||||
}
|
|
@ -0,0 +1,18 @@
|
|||
package controller
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
func Read(c *gin.Context) {
|
||||
|
||||
// Read data from the client
|
||||
data, err := client.Read(c)
|
||||
if err != nil {
|
||||
c.Status(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusOK, data)
|
||||
}
|
40
db.go
40
db.go
|
@ -1,40 +0,0 @@
|
|||
package elasticstream
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"strconv"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
)
|
||||
|
||||
func getOffset(db *bolt.DB, index string) (int, error) {
|
||||
offset := 0
|
||||
if err := db.Update(func(tx *bolt.Tx) error {
|
||||
b, err := tx.CreateBucketIfNotExists([]byte("position"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
v := b.Get([]byte(index))
|
||||
offset, _ = strconv.Atoi(string(v))
|
||||
return nil
|
||||
}); err != nil {
|
||||
return offset, err
|
||||
}
|
||||
return offset, nil
|
||||
}
|
||||
|
||||
func setOffset(db *bolt.DB, index string, offset int) error {
|
||||
// log.Printf("setOffset() index=%s offset=%d", index, offset)
|
||||
if err := db.Update(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket([]byte("position"))
|
||||
|
||||
if err := b.Put([]byte(index), []byte(fmt.Sprintf("%d", offset))); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
return nil
|
||||
}
|
32
go.mod
32
go.mod
|
@ -1,19 +1,45 @@
|
|||
module 11-11.dev/goexamples/elasticstream
|
||||
module elasticstream
|
||||
|
||||
go 1.22.4
|
||||
|
||||
require (
|
||||
github.com/boltdb/bolt v1.3.1
|
||||
github.com/elastic/go-elasticsearch v0.0.0
|
||||
github.com/elastic/go-elasticsearch/v8 v8.15.0
|
||||
github.com/gin-gonic/gin v1.10.0
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/boltdb/bolt v1.3.1 // indirect
|
||||
github.com/bytedance/sonic v1.11.6 // indirect
|
||||
github.com/bytedance/sonic/loader v0.1.1 // indirect
|
||||
github.com/cloudwego/base64x v0.1.4 // indirect
|
||||
github.com/cloudwego/iasm v0.2.0 // indirect
|
||||
github.com/elastic/elastic-transport-go/v8 v8.6.0 // indirect
|
||||
github.com/gabriel-vasile/mimetype v1.4.3 // indirect
|
||||
github.com/gin-contrib/sse v0.1.0 // indirect
|
||||
github.com/go-logr/logr v1.4.1 // indirect
|
||||
github.com/go-logr/stdr v1.2.2 // indirect
|
||||
github.com/go-playground/locales v0.14.1 // indirect
|
||||
github.com/go-playground/universal-translator v0.18.1 // indirect
|
||||
github.com/go-playground/validator/v10 v10.20.0 // indirect
|
||||
github.com/goccy/go-json v0.10.2 // indirect
|
||||
github.com/json-iterator/go v1.1.12 // indirect
|
||||
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
|
||||
github.com/leodido/go-urn v1.4.0 // indirect
|
||||
github.com/mattn/go-isatty v0.0.20 // indirect
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
|
||||
github.com/modern-go/reflect2 v1.0.2 // indirect
|
||||
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
|
||||
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
|
||||
github.com/ugorji/go/codec v1.2.12 // indirect
|
||||
go.opentelemetry.io/otel v1.24.0 // indirect
|
||||
go.opentelemetry.io/otel/metric v1.24.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.24.0 // indirect
|
||||
golang.org/x/sys v0.19.0 // indirect
|
||||
golang.org/x/arch v0.8.0 // indirect
|
||||
golang.org/x/crypto v0.23.0 // indirect
|
||||
golang.org/x/net v0.25.0 // indirect
|
||||
golang.org/x/sys v0.20.0 // indirect
|
||||
golang.org/x/text v0.15.0 // indirect
|
||||
google.golang.org/protobuf v1.34.1 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
)
|
||||
|
|
79
go.sum
79
go.sum
|
@ -1,5 +1,14 @@
|
|||
github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4=
|
||||
github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
|
||||
github.com/bytedance/sonic v1.11.6 h1:oUp34TzMlL+OY1OUWxHqsdkgC/Zfc85zGqw9siXjrc0=
|
||||
github.com/bytedance/sonic v1.11.6/go.mod h1:LysEHSvpvDySVdC2f87zGWf6CIKJcAvqab1ZaiQtds4=
|
||||
github.com/bytedance/sonic/loader v0.1.1 h1:c+e5Pt1k/cy5wMveRDyk2X4B9hF4g7an8N3zCYjJFNM=
|
||||
github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU=
|
||||
github.com/cloudwego/base64x v0.1.4 h1:jwCgWpFanWmN8xoIUHa2rtzmkd5J2plF/dnLS6Xd/0Y=
|
||||
github.com/cloudwego/base64x v0.1.4/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w=
|
||||
github.com/cloudwego/iasm v0.2.0 h1:1KNIy1I1H9hNNFEEH3DVnI4UujN+1zjpuk6gwHLTssg=
|
||||
github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/elastic/elastic-transport-go/v8 v8.6.0 h1:Y2S/FBjx1LlCv5m6pWAF2kDJAHoSjSRSJCApolgfthA=
|
||||
|
@ -8,17 +17,65 @@ github.com/elastic/go-elasticsearch v0.0.0 h1:Pd5fqOuBxKxv83b0+xOAJDAkziWYwFinWn
|
|||
github.com/elastic/go-elasticsearch v0.0.0/go.mod h1:TkBSJBuTyFdBnrNqoPc54FN0vKf5c04IdM4zuStJ7xg=
|
||||
github.com/elastic/go-elasticsearch/v8 v8.15.0 h1:IZyJhe7t7WI3NEFdcHnf6IJXqpRf+8S8QWLtZYYyBYk=
|
||||
github.com/elastic/go-elasticsearch/v8 v8.15.0/go.mod h1:HCON3zj4btpqs2N1jjsAy4a/fiAul+YBP00mBH4xik8=
|
||||
github.com/gabriel-vasile/mimetype v1.4.3 h1:in2uUcidCuFcDKtdcBxlR0rJ1+fsokWf+uqxgUFjbI0=
|
||||
github.com/gabriel-vasile/mimetype v1.4.3/go.mod h1:d8uq/6HKRL6CGdk+aubisF/M5GcPfT7nKyLpA0lbSSk=
|
||||
github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE=
|
||||
github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI=
|
||||
github.com/gin-gonic/gin v1.10.0 h1:nTuyha1TYqgedzytsKYqna+DfLos46nTv2ygFy86HFU=
|
||||
github.com/gin-gonic/gin v1.10.0/go.mod h1:4PMNQiOhvDRa013RKVbsiNwoyezlm2rm0uX/T7kzp5Y=
|
||||
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
|
||||
github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ=
|
||||
github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
|
||||
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
|
||||
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
|
||||
github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s=
|
||||
github.com/go-playground/assert/v2 v2.2.0/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
|
||||
github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA=
|
||||
github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY=
|
||||
github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY=
|
||||
github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY=
|
||||
github.com/go-playground/validator/v10 v10.20.0 h1:K9ISHbSaI0lyB2eWMPJo+kOS/FBExVwjEviJTixqxL8=
|
||||
github.com/go-playground/validator/v10 v10.20.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM=
|
||||
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
|
||||
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
|
||||
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
|
||||
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
||||
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
|
||||
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
|
||||
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
|
||||
github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM=
|
||||
github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
|
||||
github.com/knz/go-libedit v1.10.1/go.mod h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgShxwh4x8M=
|
||||
github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ=
|
||||
github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI=
|
||||
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
|
||||
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
|
||||
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
|
||||
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
|
||||
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
|
||||
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
|
||||
github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
|
||||
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
|
||||
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
|
||||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
|
||||
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
|
||||
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
|
||||
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
|
||||
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
|
||||
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
|
||||
github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE=
|
||||
github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
|
||||
go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo=
|
||||
go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo=
|
||||
go.opentelemetry.io/otel/metric v1.24.0 h1:6EhoGWWK28x1fbpA4tYTOWBkPefTDQnb8WSGXlc88kI=
|
||||
|
@ -27,7 +84,25 @@ go.opentelemetry.io/otel/sdk v1.21.0 h1:FTt8qirL1EysG6sTQRZ5TokkU8d0ugCj8htOgThZ
|
|||
go.opentelemetry.io/otel/sdk v1.21.0/go.mod h1:Nna6Yv7PWTdgJHVRD9hIYywQBRx7pbox6nwBnZIxl/E=
|
||||
go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y1YELI=
|
||||
go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU=
|
||||
golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o=
|
||||
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8=
|
||||
golang.org/x/arch v0.8.0 h1:3wRIsP3pM4yUptoR96otTUOXI367OS0+c9eeRi9doIc=
|
||||
golang.org/x/arch v0.8.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys=
|
||||
golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI=
|
||||
golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8=
|
||||
golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac=
|
||||
golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM=
|
||||
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
|
||||
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk=
|
||||
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
|
||||
google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg=
|
||||
google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
nullprogram.com/x/optparse v1.0.0/go.mod h1:KdyPE+Igbe0jQUrVfMqDMeJQIJZEuyV7pjYmp6pbG50=
|
||||
rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4=
|
||||
|
|
38
interface.go
38
interface.go
|
@ -1,38 +0,0 @@
|
|||
package elasticstream
|
||||
|
||||
type Position struct {
|
||||
ID string
|
||||
Index string
|
||||
Pos int
|
||||
}
|
||||
|
||||
type Source interface {
|
||||
Configure(context.Context, Config) error
|
||||
Open(context.Context, []Position) error
|
||||
Read(context.Context) (*Data, error)
|
||||
Ack(context.Context, Position) error
|
||||
Teardown(context.Context) error
|
||||
|
||||
// -- Lifecycle events -----------------------------------------------------
|
||||
|
||||
// LifecycleOnCreated is called after Configure and before Open when the
|
||||
// connector is run for the first time. This call will be skipped if the
|
||||
// connector was already started before. This method can be used to do some
|
||||
// initialization that needs to happen only once in the lifetime of a
|
||||
// connector (e.g. create a logical replication slot). Anything that the
|
||||
// connector creates in this method is considered to be owned by this
|
||||
// connector and should be cleaned up in LifecycleOnDeleted.
|
||||
LifecycleOnCreated(ctx context.Context, config config.Config) error
|
||||
// LifecycleOnUpdated is called after Configure and before Open when the
|
||||
// connector configuration has changed since the last run. This call will be
|
||||
// skipped if the connector configuration did not change. It can be used to
|
||||
// update anything that was initialized in LifecycleOnCreated, in case the
|
||||
// configuration change affects it.
|
||||
LifecycleOnUpdated(ctx context.Context, configBefore, configAfter config.Config) error
|
||||
// LifecycleOnDeleted is called when the connector was deleted. It will be
|
||||
// the only method that is called in that case. This method can be used to
|
||||
// clean up anything that was initialized in LifecycleOnCreated.
|
||||
LifecycleOnDeleted(ctx context.Context, config config.Config) error
|
||||
|
||||
mustEmbedUnimplementedSource()
|
||||
}
|
|
@ -0,0 +1,23 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"log"
|
||||
|
||||
"elasticstream/controller"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
func main() {
|
||||
|
||||
log.SetFlags(log.LstdFlags | log.Lshortfile)
|
||||
|
||||
r := gin.Default()
|
||||
|
||||
r.GET("/open", controller.Open)
|
||||
r.GET("/read", controller.Read)
|
||||
r.GET("/ack", controller.Ack)
|
||||
r.GET("/close", controller.Close)
|
||||
|
||||
r.Run(":8080")
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package elasticstream
|
||||
package opencdc
|
||||
|
||||
type Header struct {
|
||||
ID string
|
|
@ -0,0 +1,29 @@
|
|||
package elastic
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"elasticstream/source"
|
||||
)
|
||||
|
||||
func (c *Client) Ack(ctx context.Context, position source.Position) error {
|
||||
curr := c.offsets[position.Index]
|
||||
|
||||
fmt.Println("curr:", curr)
|
||||
fmt.Println("asked:", position.Pos)
|
||||
|
||||
for _, p := range c.positions {
|
||||
if p.Index == position.Index {
|
||||
fmt.Println("initial:", p.Pos)
|
||||
if p.Pos > position.Pos {
|
||||
return fmt.Errorf("not acknowledged pos less than initial position")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if curr < position.Pos {
|
||||
return fmt.Errorf("not acknowledged pos more than current position")
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,29 @@
|
|||
package elastic
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"elasticstream/config"
|
||||
"elasticstream/opencdc"
|
||||
"elasticstream/source"
|
||||
|
||||
"github.com/elastic/go-elasticsearch/v8"
|
||||
)
|
||||
|
||||
type Client struct {
|
||||
cfg *config.Config
|
||||
es *elasticsearch.Client
|
||||
offsets map[string]int
|
||||
positions []source.Position
|
||||
ch chan opencdc.Data
|
||||
shutdown chan struct{}
|
||||
wg *sync.WaitGroup
|
||||
}
|
||||
|
||||
func NewClient() *Client {
|
||||
client := &Client{
|
||||
offsets: make(map[string]int),
|
||||
wg: &sync.WaitGroup{},
|
||||
}
|
||||
return client
|
||||
}
|
|
@ -0,0 +1,19 @@
|
|||
package elastic
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"elasticstream/config"
|
||||
)
|
||||
|
||||
func (c *Client) Configure(ctx context.Context, cfg *config.Config) error {
|
||||
|
||||
// if c == nil || c.ch == nil {
|
||||
if c == nil {
|
||||
return fmt.Errorf("error source not opened for reading")
|
||||
}
|
||||
|
||||
c.cfg = cfg
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,51 @@
|
|||
package elastic
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
|
||||
"elasticstream/opencdc"
|
||||
"elasticstream/source"
|
||||
|
||||
"github.com/elastic/go-elasticsearch/v8"
|
||||
)
|
||||
|
||||
func (c *Client) Open(ctx context.Context, positions []source.Position) error {
|
||||
log.Println(">>>> elastic.Open()")
|
||||
defer log.Println("<<<< elastic.Open()")
|
||||
|
||||
// Open a connection with ElasticSearch
|
||||
cfg := elasticsearch.Config{
|
||||
Addresses: []string{
|
||||
c.cfg.Host,
|
||||
},
|
||||
}
|
||||
|
||||
var err error
|
||||
c.es, err = elasticsearch.NewClient(cfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// create a buffer channel
|
||||
c.ch = make(chan opencdc.Data, c.cfg.BatchSize)
|
||||
c.shutdown = make(chan struct{})
|
||||
|
||||
for _, index := range c.cfg.Indexes {
|
||||
c.wg.Add(1)
|
||||
|
||||
offset := 0
|
||||
for _, position := range positions {
|
||||
if index == position.Index {
|
||||
offset = position.Pos
|
||||
}
|
||||
}
|
||||
|
||||
c.offsets[index] = offset
|
||||
NewWorker(c, index, offset)
|
||||
}
|
||||
|
||||
c.positions = positions
|
||||
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,37 @@
|
|||
package elastic
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
"elasticstream/opencdc"
|
||||
)
|
||||
|
||||
func (c *Client) Read(context.Context) (*opencdc.Data, error) {
|
||||
log.Println(">>>> elastic.Read()")
|
||||
defer log.Println("<<<< elastic.Read()")
|
||||
|
||||
if c == nil || c.ch == nil {
|
||||
return nil, fmt.Errorf("error source not opened for reading")
|
||||
}
|
||||
|
||||
data, ok := <-c.ch
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("error reading data from channel")
|
||||
}
|
||||
|
||||
index := data.Header.Index
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("error index not found in data header")
|
||||
}
|
||||
|
||||
offset, ok := c.offsets[index]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("error offset index not found")
|
||||
}
|
||||
|
||||
c.offsets[index] = offset + 1
|
||||
|
||||
return &data, nil
|
||||
}
|
|
@ -1,17 +1,31 @@
|
|||
package elasticstream
|
||||
package elastic
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/elastic/go-elasticsearch/esapi"
|
||||
"github.com/elastic/go-elasticsearch/v8"
|
||||
)
|
||||
|
||||
type SearchResponse struct {
|
||||
Hits struct {
|
||||
Total struct {
|
||||
Value int `json:"value"`
|
||||
} `json:"total"`
|
||||
Hits []struct {
|
||||
Index string `json:"_index"`
|
||||
ID string `json:"_id"`
|
||||
Source map[string]interface{} `json:"_source"`
|
||||
} `json:"hits"`
|
||||
} `json:"hits"`
|
||||
}
|
||||
|
||||
// search is calling Elastic Search search API
|
||||
func search(client *elasticsearch.Client, index string, offset, size *int) ([]Data, error) {
|
||||
func search(client *elasticsearch.Client, index string, offset, size *int) (*SearchResponse, error) {
|
||||
query := fmt.Sprintf(`{
|
||||
"query": {
|
||||
"match_all": {}
|
||||
|
@ -27,7 +41,8 @@ func search(client *elasticsearch.Client, index string, offset, size *int) ([]Da
|
|||
}
|
||||
|
||||
// Perform the request
|
||||
res, err := req.Do(context.Background(), client)
|
||||
ctx, _ := context.WithTimeout(context.TODO(), 5*time.Second)
|
||||
res, err := req.Do(ctx, client)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error getting response: %s", err)
|
||||
}
|
||||
|
@ -37,37 +52,10 @@ func search(client *elasticsearch.Client, index string, offset, size *int) ([]Da
|
|||
return nil, fmt.Errorf("res.IsError() error: %s", res.String())
|
||||
}
|
||||
|
||||
// Parse the response
|
||||
var result struct {
|
||||
Hits struct {
|
||||
Hits []struct {
|
||||
Source map[string]interface{} `json:"_source"`
|
||||
} `json:"hits"`
|
||||
} `json:"hits"`
|
||||
}
|
||||
|
||||
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
|
||||
result := &SearchResponse{}
|
||||
if err := json.NewDecoder(res.Body).Decode(result); err != nil {
|
||||
return nil, fmt.Errorf("error parsing the response body: %s", err)
|
||||
}
|
||||
|
||||
// Collect the records
|
||||
newRecords := make([]map[string]interface{}, len(result.Hits.Hits))
|
||||
for i, hit := range result.Hits.Hits {
|
||||
newRecords[i] = hit.Source
|
||||
}
|
||||
|
||||
header := map[string]string{"index": index}
|
||||
|
||||
var records []Data
|
||||
for _, v := range newRecords {
|
||||
data := Data{
|
||||
Header: header,
|
||||
Payload: v,
|
||||
}
|
||||
records = append(records, data)
|
||||
}
|
||||
|
||||
// log.Println("records:", records)
|
||||
|
||||
return records, nil
|
||||
return result, nil
|
||||
}
|
|
@ -0,0 +1,28 @@
|
|||
package elastic
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
)
|
||||
|
||||
// close the client
|
||||
func (c *Client) Teardown(ctx context.Context) error {
|
||||
log.Println(">>>> elastic.Teardown()")
|
||||
defer log.Println("<<<< elastic.Teardown()")
|
||||
|
||||
if c == nil || c.ch == nil {
|
||||
return fmt.Errorf("error source not opened for reading")
|
||||
}
|
||||
|
||||
close(c.shutdown)
|
||||
|
||||
c.wg.Wait()
|
||||
|
||||
close(c.ch)
|
||||
|
||||
c.ch = nil
|
||||
// c.es.Close()
|
||||
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,66 @@
|
|||
package elastic
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"elasticstream/opencdc"
|
||||
)
|
||||
|
||||
type Worker struct {
|
||||
client *Client
|
||||
index string
|
||||
offset int
|
||||
}
|
||||
|
||||
func NewWorker(client *Client, index string, offset int) {
|
||||
w := &Worker{
|
||||
client: client,
|
||||
index: index,
|
||||
offset: offset,
|
||||
}
|
||||
|
||||
go w.start()
|
||||
}
|
||||
|
||||
func (w *Worker) start() {
|
||||
defer w.client.wg.Done()
|
||||
|
||||
for {
|
||||
log.Printf("worker index=%s offset=%d size=%d\n", w.index, w.offset, w.client.cfg.BatchSize)
|
||||
|
||||
searchResponse, err := search(w.client.es, w.index, &w.offset, &w.client.cfg.BatchSize)
|
||||
if err != nil || len(searchResponse.Hits.Hits) == 0 {
|
||||
// log.Println("search() err:", err)
|
||||
select {
|
||||
case <-w.client.shutdown:
|
||||
fmt.Println("shuting donw..")
|
||||
return
|
||||
|
||||
case <-time.After(time.Second):
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
for _, hit := range searchResponse.Hits.Hits {
|
||||
data := opencdc.Data{
|
||||
Header: opencdc.Header{
|
||||
ID: hit.ID,
|
||||
Index: hit.Index,
|
||||
Position: w.offset + 1,
|
||||
},
|
||||
Payload: hit.Source,
|
||||
}
|
||||
|
||||
select {
|
||||
case w.client.ch <- data:
|
||||
w.offset++
|
||||
|
||||
case <-w.client.shutdown:
|
||||
fmt.Println("Stopping worker...")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,22 @@
|
|||
package source
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"elasticstream/config"
|
||||
"elasticstream/opencdc"
|
||||
)
|
||||
|
||||
type Position struct {
|
||||
ID string `json:"id"`
|
||||
Index string `json:"index"`
|
||||
Pos int `json:"pos"`
|
||||
}
|
||||
|
||||
type Source interface {
|
||||
Configure(context.Context, config.Config) error
|
||||
Open(context.Context, []Position) error
|
||||
Read(context.Context) (*opencdc.Data, error)
|
||||
Ack(context.Context, Position) error
|
||||
Teardown(context.Context) error
|
||||
}
|
45
worker.go
45
worker.go
|
@ -1,45 +0,0 @@
|
|||
package elasticstream
|
||||
|
||||
import (
|
||||
"log"
|
||||
"time"
|
||||
// "github.com/elastic/go-elasticsearch/v8"
|
||||
)
|
||||
|
||||
type Worker struct {
|
||||
client *Client
|
||||
index string
|
||||
offset int
|
||||
size int
|
||||
}
|
||||
|
||||
func NewWorker(client *Client, index string, offset, size int) {
|
||||
w := &Worker{
|
||||
client: client,
|
||||
index: index,
|
||||
offset: offset,
|
||||
size: size,
|
||||
}
|
||||
|
||||
go w.start()
|
||||
}
|
||||
|
||||
func (w *Worker) start() {
|
||||
|
||||
for {
|
||||
log.Printf("worker index=%s offset=%d size=%d\n", w.index, w.offset, w.size)
|
||||
|
||||
dataArray, err := search(w.client.es, w.index, &w.offset, &w.size)
|
||||
if err != nil {
|
||||
log.Println("search() err:", err)
|
||||
time.Sleep(1 * time.Second)
|
||||
continue
|
||||
}
|
||||
|
||||
for _, data := range dataArray {
|
||||
w.client.ch <- data
|
||||
w.offset++
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue