在Dapr元件有一種叫做Binding的元件(component)讓你的app跟外部系統做一個連結的, 這元件可分為兩類:
- Input BIndings: 用來接受外部事件的觸發,像是Webhook, 從Queue來的events, 甚至是人家新發的Tweets, 應該都可以歸為這一類
- Ouput Bindings: 呼叫外部系統的動作,命名為Outpiut其實會讓人誤以為是資料的輸出,但其實,他不只可以用在資料輸出,呼叫外部系統的動作都可以包含在內, 舉個例子, GraphQL 的Output binding定義了兩個操作(Operations), 一個是QueryOperation, 一個是MutationOperation, 熟悉GraphQL的應該知道,MutationOperation一般才是應用在資料操作,而Query感覺就跟輸出比較無關了
一開始我也有點搞不清楚這個模式目的在做啥的, 要接受事件觸發,我們有pub sub了,而state store本身就用在資料輸出, 感覺的確有點重複,但由上述兩點來看,其實Binding定義的範圍廣泛多了,它並不特定限制在Queue或是資料庫
但有個東西同時支援了Pub sub, input binding, output binding, 一開始我是看Kafka這應用,才讓我覺得有點錯亂,前一篇有講過了怎實作Subscriber, 這邊來比較一下, 利用Input binding的話,會有什麼不一樣?
建立Binding元件
要用Kafka來觸發我們服務(如上圖),跟寫Subscriber一樣,我們需要在~/.dapr/components
裡先建立好元件, 假設我們新增一個kafka-binding.yaml
, 內容如下:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: kafka-binding
spec:
type: bindings.kafka
version: v1
metadata:
- name: brokers
value: localhost:9092
- name: topics
value: mytopic
- name: consumerGroup
value: group1
- name: publishTopic
value: mytopic
- name: authRequired
value: "false"
上面這個其實已經是定義好input和output binding了,topics定義的是input binding要聽取事件的topic, 而publishTopic定義的則是Output binding要輸出資料的目標
實作Input binding
跟實做subscriber差不多, Input binding也是實作一個webhook讓Dapr打進來而已, 這邊假設Kafka會收到的資料會是一個數字
package main
import (
"log"
"net/http"
"github.com/gin-gonic/gin"
)
func dataBinding(ctx *gin.Context) {
var data int
if err := ctx.Bind(&data); err != nil {
ctx.AbortWithStatus(500)
return
}
log.Println(data)
ctx.Status(200)
}
func main() {
r := gin.Default()
r.POST("/kafka-binding", dataBinding)
r.OPTIONS("/kafka-binding", func(ctx *gin.Context) {
ctx.Status(200)
})
http.ListenAndServe(":6003", r)
}
這邊幾個重點:
- endpoint path跟你的binding名稱一樣, 當然這可以在元件設定那邊改
- OPTIONS有點像是讓Dapr確認你有沒支援這個binding的health endpoint, 在程式一開始跑就會被call, 這邊只要回OK, 其實都好
- 跟pub sub不一樣的是, 這邊會收到的格式不一定會是cloudevent, 除非publisher那邊過來的就是cloudevent, 因此, tracing應該是追蹤不到才是
實做Output binding
那如何實作跟剛剛的Input binding匹配的Output binding呢? 範例如下:
package main
import (
"context"
"log"
"math/rand"
"strconv"
"time"
dapr "github.com/dapr/go-sdk/client"
)
func main() {
BINDING_NAME := "kafka-binding"
BINDING_OPERATION := "create"
for i := 0; i < 10; i++ {
time.Sleep(5000)
rand.Seed(time.Now().UnixMicro())
dataId := rand.Intn(1000-1) + 1
client, err := dapr.NewClient()
if err != nil {
panic(err)
}
defer client.Close()
ctx := context.Background()
in := &dapr.InvokeBindingRequest{Name: BINDING_NAME, Operation: BINDING_OPERATION, Data: []byte(strconv.Itoa(dataId))}
client.InvokeOutputBinding(ctx, in)
log.Println("Sending message: " + strconv.Itoa(dataId))
}
}
這邊重點在於:
in := &dapr.InvokeBindingRequest{Name: BINDING_NAME, Operation: BINDING_OPERATION, Data: []byte(strconv.Itoa(dataId))}
client.InvokeOutputBinding(ctx, in)
雖說是"Output" binding, 但這邊用的名字是"Invoke", 跟Output沒啥相關, Operation則是元件訂的, Kafka binding只定義一個"create", 就是讓你送訊息用的, Data則是要傳送的資料, 以Byte array表示
用subscriber接收output binding來的事件
這邊就不用多作解釋了, 從前面不難發現, 它接的就是raw payload, 這部分可以參考 前一篇
那啥時該用哪一種呢? 以Kafka這範例來說, 我是認為如果publisher跟subscriber都是自己實做的話, 應該是要選用pub sub, 用cloud events的話, 可以享受到distributed tracing帶來的好處, 如果不是, 差異應該不大, 都蠻簡單實作的