diff --git a/README.md b/README.md index bd30708..7af3c41 100644 --- a/README.md +++ b/README.md @@ -1 +1,9 @@ -# ELASTIC STREAM \ No newline at end of file +# ELASTIC STREAM + +1. Graceful shutdown of Workers. It has start() so it should have stop() also. +2. Channel buffer should be configurable or not? Is it same as batchSize? +3. Each data(record) should have its position. +4. Worker has *Client in it. Is this OK? +5. Polling(ticker)? +6. Context use. +7. Teardown() and Ack(). \ No newline at end of file diff --git a/cmd/dummy.go b/cmd/dummy.go new file mode 100644 index 0000000..9e41d2f --- /dev/null +++ b/cmd/dummy.go @@ -0,0 +1,80 @@ +package main + +import ( + "context" + "log" + + "11-11.dev/goexamples/elasticstream" +) + +func main() { + ctx := context.Background() + + // STEP 1: Create a new client + client := &elasticstream.Client{} + + // STEP 2: Configure() the client + config := &elasticstream.Config{ + Host: "http://test.urantiacloud.com:9200", + Indexes: []string{"index-a", "index-b", "index-c"}, + BatchSize: 10, + DBPath: "./index.db", + } + err := client.Configure(ctx, config) + if err != nil { + log.Println("client.Configure() err:", err) + return + } + + // ------------ IF RUN FOR THE FIRST TIME ---------------- + // STEP 2.5: LifecycleOnCreated() + err = client.LifecycleOnCreated(ctx, config) + if err != nil { + log.Println("client.LifecycleOnCreated() err:", err) + return + } + // ------------ IF RUN FOR THE FIRST TIME ---------------- + + // ------------ IF CONFIG CHANGED ------------------------ + // STEP 2.5: LifecycleOnUpdated() + err = client.LifecycleOnUpdated(ctx, config) + if err != nil { + log.Println("client.LifecycleOnUpdated() err:", err) + return + } + // ------------ IF CONFIG CHANGED ------------------------ + + // STEP 3: Open() the client + err = client.Open(ctx, []Position{}) + if err != nil { + log.Println("client.Open() err:", err) + return + } + + // STEP 4: Read() using client + // for untill context is cancelled + for { + data, err := client.Read(ctx) + if err != nil { + log.Println("client.Read() err:", err) + continue + } + + fmt.Println("data:", data) + } + + // STEP 5: Ack() + err = client.Ack(ctx, Position{}) + if err != nil { + log.Println("client.Ack() err:", err) + return + } + + // STEP 6: Teardown() + err = client.Teardown(ctx) + if err != nil { + log.Println("client.Teardown() err:", err) + return + } + +} diff --git a/data.go b/data.go index 9d23549..7c615f5 100644 --- a/data.go +++ b/data.go @@ -1,8 +1,12 @@ package elasticstream +type Header struct { + ID string + Index string + Position int +} + type Data struct { - // we can store index name - Header map[string]string - // actual hit from elastic search + Header Header Payload map[string]interface{} } diff --git a/interface.go b/interface.go new file mode 100644 index 0000000..b606c48 --- /dev/null +++ b/interface.go @@ -0,0 +1,38 @@ +package elasticstream + +type Position struct { + ID string + Index string + Pos int +} + +type Source interface { + Configure(context.Context, Config) error + Open(context.Context, []Position) error + Read(context.Context) (*Data, error) + Ack(context.Context, Position) error + Teardown(context.Context) error + + // -- Lifecycle events ----------------------------------------------------- + + // LifecycleOnCreated is called after Configure and before Open when the + // connector is run for the first time. This call will be skipped if the + // connector was already started before. This method can be used to do some + // initialization that needs to happen only once in the lifetime of a + // connector (e.g. create a logical replication slot). Anything that the + // connector creates in this method is considered to be owned by this + // connector and should be cleaned up in LifecycleOnDeleted. + LifecycleOnCreated(ctx context.Context, config config.Config) error + // LifecycleOnUpdated is called after Configure and before Open when the + // connector configuration has changed since the last run. This call will be + // skipped if the connector configuration did not change. It can be used to + // update anything that was initialized in LifecycleOnCreated, in case the + // configuration change affects it. + LifecycleOnUpdated(ctx context.Context, configBefore, configAfter config.Config) error + // LifecycleOnDeleted is called when the connector was deleted. It will be + // the only method that is called in that case. This method can be used to + // clean up anything that was initialized in LifecycleOnCreated. + LifecycleOnDeleted(ctx context.Context, config config.Config) error + + mustEmbedUnimplementedSource() +} diff --git a/worker.go b/worker.go index 7f890b0..90d1612 100644 --- a/worker.go +++ b/worker.go @@ -27,7 +27,7 @@ func NewWorker(client *Client, index string, offset, size int) { func (w *Worker) start() { for { - // log.Printf("worker index=%s offset=%d size=%d\n", w.index, w.offset, w.size) + log.Printf("worker index=%s offset=%d size=%d\n", w.index, w.offset, w.size) dataArray, err := search(w.client.es, w.index, &w.offset, &w.size) if err != nil {