dummy code interface
This commit is contained in:
parent
26fb21902a
commit
ab05f55b09
|
@ -1 +1,9 @@
|
||||||
# ELASTIC STREAM
|
# ELASTIC STREAM
|
||||||
|
|
||||||
|
1. Graceful shutdown of Workers. It has start() so it should have stop() also.
|
||||||
|
2. Channel buffer should be configurable or not? Is it same as batchSize?
|
||||||
|
3. Each data(record) should have its position.
|
||||||
|
4. Worker has *Client in it. Is this OK?
|
||||||
|
5. Polling(ticker)?
|
||||||
|
6. Context use.
|
||||||
|
7. Teardown() and Ack().
|
|
@ -0,0 +1,80 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"log"
|
||||||
|
|
||||||
|
"11-11.dev/goexamples/elasticstream"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
// STEP 1: Create a new client
|
||||||
|
client := &elasticstream.Client{}
|
||||||
|
|
||||||
|
// STEP 2: Configure() the client
|
||||||
|
config := &elasticstream.Config{
|
||||||
|
Host: "http://test.urantiacloud.com:9200",
|
||||||
|
Indexes: []string{"index-a", "index-b", "index-c"},
|
||||||
|
BatchSize: 10,
|
||||||
|
DBPath: "./index.db",
|
||||||
|
}
|
||||||
|
err := client.Configure(ctx, config)
|
||||||
|
if err != nil {
|
||||||
|
log.Println("client.Configure() err:", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// ------------ IF RUN FOR THE FIRST TIME ----------------
|
||||||
|
// STEP 2.5: LifecycleOnCreated()
|
||||||
|
err = client.LifecycleOnCreated(ctx, config)
|
||||||
|
if err != nil {
|
||||||
|
log.Println("client.LifecycleOnCreated() err:", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// ------------ IF RUN FOR THE FIRST TIME ----------------
|
||||||
|
|
||||||
|
// ------------ IF CONFIG CHANGED ------------------------
|
||||||
|
// STEP 2.5: LifecycleOnUpdated()
|
||||||
|
err = client.LifecycleOnUpdated(ctx, config)
|
||||||
|
if err != nil {
|
||||||
|
log.Println("client.LifecycleOnUpdated() err:", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// ------------ IF CONFIG CHANGED ------------------------
|
||||||
|
|
||||||
|
// STEP 3: Open() the client
|
||||||
|
err = client.Open(ctx, []Position{})
|
||||||
|
if err != nil {
|
||||||
|
log.Println("client.Open() err:", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// STEP 4: Read() using client
|
||||||
|
// for untill context is cancelled
|
||||||
|
for {
|
||||||
|
data, err := client.Read(ctx)
|
||||||
|
if err != nil {
|
||||||
|
log.Println("client.Read() err:", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Println("data:", data)
|
||||||
|
}
|
||||||
|
|
||||||
|
// STEP 5: Ack()
|
||||||
|
err = client.Ack(ctx, Position{})
|
||||||
|
if err != nil {
|
||||||
|
log.Println("client.Ack() err:", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// STEP 6: Teardown()
|
||||||
|
err = client.Teardown(ctx)
|
||||||
|
if err != nil {
|
||||||
|
log.Println("client.Teardown() err:", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
10
data.go
10
data.go
|
@ -1,8 +1,12 @@
|
||||||
package elasticstream
|
package elasticstream
|
||||||
|
|
||||||
|
type Header struct {
|
||||||
|
ID string
|
||||||
|
Index string
|
||||||
|
Position int
|
||||||
|
}
|
||||||
|
|
||||||
type Data struct {
|
type Data struct {
|
||||||
// we can store index name
|
Header Header
|
||||||
Header map[string]string
|
|
||||||
// actual hit from elastic search
|
|
||||||
Payload map[string]interface{}
|
Payload map[string]interface{}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,38 @@
|
||||||
|
package elasticstream
|
||||||
|
|
||||||
|
type Position struct {
|
||||||
|
ID string
|
||||||
|
Index string
|
||||||
|
Pos int
|
||||||
|
}
|
||||||
|
|
||||||
|
type Source interface {
|
||||||
|
Configure(context.Context, Config) error
|
||||||
|
Open(context.Context, []Position) error
|
||||||
|
Read(context.Context) (*Data, error)
|
||||||
|
Ack(context.Context, Position) error
|
||||||
|
Teardown(context.Context) error
|
||||||
|
|
||||||
|
// -- Lifecycle events -----------------------------------------------------
|
||||||
|
|
||||||
|
// LifecycleOnCreated is called after Configure and before Open when the
|
||||||
|
// connector is run for the first time. This call will be skipped if the
|
||||||
|
// connector was already started before. This method can be used to do some
|
||||||
|
// initialization that needs to happen only once in the lifetime of a
|
||||||
|
// connector (e.g. create a logical replication slot). Anything that the
|
||||||
|
// connector creates in this method is considered to be owned by this
|
||||||
|
// connector and should be cleaned up in LifecycleOnDeleted.
|
||||||
|
LifecycleOnCreated(ctx context.Context, config config.Config) error
|
||||||
|
// LifecycleOnUpdated is called after Configure and before Open when the
|
||||||
|
// connector configuration has changed since the last run. This call will be
|
||||||
|
// skipped if the connector configuration did not change. It can be used to
|
||||||
|
// update anything that was initialized in LifecycleOnCreated, in case the
|
||||||
|
// configuration change affects it.
|
||||||
|
LifecycleOnUpdated(ctx context.Context, configBefore, configAfter config.Config) error
|
||||||
|
// LifecycleOnDeleted is called when the connector was deleted. It will be
|
||||||
|
// the only method that is called in that case. This method can be used to
|
||||||
|
// clean up anything that was initialized in LifecycleOnCreated.
|
||||||
|
LifecycleOnDeleted(ctx context.Context, config config.Config) error
|
||||||
|
|
||||||
|
mustEmbedUnimplementedSource()
|
||||||
|
}
|
|
@ -27,7 +27,7 @@ func NewWorker(client *Client, index string, offset, size int) {
|
||||||
func (w *Worker) start() {
|
func (w *Worker) start() {
|
||||||
|
|
||||||
for {
|
for {
|
||||||
// log.Printf("worker index=%s offset=%d size=%d\n", w.index, w.offset, w.size)
|
log.Printf("worker index=%s offset=%d size=%d\n", w.index, w.offset, w.size)
|
||||||
|
|
||||||
dataArray, err := search(w.client.es, w.index, &w.offset, &w.size)
|
dataArray, err := search(w.client.es, w.index, &w.offset, &w.size)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
Loading…
Reference in New Issue