在Dapr元件有一種叫做Binding的元件(component)讓你的app跟外部系統做一個連結的, 這元件可分為兩類:

  1. Input BIndings: 用來接受外部事件的觸發,像是Webhook, 從Queue來的events, 甚至是人家新發的Tweets, 應該都可以歸為這一類
  2. Ouput Bindings: 呼叫外部系統的動作,命名為Outpiut其實會讓人誤以為是資料的輸出,但其實,他不只可以用在資料輸出,呼叫外部系統的動作都可以包含在內, 舉個例子, GraphQL 的Output binding定義了兩個操作(Operations), 一個是QueryOperation, 一個是MutationOperation, 熟悉GraphQL的應該知道,MutationOperation一般才是應用在資料操作,而Query感覺就跟輸出比較無關了

一開始我也有點搞不清楚這個模式目的在做啥的, 要接受事件觸發,我們有pub sub了,而state store本身就用在資料輸出, 感覺的確有點重複,但由上述兩點來看,其實Binding定義的範圍廣泛多了,它並不特定限制在Queue或是資料庫

但有個東西同時支援了Pub sub, input binding, output binding, 一開始我是看Kafka這應用,才讓我覺得有點錯亂,前一篇有講過了怎實作Subscriber, 這邊來比較一下, 利用Input binding的話,會有什麼不一樣?

建立Binding元件

要用Kafka來觸發我們服務(如上圖),跟寫Subscriber一樣,我們需要在~/.dapr/components裡先建立好元件, 假設我們新增一個kafka-binding.yaml, 內容如下:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: kafka-binding
spec:
  type: bindings.kafka
  version: v1
  metadata:
  - name: brokers
    value: localhost:9092
  - name: topics
    value: mytopic
  - name: consumerGroup
    value: group1
  - name: publishTopic
    value: mytopic
  - name: authRequired
    value: "false"

上面這個其實已經是定義好input和output binding了,topics定義的是input binding要聽取事件的topic, 而publishTopic定義的則是Output binding要輸出資料的目標

實作Input binding

跟實做subscriber差不多, Input binding也是實作一個webhook讓Dapr打進來而已, 這邊假設Kafka會收到的資料會是一個數字

package main

import (
    "log"
    "net/http"
    "github.com/gin-gonic/gin"
)

func dataBinding(ctx *gin.Context) {
    var data int
    if err := ctx.Bind(&data); err != nil {
        ctx.AbortWithStatus(500)
        return
    }
  
    log.Println(data)
    ctx.Status(200)
}

func main() {
    r := gin.Default()
    r.POST("/kafka-binding", dataBinding)
    r.OPTIONS("/kafka-binding", func(ctx *gin.Context) {
        ctx.Status(200)
    })

    http.ListenAndServe(":6003", r)
}

這邊幾個重點:

  1. endpoint path跟你的binding名稱一樣, 當然這可以在元件設定那邊改
  2. OPTIONS有點像是讓Dapr確認你有沒支援這個binding的health endpoint, 在程式一開始跑就會被call, 這邊只要回OK, 其實都好
  3. 跟pub sub不一樣的是, 這邊會收到的格式不一定會是cloudevent, 除非publisher那邊過來的就是cloudevent, 因此, tracing應該是追蹤不到才是

實做Output binding

那如何實作跟剛剛的Input binding匹配的Output binding呢? 範例如下:

package main

import (
    "context"
    "log"
    "math/rand"
    "strconv"
    "time"
    
    dapr "github.com/dapr/go-sdk/client"
)

func main() {
    BINDING_NAME := "kafka-binding"
    BINDING_OPERATION := "create"

    for i := 0; i < 10; i++ {
        time.Sleep(5000)
        rand.Seed(time.Now().UnixMicro())
        dataId := rand.Intn(1000-1) + 1
        client, err := dapr.NewClient()
        if err != nil {
            panic(err)
        }

        defer client.Close()
        ctx := context.Background()

        in := &dapr.InvokeBindingRequest{Name: BINDING_NAME, Operation: BINDING_OPERATION, Data: []byte(strconv.Itoa(dataId))}

        client.InvokeOutputBinding(ctx, in)
        log.Println("Sending message: " + strconv.Itoa(dataId))
    }
}

這邊重點在於:

in := &dapr.InvokeBindingRequest{Name: BINDING_NAME, Operation: BINDING_OPERATION, Data: []byte(strconv.Itoa(dataId))}

client.InvokeOutputBinding(ctx, in)

雖說是"Output" binding, 但這邊用的名字是"Invoke", 跟Output沒啥相關, Operation則是元件訂的, Kafka binding只定義一個"create", 就是讓你送訊息用的, Data則是要傳送的資料, 以Byte array表示

用subscriber接收output binding來的事件

這邊就不用多作解釋了, 從前面不難發現, 它接的就是raw payload, 這部分可以參考 前一篇

那啥時該用哪一種呢? 以Kafka這範例來說, 我是認為如果publisher跟subscriber都是自己實做的話, 應該是要選用pub sub, 用cloud events的話, 可以享受到distributed tracing帶來的好處, 如果不是, 差異應該不大, 都蠻簡單實作的

在發表前一篇時, Evan 跟我說, “能不能用slug把url改好看點?”, 從開始寫blog來, 其實我也沒在意有沒人看, 所以SEO相關的也沒太在意, 但既然有人講了, 我就來弄一下吧

Prama links的設定

首先先來看看pramalinks設定的部分:

[permalinks]
    post = "/:slug/"

這邊就是用來設定你URL長相的地方, 我的原文都放在post目錄內, 所以我一開始的設定就是以slug當URL沒錯, 那怎還是有中文呢?

Archetypes, 初始文章的設定

當你用hugo new filename, 他會拿themes/[theme_name]/archetypes/default.md當範本來建立初始文章, 像我原本的設定是:

---
date: {{ .Date }}
title: "{{ replace .Name "-" " " | title }}"
images: 
- "https://og.jln.co/jlns1/{{ replace .Name "-" " " | title | base64Encode | replaceRE "=+$" "" | replaceRE "\\+" "-" | replaceRE "/" "_"}}"
draft: true
---

slug是沒設定的, 不過它似乎應該會用title去算slug, 所以在Paramlinks那邊沒啥影響, 但其實你也可以加一行變成:

---
date: {{ .Date }}
title: "{{ replace .Name "-" " " | title }}"
slug: "{{ anchorize .Name | title }}"
images: 
- "https://og.jln.co/jlns1/{{ replace .Name "-" " " | title | base64Encode | replaceRE "=+$" "" | replaceRE "\\+" "-" | replaceRE "/" "_"}}"
draft: true
---

anchorize是可以把"This is a cat"轉成"this-is-a-cat", 其實這兩段效果差不多, 問題在, 不支援中文, 因此像是"這是中文 Chinese", 其實翻成的是"這是中文-chinese", 如果再把這段放到url, unicode url並不是很好看

找了半天, 並不是很好用, 本來想寫個wrapper script來做新建文章好了, 然後自動加入比較好看的slug, 但轉念一想, 何不直接改Hugo?何不直接改Hugo?何不直接改Hugo? (回音持續)

支援中文的slug轉換的go module

推薦這個github.com/gosimple/slug, 這不只支援中文, 很多語言都有!用法也很簡單:

package main

import (
	"fmt"
	"github.com/gosimple/slug"
)

func main() {
	someText := slug.Make("影師")
	fmt.Println(someText) // Will print: "ying-shi"
}

(以上範例來自它github)它很貼心的幫你把中文字轉成拼音了, 這用來做url感覺還蠻適合的呀!

修改Hugo

那我要加在Hugo哪裡呢?前面說到anchorize, 那其實仿效它做一個slugify不就好了, 那也容易知道要改哪, 抄anchorize就對了

要改的只有兩個檔:

  • tpl/urls/init.go
  • tpl/urls/urls.go

tpl/urls/urls.go加入:

func (ns *Namespace) Slugify(s any) (string, error) {
	ss, err := cast.ToStringE(s)
	if err != nil {
		return "", nil
	}
	return slug.Make(ss), nil
}

然後在tpl/urls/init.goinit()內加入:

ns.AddMethodMapping(ctx.Slugify,
    []string{"slugify"},
    [][2]string{
        {`{{ "This is a title" | slugify }}`, `this-is-a-title`},
    },
)

重新建置安裝好後, 就可以把Archetypes改成:

---
date: {{ .Date }}
title: "{{ replace .Name "-" " " | title }}"
slug: "{{ slugify .Name | title }}"
images: 
- "https://og.jln.co/jlns1/{{ replace .Name "-" " " | title | base64Encode | replaceRE "=+$" "" | replaceRE "\\+" "-" | replaceRE "/" "_"}}"
draft: true
---

搞定! 結果寫這些code五分鐘, 但卻花了五十分鐘找方法呀, 果然Open source還是自己改來用最快

本來沒預計寫這篇的, 不過後來想想, 本來想寫的篇幅太大, 先寫這篇幫後面內容暖身, 後續相關內容會再更新到下面連結:

  1. 使用Dapr Input/Output Binding連接Kafka

這篇並不是要寫怎用go實做Dapr的pubsub, 不完全是, 實做pubsub部分請參考官方文件, 基本的Dapr的publisher跟subscriber是用所謂CloudEvent的格式在傳遞, 用CloudEvent的好處是, 由於CloudEvent會幫忙夾帶一些metadata, 因此也就可以實現分散式追蹤(Tracing)的功能, 但缺點就是無法支援一些原本寫好的legacy publisher或subscriber, 所幸Dapr的pubsub還是支援raw payload可以讓你自組你的訊息格式

在開始之前, 為了測試實做, 我這邊採用了Kafka, 但由於Dapr把實做封裝得不錯, 所以其實也不一定要用Kafka, 不過支援了Kraft之後的Kafka, 由於可以去掉對zoo keeper的依賴, 所以算蠻簡單裝的

安裝Kafka

使用docker跑Kafka, 應該是最簡單的方式, 只要執行

docker run -it --name kafka-zkless -p 9092:9092 -e LOG_DIR=/tmp/logs quay.io/strimzi/kafka:latest-kafka-2.8.1-amd64 /bin/sh -c 'export CLUSTER_ID=$(bin/kafka-storage.sh random-uuid) && bin/kafka-storage.sh format -t $CLUSTER_ID -c config/kraft/server.properties && bin/kafka-server-start.sh config/kraft/server.properties'

這樣Kafka就可以順利活起來了, 完全不需要跑zoo keeper…喔耶…

建議也可以順便跑一下Kafka map, 這樣待會可以直接發event來測試

新增Kafka component

有了Kafka後, 我們需要在Dapr新增這個component 才可以讓Dapr應用程式使用, 在~/.dape/components底下加一個檔案(可叫做kafka.yaml),內容是:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: kafka-pubsub
spec:
  type: pubsub.kafka
  version: v1
  metadata:
  - name: brokers # Required. Kafka broker connection setting
    value: "localhost:9092"
  - name: consumerGroup # Optional. Used for input bindings.
    value: "group1"
  - name: authType # Required.
    value: "none"
  - name: disableTls # Optional. Disable TLS. This is not safe for production!! You should read the `Mutual TLS` section for how to use TLS.
    value: "true"

內容就不多做解釋了, 官方文件會有更清楚的說明, 這邊先說明, 這新增上去後, 我們會多一個pubsub component叫做kafka-pubsub, 這名字寫程式會用到囉

寫個subscriber來接收事件(event)吧

官方文件其實有寫如何寫一個接收raw payload的subscriber, 但不像其他文件一樣有多種語言範例, 只有Python, PHP兩種

但其實, 如上圖, Dapr用sidecar的作法, 簡化了寫pubsub的複雜度, 而且減低了對語言的依賴, 也不像是Istio是從系統的角度設計, 算是有點有趣的作法, 你不用理解pubsub, 也不用特別知道你是用Kafka, NATS, 或者是RabbitMQ, 寫法都一樣, 不囉嗦, 直接看code

package main

import (
	"encoding/base64"
	"encoding/json"
	"log"

	"github.com/gin-gonic/gin"
)

type Subscription struct {
	PubsubName string            `json:"pubsubname"`
	Topic      string            `json:"topic"`
	Route      string            `json:"route,omitempty"`
	Metadata   map[string]string `json:"metadata,omitempty"`
}

type Event struct {
	Topic string `json:"topic"`
	Data  string `json:"data_base64"`
}

var sub = &Subscription{
	PubsubName: "kafka-pubsub",
	Topic:      "myevent",
	Route:      "create",
	Metadata: map[string]string{
		"rawPayload": "true",
	},
}

func main() {
	r := gin.Default()
	r.GET("/dapr/subscribe", func(ctx *gin.Context) {
		ctx.JSON(200, []*Subscription{sub})
	})

	r.POST("/create", func(ctx *gin.Context) {
		var event Event
		err := json.NewDecoder(ctx.Request.Body).Decode(&event)

		if err == nil {
			decoded, _ := base64.RawStdEncoding.DecodeString(event.Data)
			log.Println(string(decoded))
		}

		ctx.JSON(200, map[string]bool{
			"success": true,
		})
	})

	r.Run(":6002")
}

咦, 這不像是在寫subscriber呀, 倒像是一個web service, 沒錯, 實際上的subscribe的部分被封裝在Dapr內了, Dapr等於收到Event後會打給我們的程式

那他怎知道要收到哪個queue哪個topic要打到哪個endpoint? 很簡單, 你只要有一個叫做/dapr/subscribe的endpoint, 在開始執行後, Dapr會自行打這endpoint了解你希望幫忙它收哪些event, 這邊我們希望收的 PubsubName (這邊是我們剛剛加的kafka-pubsub), 另外我們希望收myevent這個topic, 然後我們會希望收到event後打/create這個endpoint, 這有個好處, 你換成另一個完全不一樣的方案, 比如說Redis, 是不需要重新改code的

那我們在/create會收到甚麼呢?基本上就是包裝成CloudEvent的資料結構, 不對, 我們不是要收raw payload嗎?別急, 它只是收到後幫你包裝, 你的raw payload是被base64編碼好好地放在欄位data_base64

這邊我特別沒用任何Dapr SDK, 然後也用gin來寫(Dapr的sdk裡用的是Gorilla),主要是為了展示, 這簡單到不用SDK呀(其實sdk也還沒支援raw payload subscriber相關的呀 XD)

執行

dapr run --app-id subs --app-port 6002 --dapr-http-port 3601 --dapr-grpc-port 60001 --log-level debug go run main.go

指令如上, 可以設定log level把debug訊息打開, 這邊有一點需要注意的, 這浪費我半天的青春, app port一定要設對, 我們程式內用6002那麼這邊的app port就要是6002, 不然Dapr不但會不知道要打事件給你, 連一開始的設定都拿不到(就是打dapr/subscribe)

測試

測試方式很簡單, 如果你剛剛有裝Kafka map, 去那個topic發送一個訊息(按Produce message), 看有沒收到一樣的訊息就可以了

這邊的與世隔絕當然不是真的與世隔絕啦! 我指的是無法連上外面的docker registry的環境, 例如docker hub或是quay.io, 開發環境指的是你要在local可以用來開發測試的standalone模式

首先, 你要先裝好docker(或podman)

如果有private registry

有幾個images會需要放到registry裡面的, 像是

  • dapr
  • 3rdparty/redis
  • 3rdparty/zipkin
  • placement
  • daprd

列表可以在 這邊 找到 (這邊的是放在github上的)

接著安裝dapr cli, 如果可以直接連上internet, 那就用官方文件的做法:

wget -q https://raw.githubusercontent.com/dapr/cli/master/install/install.sh -O - | DAPR_INSTALL_DIR="$HOME/dapr" /bin/bash

但如果不行呢? 那就想辦法到github上下載cli回來安裝

因為要從private history來安裝dapr, 所以在dapr init要多下一個參數, 像是:

dapr init --image-registry MY_REGISTRY_URL

這樣就會從你private registry抓回來裝了

但如果連private registry都沒辦法放上相關的image呢?

不依靠docker registry安裝

Dapr很貼心呀! 還有install bundle, 可以想辦法先去前一個連結裝bundle回來

這個bundle裡面已經有個dapr cli了, 所以解開後, 把dapr複製到你要的目錄, 例如:

sudo cp ./dapr /usr/local/bin

現在我們就有cli可以用了, 但images怎辦? bundle裡面其實就包含有相關的image的tar檔了, 所以把init方式改成:

dapr init --from-dir . -s

這樣就會用local image安裝了, 這邊多加了個-s表示是slim mode, 沒有redis, 沒zipkin的, 因為local images沒有放, 但如果自己需要(比如說要寫state store需要redis), 那就要自己去加component

以redis當state store為例: 假設我們需要一個redis來當state store, 且這redis我們預先跑在本機, port為為6379, 此store名稱為mystore, 我們可以在~/.dapr/components/這目錄加上一個mystore.yaml,

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: mystore
spec:
  type: state.redis
  version: v1
  metadata:
  - name: redisHost
    value: localhost:6379
  - name: redisPassword
    value: ""
  - name: actorStateStore
    value: "true"

這樣我們就有一個名叫mystore的state store了, 不只state store, 其他也可以如法炮製

那如果想用podman取代docker呢?

dapr init --from-dir . -s --container-runtime podman

剛好想說要用Varish來做一下Minio(S3)的cache, 研究了一下順便做個紀錄

先在Ubuntu上裝來試試, 可以用apt 來安裝:

apt install varnish

在Ubuntu 20.04上(WSL用的版本)是6.2.1的版本, 最新版應該是7.2, 不過沒差, 做法都一樣

Varnish預設的設定在/etc/varnish/default.vcl, 打開這檔案你就可以看到像這樣的內容:

vcl 4.0;

# Default backend definition. Set this to point to your content server.
backend default {
    .host = "127.0.0.1";
}
sub vcl_recv {
    # Happens before we check if we have this in cache already.
    #
    # Typically you clean up the request here, removing cookies you don't need,
    # rewriting the request, etc.
}

sub vcl_backend_response {
    # Happens after we have read the response headers from the backend.
    #
    # Here you clean the response headers, removing silly Set-Cookie headers
    # and other mistakes your backend does.
}

sub vcl_deliver {
    # Happens when we have all the pieces we need, and are about to send the
    # response to the client.
    #
    # You can do accounting or modifying the final object here.
}

Varnish的設定檔用的是一種叫做vcl的語言, 它會被Varnish先compile過後才會被使用, 所以改好這檔案後, 如果你跑 sudo system start varnish (這是WSL2上用的, 其他地方可能就是systemctl), 如果你寫錯了, 一開始跑就可以發現出錯了

以上面那個例子來說, 它會預設快取你local上的web server

但如果是要連接Minio (S3)是不夠的, 因為如果單純把backend設成 Minio server, 那client還是會需要access key和secret key才可以存取, 如果你希望讓它跟存取靜態網站一樣, 那你可以能會希望把這兩個設定放在後端

Vanish出場是沒支援可以call S3 API的, 這時候就要透過一個VMOD - AWSRest, 這VMOD是可以在你去backend (Minio/S3) 拿資料前先幫你用你的access key, secret key算好簽章(signature), 所以我們要先安裝這個VMOD

安裝VMOD你會先需要libvarnishapi-dev, 可以用apt install libvarnishapi-dev來安裝, 另外AWSRest還會需要mhash, 你還會需要安裝apt-get install libmhash-dev

裝好後, 從 https://github.com/xcir/libvmod-awsrest 抓取最新的source code, 進入目錄後執行

./autogen.sh
./configure
make
sudo make install

沒意外的話就可以完成安裝, 要確認是不是已經安裝好了, 我們可以在default.vcl加上

vcl 4.0;
import awsrest;

# ....

重啟varnish有成功, 表示應該是沒啥問題才對

我在我本地端電腦跑了個Minio, port為9000, 有一個bucket叫做mmmbux, 裡面有個檔案, key為20220101/a.c, access key為TGhYs2FYBGMYueAz, secrect key為IM2SgF7LxIlZVbeo3Vv7OdQzA7pnZFB1, Varnish則是跑在port 6081上

首先我們來看看怎讓client/browser在不用提供access key/secret key的狀況下可以存取物件

vcl 4.0;
import awsrest;

backend default {
    .host = "127.0.0.1";
    .port = "9000";
}

sub vcl_recv {
    set req.http.host = "127.0.0.1";
    awsrest.v4_generic(
        service           = "s3",
        region            = "ap-northeast-1",
        access_key        = "TGhYs2FYBGMYueAz",
        secret_key        = "IM2SgF7LxIlZVbeo3Vv7OdQzA7pnZFB1",
        signed_headers    = "host;",
        canonical_headers = "host:" + req.http.host + awsrest.lf()
    );
}

Ok, 其實就很簡單的在vcl_recv上加上那幾行就好, 這時候你就可以用 http://localhost:6081/mmmbux/20220101/a.c 來存取 mmmbux 這bucket上 2022/0101/a.c 這個檔案了

那, 如果我不想把bucket name當作url的一部分呢?

sub vcl_recv {
    set req.http.host = "127.0.0.1";
    set req.url = "mmmbux/" + req.url
    awsrest.v4_generic(
        service           = "s3",
        region            = "ap-northeast-1",
        access_key        = "TGhYs2FYBGMYueAz",
        secret_key        = "IM2SgF7LxIlZVbeo3Vv7OdQzA7pnZFB1",
        signed_headers    = "host;",
        canonical_headers = "host:" + req.http.host + awsrest.lf()
    );
}

上面這段就是把你進來的url加上mmmbux/當新的url, 這樣做的話, 你的新url就會是 http://localhost:6081/20220101/a.c

那如果我想進一步, 把它變成 http://localhost:6081/files/20220101/a.c 呢?

sub vcl_recv {
    set req.http.host = "127.0.0.1";
    
    if (req.url ~ "^/files/") {
        set req.url = regsub(req.url, "^/files/", "/mmmbux/");
        awsrest.v4_generic(
          service           = "s3",
          region            = "ap-northeast-1",
          access_key        = "TGhYs2FYBGMYueAz",
          secret_key        = "IM2SgF7LxIlZVbeo3Vv7OdQzA7pnZFB1",
          signed_headers    = "host;",
          canonical_headers = "host:" + req.http.host + awsrest.lf()
        );
    } else {
        return(synth(404));
    }
}

上面這段就是把/files/後面的都到 mmmbux這bucket去抓, 然後其他目錄都回傳 404 Not found

那如果我想要用docker跑也要有這VMod呢? 我把這Dockerfile的範例放在https://github.com/julianshen/varnish-awsrest-docker, Varnish 官方的docker image有提供 install-vmod 這script讓你安裝vmod, 所以只需要給它awsrest的tarball: https://github.com/xcir/libvmod-awsrest/archive/refs/tags/v70.12.tar.gz 即可, 我做了一個現成的image放在quay.io/jlnshen/varnish-awsrest