做了些實驗, 紀錄一下, 剛好寫這篇時巴西也被幹掉了, 也記錄一下 XD

這個問題其實要滿足以下條件才可能發生:

  • MongoDB版本在 4.4.14, 5.3.0, 5.0.7, 4.2.20 之前, 這算是一個Mongodb在2022一月才fix的一個bug, 所以比這幾版舊是有可能的
  • MongoDB instance在K8S上有設memory limit (且這limit要小於host node memory的一半?)
  • K8S所在的Host OS 的cgroup版本為V2, 可以參考這文件, Ubuntu 21.10, Fedora 31之後都開啟V2了, 不過如果你用的是WSL2, 由於WSL2的Kenel還是V1, 是試不出這問題的 (我是找了台Fedora來試)

問題是甚麼?

查這問題的起因當然是碰到Mongodb被OOM Kill, 後來發現好像這也算蠻常踩到的坑, 只是好像沒人寫出完整可能性

碰到被OOM Kill第一個會思考的是, 他為何要那麼多記憶體?要給他多少才夠? 另外一個是, 由於是發生在container, 跑在K8S上, 一個疑問是, 那MongoDB是否會遵守設給他的resource limit? 還是他會當node所有記憶體都是他可用的?

有沒有哪裡可疑的?

有哪些東西會吃記憶體? 連線會, index會, 但其實其中一個比較可疑的是給Wired Tiger的cache, 根據這份文件, Wired Tiger的cache會用掉

  • 50% 的(總記憶體 - 1GB) 或是
  • 256MB

也就是至少256MB, 然後如果你有64G記憶體, 他就會用掉最多 (64-1)/2, 到這聽起來好像沒啥問題, 只用一半還不至於有撐爆的問題, 會不會是其他的地方?

但另一個問題是, 直接裝在單機沒問題, 如果是跑在K8S上的容器, Memory limit我們是給在K8S上, MongoDB到底會以memory limit當總記憶體大小還是以整個node全部可用的記憶體計算?

其實根據文件的補充說明, 它是有考慮到的, 它會以hostInfo.system.memLimitMB 來計算

In some instances, such as when running in a container, the database can have memory constraints that are lower than the total system memory. In such instances, this memory limit, rather than the total system memory, is used as the maximum RAM available.

而它這資訊是透過cgroup去抓的, K8S也是用cgroup做資源管理的, 所以這值會等於你設定的limit

我第一次在WSL下測試, 把memory limit設為2Gi, hostInfo.system.memLimitMB 也的確是這個值(用mongo client下db.hostInfo()即可查詢)

那看來應該沒問題呀, 問題在哪?

後來查到一個bug : https://jira.mongodb.org/browse/SERVER-60412, 原來cgroup v1, v2抓這些資訊的位置是不同的, 所以導致舊版的會有抓不正確的狀況

看到這就來做個實驗, 找了台有開啟v2的fedora (with podman), 跑了k3s, 在這k3s上分別跑了4.4.13, 4.4.15兩個版本去做測試, memory limit都設為2Gi, 用db.hostInfo()查詢memLimitMB得到下面結果:

4.4.13

4.4.15

Bingo! 4.4.13果然抓到的memLimitMB是整個node的記憶體大小而非limit, 這樣如果node的記憶體大小遠大於limit, Wired Tiger cache是有可能用超過limit的

當然, 這只是其中一種可能性, 不見得一定都是這情形, 但碰到這類狀況, 這的確是可以考慮查的一個方向

在Dapr元件有一種叫做Binding的元件(component)讓你的app跟外部系統做一個連結的, 這元件可分為兩類:

  1. Input BIndings: 用來接受外部事件的觸發,像是Webhook, 從Queue來的events, 甚至是人家新發的Tweets, 應該都可以歸為這一類
  2. Ouput Bindings: 呼叫外部系統的動作,命名為Outpiut其實會讓人誤以為是資料的輸出,但其實,他不只可以用在資料輸出,呼叫外部系統的動作都可以包含在內, 舉個例子, GraphQL 的Output binding定義了兩個操作(Operations), 一個是QueryOperation, 一個是MutationOperation, 熟悉GraphQL的應該知道,MutationOperation一般才是應用在資料操作,而Query感覺就跟輸出比較無關了

一開始我也有點搞不清楚這個模式目的在做啥的, 要接受事件觸發,我們有pub sub了,而state store本身就用在資料輸出, 感覺的確有點重複,但由上述兩點來看,其實Binding定義的範圍廣泛多了,它並不特定限制在Queue或是資料庫

但有個東西同時支援了Pub sub, input binding, output binding, 一開始我是看Kafka這應用,才讓我覺得有點錯亂,前一篇有講過了怎實作Subscriber, 這邊來比較一下, 利用Input binding的話,會有什麼不一樣?

建立Binding元件

要用Kafka來觸發我們服務(如上圖),跟寫Subscriber一樣,我們需要在~/.dapr/components裡先建立好元件, 假設我們新增一個kafka-binding.yaml, 內容如下:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: kafka-binding
spec:
  type: bindings.kafka
  version: v1
  metadata:
  - name: brokers
    value: localhost:9092
  - name: topics
    value: mytopic
  - name: consumerGroup
    value: group1
  - name: publishTopic
    value: mytopic
  - name: authRequired
    value: "false"

上面這個其實已經是定義好input和output binding了,topics定義的是input binding要聽取事件的topic, 而publishTopic定義的則是Output binding要輸出資料的目標

實作Input binding

跟實做subscriber差不多, Input binding也是實作一個webhook讓Dapr打進來而已, 這邊假設Kafka會收到的資料會是一個數字

package main

import (
    "log"
    "net/http"
    "github.com/gin-gonic/gin"
)

func dataBinding(ctx *gin.Context) {
    var data int
    if err := ctx.Bind(&data); err != nil {
        ctx.AbortWithStatus(500)
        return
    }
  
    log.Println(data)
    ctx.Status(200)
}

func main() {
    r := gin.Default()
    r.POST("/kafka-binding", dataBinding)
    r.OPTIONS("/kafka-binding", func(ctx *gin.Context) {
        ctx.Status(200)
    })

    http.ListenAndServe(":6003", r)
}

這邊幾個重點:

  1. endpoint path跟你的binding名稱一樣, 當然這可以在元件設定那邊改
  2. OPTIONS有點像是讓Dapr確認你有沒支援這個binding的health endpoint, 在程式一開始跑就會被call, 這邊只要回OK, 其實都好
  3. 跟pub sub不一樣的是, 這邊會收到的格式不一定會是cloudevent, 除非publisher那邊過來的就是cloudevent, 因此, tracing應該是追蹤不到才是

實做Output binding

那如何實作跟剛剛的Input binding匹配的Output binding呢? 範例如下:

package main

import (
    "context"
    "log"
    "math/rand"
    "strconv"
    "time"
    
    dapr "github.com/dapr/go-sdk/client"
)

func main() {
    BINDING_NAME := "kafka-binding"
    BINDING_OPERATION := "create"

    for i := 0; i < 10; i++ {
        time.Sleep(5000)
        rand.Seed(time.Now().UnixMicro())
        dataId := rand.Intn(1000-1) + 1
        client, err := dapr.NewClient()
        if err != nil {
            panic(err)
        }

        defer client.Close()
        ctx := context.Background()

        in := &dapr.InvokeBindingRequest{Name: BINDING_NAME, Operation: BINDING_OPERATION, Data: []byte(strconv.Itoa(dataId))}

        client.InvokeOutputBinding(ctx, in)
        log.Println("Sending message: " + strconv.Itoa(dataId))
    }
}

這邊重點在於:

in := &dapr.InvokeBindingRequest{Name: BINDING_NAME, Operation: BINDING_OPERATION, Data: []byte(strconv.Itoa(dataId))}

client.InvokeOutputBinding(ctx, in)

雖說是"Output" binding, 但這邊用的名字是"Invoke", 跟Output沒啥相關, Operation則是元件訂的, Kafka binding只定義一個"create", 就是讓你送訊息用的, Data則是要傳送的資料, 以Byte array表示

用subscriber接收output binding來的事件

這邊就不用多作解釋了, 從前面不難發現, 它接的就是raw payload, 這部分可以參考 前一篇

那啥時該用哪一種呢? 以Kafka這範例來說, 我是認為如果publisher跟subscriber都是自己實做的話, 應該是要選用pub sub, 用cloud events的話, 可以享受到distributed tracing帶來的好處, 如果不是, 差異應該不大, 都蠻簡單實作的

在發表前一篇時, Evan 跟我說, “能不能用slug把url改好看點?”, 從開始寫blog來, 其實我也沒在意有沒人看, 所以SEO相關的也沒太在意, 但既然有人講了, 我就來弄一下吧

Prama links的設定

首先先來看看pramalinks設定的部分:

[permalinks]
    post = "/:slug/"

這邊就是用來設定你URL長相的地方, 我的原文都放在post目錄內, 所以我一開始的設定就是以slug當URL沒錯, 那怎還是有中文呢?

Archetypes, 初始文章的設定

當你用hugo new filename, 他會拿themes/[theme_name]/archetypes/default.md當範本來建立初始文章, 像我原本的設定是:

---
date: {{ .Date }}
title: "{{ replace .Name "-" " " | title }}"
images: 
- "https://og.jln.co/jlns1/{{ replace .Name "-" " " | title | base64Encode | replaceRE "=+$" "" | replaceRE "\\+" "-" | replaceRE "/" "_"}}"
draft: true
---

slug是沒設定的, 不過它似乎應該會用title去算slug, 所以在Paramlinks那邊沒啥影響, 但其實你也可以加一行變成:

---
date: {{ .Date }}
title: "{{ replace .Name "-" " " | title }}"
slug: "{{ anchorize .Name | title }}"
images: 
- "https://og.jln.co/jlns1/{{ replace .Name "-" " " | title | base64Encode | replaceRE "=+$" "" | replaceRE "\\+" "-" | replaceRE "/" "_"}}"
draft: true
---

anchorize是可以把"This is a cat"轉成"this-is-a-cat", 其實這兩段效果差不多, 問題在, 不支援中文, 因此像是"這是中文 Chinese", 其實翻成的是"這是中文-chinese", 如果再把這段放到url, unicode url並不是很好看

找了半天, 並不是很好用, 本來想寫個wrapper script來做新建文章好了, 然後自動加入比較好看的slug, 但轉念一想, 何不直接改Hugo?何不直接改Hugo?何不直接改Hugo? (回音持續)

支援中文的slug轉換的go module

推薦這個github.com/gosimple/slug, 這不只支援中文, 很多語言都有!用法也很簡單:

package main

import (
	"fmt"
	"github.com/gosimple/slug"
)

func main() {
	someText := slug.Make("影師")
	fmt.Println(someText) // Will print: "ying-shi"
}

(以上範例來自它github)它很貼心的幫你把中文字轉成拼音了, 這用來做url感覺還蠻適合的呀!

修改Hugo

那我要加在Hugo哪裡呢?前面說到anchorize, 那其實仿效它做一個slugify不就好了, 那也容易知道要改哪, 抄anchorize就對了

要改的只有兩個檔:

  • tpl/urls/init.go
  • tpl/urls/urls.go

tpl/urls/urls.go加入:

func (ns *Namespace) Slugify(s any) (string, error) {
	ss, err := cast.ToStringE(s)
	if err != nil {
		return "", nil
	}
	return slug.Make(ss), nil
}

然後在tpl/urls/init.goinit()內加入:

ns.AddMethodMapping(ctx.Slugify,
    []string{"slugify"},
    [][2]string{
        {`{{ "This is a title" | slugify }}`, `this-is-a-title`},
    },
)

重新建置安裝好後, 就可以把Archetypes改成:

---
date: {{ .Date }}
title: "{{ replace .Name "-" " " | title }}"
slug: "{{ slugify .Name | title }}"
images: 
- "https://og.jln.co/jlns1/{{ replace .Name "-" " " | title | base64Encode | replaceRE "=+$" "" | replaceRE "\\+" "-" | replaceRE "/" "_"}}"
draft: true
---

搞定! 結果寫這些code五分鐘, 但卻花了五十分鐘找方法呀, 果然Open source還是自己改來用最快

本來沒預計寫這篇的, 不過後來想想, 本來想寫的篇幅太大, 先寫這篇幫後面內容暖身, 後續相關內容會再更新到下面連結:

  1. 使用Dapr Input/Output Binding連接Kafka

這篇並不是要寫怎用go實做Dapr的pubsub, 不完全是, 實做pubsub部分請參考官方文件, 基本的Dapr的publisher跟subscriber是用所謂CloudEvent的格式在傳遞, 用CloudEvent的好處是, 由於CloudEvent會幫忙夾帶一些metadata, 因此也就可以實現分散式追蹤(Tracing)的功能, 但缺點就是無法支援一些原本寫好的legacy publisher或subscriber, 所幸Dapr的pubsub還是支援raw payload可以讓你自組你的訊息格式

在開始之前, 為了測試實做, 我這邊採用了Kafka, 但由於Dapr把實做封裝得不錯, 所以其實也不一定要用Kafka, 不過支援了Kraft之後的Kafka, 由於可以去掉對zoo keeper的依賴, 所以算蠻簡單裝的

安裝Kafka

使用docker跑Kafka, 應該是最簡單的方式, 只要執行

docker run -it --name kafka-zkless -p 9092:9092 -e LOG_DIR=/tmp/logs quay.io/strimzi/kafka:latest-kafka-2.8.1-amd64 /bin/sh -c 'export CLUSTER_ID=$(bin/kafka-storage.sh random-uuid) && bin/kafka-storage.sh format -t $CLUSTER_ID -c config/kraft/server.properties && bin/kafka-server-start.sh config/kraft/server.properties'

這樣Kafka就可以順利活起來了, 完全不需要跑zoo keeper…喔耶…

建議也可以順便跑一下Kafka map, 這樣待會可以直接發event來測試

新增Kafka component

有了Kafka後, 我們需要在Dapr新增這個component 才可以讓Dapr應用程式使用, 在~/.dape/components底下加一個檔案(可叫做kafka.yaml),內容是:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: kafka-pubsub
spec:
  type: pubsub.kafka
  version: v1
  metadata:
  - name: brokers # Required. Kafka broker connection setting
    value: "localhost:9092"
  - name: consumerGroup # Optional. Used for input bindings.
    value: "group1"
  - name: authType # Required.
    value: "none"
  - name: disableTls # Optional. Disable TLS. This is not safe for production!! You should read the `Mutual TLS` section for how to use TLS.
    value: "true"

內容就不多做解釋了, 官方文件會有更清楚的說明, 這邊先說明, 這新增上去後, 我們會多一個pubsub component叫做kafka-pubsub, 這名字寫程式會用到囉

寫個subscriber來接收事件(event)吧

官方文件其實有寫如何寫一個接收raw payload的subscriber, 但不像其他文件一樣有多種語言範例, 只有Python, PHP兩種

但其實, 如上圖, Dapr用sidecar的作法, 簡化了寫pubsub的複雜度, 而且減低了對語言的依賴, 也不像是Istio是從系統的角度設計, 算是有點有趣的作法, 你不用理解pubsub, 也不用特別知道你是用Kafka, NATS, 或者是RabbitMQ, 寫法都一樣, 不囉嗦, 直接看code

package main

import (
	"encoding/base64"
	"encoding/json"
	"log"

	"github.com/gin-gonic/gin"
)

type Subscription struct {
	PubsubName string            `json:"pubsubname"`
	Topic      string            `json:"topic"`
	Route      string            `json:"route,omitempty"`
	Metadata   map[string]string `json:"metadata,omitempty"`
}

type Event struct {
	Topic string `json:"topic"`
	Data  string `json:"data_base64"`
}

var sub = &Subscription{
	PubsubName: "kafka-pubsub",
	Topic:      "myevent",
	Route:      "create",
	Metadata: map[string]string{
		"rawPayload": "true",
	},
}

func main() {
	r := gin.Default()
	r.GET("/dapr/subscribe", func(ctx *gin.Context) {
		ctx.JSON(200, []*Subscription{sub})
	})

	r.POST("/create", func(ctx *gin.Context) {
		var event Event
		err := json.NewDecoder(ctx.Request.Body).Decode(&event)

		if err == nil {
			decoded, _ := base64.RawStdEncoding.DecodeString(event.Data)
			log.Println(string(decoded))
		}

		ctx.JSON(200, map[string]bool{
			"success": true,
		})
	})

	r.Run(":6002")
}

咦, 這不像是在寫subscriber呀, 倒像是一個web service, 沒錯, 實際上的subscribe的部分被封裝在Dapr內了, Dapr等於收到Event後會打給我們的程式

那他怎知道要收到哪個queue哪個topic要打到哪個endpoint? 很簡單, 你只要有一個叫做/dapr/subscribe的endpoint, 在開始執行後, Dapr會自行打這endpoint了解你希望幫忙它收哪些event, 這邊我們希望收的 PubsubName (這邊是我們剛剛加的kafka-pubsub), 另外我們希望收myevent這個topic, 然後我們會希望收到event後打/create這個endpoint, 這有個好處, 你換成另一個完全不一樣的方案, 比如說Redis, 是不需要重新改code的

那我們在/create會收到甚麼呢?基本上就是包裝成CloudEvent的資料結構, 不對, 我們不是要收raw payload嗎?別急, 它只是收到後幫你包裝, 你的raw payload是被base64編碼好好地放在欄位data_base64

這邊我特別沒用任何Dapr SDK, 然後也用gin來寫(Dapr的sdk裡用的是Gorilla),主要是為了展示, 這簡單到不用SDK呀(其實sdk也還沒支援raw payload subscriber相關的呀 XD)

執行

dapr run --app-id subs --app-port 6002 --dapr-http-port 3601 --dapr-grpc-port 60001 --log-level debug go run main.go

指令如上, 可以設定log level把debug訊息打開, 這邊有一點需要注意的, 這浪費我半天的青春, app port一定要設對, 我們程式內用6002那麼這邊的app port就要是6002, 不然Dapr不但會不知道要打事件給你, 連一開始的設定都拿不到(就是打dapr/subscribe)

測試

測試方式很簡單, 如果你剛剛有裝Kafka map, 去那個topic發送一個訊息(按Produce message), 看有沒收到一樣的訊息就可以了

這邊的與世隔絕當然不是真的與世隔絕啦! 我指的是無法連上外面的docker registry的環境, 例如docker hub或是quay.io, 開發環境指的是你要在local可以用來開發測試的standalone模式

首先, 你要先裝好docker(或podman)

如果有private registry

有幾個images會需要放到registry裡面的, 像是

  • dapr
  • 3rdparty/redis
  • 3rdparty/zipkin
  • placement
  • daprd

列表可以在 這邊 找到 (這邊的是放在github上的)

接著安裝dapr cli, 如果可以直接連上internet, 那就用官方文件的做法:

wget -q https://raw.githubusercontent.com/dapr/cli/master/install/install.sh -O - | DAPR_INSTALL_DIR="$HOME/dapr" /bin/bash

但如果不行呢? 那就想辦法到github上下載cli回來安裝

因為要從private history來安裝dapr, 所以在dapr init要多下一個參數, 像是:

dapr init --image-registry MY_REGISTRY_URL

這樣就會從你private registry抓回來裝了

但如果連private registry都沒辦法放上相關的image呢?

不依靠docker registry安裝

Dapr很貼心呀! 還有install bundle, 可以想辦法先去前一個連結裝bundle回來

這個bundle裡面已經有個dapr cli了, 所以解開後, 把dapr複製到你要的目錄, 例如:

sudo cp ./dapr /usr/local/bin

現在我們就有cli可以用了, 但images怎辦? bundle裡面其實就包含有相關的image的tar檔了, 所以把init方式改成:

dapr init --from-dir . -s

這樣就會用local image安裝了, 這邊多加了個-s表示是slim mode, 沒有redis, 沒zipkin的, 因為local images沒有放, 但如果自己需要(比如說要寫state store需要redis), 那就要自己去加component

以redis當state store為例: 假設我們需要一個redis來當state store, 且這redis我們預先跑在本機, port為為6379, 此store名稱為mystore, 我們可以在~/.dapr/components/這目錄加上一個mystore.yaml,

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: mystore
spec:
  type: state.redis
  version: v1
  metadata:
  - name: redisHost
    value: localhost:6379
  - name: redisPassword
    value: ""
  - name: actorStateStore
    value: "true"

這樣我們就有一個名叫mystore的state store了, 不只state store, 其他也可以如法炮製

那如果想用podman取代docker呢?

dapr init --from-dir . -s --container-runtime podman