架設Kafka一個稍稍討厭的點就是先架好Zookeeper, 在Kafka 2.8 (2021/4發布)之後, 支援了以自家的KRaft實現的Quorum controller, 這就可以不用再依賴zookeeper了, Confluent這篇文章有簡單的介紹一下Quorum controller是怎運作的

在我前面這篇有提到, 如何用docker跑無zookeeper的Kafka:

docker run -it --name kafka-zkless -p 9092:9092 -e LOG_DIR=/tmp/logs quay.io/strimzi/kafka:latest-kafka-2.8.1-amd64 /bin/sh -c 'export CLUSTER_ID=$(bin/kafka-storage.sh random-uuid) && bin/kafka-storage.sh format -t $CLUSTER_ID -c config/kraft/server.properties && bin/kafka-server-start.sh config/kraft/server.properties'

那如果要架設在K8S上, 可以怎麼做呢? 原本的Kafka需要依賴zookeeper, 加上Kafka的eco system其實蠻多東西的, 一般也不會光只用Kafka本身而已, Kafka Bridge, Kafka connect, schema registry, 進階一點就Kafka stream, KSQL, 規模大一點還需要用上Cruise control, Mirror Maker, 所以用Operator來架設可能會比單純寫manifest, helm chart來的好用, 而比較常見(有名的?)Kafka Operator大致上有這三個(就我知道的啦):

  1. Confluent Operator, 由Confluent這家公司發布的, 由於Confluent這家公司的背景(https://docs.confluent.io/5.5.1/installation/operator/index.html), Kafka雖是Open source但就是他們家的產品, 所以這個也算是官方出品的Operator, 但這個功能上比較起來稍弱, 而且並沒啥更新, 當然也就還沒看到KRaft相關的支援
  2. KOperator, 萬歲雲(Bonzai Cloud)出品, 由於Bonzai Cloud目前是Cisco的, 所以這個也可以算大公司出品(?), 我自己是還沒用過, 但看架構, 預設就會架起Cruise control跟Prometheus, 感覺架構上考量是比較完整的, 另外就是也考量到部屬到Istio mesh的部分, 用Envoy來做external LB, 以及用等等, 另外一個值得一提的是Kafka這種Stateful application, 它卻並不是採用Statefulset來部署(它的文件有提到All Kafka on Kubernetes operators use StatefulSet to create a Kafka Cluster, 但事實是後來Strimzi也採用一樣的策略了), 但一樣的, 也還沒有支援KRaft
  3. Strimzi Operator, 這應該算蠻廣泛被利用的一個Operator, 支援豐富, 更新迅速, 也是可以支援Cruise control (不一定要開), 基本該支援的, 應該也都差不多了, 而且從0.29就支援了KRaft, 不過這個Operator基本消費的記憶體就需要到300MB了

總和以上, 看起來如果要在K8S上玩KRaft的話, Strimzi是一個比較適合的選擇

安裝Strimzi operator

用以下指令安裝:

kubectl create namespace kafka
kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka

這除了會建立strimzi-cluster-operator這個Deployment, 也會建立相關的ClusterRoles, ClusterRoleBindings, 和相關的CRD, 所以要先確定你有權限建立這些(尤其是Cluster level的), 其實相當簡單

另外, 用以下的manifest就會幫你建立好一個Kafka Cluster

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    version: 3.3.1
    replicas: 3
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      default.replication.factor: 3
      min.insync.replicas: 2
      inter.broker.protocol.version: "3.3"
    storage:
      type: ephemeral
  zookeeper:
    replicas: 3
    storage:
      type: ephemeral

這會建立一個replica數量為3的Kafka cluster,以及對應的Zookeeper, 這邊的Storage type為ephemeral, 這表示它會用emptyDir當Volume, 如果你有相對應的PVC, 也可以把這替換掉

這邊還是會幫你建立出zookeeper, 那如何能擺脫zookeeper呢?

打開實驗性功能

截至這邊文章寫的時間的版本(0.32), KRaft還是一個實驗性功能, 要以環境變數打開, 如下:

kubectl set env deployments/strimzi-cluster-operator STRIMZI_FEATURE_GATES=+UseKRaft -n kafka

strimzi是靠STRIMZI_FEATURE_GATES來當作feature toggle, 在0.32只有一個實驗性功能的開關, 那就是UseKRaft, 上面那行指令就可以把這功能打開

用上面一模一樣的Manfest(Zookeeper那段要留著, 雖然沒用, 但在這版本還是必須), 就可以開出一個不依賴zookeeper的kafka cluster了, 以下是整個操作過程:

asciicast

這邊你可能會發現, 我是用一個strimzipotset的資源來確認是否Kafka有沒正確被開成功, 你如果再去看Replica set, Stateful set, 你會發現找不到Kafka相關的, Strimzi其實就是靠自己的controller來管理Kafka的pods

你也可以用 kubectl get kafkas -n kafka來確認kafka這namespace下的kafka cluster的狀況

這個Manifest其實我拿掉了EntityOperator的部分, 是因為KRaft功能目前還沒支援TopicOperator, 沒拿掉會報錯

是該拋棄Zookeeper了嗎?

KRaft相對很新, 以三大有名的Kafka Operator來說, 目前也只有Strimzi有支援, 而且才剛開始, 實務上來說, 以功能, 穩定性或許應該還不是時候在production廣泛使用, 真要用, 還是多測試一下再說吧, 短時間還是跟zookeeper做做好朋友

做了些實驗, 紀錄一下, 剛好寫這篇時巴西也被幹掉了, 也記錄一下 XD

這個問題其實要滿足以下條件才可能發生:

  • MongoDB版本在 4.4.14, 5.3.0, 5.0.7, 4.2.20 之前, 這算是一個Mongodb在2022一月才fix的一個bug, 所以比這幾版舊是有可能的
  • MongoDB instance在K8S上有設memory limit (且這limit要小於host node memory的一半?)
  • K8S所在的Host OS 的cgroup版本為V2, 可以參考這文件, Ubuntu 21.10, Fedora 31之後都開啟V2了, 不過如果你用的是WSL2, 由於WSL2的Kenel還是V1, 是試不出這問題的 (我是找了台Fedora來試)

問題是甚麼?

查這問題的起因當然是碰到Mongodb被OOM Kill, 後來發現好像這也算蠻常踩到的坑, 只是好像沒人寫出完整可能性

碰到被OOM Kill第一個會思考的是, 他為何要那麼多記憶體?要給他多少才夠? 另外一個是, 由於是發生在container, 跑在K8S上, 一個疑問是, 那MongoDB是否會遵守設給他的resource limit? 還是他會當node所有記憶體都是他可用的?

有沒有哪裡可疑的?

有哪些東西會吃記憶體? 連線會, index會, 但其實其中一個比較可疑的是給Wired Tiger的cache, 根據這份文件, Wired Tiger的cache會用掉

  • 50% 的(總記憶體 - 1GB) 或是
  • 256MB

也就是至少256MB, 然後如果你有64G記憶體, 他就會用掉最多 (64-1)/2, 到這聽起來好像沒啥問題, 只用一半還不至於有撐爆的問題, 會不會是其他的地方?

但另一個問題是, 直接裝在單機沒問題, 如果是跑在K8S上的容器, Memory limit我們是給在K8S上, MongoDB到底會以memory limit當總記憶體大小還是以整個node全部可用的記憶體計算?

其實根據文件的補充說明, 它是有考慮到的, 它會以hostInfo.system.memLimitMB 來計算

In some instances, such as when running in a container, the database can have memory constraints that are lower than the total system memory. In such instances, this memory limit, rather than the total system memory, is used as the maximum RAM available.

而它這資訊是透過cgroup去抓的, K8S也是用cgroup做資源管理的, 所以這值會等於你設定的limit

我第一次在WSL下測試, 把memory limit設為2Gi, hostInfo.system.memLimitMB 也的確是這個值(用mongo client下db.hostInfo()即可查詢)

那看來應該沒問題呀, 問題在哪?

後來查到一個bug : https://jira.mongodb.org/browse/SERVER-60412, 原來cgroup v1, v2抓這些資訊的位置是不同的, 所以導致舊版的會有抓不正確的狀況

看到這就來做個實驗, 找了台有開啟v2的fedora (with podman), 跑了k3s, 在這k3s上分別跑了4.4.13, 4.4.15兩個版本去做測試, memory limit都設為2Gi, 用db.hostInfo()查詢memLimitMB得到下面結果:

4.4.13

4.4.15

Bingo! 4.4.13果然抓到的memLimitMB是整個node的記憶體大小而非limit, 這樣如果node的記憶體大小遠大於limit, Wired Tiger cache是有可能用超過limit的

當然, 這只是其中一種可能性, 不見得一定都是這情形, 但碰到這類狀況, 這的確是可以考慮查的一個方向

在Dapr元件有一種叫做Binding的元件(component)讓你的app跟外部系統做一個連結的, 這元件可分為兩類:

  1. Input BIndings: 用來接受外部事件的觸發,像是Webhook, 從Queue來的events, 甚至是人家新發的Tweets, 應該都可以歸為這一類
  2. Ouput Bindings: 呼叫外部系統的動作,命名為Outpiut其實會讓人誤以為是資料的輸出,但其實,他不只可以用在資料輸出,呼叫外部系統的動作都可以包含在內, 舉個例子, GraphQL 的Output binding定義了兩個操作(Operations), 一個是QueryOperation, 一個是MutationOperation, 熟悉GraphQL的應該知道,MutationOperation一般才是應用在資料操作,而Query感覺就跟輸出比較無關了

一開始我也有點搞不清楚這個模式目的在做啥的, 要接受事件觸發,我們有pub sub了,而state store本身就用在資料輸出, 感覺的確有點重複,但由上述兩點來看,其實Binding定義的範圍廣泛多了,它並不特定限制在Queue或是資料庫

但有個東西同時支援了Pub sub, input binding, output binding, 一開始我是看Kafka這應用,才讓我覺得有點錯亂,前一篇有講過了怎實作Subscriber, 這邊來比較一下, 利用Input binding的話,會有什麼不一樣?

建立Binding元件

要用Kafka來觸發我們服務(如上圖),跟寫Subscriber一樣,我們需要在~/.dapr/components裡先建立好元件, 假設我們新增一個kafka-binding.yaml, 內容如下:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: kafka-binding
spec:
  type: bindings.kafka
  version: v1
  metadata:
  - name: brokers
    value: localhost:9092
  - name: topics
    value: mytopic
  - name: consumerGroup
    value: group1
  - name: publishTopic
    value: mytopic
  - name: authRequired
    value: "false"

上面這個其實已經是定義好input和output binding了,topics定義的是input binding要聽取事件的topic, 而publishTopic定義的則是Output binding要輸出資料的目標

實作Input binding

跟實做subscriber差不多, Input binding也是實作一個webhook讓Dapr打進來而已, 這邊假設Kafka會收到的資料會是一個數字

package main

import (
    "log"
    "net/http"
    "github.com/gin-gonic/gin"
)

func dataBinding(ctx *gin.Context) {
    var data int
    if err := ctx.Bind(&data); err != nil {
        ctx.AbortWithStatus(500)
        return
    }
  
    log.Println(data)
    ctx.Status(200)
}

func main() {
    r := gin.Default()
    r.POST("/kafka-binding", dataBinding)
    r.OPTIONS("/kafka-binding", func(ctx *gin.Context) {
        ctx.Status(200)
    })

    http.ListenAndServe(":6003", r)
}

這邊幾個重點:

  1. endpoint path跟你的binding名稱一樣, 當然這可以在元件設定那邊改
  2. OPTIONS有點像是讓Dapr確認你有沒支援這個binding的health endpoint, 在程式一開始跑就會被call, 這邊只要回OK, 其實都好
  3. 跟pub sub不一樣的是, 這邊會收到的格式不一定會是cloudevent, 除非publisher那邊過來的就是cloudevent, 因此, tracing應該是追蹤不到才是

實做Output binding

那如何實作跟剛剛的Input binding匹配的Output binding呢? 範例如下:

package main

import (
    "context"
    "log"
    "math/rand"
    "strconv"
    "time"
    
    dapr "github.com/dapr/go-sdk/client"
)

func main() {
    BINDING_NAME := "kafka-binding"
    BINDING_OPERATION := "create"

    for i := 0; i < 10; i++ {
        time.Sleep(5000)
        rand.Seed(time.Now().UnixMicro())
        dataId := rand.Intn(1000-1) + 1
        client, err := dapr.NewClient()
        if err != nil {
            panic(err)
        }

        defer client.Close()
        ctx := context.Background()

        in := &dapr.InvokeBindingRequest{Name: BINDING_NAME, Operation: BINDING_OPERATION, Data: []byte(strconv.Itoa(dataId))}

        client.InvokeOutputBinding(ctx, in)
        log.Println("Sending message: " + strconv.Itoa(dataId))
    }
}

這邊重點在於:

in := &dapr.InvokeBindingRequest{Name: BINDING_NAME, Operation: BINDING_OPERATION, Data: []byte(strconv.Itoa(dataId))}

client.InvokeOutputBinding(ctx, in)

雖說是"Output" binding, 但這邊用的名字是"Invoke", 跟Output沒啥相關, Operation則是元件訂的, Kafka binding只定義一個"create", 就是讓你送訊息用的, Data則是要傳送的資料, 以Byte array表示

用subscriber接收output binding來的事件

這邊就不用多作解釋了, 從前面不難發現, 它接的就是raw payload, 這部分可以參考 前一篇

那啥時該用哪一種呢? 以Kafka這範例來說, 我是認為如果publisher跟subscriber都是自己實做的話, 應該是要選用pub sub, 用cloud events的話, 可以享受到distributed tracing帶來的好處, 如果不是, 差異應該不大, 都蠻簡單實作的

在發表前一篇時, Evan 跟我說, “能不能用slug把url改好看點?”, 從開始寫blog來, 其實我也沒在意有沒人看, 所以SEO相關的也沒太在意, 但既然有人講了, 我就來弄一下吧

Prama links的設定

首先先來看看pramalinks設定的部分:

[permalinks]
    post = "/:slug/"

這邊就是用來設定你URL長相的地方, 我的原文都放在post目錄內, 所以我一開始的設定就是以slug當URL沒錯, 那怎還是有中文呢?

Archetypes, 初始文章的設定

當你用hugo new filename, 他會拿themes/[theme_name]/archetypes/default.md當範本來建立初始文章, 像我原本的設定是:

---
date: {{ .Date }}
title: "{{ replace .Name "-" " " | title }}"
images: 
- "https://og.jln.co/jlns1/{{ replace .Name "-" " " | title | base64Encode | replaceRE "=+$" "" | replaceRE "\\+" "-" | replaceRE "/" "_"}}"
draft: true
---

slug是沒設定的, 不過它似乎應該會用title去算slug, 所以在Paramlinks那邊沒啥影響, 但其實你也可以加一行變成:

---
date: {{ .Date }}
title: "{{ replace .Name "-" " " | title }}"
slug: "{{ anchorize .Name | title }}"
images: 
- "https://og.jln.co/jlns1/{{ replace .Name "-" " " | title | base64Encode | replaceRE "=+$" "" | replaceRE "\\+" "-" | replaceRE "/" "_"}}"
draft: true
---

anchorize是可以把"This is a cat"轉成"this-is-a-cat", 其實這兩段效果差不多, 問題在, 不支援中文, 因此像是"這是中文 Chinese", 其實翻成的是"這是中文-chinese", 如果再把這段放到url, unicode url並不是很好看

找了半天, 並不是很好用, 本來想寫個wrapper script來做新建文章好了, 然後自動加入比較好看的slug, 但轉念一想, 何不直接改Hugo?何不直接改Hugo?何不直接改Hugo? (回音持續)

支援中文的slug轉換的go module

推薦這個github.com/gosimple/slug, 這不只支援中文, 很多語言都有!用法也很簡單:

package main

import (
	"fmt"
	"github.com/gosimple/slug"
)

func main() {
	someText := slug.Make("影師")
	fmt.Println(someText) // Will print: "ying-shi"
}

(以上範例來自它github)它很貼心的幫你把中文字轉成拼音了, 這用來做url感覺還蠻適合的呀!

修改Hugo

那我要加在Hugo哪裡呢?前面說到anchorize, 那其實仿效它做一個slugify不就好了, 那也容易知道要改哪, 抄anchorize就對了

要改的只有兩個檔:

  • tpl/urls/init.go
  • tpl/urls/urls.go

tpl/urls/urls.go加入:

func (ns *Namespace) Slugify(s any) (string, error) {
	ss, err := cast.ToStringE(s)
	if err != nil {
		return "", nil
	}
	return slug.Make(ss), nil
}

然後在tpl/urls/init.goinit()內加入:

ns.AddMethodMapping(ctx.Slugify,
    []string{"slugify"},
    [][2]string{
        {`{{ "This is a title" | slugify }}`, `this-is-a-title`},
    },
)

重新建置安裝好後, 就可以把Archetypes改成:

---
date: {{ .Date }}
title: "{{ replace .Name "-" " " | title }}"
slug: "{{ slugify .Name | title }}"
images: 
- "https://og.jln.co/jlns1/{{ replace .Name "-" " " | title | base64Encode | replaceRE "=+$" "" | replaceRE "\\+" "-" | replaceRE "/" "_"}}"
draft: true
---

搞定! 結果寫這些code五分鐘, 但卻花了五十分鐘找方法呀, 果然Open source還是自己改來用最快

本來沒預計寫這篇的, 不過後來想想, 本來想寫的篇幅太大, 先寫這篇幫後面內容暖身, 後續相關內容會再更新到下面連結:

  1. 使用Dapr Input/Output Binding連接Kafka

這篇並不是要寫怎用go實做Dapr的pubsub, 不完全是, 實做pubsub部分請參考官方文件, 基本的Dapr的publisher跟subscriber是用所謂CloudEvent的格式在傳遞, 用CloudEvent的好處是, 由於CloudEvent會幫忙夾帶一些metadata, 因此也就可以實現分散式追蹤(Tracing)的功能, 但缺點就是無法支援一些原本寫好的legacy publisher或subscriber, 所幸Dapr的pubsub還是支援raw payload可以讓你自組你的訊息格式

在開始之前, 為了測試實做, 我這邊採用了Kafka, 但由於Dapr把實做封裝得不錯, 所以其實也不一定要用Kafka, 不過支援了Kraft之後的Kafka, 由於可以去掉對zoo keeper的依賴, 所以算蠻簡單裝的

安裝Kafka

使用docker跑Kafka, 應該是最簡單的方式, 只要執行

docker run -it --name kafka-zkless -p 9092:9092 -e LOG_DIR=/tmp/logs quay.io/strimzi/kafka:latest-kafka-2.8.1-amd64 /bin/sh -c 'export CLUSTER_ID=$(bin/kafka-storage.sh random-uuid) && bin/kafka-storage.sh format -t $CLUSTER_ID -c config/kraft/server.properties && bin/kafka-server-start.sh config/kraft/server.properties'

這樣Kafka就可以順利活起來了, 完全不需要跑zoo keeper…喔耶…

建議也可以順便跑一下Kafka map, 這樣待會可以直接發event來測試

新增Kafka component

有了Kafka後, 我們需要在Dapr新增這個component 才可以讓Dapr應用程式使用, 在~/.dape/components底下加一個檔案(可叫做kafka.yaml),內容是:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: kafka-pubsub
spec:
  type: pubsub.kafka
  version: v1
  metadata:
  - name: brokers # Required. Kafka broker connection setting
    value: "localhost:9092"
  - name: consumerGroup # Optional. Used for input bindings.
    value: "group1"
  - name: authType # Required.
    value: "none"
  - name: disableTls # Optional. Disable TLS. This is not safe for production!! You should read the `Mutual TLS` section for how to use TLS.
    value: "true"

內容就不多做解釋了, 官方文件會有更清楚的說明, 這邊先說明, 這新增上去後, 我們會多一個pubsub component叫做kafka-pubsub, 這名字寫程式會用到囉

寫個subscriber來接收事件(event)吧

官方文件其實有寫如何寫一個接收raw payload的subscriber, 但不像其他文件一樣有多種語言範例, 只有Python, PHP兩種

但其實, 如上圖, Dapr用sidecar的作法, 簡化了寫pubsub的複雜度, 而且減低了對語言的依賴, 也不像是Istio是從系統的角度設計, 算是有點有趣的作法, 你不用理解pubsub, 也不用特別知道你是用Kafka, NATS, 或者是RabbitMQ, 寫法都一樣, 不囉嗦, 直接看code

package main

import (
	"encoding/base64"
	"encoding/json"
	"log"

	"github.com/gin-gonic/gin"
)

type Subscription struct {
	PubsubName string            `json:"pubsubname"`
	Topic      string            `json:"topic"`
	Route      string            `json:"route,omitempty"`
	Metadata   map[string]string `json:"metadata,omitempty"`
}

type Event struct {
	Topic string `json:"topic"`
	Data  string `json:"data_base64"`
}

var sub = &Subscription{
	PubsubName: "kafka-pubsub",
	Topic:      "myevent",
	Route:      "create",
	Metadata: map[string]string{
		"rawPayload": "true",
	},
}

func main() {
	r := gin.Default()
	r.GET("/dapr/subscribe", func(ctx *gin.Context) {
		ctx.JSON(200, []*Subscription{sub})
	})

	r.POST("/create", func(ctx *gin.Context) {
		var event Event
		err := json.NewDecoder(ctx.Request.Body).Decode(&event)

		if err == nil {
			decoded, _ := base64.RawStdEncoding.DecodeString(event.Data)
			log.Println(string(decoded))
		}

		ctx.JSON(200, map[string]bool{
			"success": true,
		})
	})

	r.Run(":6002")
}

咦, 這不像是在寫subscriber呀, 倒像是一個web service, 沒錯, 實際上的subscribe的部分被封裝在Dapr內了, Dapr等於收到Event後會打給我們的程式

那他怎知道要收到哪個queue哪個topic要打到哪個endpoint? 很簡單, 你只要有一個叫做/dapr/subscribe的endpoint, 在開始執行後, Dapr會自行打這endpoint了解你希望幫忙它收哪些event, 這邊我們希望收的 PubsubName (這邊是我們剛剛加的kafka-pubsub), 另外我們希望收myevent這個topic, 然後我們會希望收到event後打/create這個endpoint, 這有個好處, 你換成另一個完全不一樣的方案, 比如說Redis, 是不需要重新改code的

那我們在/create會收到甚麼呢?基本上就是包裝成CloudEvent的資料結構, 不對, 我們不是要收raw payload嗎?別急, 它只是收到後幫你包裝, 你的raw payload是被base64編碼好好地放在欄位data_base64

這邊我特別沒用任何Dapr SDK, 然後也用gin來寫(Dapr的sdk裡用的是Gorilla),主要是為了展示, 這簡單到不用SDK呀(其實sdk也還沒支援raw payload subscriber相關的呀 XD)

執行

dapr run --app-id subs --app-port 6002 --dapr-http-port 3601 --dapr-grpc-port 60001 --log-level debug go run main.go

指令如上, 可以設定log level把debug訊息打開, 這邊有一點需要注意的, 這浪費我半天的青春, app port一定要設對, 我們程式內用6002那麼這邊的app port就要是6002, 不然Dapr不但會不知道要打事件給你, 連一開始的設定都拿不到(就是打dapr/subscribe)

測試

測試方式很簡單, 如果你剛剛有裝Kafka map, 去那個topic發送一個訊息(按Produce message), 看有沒收到一樣的訊息就可以了