This commit is contained in:
Parikshit Gothwal 2024-10-07 16:26:41 +05:30
parent ab182ae9af
commit 36412c7601
17 changed files with 77 additions and 61 deletions

View File

@ -1,4 +1,4 @@
package elasticstream
package config
type Config struct {
Host string

View File

@ -1,19 +1,23 @@
package controller
import (
"elasticstream/source/elastic"
"net/http"
"elasticstream/source"
"github.com/elastic/go-elasticsearch/v8"
"github.com/gin-gonic/gin"
)
func Ack(c *gin.Context) {
var req source.Position
var req Position
err := c.BindJSON(&req)
if err != nil {
c.Status(http.StatusBadRequest)
return
}
c.BindJSON(c, &req)
err := client.Ack(c, req)
err = client.Ack(c, req)
if err != nil {
c.Status(http.StatusInternalServerError)
return

View File

@ -1,9 +1,12 @@
package controller
import (
"net/http"
"elasticstream/config"
"elasticstream/source"
"elasticstream/source/elastic"
"github.com/elastic/go-elasticsearch/v8"
"github.com/gin-gonic/gin"
)
@ -13,7 +16,7 @@ func Open(c *gin.Context) {
client = elastic.NewClient()
err := client.Configure(&Config{
err := client.Configure(c, config.Config{
Host: "http://test.urantiacloud.com:9200",
Indexes: []string{"index-a", "index-b", "index-c"},
BatchSize: 100,
@ -24,8 +27,10 @@ func Open(c *gin.Context) {
return
}
// Open connection with Elastic Search
err = client.Open()
// get position from boltdb
var positions []source.Position
err = client.Open(c, positions)
if err != nil {
c.Status(http.StatusInternalServerError)
return

View File

@ -1,16 +1,15 @@
package controller
import (
"elasticstream/source/elastic"
"net/http"
"github.com/elastic/go-elasticsearch/v8"
"github.com/gin-gonic/gin"
)
func Read(c *gin.Context) {
// Read data from the client
data, err := client.Read()
data, err := client.Read(c)
if err != nil {
c.Status(http.StatusInternalServerError)
return

View File

@ -1,16 +1,15 @@
package controller
import (
"elasticstream/source/elastic"
"net/http"
"github.com/elastic/go-elasticsearch/v8"
"github.com/gin-gonic/gin"
)
func Teardown(c *gin.Context) {
// Close connection
err := client.Teardown()
err := client.Teardown(c)
if err != nil {
c.Status(http.StatusInternalServerError)
return

View File

@ -16,5 +16,4 @@ func main() {
r.DELETE("/teardown", controller.Teardown)
r.Run(":8080")
}

View File

@ -1,4 +1,4 @@
package elasticstream
package opencdc
type Header struct {
ID string

View File

@ -2,10 +2,11 @@ package elastic
import (
"context"
"elasticstream/source"
)
func (c *Client) Teardown(context.Context) error {
func (c *Client) Ack(ctx context.Context, position source.Position) error {
return nil
}

View File

@ -1,23 +1,23 @@
package elastic
import (
"fmt"
"log"
"elasticstream/config"
"elasticstream/opencdc"
"github.com/boltdb/bolt"
"github.com/elastic/go-elasticsearch/v8"
)
type Client struct {
cfg *config.Config
es *elasticsearch.Client
db *bolt.DB
offset map[string]int
cfg *config.Config
es *elasticsearch.Client
db *bolt.DB
offsets map[string]int
ch chan opencdc.Data
}
func NewClient(config *Config) *Client {
func NewClient() *Client {
client := &Client{
config: config,
offsets: make(map[string]int),
}
return client

View File

@ -2,8 +2,10 @@ package elastic
import (
"context"
"elasticstream/config"
)
func (c *Client) Configure(context.Context, Config) error {
func (c *Client) Configure(ctx context.Context, cfg config.Config) error {
return nil
}

View File

@ -1,4 +1,4 @@
package elasticstream
package elastic
import (
"fmt"

View File

@ -2,9 +2,14 @@ package elastic
import (
"context"
"elasticstream/opencdc"
"elasticstream/source"
"github.com/elastic/go-elasticsearch/v8"
)
func Open(context.Context, []Position) error {
func (c *Client) Open(ctx context.Context, positions []source.Position) error {
// Open a connection with ElasticSearch
cfg := elasticsearch.Config{
@ -20,21 +25,18 @@ func Open(context.Context, []Position) error {
}
// create a buffer channel
c.ch = make(chan Data, c.cfg.BatchSize)
c.ch = make(chan opencdc.Data, c.cfg.BatchSize)
// open bolt db
c.db, err = bolt.Open(c.cfg.DBPath, 0644, nil)
if err != nil {
return err
}
for _, index := range c.config.Indexes {
offset, err := getOffset(c.db, index)
if err != nil {
return err
for _, index := range c.cfg.Indexes {
offset := 0
for _, position := range positions {
if index == position.Index {
offset = position.Pos
}
}
c.offsets[index] = offset
NewWorker(c.es, index, offset, c.config.BatchSize)
NewWorker(c, index, offset)
}
return nil

View File

@ -2,9 +2,12 @@ package elastic
import (
"context"
"fmt"
"elasticstream/opencdc"
)
func (c *Client) Read(context.Context) (*Data, error) {
func (c *Client) Read(context.Context) (*opencdc.Data, error) {
data, ok := <-c.ch
if !ok {

View File

@ -1,4 +1,4 @@
package elasticstream
package elastic
import (
"context"
@ -6,12 +6,14 @@ import (
"fmt"
"strings"
"elasticstream/opencdc"
"github.com/elastic/go-elasticsearch/esapi"
"github.com/elastic/go-elasticsearch/v8"
)
// search is calling Elastic Search search API
func search(client *elasticsearch.Client, index string, offset, size *int) ([]Data, error) {
func search(client *elasticsearch.Client, index string, offset, size *int) ([]opencdc.Data, error) {
query := fmt.Sprintf(`{
"query": {
"match_all": {}
@ -56,11 +58,11 @@ func search(client *elasticsearch.Client, index string, offset, size *int) ([]Da
newRecords[i] = hit.Source
}
header := Header{Index: index}
header := opencdc.Header{Index: index}
var records []Data
var records []opencdc.Data
for _, v := range newRecords {
data := Data{
data := opencdc.Data{
Header: header,
Payload: v,
}

View File

@ -5,6 +5,6 @@ import (
)
// close the client
func (c *Client) Teardown(context.Context) error {
func (c *Client) Teardown(ctx context.Context) error {
return nil
}

View File

@ -1,24 +1,21 @@
package elasticstream
package elastic
import (
"log"
"time"
// "github.com/elastic/go-elasticsearch/v8"
)
type Worker struct {
client *Client
index string
offset int
size int
}
func NewWorker(client *Client, index string, offset, size int) {
func NewWorker(client *Client, index string, offset int) {
w := &Worker{
client: client,
index: index,
offset: offset,
size: size,
}
go w.start()
@ -27,9 +24,9 @@ func NewWorker(client *Client, index string, offset, size int) {
func (w *Worker) start() {
for {
log.Printf("worker index=%s offset=%d size=%d\n", w.index, w.offset, w.size)
log.Printf("worker index=%s offset=%d size=%d\n", w.index, w.offset, w.client.cfg.BatchSize)
dataArray, err := search(w.client.es, w.index, &w.offset, &w.size)
dataArray, err := search(w.client.es, w.index, &w.offset, &w.client.cfg.BatchSize)
if err != nil {
log.Println("search() err:", err)
time.Sleep(1 * time.Second)

View File

@ -1,7 +1,10 @@
package elasticstream
package source
import (
"context"
"elasticstream/config"
"elasticstream/opencdc"
)
type Position struct {
@ -11,9 +14,9 @@ type Position struct {
}
type Source interface {
Configure(context.Context, Config) error
Configure(context.Context, config.Config) error
Open(context.Context, []Position) error
Read(context.Context) (*Data, error)
Read(context.Context) (*opencdc.Data, error)
Ack(context.Context, Position) error
Teardown(context.Context) error
}