WebRTC是一個支援瀏覽器的即時影音對話的架構, 算是一個業界標(W3C,IETF), 最近由於想做一個有影音通話的應用, 就研究了一下這東西

如果只是想嘗試一下WebRTC, 是可以直接是可以直接試AppRTC這個Google的範例, 不過這個是Web的版本, 我想要做的是 手機的版本(Android, iOS), AppRTC其實也有Android的版本可搭配

為了熟悉一下整個用WebRTC建立video call的流程, 因此我就決定改一下這個Android版本, 原本Google的版本是透過Web Socket

至於流程與架構我會建議看這影片:

如果不想看太長, 就看這個:

把Web RTC那段換成Firebase(好, 其實我蠻後悔選Firebase來實作的)其實就是把Signaling這段給換掉, 而這段流程是(節錄自影片):

這部分其實就是交換兩邊的SDP和ICE candidate的過程, 詳細可以參考這邊:WebRTC 相關縮寫名詞簡介

結果的source code放在這邊 : apprtc-android-demo

Building WebRTC lib on Android

其實現在寫WebRTC的應用的話, 也不用從頭實作, Google老早就把它實作在Chromium裡面了, 也可以單獨build出library用

這邊有官方的如何建置出Android版本的Web RTC library, 不過, 不要照著這份文件做呀, 不然頭髮會白好幾根, 可能還build不太起來, 找了一堆網路上人家的建議也都是不要直接build, 直接用人家build好現成的, 不過, 現成的雖然有一些, 但大多是過時的, API跟現今的也不太一樣, 如果 要套用到現在的Android版本AppRTC的source code內, 大多都沒辦法用

所幸找到這個build script: pristineio/webrtc-build-scripts, 這個從下載最新的source code到build出library一律包辦, 用法也很簡單, 只要執行下面的:

source android/build.sh
install_dependencies
get_webrtc
build_apprtc

簡單明暸, 但…有幾個問題, 第一個是只能在Linux下build, 因此在Mac跟Windows下要透過Vagrant這類的工具, 而且對硬體需求也很高, 我的2012年中版的Macbook Pro retina實在是跑不動, 後來跑去Digital Ocean租了台VM來build, 本以為最便宜的可以勝任, 後來發現, 至少要4G RAM, 硬碟要20G以上的instance(哭哭, 浪費好多時間)

build出來後, 所需要的東西包含了libjingle_peerconnection.jar和libjingle_peerconnection_so.so, 把這幾個備份起來就是了, 待會build apk需要用

AppRTC 範例的Android source codes

Android的範例的source codes可以在這邊下載

不過這並不是Android studio的project格式, 因此需要用匯入的方式, 或是可以直接fork我的版本去改, 由於原本的版本使用了Web socket做singaling的管道, 因此需要Autobahn, 但你切記絕對不能用Autobahn官方最新的jar檔, 而是要用Google放在third_party裡面那個autobanh.jar(啊, 我到現在才發現名字有些許不同), 這邊的差異是, 原本Autobahn是沒有支援SSL的websocket的, 但AppRTC的websocket則是要透過SSL來連接

把jar跟so檔放到對應的目錄去後, 記得改一下app目錄下的build.gradle加入 (因為import產生的不會幫你加):

dependencies {
    compile fileTree(dir: 'libs', include: ['*.jar'])
}

加上firebase

除了原本的Webcoket和Direct connect兩種方式外, 為了跑一次他的流程我多加了Firebase的部分, 利用它的realtime database來做Signaling這部分, 至於怎樣開始開發firebase, 就參考一下他的官方文件

Signaling的實作

CallActivity

選擇哪種signaling的方式是在CallActivity裡面依據roomId來看使用哪一個signaling client, 程式碼如下:

    // Create connection client. Use DirectRTCClient if room name is an IP otherwise use the
    // standard WebSocketRTCClient.
    if("firebase".equals(roomId)) {
      Log.d(TAG, "firebase");
      appRtcClient = new FirebaseRTCClient(this);
    } else if (loopback || !DirectRTCClient.IP_PATTERN.matcher(roomId).matches()) {
      appRtcClient = new WebSocketRTCClient(this);
    } else {
      Log.i(TAG, "Using DirectRTCClient because room name looks like an IP.");
      appRtcClient = new DirectRTCClient(this);
    }

原本有WebSocketRTCClient和DirectRTCClient, 如果是IP的話就用DirectRTCClient,這邊我多加一個FirebaseRTCClient, 只要roomId是firebase就會使用這個(我偷懶)

FirebaseRTCClient

XXXRTCClient這部分實作了signaling的部分, 因此我參考了WebSocketRTCClient和DirectRTCClient的內容來寫FirebaseRTCClient

跟WebSocketRTCClinet一樣, 它必須實作AppRTCClient, AppRTCClient這個Interface定義如下:

/**
 * AppRTCClient is the interface representing an AppRTC client.
 */
public interface AppRTCClient {
  /**
   * Struct holding the connection parameters of an AppRTC room.
   */
  class RoomConnectionParameters {
    public final String roomUrl;
    public final String roomId;
    public final boolean loopback;
    public RoomConnectionParameters(String roomUrl, String roomId, boolean loopback) {
      this.roomUrl = roomUrl;
      this.roomId = roomId;
      this.loopback = loopback;
    }
  }

  /**
   * Asynchronously connect to an AppRTC room URL using supplied connection
   * parameters. Once connection is established onConnectedToRoom()
   * callback with room parameters is invoked.
   */
  void connectToRoom(RoomConnectionParameters connectionParameters);

  /**
   * Send offer SDP to the other participant.
   */
  void sendOfferSdp(final SessionDescription sdp);

  /**
   * Send answer SDP to the other participant.
   */
  void sendAnswerSdp(final SessionDescription sdp);

  /**
   * Send Ice candidate to the other participant.
   */
  void sendLocalIceCandidate(final IceCandidate candidate);

  /**
   * Send removed ICE candidates to the other participant.
   */
  void sendLocalIceCandidateRemovals(final IceCandidate[] candidates);

  /**
   * Disconnect from room.
   */
  void disconnectFromRoom();

  /**
   * Struct holding the signaling parameters of an AppRTC room.
   */
  class SignalingParameters {
    public final List<PeerConnection.IceServer> iceServers;
    public final boolean initiator;
    public final String clientId;
    public final String wssUrl;
    public final String wssPostUrl;
    public final SessionDescription offerSdp;
    public final List<IceCandidate> iceCandidates;

    public SignalingParameters(List<PeerConnection.IceServer> iceServers, boolean initiator,
        String clientId, String wssUrl, String wssPostUrl, SessionDescription offerSdp,
        List<IceCandidate> iceCandidates) {
      this.iceServers = iceServers;
      this.initiator = initiator;
      this.clientId = clientId;
      this.wssUrl = wssUrl;
      this.wssPostUrl = wssPostUrl;
      this.offerSdp = offerSdp;
      this.iceCandidates = iceCandidates;
    }
  }

  /**
   * Callback interface for messages delivered on signaling channel.
   *
   * <p>Methods are guaranteed to be invoked on the UI thread of |activity|.
   */
  interface SignalingEvents {
    /**
     * Callback fired once the room's signaling parameters
     * SignalingParameters are extracted.
     */
    void onConnectedToRoom(final SignalingParameters params);

    /**
     * Callback fired once remote SDP is received.
     */
    void onRemoteDescription(final SessionDescription sdp);

    /**
     * Callback fired once remote Ice candidate is received.
     */
    void onRemoteIceCandidate(final IceCandidate candidate);

    /**
     * Callback fired once remote Ice candidate removals are received.
     */
    void onRemoteIceCandidatesRemoved(final IceCandidate[] candidates);

    /**
     * Callback fired once channel is closed.
     */
    void onChannelClose();

    /**
     * Callback fired once channel error happened.
     */
    void onChannelError(final String description);
  }
}

主要就是定義了如何處理connect, disconnect, 還有怎麼去註冊SDP和ICE candidate, 在確定好連接成功後, AppRTCClient要負責呼叫onConnectedToRoom來通知 CallActivity已經可以準備建立video call的後續流程, 且要負責處理如果Signal server(這邊是firebase)有傳來遠端的SDP跟ICE candidate, 要負責呼叫SignalingEvents對應的處理 (這邊一樣會叫到CallActivity, 而CallActivity則會使用PeerConnectionClient來處理需要傳遞給PeerConnection相關的參數)

這邊用Firebase處理Signaling的方式是監聽某一個key的改變, 有新的裝置連接, 註冊SDP, ICE Candidate, 就寫到這下面去, 這其實不是一個很好的方式, 因為這下面只要有值的改變, 就會觸發, 不像是WebSocket那個版本是一來一往的API calls, 而且你不知道每次觸發被更動的是哪一部分, 一開始發生了好幾次PeerConnection重複註冊SDP才讓我發現因為這原因被重複呼叫的問題

TURN server

WebRTC是P2P的, 因此如果不具備穿牆能力的話, 在牆外就會被擋掉, 一開始我本來想說試驗P2P而不走TURN Server穿牆的(因為我一時也懶得架一台), 結果測試時老是連不上, 後來才發現我阿呆, 我的測試環境是一台實體手機, 另一台是電腦上跑模擬器, 本以為兩個(手機, 電腦)是同一個區網沒問題, 後來才想到模擬器是在另一個虛擬網路, 因此還是有需要TURN server

如果不想架一台, 要怎辦? 用Google免錢的, 他們做了這個demo, 一定有! 因此就偷看了一下WebRTCClient的code跟傳輸內容,發現它跟https://networktraversal.googleapis.com/v1alpha/iceconfig?key=AIzaSyAJdh2HkajseEIltlZ3SIXO02Tze9sO3NY 去要TURN server list, 所以基本上只要照copy下面這段就好:

   private LinkedList<PeerConnection.IceServer> requestTurnServers(String url)
            throws IOException, JSONException {
        LinkedList<PeerConnection.IceServer> turnServers = new LinkedList<PeerConnection.IceServer>();
        Log.d(TAG, "Request TURN from: " + url);
        HttpURLConnection connection = (HttpURLConnection) new URL(url).openConnection();
        connection.setDoOutput(true);
        connection.setRequestProperty("REFERER", "https://appr.tc");
        connection.setConnectTimeout(TURN_HTTP_TIMEOUT_MS);
        connection.setReadTimeout(TURN_HTTP_TIMEOUT_MS);
        int responseCode = connection.getResponseCode();
        if (responseCode != 200) {
            throw new IOException("Non-200 response when requesting TURN server from " + url + " : "
                    + connection.getHeaderField(null));
        }
        InputStream responseStream = connection.getInputStream();
        String response = drainStream(responseStream);
        connection.disconnect();
        Log.d(TAG, "TURN response: " + response);
        JSONObject responseJSON = new JSONObject(response);
        JSONArray iceServers = responseJSON.getJSONArray("iceServers");
        for (int i = 0; i < iceServers.length(); ++i) {
            JSONObject server = iceServers.getJSONObject(i);
            JSONArray turnUrls = server.getJSONArray("urls");
            String username = server.has("username") ? server.getString("username") : "";
            String credential = server.has("credential") ? server.getString("credential") : "";
            for (int j = 0; j < turnUrls.length(); j++) {
                String turnUrl = turnUrls.getString(j);
                turnServers.add(new PeerConnection.IceServer(turnUrl, username, credential));
            }
        }
        return turnServers;
    }

    // Return the contents of an InputStream as a String.
    private static String drainStream(InputStream in) {
        Scanner s = new Scanner(in).useDelimiter("\\A");
        return s.hasNext() ? s.next() : "";
    }

把這邊拿來的list當ICE candidate, 就可以成功透過Google的TURN server去穿牆了(長久之計還是自己架一台吧)

昨天趁等著去面試前稍微把這之前想要寫一下的這題目打包成一個Gin的middleware :

Rate limiting 通常在很多開放API的服務內會常看到, 像是Twitter, 像是Facebook或是新浪微博, 其目的就是希望API不要被特定節點頻繁存取以致於造成伺服器端的過載

rate limiter

一般的Rate limiting的設計大致上來說就是限制某一個特定的節點(或使用者或API Key等等)在一段特定的時間內的存取次數, 比如說, 限制一分鐘最多60次存取這樣的規則, 最直覺的方式我們是可以起一個timer和一個counter, counter大於60就限制存取, timer則每60秒重置counter一次, 看似這樣就好了, 但其實這有漏洞, 假設我在第59秒時瞬間存取了60次, 第61秒又瞬間存取了60次, 在這設計上是合法的, 因為counter在第60秒時就被重置了, 但實質上卻違反了一分鐘最多60次這限制, 因為他在兩秒內就存取了120次, 遠大於我們設計的限制, 當然我們也可以用Sliding time window來解決, 但那個實作上就稍稍複雜點

目前兩個主流比較常見的做法是Token BucketLeaky Bucket, 這兩個原理上大同小異

先來說說Token Bucket, 他的做法是, 假設你有個桶子, 裡面是拿來裝令牌(Token)的, 桶子不是Doraemon的四次元口袋, 所以他空間是有限的, 令牌(Token)的作用在於, 要執行命令的人, 如果沒從桶子內摸到令牌, 就不准執行, 然後我們一段時間內丟一些令牌進去, 如果桶子裡面已經裝滿就不丟, 以上個例子來說, 我們可以準備一個最多可以裝60個令牌的桶子, 每秒鐘丟一個進去, 如果消耗速度大於每秒一個, 自然桶子很快就乾了, 就沒牌子拿了

Leaky BucketToken Bucket很像, 不過就是反過來, 我們把每次的存取都當作一滴水滴入桶子中, 桶子滿了就會溢出(拒絕存取), 桶子底下打個洞, 讓水以固定速率流出去, 這樣一樣能達到類似的效果

Leaky Bucket

Go的rate limiter實作

Go官方的package內其實是有rate limiter的實作的:

照他的說法他是實作了Token Bucket, 創建一個Limiter, 要給的參數是Limit和b, 這個Limit指的是每秒鐘丟多少Token進桶字(? 我不知道有沒理解錯), 而b是桶子的大小

實際上去用了之後發現好像也不是那麼好用, 可能我理解有問題, 出現的並不是我想像的結果, 因此我換用了Juju’s ratelimit, 這個是在gokit這邊看到它有用, 所以應該不會差到哪去, 一樣也是Token Bucket,給的參數就是多久餵一次牌子, 跟桶子的大小, 這就簡單用了一點

包裝成Gin middleware

要套在web server上使用的話, 包裝成middleware是比較方便的, 因此我就花了點時間把Juju’s ratelimit包裝成這個:

使用範例如下:

    //Allow only 10 requests per minute per API-Key
	lm := limiter.NewRateLimiter(time.Minute, 10, func(ctx *gin.Context) (string, error) {
		key := ctx.Request.Header.Get("X-API-KEY")
		if key != "" {
			return key, nil
		}
		return "", errors.New("API key is missing")
	})
	//Apply only to /ping
	r.GET("/ping", lm.Middleware(), func(c *gin.Context) {
		c.JSON(200, gin.H{
			"message": "pong",
		})
	})

	//Allow only 5 requests per second per user
	lm2 := limiter.NewRateLimiter(time.Second, 5, func(ctx *gin.Context) (string, error) {
		key := ctx.Request.Header.Get("X-USER-TOKEN")
		if key != "" {
			return key, nil
		}
		return "", errors.New("User is not authorized")
	})

	//Apply to a group
	x := r.Group("/v2")
	x.Use(lm2.Middleware())
	{
		x.GET("/ping", func(c *gin.Context) {
			c.JSON(200, gin.H{
				"message": "pong",
			})
		})
		x.GET("/another_ping", func(c *gin.Context) {
			c.JSON(200, gin.H{
				"message": "pong pong",
			})
		})
	}

這邊本來想說為了簡單理解一點把參數設計成"每分鐘不能超過10次"這樣的描述, 然後後面再轉換成實際的fillInterval, 不過好像覺得怪怪的, 有點不太符合Token Bucket的特質, 寫成middleware後的彈性就較大一點, 可以全部都用一個limiter或是分不同的資源不同限制都可

這邊建構時要傳入一個用來產生key的函數, 這是考慮到每個人想限制的依據不同, 例如根據API key, 或是session, 或是不同來源之類的, 由這函數去 產生一個對應的key來找到limiter, 如果傳回error, 就是這個request不符合規則, 直接把他拒絕掉

跨server的rate limit

這方法只能針對單一server, 但現在通常都是多台server水平擴展, 因此也是會需要橫跨server的解決方案, 這部分的話, 用Redis來實作Token Bucket是可行的, 這等下次再來弄好了

在之前工作的時候, 做了一個專門用來產生thumbnail(縮圖)的服務, 當時這東西主要的目的是為了因應Zencircle會有不同尺寸的縮圖的需求, 而且每次client app改版又可能多新的尺寸, 因此當時寫了這個叫Minami的服務, 當時幾個簡單的需求是:

  1. 要能夠被CDN所cache (因此URL設定上不採用query string,而是簡單的URL)
  2. 能夠容易被deploy
  3. 能夠的簡單的被擴展 (加一台新的instance就可以)
  4. 不需要太多額外的dependencies

不過那時候寫的版本, 沒寫得很好, 這兩天花了點時間重寫了一個叫做Minami_t(本來Minami這名字就是來自於Minami Takahashi, 所以加個"t" XD), 新的這個重寫的版本採一樣的架構(使用了groupcache), 但多加了Peer discovery的功能(使用etcd), 但少了 臉部辨識跟色情照片偵測功能(原本在前公司的版本有, 新寫的這個我懶得加了)

我把這次重寫的版本放到github上: Minami_t

不過這算是一個sample project, 影像來源來自於Imgur, 如何使用或如何改成支援自己的Image host, 那就自行看source code吧, 這版本縮圖的部分用了我改過的VIPS, 當然原來版本的VIPS也是可用, 這版本只是我當初為了支援Face crop所改出來的

Groupcache

先來說說為什麼採用groupcache? 我不是很確定當時為何會看到groupcache這來, 但後來想想, 採用它的原因可能是看到這份投影片, 它是memchached的作者寫來用在dl.google.com上面的, 架構上剛好也適合thumbnail service, 可能剛好投影片又提到thumbnail(我腦波也太弱了吧), 所以當初採用它來實作這個service

架構上會像是這樣:

Groupcache有幾個特色

  1. Embedded, 不像memcached, redis需要額外的server, 它是嵌入在你原本的程式內的
  2. Shared, Cache是可以所有Peer共享的, 資料未必放在某特定的Peer上, 有可能在本機, 也可能在另一台, 當然如果剛好在本機時就會快一點
  3. LRU, Cache總量有上限限制的, 過久沒使用的資料有可能會被移出記憶體
  4. Immutable, key所對應的值不像memcached, redis可以修改, 而是當cache miss時, 他會再透過你實作的getter去抓真正的資料

要讓Groupcache可以在不同node間共享cache, 就必須開啟HTTPPool, 像下面

	ln, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
	if err != nil {
		return nil, err
	}

	if port == 0 {
		port = ln.Addr().(*net.TCPAddr).Port
	}

	_url := fmt.Sprintf("http://%s:%d", ip, port)
	pool := groupcache.NewHTTPPool(_url)

	go func() {
		log.Printf("Group cache served at port %s:%d\n", ip, port)
		if err := http.Serve(ln, http.HandlerFunc(pool.ServeHTTP)); err != nil {
			log.Printf("GROUPCACHE PORT %d, ERROR: %s\n", port, err.Error())
			os.Exit(-1)
		}
	}()

Groupcache 的getter範例:

func getter(ctx groupcache.Context, key string, dest groupcache.Sink) error {
	log.Println("Cache missed for " + key)

	params := strings.Split(key, ":")

	if len(params) != 3 {
		return ErrWrongKey
	}

	d := ctx.(*Downloader)
	fileName, err := d.Download("http://i.imgur.com/" + params[2])

	if err != nil {
		return err
	}

	//Should assume correct since it is checked at where it is from
	width, _ := strconv.Atoi(params[0])
	height, _ := strconv.Atoi(params[1])

	data, err := resize(fileName, width, height)

	if err != nil {
		return err
	}

	dest.SetBytes(data)
	return nil
}

etcd

我之前寫的版本有個問題是, 沒有自動的peer discovery的功能, 所以必須手動加peer, 這版本把etcd導入, etcd已經是coreos的核心之一了, 簡單, 又蠻好用的, 不過選它也是它直接有Go的client了

Peer discovery的部分, 參考了Go kitetcd實作, Go kit是一個蠻好的Go的微服務框架, 它裡面也有實作用etcd做service discovery, 這一部分正好是這邊需要的, 因此 參考並寫出了這邊這個版本

重點是要能夠在有新server加入後就新增到peer list去, 有server離開後要拿掉, 因此必須利用到etcd的watch功能

func (s *ServiceRegistry) Watch(watcher Watcher) {
	key := fmt.Sprintf("/%s/nodes", s.name)
	log.Println("watch " + key)
	w := s.etcd_client.Watcher(key, &etcd.WatcherOptions{AfterIndex: 0, Recursive: true})

	var retryInterval time.Duration = 1

	for {
		_, err := w.Next(s.ctx)

		if err != nil {
			log.Printf("Failed to connect to etcd. Will retry after %d sec \n", retryInterval)
			time.Sleep(retryInterval * time.Second)

			retryInterval = (retryInterval * 2) % 4096
		} else {
			if retryInterval > 1 {
				retryInterval = 1
			}

			list, err := s.GetNodes()
			if err == nil {
				watcher(list)
			} else {
				//skip first
			}
		}
	}
}

Watch可以用來監測某一個key有無改變, 因此我們只要一直監測server node的list就好(指定一個key來放), 因此流程是這樣的:

  1. Server開啟後, 自己到etcd註冊自己, 並把etcd上找到的nodes全加到peer list中
  2. 另一台由etcd發現有另一台出現後, 把它加到peer list中
  3. Server下線後, 要移除自己的註冊, 其他機器要從peer list把它移除

問題點在最後一點, Server下線有可能是被kill的, 也有可能按ctrl-c中斷的, 這時候就要監聽os的signal, 在程式被結束前, 可以先去移除註冊, 像這樣:

//Listening to exit signals for graceful leave
go func() {
	c := make(chan os.Signal, 1)
	signal.Notify(c, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
	<-c
	log.Println("I'm leaving")
	cm.Leave()
	os.Exit(0)
}()

這只是一個sample而已, 還有一些待改進的

這個語法蠻有趣的, 早上一直都在看這個想能拿來幹嘛? 結果早上有個phone interview就有點腦袋小小的轉不過來, 不過這不是重點, 先簡單的來講一下Generators

產生器? 顧名思義或許是這樣, 可以先看一下MDN上的文件, 這是一個從ES6開始才有的功能, 因此要在比較新的瀏覽器, 或是nodejs 6之後才有支援, 先來看看MDN上那個簡單的例子:

function* idMaker(){
  var index = 0;
  while(index < 3)
    yield index++;
}

var gen = idMaker();

console.log(gen.next().value); // 0
console.log(gen.next().value); // 1
console.log(gen.next().value); // 2
console.log(gen.next().value); // undefined

一個generator是以"function*“宣告的, 可以說它是一種特別的function, 跟一般的function一次跑到結束不同, 它是可以中途暫停的, 以上面這例子來說, 如果習慣傳統的寫法, 可能會有點小暈, while loop在idMaker裡面不是一次做到完嗎? 剛剛說generator是可被中途暫停的, 因此, 在第一輪的時候會先停在"yield"處, 當你呼叫next()才會到下一個yield位置停下來並回傳, 我的感覺是比較像一種有帶state的function之類的

有什麼用途? 從上面的例子, 當然最直覺想到的是ID產生器或是計數器之類的東西, 網路上應該可以找到一些不同的用法, 比如說搭配Promise, 有興趣可以自己找找, 是不只可以用在產生器, 拿我早上interview被問到的實作strstr, 不是很難的東西, 我原本拿go寫,出了點小槌, 而且也只能找第一個發生的字串, 後來用generator改了這版本:

以這個來說, 第一次呼叫會找出第一個發生的點, 可以持續呼叫到所有的都找出來為止, generator是可以被iterate的, 因此這邊可以用

for(var i of gen) {
    console.log(i);
}

不需要一直call next(), next()回傳回來的會是{value:…, done:…}, 因此可以看done知道是否已經結束

下面這範例則是一個質數產生器, 一直call next就可以產生下一個質數:

這應該是這系列最後一篇了吧, 雖然回頭看, 可能有些漏寫, 不過, 以後想到再補吧, 如果還沒看過前兩篇, 可以再複習一下: 概念篇,多服務彙整

這篇主要要著眼在如何做Timeline這東西(怎又突然把它叫Timeline了? 好吧, 複習了一下很多文章, 發現這還是比較通用的名詞)

先借用一下以前跟老闆報告時, 老闆問的

"做這很難嗎?"

這不是啥記恨啦(雖然我還蠻有印象這問題跟我的回答的), 而是開始做這個的時候, 我也覺得應該沒什麼難的, 不過, 實作這個功能本身的確不難, 倒是要讓它可以擴展(scale)這件事, 的確會比較麻煩, 也不止一個方法做, 先從基本來看一下

什麼構成一個Timeline (Social feed)

Twitter和Facebook主要把這東西分為兩類 - User timeline和Home timeline

User timeline主要就是單一使用者的近況更新, 也就是所有的內容都是由那個使用者產生, 並以時間排序(不然怎叫Timeline), 這部分倒沒什麼困難的, 因為它就是單一個人的流水帳(從軟體的角度也可以說成他活動的"log")

Home timeline包含的則不是單一個人的, 而是包含你關注的人所有的動態, 在Twitter就是你follow的人, 而Facebook則是你的朋友加上你關注的人(follow)和粉絲頁, 講"follow"其實蠻貼切的啦, 有點偷窺(光明正大吧?), 又有點跟蹤狂的感覺, 但現今的Home timeline大多(尤其是Facebook)不是用時間排序, 好像也不能真的叫Timeline (ㄠ回來了, 這樣我叫Social feed好像比較貼切 XD)

假設我follow了user1, user2, user3等三個人, 那我的Home timeline就會變成這樣:

Ok, 其實這有點像前一篇講的多服務彙整那種, 不同的是, 這些feeds是來自於同一個來源, 並不是多個不同的服務, 比較沒資料異質性問題

1 9 90 理論

在切入實作面之前, 先提一下這個理論, 這邊有Mr. Jamine對這個的解釋: “網路內容的 1/9/90 定律” (他用"定律", 但定律是比較恆常的, 但這比例並不是那麼的絕對, 所以我比較覺得用理論或假設比較適合)

這理論說的是, 大約有90%(甚或以上)的人是屬於讀者, 9%的人會參與進一步互動(比如說按讚或回文), 只有1%的創作者(你可以看一下你自己是屬於哪類的人), 根據我的經驗, 讀者可能會多於90%, 創作者甚至可能少於1%

那對於開發者來說, 知道這些有什麼用? 我個人是覺得一個開發者或是架構設計者, 必須要清楚暸解所做的東西所會產生的行為才能產出一個好的架構, 以這個來說, 如果要設計一個高度可擴展的架構的話, 我們可知道, 絕大部分的request其實都是讀取(read), 高併發(highly concurrent)的寫入機會並不大(反而比較容易發生在like, comment)

比較直覺的實作方式

好, 難免的, 我一開始也是選用這種方式 - 都交給資料庫(database), 這邊的資料庫, 不管SQL或No-SQL, 差不多原理啦, 雖然說針對Feed這種time squences看起來像比較適合No SQL, 但Facebook不也是用My SQL(雖然用的方式比較是key-value的方式)

依照前面的說法, 我們可以簡單的假設有兩種資料 - 使用者(User)和內容(Feed)

  1. 使用者可以跟隨(Follow)其他使用者(這邊引用Twitter的設定), 因此每個使用者會有n個"follower"
  2. 使用者可以發文(Feed/Post), 每則發文都有(只有)一個作者(author)
  3. 每個使用者的Homeline是由他跟隨的所有人的發文所組成
  4. 每次client來要求homeline最多給m則(比如說25則)

按照這樣的說法, 我們可以想像Query或許長得像這樣(No SQL版本自行想像):

SELECT * FROM FEED WHERE AUTHOR IN (SELECT FOLLOWERS FROM USER WHERE ID='myid') ORDER BY TIME LIMITS m

這邊暫時先省略掉query出來你還要再query出user大頭照跟人名的部分, 但加上這部分每次至少要有兩次queries

這樣不就搞定了? 有什麼問題? 有, 後面就會撐不住了! (切身之痛), 先來看看什麼問題

查詢效率

從上面的query來說, 它還包含了個sub query去取出所有的followers, 所以這整串在資料庫裡可能的做法是, 把所有相關使用者的feed取出, 在記憶體中排序, 取前m個, 前面有提到, Feed就像流水帳, 全部的人加起來可能不少, 這聽起來就像是耗時耗CPU的查詢

因此在兩個狀況下就慘了:

  1. 讀取高併發時
  2. 使用者follow了"一大堆"人!!!

關於第一點, 這很容易發生呀! 90%的人一天到晚窺探…ㄟ …關心…人家在幹嘛, 當一堆這些queries湧入, 資料庫會非常忙碌的, 因為沒有不同的兩個人會follow同樣的人, 根本無法cache

第二點其實更慘了, follow個幾個人還好, 幾十個人還搞得定, 偏偏這個社群網路時代的, 幾百人是標配, 上千人的也不少, 如果有上萬, 可能更跑不動了(Facebook限制你只能交5000個朋友, Twitter超過5000也是選擇性的讓你少量follow, 所以上萬目前應該還比較少見)

Materialized View

有些資料庫, 像是MySQL, Postgresql, Cassandra (3.0+)都有支援Materialized view, materialized view就像是一個query的snapshot, join的部分是發生在create或是refresh時, 因此用來解決讀取高併發可能是可行的, 因為讀取時只有單純的query, 直到有更新時再呼叫refresh

但對於更新比較頻繁, 比較熱門的social network service, 資料庫的負擔還是不算小

Sharding

如果以時間為基準來做sharding, 或許可以解決這兩個問題, 因為不是所有的人不時都在更新狀態, 所以在含有最新的shard裡面包含的可能只有少數人的feed, 這減少了遍訪所有人的feed的工, 而且不用排序所有的feed

但還是有幾個問題:

  1. 無法join, 如果根據feed的時間去做sharding, feed跟user就不見得在同一資料庫, 這樣就無法join了
  2. 邊界問題, 有可能你需要的資料剛好就在時間間界的附近, 導致一開始query不到足夠的資料

其實上面問題寫程式解決都不是問題啦, 這邊想說的只是, 沒辦法以一兩個簡單的queries就搞定了

大家都怎麼做

這邊講的"大家"就那些大咖囉, Facebook, Twitter, Pinterest, Tumblr …等等, 關於這個問題, 其實Yahoo曾經出過一篇論文:

“Feeding Frenzy: Selectively Materializing Users’ Event Feeds”

如果沒耐心看完論文(我也沒耐心), 這邊先簡單提一下兩種模式:

  1. Push model
  2. Pull model

Push model又被稱為Megafeed(根據某場Facebook的分享), 而Pull model則是Facebook使用的Multifeed, 其實不管哪種模式, 大多不是直接存取資料庫增加資料庫的負擔, 而是大量的應用快取(cache), 像是Memcached, Redis等等

簡單的來說, Push model在整合feed的時間發生在寫入, 而Pull model則是發生在讀取

Push model (Megafeed)

這是Twitter所採用的方式, 也是我以前採用過的做法, 我自己則是把它稱為Inbox model, 比較詳細的內容推薦可以參考Twiter的:

“Timelines at Scale”

先來看看他們這張架構圖

Home timeline的部分主要是中間的那個流程, 這做法比較像是E-mail一樣(所以我才稱它為inbox), 當使用者發表了一則新的動態後, 系統會根據有訂閱這則動態有那些人(也就是follow這個使用者的人), 然後把這則動態複製到各個訂閱者的Home timeline (Inbox)上

這方法的優點是, 對於讀取相當之快, 因為Home timeline已經在寫入期間就準備好了, 所以當使用者要讀取時, 不需要複雜的join就能取得, 在做pagination也相當簡單, 因為本來就是依時序排下去的

但缺點是, 很顯而易見的, 非常耗費空間, 因為每個timeline都要複製一份, 假設你被上千人follow, 就要上千份, 因此Twitter只有存ID和flag, 詳細的內容跟Meta data, 是後來才從cache去合併來的, 另外Twitter也只存了最近的八百則, 所以你不可能得無窮無盡的往前滑

另一點就是耗時, 這種寫入通常是非同步的, 使用者發布動態後, 他只知道他動態發布成功了, 但系統還需要在背後寫到各個Inbox中, 因此他不會知道別人其實可能還看不到的, 對於一個follower數量不多的不是問題, 但如果像是Lady gaga那種大人物, 有幾百萬粉絲, 那就是大問題了! 寫入幾百萬的timeline即使只寫入memcached也是相當耗時的事, 而且這會產生時間錯亂的問題, follower比較少得很快就做完了, 所以很容易看到比較熱門的人物的貼文比較晚出現

Twitter是把follower多的人另案處理, 也就是讀取時段再合併(那就是類似下面要講的multifeed了), 這樣可以省下一些空間跟時間, 另一種可行的做法(我們之前的做法), 就是不寫到所有人的timeline, 而是只cache最近有上線的人的timeline, 這樣就算Lady gaga有幾百萬粉絲, 實際上最近才有上線的可能才幾十萬或更少, 處理這部分就好了, 如果cache裡面並沒有現在上線這個人的timeline, 就在從資料庫讀取合成就好

不過總歸來說, 這方法讀取快, 但寫入慢, 耗費空間, 較適合讀比寫多上許多的應用

此外其實也有不同的變形, 像是Pinterest:

Building a smarter home feed

Pull model (Multifeed)

Facebook採用了一個完全不同的方式, 叫做Multifeed, 這方式從2009開始在Infoq就一直被提到:

  1. Facebook: Science and the Social Graph 2009, by Aditya Agarwal
  2. Scale at Facebook 2010, by Aditya Agarwal
  3. Facebook News Feed: Social Data at Scale 2012, by Serkan Piantino (Aditya Agarwal這時候應該跑到Dropbox去了)

這跟Push model有什麼不同? 其實說起來跟前面一開始用DB的方式比較像, 就是在讀取時, 才取得所跟隨的人的feed, 合併並排序, 但這樣不是讀取很沒效率嗎?先來看看圖:

  1. 在寫入時, feed資料只會寫入"一個"leaf server, 應該是根據user去分流的
  2. leaf server主要是memcached, 所以都是in memory的
  3. 在memory裡面不可能保存所有動態, 只會保存最近一段時間的 (所以不可能包含所有人所有的動態, 在做整合時就輕鬆多了)
  4. 前端跟Aggregator query後, Aggregator會去跟"所有"的leaf server問所有相關的人的feed再回來整合

因為資料存儲跟處理都在memory, 所以可以很快, 但還是要考慮到網路的部分, 因此leaf server跨區的話效率就不會高了, 自然空間需求會比Pull model來得少, 但home timeline的讀取時間就較長了(因為是read time aggregation的關係),也不會有名人問題, 不會因為follower多, 複製耗時耗空間 另一個優點是, 排序的方式控制在Aggregator, 因此很容易立刻更動規則, 不像pull model, 當home timeline組好後要去變動它就較麻煩

混搭風

當然沒有絕對的好壞, 兩種模式各有優缺, 所以也有人採用的是混合模式, 根據使用者使用頻率來決定, 這就跟穿衣服一樣, 每個人怎搭衣服都是不一樣的, 端看你要怎混搭

REST API的問題

在前面一篇多服務彙整裡有提到REST API都是輪詢(polling)的模式, 不管資料有沒更新, Client都是會常常來server查詢資料, 這對server可能會是夢靨, 因為只有1%在努力創作, 所以搞不好有很大量的查詢都是浪費的, 而這些查詢通常是造成系統多餘負擔的元兇

關於這問題, 我有兩個想法, 不過都還沒實際去實證過

  1. 增加HEAD的API, 大部分REST API是以GET直接抓取資料, 所以針對個別資源(Resource), 應可實作HEAD, 讓Client在實際去查詢資料前先確訂一下資源的更新時間, 資源的更新時間在資料更新時就可以放在cache內了, 相對的可以省傳輸的數據量跟處理時間
  2. 利用PUSH, 現在大部分的應用都在手機上, 也大多有實作PUSH, 當有資料更新且App在前景時, 利用PUSH通知有資料更新, Client收到後才會真的去抓取, 不過這比較起來感覺相對負擔較重

另外這篇也是值得去參考(只是這個還要帶入XMPP):

Beyond REST?Building data services with XMPP PubSub