在使用很多API服務像是Google的API或是Facebook的API, 常常會需要拿簽署APK的Key的signature登錄, 取得這sigature的方法有很多種, 剛學到一種是直接可以從Android studio的Gardle選單內取得的, 方法如下:

Gradle->(Project name)->Tasks->singningReports

WebRTC是一個支援瀏覽器的即時影音對話的架構, 算是一個業界標(W3C,IETF), 最近由於想做一個有影音通話的應用, 就研究了一下這東西

如果只是想嘗試一下WebRTC, 是可以直接是可以直接試AppRTC這個Google的範例, 不過這個是Web的版本, 我想要做的是 手機的版本(Android, iOS), AppRTC其實也有Android的版本可搭配

為了熟悉一下整個用WebRTC建立video call的流程, 因此我就決定改一下這個Android版本, 原本Google的版本是透過Web Socket

至於流程與架構我會建議看這影片:

如果不想看太長, 就看這個:

把Web RTC那段換成Firebase(好, 其實我蠻後悔選Firebase來實作的)其實就是把Signaling這段給換掉, 而這段流程是(節錄自影片):

這部分其實就是交換兩邊的SDP和ICE candidate的過程, 詳細可以參考這邊:WebRTC 相關縮寫名詞簡介

結果的source code放在這邊 : apprtc-android-demo

Building WebRTC lib on Android

其實現在寫WebRTC的應用的話, 也不用從頭實作, Google老早就把它實作在Chromium裡面了, 也可以單獨build出library用

這邊有官方的如何建置出Android版本的Web RTC library, 不過, 不要照著這份文件做呀, 不然頭髮會白好幾根, 可能還build不太起來, 找了一堆網路上人家的建議也都是不要直接build, 直接用人家build好現成的, 不過, 現成的雖然有一些, 但大多是過時的, API跟現今的也不太一樣, 如果 要套用到現在的Android版本AppRTC的source code內, 大多都沒辦法用

所幸找到這個build script: pristineio/webrtc-build-scripts, 這個從下載最新的source code到build出library一律包辦, 用法也很簡單, 只要執行下面的:

source android/build.sh
install_dependencies
get_webrtc
build_apprtc

簡單明暸, 但…有幾個問題, 第一個是只能在Linux下build, 因此在Mac跟Windows下要透過Vagrant這類的工具, 而且對硬體需求也很高, 我的2012年中版的Macbook Pro retina實在是跑不動, 後來跑去Digital Ocean租了台VM來build, 本以為最便宜的可以勝任, 後來發現, 至少要4G RAM, 硬碟要20G以上的instance(哭哭, 浪費好多時間)

build出來後, 所需要的東西包含了libjingle_peerconnection.jar和libjingle_peerconnection_so.so, 把這幾個備份起來就是了, 待會build apk需要用

AppRTC 範例的Android source codes

Android的範例的source codes可以在這邊下載

不過這並不是Android studio的project格式, 因此需要用匯入的方式, 或是可以直接fork我的版本去改, 由於原本的版本使用了Web socket做singaling的管道, 因此需要Autobahn, 但你切記絕對不能用Autobahn官方最新的jar檔, 而是要用Google放在third_party裡面那個autobanh.jar(啊, 我到現在才發現名字有些許不同), 這邊的差異是, 原本Autobahn是沒有支援SSL的websocket的, 但AppRTC的websocket則是要透過SSL來連接

把jar跟so檔放到對應的目錄去後, 記得改一下app目錄下的build.gradle加入 (因為import產生的不會幫你加):

dependencies {
    compile fileTree(dir: 'libs', include: ['*.jar'])
}

加上firebase

除了原本的Webcoket和Direct connect兩種方式外, 為了跑一次他的流程我多加了Firebase的部分, 利用它的realtime database來做Signaling這部分, 至於怎樣開始開發firebase, 就參考一下他的官方文件

Signaling的實作

CallActivity

選擇哪種signaling的方式是在CallActivity裡面依據roomId來看使用哪一個signaling client, 程式碼如下:

    // Create connection client. Use DirectRTCClient if room name is an IP otherwise use the
    // standard WebSocketRTCClient.
    if("firebase".equals(roomId)) {
      Log.d(TAG, "firebase");
      appRtcClient = new FirebaseRTCClient(this);
    } else if (loopback || !DirectRTCClient.IP_PATTERN.matcher(roomId).matches()) {
      appRtcClient = new WebSocketRTCClient(this);
    } else {
      Log.i(TAG, "Using DirectRTCClient because room name looks like an IP.");
      appRtcClient = new DirectRTCClient(this);
    }

原本有WebSocketRTCClient和DirectRTCClient, 如果是IP的話就用DirectRTCClient,這邊我多加一個FirebaseRTCClient, 只要roomId是firebase就會使用這個(我偷懶)

FirebaseRTCClient

XXXRTCClient這部分實作了signaling的部分, 因此我參考了WebSocketRTCClient和DirectRTCClient的內容來寫FirebaseRTCClient

跟WebSocketRTCClinet一樣, 它必須實作AppRTCClient, AppRTCClient這個Interface定義如下:

/**
 * AppRTCClient is the interface representing an AppRTC client.
 */
public interface AppRTCClient {
  /**
   * Struct holding the connection parameters of an AppRTC room.
   */
  class RoomConnectionParameters {
    public final String roomUrl;
    public final String roomId;
    public final boolean loopback;
    public RoomConnectionParameters(String roomUrl, String roomId, boolean loopback) {
      this.roomUrl = roomUrl;
      this.roomId = roomId;
      this.loopback = loopback;
    }
  }

  /**
   * Asynchronously connect to an AppRTC room URL using supplied connection
   * parameters. Once connection is established onConnectedToRoom()
   * callback with room parameters is invoked.
   */
  void connectToRoom(RoomConnectionParameters connectionParameters);

  /**
   * Send offer SDP to the other participant.
   */
  void sendOfferSdp(final SessionDescription sdp);

  /**
   * Send answer SDP to the other participant.
   */
  void sendAnswerSdp(final SessionDescription sdp);

  /**
   * Send Ice candidate to the other participant.
   */
  void sendLocalIceCandidate(final IceCandidate candidate);

  /**
   * Send removed ICE candidates to the other participant.
   */
  void sendLocalIceCandidateRemovals(final IceCandidate[] candidates);

  /**
   * Disconnect from room.
   */
  void disconnectFromRoom();

  /**
   * Struct holding the signaling parameters of an AppRTC room.
   */
  class SignalingParameters {
    public final List<PeerConnection.IceServer> iceServers;
    public final boolean initiator;
    public final String clientId;
    public final String wssUrl;
    public final String wssPostUrl;
    public final SessionDescription offerSdp;
    public final List<IceCandidate> iceCandidates;

    public SignalingParameters(List<PeerConnection.IceServer> iceServers, boolean initiator,
        String clientId, String wssUrl, String wssPostUrl, SessionDescription offerSdp,
        List<IceCandidate> iceCandidates) {
      this.iceServers = iceServers;
      this.initiator = initiator;
      this.clientId = clientId;
      this.wssUrl = wssUrl;
      this.wssPostUrl = wssPostUrl;
      this.offerSdp = offerSdp;
      this.iceCandidates = iceCandidates;
    }
  }

  /**
   * Callback interface for messages delivered on signaling channel.
   *
   * <p>Methods are guaranteed to be invoked on the UI thread of |activity|.
   */
  interface SignalingEvents {
    /**
     * Callback fired once the room's signaling parameters
     * SignalingParameters are extracted.
     */
    void onConnectedToRoom(final SignalingParameters params);

    /**
     * Callback fired once remote SDP is received.
     */
    void onRemoteDescription(final SessionDescription sdp);

    /**
     * Callback fired once remote Ice candidate is received.
     */
    void onRemoteIceCandidate(final IceCandidate candidate);

    /**
     * Callback fired once remote Ice candidate removals are received.
     */
    void onRemoteIceCandidatesRemoved(final IceCandidate[] candidates);

    /**
     * Callback fired once channel is closed.
     */
    void onChannelClose();

    /**
     * Callback fired once channel error happened.
     */
    void onChannelError(final String description);
  }
}

主要就是定義了如何處理connect, disconnect, 還有怎麼去註冊SDP和ICE candidate, 在確定好連接成功後, AppRTCClient要負責呼叫onConnectedToRoom來通知 CallActivity已經可以準備建立video call的後續流程, 且要負責處理如果Signal server(這邊是firebase)有傳來遠端的SDP跟ICE candidate, 要負責呼叫SignalingEvents對應的處理 (這邊一樣會叫到CallActivity, 而CallActivity則會使用PeerConnectionClient來處理需要傳遞給PeerConnection相關的參數)

這邊用Firebase處理Signaling的方式是監聽某一個key的改變, 有新的裝置連接, 註冊SDP, ICE Candidate, 就寫到這下面去, 這其實不是一個很好的方式, 因為這下面只要有值的改變, 就會觸發, 不像是WebSocket那個版本是一來一往的API calls, 而且你不知道每次觸發被更動的是哪一部分, 一開始發生了好幾次PeerConnection重複註冊SDP才讓我發現因為這原因被重複呼叫的問題

TURN server

WebRTC是P2P的, 因此如果不具備穿牆能力的話, 在牆外就會被擋掉, 一開始我本來想說試驗P2P而不走TURN Server穿牆的(因為我一時也懶得架一台), 結果測試時老是連不上, 後來才發現我阿呆, 我的測試環境是一台實體手機, 另一台是電腦上跑模擬器, 本以為兩個(手機, 電腦)是同一個區網沒問題, 後來才想到模擬器是在另一個虛擬網路, 因此還是有需要TURN server

如果不想架一台, 要怎辦? 用Google免錢的, 他們做了這個demo, 一定有! 因此就偷看了一下WebRTCClient的code跟傳輸內容,發現它跟https://networktraversal.googleapis.com/v1alpha/iceconfig?key=AIzaSyAJdh2HkajseEIltlZ3SIXO02Tze9sO3NY 去要TURN server list, 所以基本上只要照copy下面這段就好:

   private LinkedList<PeerConnection.IceServer> requestTurnServers(String url)
            throws IOException, JSONException {
        LinkedList<PeerConnection.IceServer> turnServers = new LinkedList<PeerConnection.IceServer>();
        Log.d(TAG, "Request TURN from: " + url);
        HttpURLConnection connection = (HttpURLConnection) new URL(url).openConnection();
        connection.setDoOutput(true);
        connection.setRequestProperty("REFERER", "https://appr.tc");
        connection.setConnectTimeout(TURN_HTTP_TIMEOUT_MS);
        connection.setReadTimeout(TURN_HTTP_TIMEOUT_MS);
        int responseCode = connection.getResponseCode();
        if (responseCode != 200) {
            throw new IOException("Non-200 response when requesting TURN server from " + url + " : "
                    + connection.getHeaderField(null));
        }
        InputStream responseStream = connection.getInputStream();
        String response = drainStream(responseStream);
        connection.disconnect();
        Log.d(TAG, "TURN response: " + response);
        JSONObject responseJSON = new JSONObject(response);
        JSONArray iceServers = responseJSON.getJSONArray("iceServers");
        for (int i = 0; i < iceServers.length(); ++i) {
            JSONObject server = iceServers.getJSONObject(i);
            JSONArray turnUrls = server.getJSONArray("urls");
            String username = server.has("username") ? server.getString("username") : "";
            String credential = server.has("credential") ? server.getString("credential") : "";
            for (int j = 0; j < turnUrls.length(); j++) {
                String turnUrl = turnUrls.getString(j);
                turnServers.add(new PeerConnection.IceServer(turnUrl, username, credential));
            }
        }
        return turnServers;
    }

    // Return the contents of an InputStream as a String.
    private static String drainStream(InputStream in) {
        Scanner s = new Scanner(in).useDelimiter("\\A");
        return s.hasNext() ? s.next() : "";
    }

把這邊拿來的list當ICE candidate, 就可以成功透過Google的TURN server去穿牆了(長久之計還是自己架一台吧)

昨天趁等著去面試前稍微把這之前想要寫一下的這題目打包成一個Gin的middleware :

Rate limiting 通常在很多開放API的服務內會常看到, 像是Twitter, 像是Facebook或是新浪微博, 其目的就是希望API不要被特定節點頻繁存取以致於造成伺服器端的過載

rate limiter

一般的Rate limiting的設計大致上來說就是限制某一個特定的節點(或使用者或API Key等等)在一段特定的時間內的存取次數, 比如說, 限制一分鐘最多60次存取這樣的規則, 最直覺的方式我們是可以起一個timer和一個counter, counter大於60就限制存取, timer則每60秒重置counter一次, 看似這樣就好了, 但其實這有漏洞, 假設我在第59秒時瞬間存取了60次, 第61秒又瞬間存取了60次, 在這設計上是合法的, 因為counter在第60秒時就被重置了, 但實質上卻違反了一分鐘最多60次這限制, 因為他在兩秒內就存取了120次, 遠大於我們設計的限制, 當然我們也可以用Sliding time window來解決, 但那個實作上就稍稍複雜點

目前兩個主流比較常見的做法是Token BucketLeaky Bucket, 這兩個原理上大同小異

先來說說Token Bucket, 他的做法是, 假設你有個桶子, 裡面是拿來裝令牌(Token)的, 桶子不是Doraemon的四次元口袋, 所以他空間是有限的, 令牌(Token)的作用在於, 要執行命令的人, 如果沒從桶子內摸到令牌, 就不准執行, 然後我們一段時間內丟一些令牌進去, 如果桶子裡面已經裝滿就不丟, 以上個例子來說, 我們可以準備一個最多可以裝60個令牌的桶子, 每秒鐘丟一個進去, 如果消耗速度大於每秒一個, 自然桶子很快就乾了, 就沒牌子拿了

Leaky BucketToken Bucket很像, 不過就是反過來, 我們把每次的存取都當作一滴水滴入桶子中, 桶子滿了就會溢出(拒絕存取), 桶子底下打個洞, 讓水以固定速率流出去, 這樣一樣能達到類似的效果

Leaky Bucket

Go的rate limiter實作

Go官方的package內其實是有rate limiter的實作的:

照他的說法他是實作了Token Bucket, 創建一個Limiter, 要給的參數是Limit和b, 這個Limit指的是每秒鐘丟多少Token進桶字(? 我不知道有沒理解錯), 而b是桶子的大小

實際上去用了之後發現好像也不是那麼好用, 可能我理解有問題, 出現的並不是我想像的結果, 因此我換用了Juju’s ratelimit, 這個是在gokit這邊看到它有用, 所以應該不會差到哪去, 一樣也是Token Bucket,給的參數就是多久餵一次牌子, 跟桶子的大小, 這就簡單用了一點

包裝成Gin middleware

要套在web server上使用的話, 包裝成middleware是比較方便的, 因此我就花了點時間把Juju’s ratelimit包裝成這個:

使用範例如下:

    //Allow only 10 requests per minute per API-Key
	lm := limiter.NewRateLimiter(time.Minute, 10, func(ctx *gin.Context) (string, error) {
		key := ctx.Request.Header.Get("X-API-KEY")
		if key != "" {
			return key, nil
		}
		return "", errors.New("API key is missing")
	})
	//Apply only to /ping
	r.GET("/ping", lm.Middleware(), func(c *gin.Context) {
		c.JSON(200, gin.H{
			"message": "pong",
		})
	})

	//Allow only 5 requests per second per user
	lm2 := limiter.NewRateLimiter(time.Second, 5, func(ctx *gin.Context) (string, error) {
		key := ctx.Request.Header.Get("X-USER-TOKEN")
		if key != "" {
			return key, nil
		}
		return "", errors.New("User is not authorized")
	})

	//Apply to a group
	x := r.Group("/v2")
	x.Use(lm2.Middleware())
	{
		x.GET("/ping", func(c *gin.Context) {
			c.JSON(200, gin.H{
				"message": "pong",
			})
		})
		x.GET("/another_ping", func(c *gin.Context) {
			c.JSON(200, gin.H{
				"message": "pong pong",
			})
		})
	}

這邊本來想說為了簡單理解一點把參數設計成”每分鐘不能超過10次”這樣的描述, 然後後面再轉換成實際的fillInterval, 不過好像覺得怪怪的, 有點不太符合Token Bucket的特質, 寫成middleware後的彈性就較大一點, 可以全部都用一個limiter或是分不同的資源不同限制都可

這邊建構時要傳入一個用來產生key的函數, 這是考慮到每個人想限制的依據不同, 例如根據API key, 或是session, 或是不同來源之類的, 由這函數去 產生一個對應的key來找到limiter, 如果傳回error, 就是這個request不符合規則, 直接把他拒絕掉

跨server的rate limit

這方法只能針對單一server, 但現在通常都是多台server水平擴展, 因此也是會需要橫跨server的解決方案, 這部分的話, 用Redis來實作Token Bucket是可行的, 這等下次再來弄好了

在之前工作的時候, 做了一個專門用來產生thumbnail(縮圖)的服務, 當時這東西主要的目的是為了因應Zencircle會有不同尺寸的縮圖的需求, 而且每次client app改版又可能多新的尺寸, 因此當時寫了這個叫Minami的服務, 當時幾個簡單的需求是:

  1. 要能夠被CDN所cache (因此URL設定上不採用query string,而是簡單的URL)
  2. 能夠容易被deploy
  3. 能夠的簡單的被擴展 (加一台新的instance就可以)
  4. 不需要太多額外的dependencies

不過那時候寫的版本, 沒寫得很好, 這兩天花了點時間重寫了一個叫做Minami_t(本來Minami這名字就是來自於Minami Takahashi, 所以加個”t” XD), 新的這個重寫的版本採一樣的架構(使用了groupcache), 但多加了Peer discovery的功能(使用etcd), 但少了 臉部辨識跟色情照片偵測功能(原本在前公司的版本有, 新寫的這個我懶得加了)

我把這次重寫的版本放到github上: Minami_t

不過這算是一個sample project, 影像來源來自於Imgur, 如何使用或如何改成支援自己的Image host, 那就自行看source code吧, 這版本縮圖的部分用了我改過的VIPS, 當然原來版本的VIPS也是可用, 這版本只是我當初為了支援Face crop所改出來的

Groupcache

先來說說為什麼採用groupcache? 我不是很確定當時為何會看到groupcache這來, 但後來想想, 採用它的原因可能是看到這份投影片, 它是memchached的作者寫來用在dl.google.com上面的, 架構上剛好也適合thumbnail service, 可能剛好投影片又提到thumbnail(我腦波也太弱了吧), 所以當初採用它來實作這個service

架構上會像是這樣:

Groupcache有幾個特色

  1. Embedded, 不像memcached, redis需要額外的server, 它是嵌入在你原本的程式內的
  2. Shared, Cache是可以所有Peer共享的, 資料未必放在某特定的Peer上, 有可能在本機, 也可能在另一台, 當然如果剛好在本機時就會快一點
  3. LRU, Cache總量有上限限制的, 過久沒使用的資料有可能會被移出記憶體
  4. Immutable, key所對應的值不像memcached, redis可以修改, 而是當cache miss時, 他會再透過你實作的getter去抓真正的資料

要讓Groupcache可以在不同node間共享cache, 就必須開啟HTTPPool, 像下面

	ln, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
	if err != nil {
		return nil, err
	}

	if port == 0 {
		port = ln.Addr().(*net.TCPAddr).Port
	}

	_url := fmt.Sprintf("http://%s:%d", ip, port)
	pool := groupcache.NewHTTPPool(_url)

	go func() {
		log.Printf("Group cache served at port %s:%d\n", ip, port)
		if err := http.Serve(ln, http.HandlerFunc(pool.ServeHTTP)); err != nil {
			log.Printf("GROUPCACHE PORT %d, ERROR: %s\n", port, err.Error())
			os.Exit(-1)
		}
	}()

Groupcache 的getter範例:

func getter(ctx groupcache.Context, key string, dest groupcache.Sink) error {
	log.Println("Cache missed for " + key)

	params := strings.Split(key, ":")

	if len(params) != 3 {
		return ErrWrongKey
	}

	d := ctx.(*Downloader)
	fileName, err := d.Download("http://i.imgur.com/" + params[2])

	if err != nil {
		return err
	}

	//Should assume correct since it is checked at where it is from
	width, _ := strconv.Atoi(params[0])
	height, _ := strconv.Atoi(params[1])

	data, err := resize(fileName, width, height)

	if err != nil {
		return err
	}

	dest.SetBytes(data)
	return nil
}

etcd

我之前寫的版本有個問題是, 沒有自動的peer discovery的功能, 所以必須手動加peer, 這版本把etcd導入, etcd已經是coreos的核心之一了, 簡單, 又蠻好用的, 不過選它也是它直接有Go的client了

Peer discovery的部分, 參考了Go kitetcd實作, Go kit是一個蠻好的Go的微服務框架, 它裡面也有實作用etcd做service discovery, 這一部分正好是這邊需要的, 因此 參考並寫出了這邊這個版本

重點是要能夠在有新server加入後就新增到peer list去, 有server離開後要拿掉, 因此必須利用到etcd的watch功能

func (s *ServiceRegistry) Watch(watcher Watcher) {
	key := fmt.Sprintf("/%s/nodes", s.name)
	log.Println("watch " + key)
	w := s.etcd_client.Watcher(key, &etcd.WatcherOptions{AfterIndex: 0, Recursive: true})

	var retryInterval time.Duration = 1

	for {
		_, err := w.Next(s.ctx)

		if err != nil {
			log.Printf("Failed to connect to etcd. Will retry after %d sec \n", retryInterval)
			time.Sleep(retryInterval * time.Second)

			retryInterval = (retryInterval * 2) % 4096
		} else {
			if retryInterval > 1 {
				retryInterval = 1
			}

			list, err := s.GetNodes()
			if err == nil {
				watcher(list)
			} else {
				//skip first
			}
		}
	}
}

Watch可以用來監測某一個key有無改變, 因此我們只要一直監測server node的list就好(指定一個key來放), 因此流程是這樣的:

  1. Server開啟後, 自己到etcd註冊自己, 並把etcd上找到的nodes全加到peer list中
  2. 另一台由etcd發現有另一台出現後, 把它加到peer list中
  3. Server下線後, 要移除自己的註冊, 其他機器要從peer list把它移除

問題點在最後一點, Server下線有可能是被kill的, 也有可能按ctrl-c中斷的, 這時候就要監聽os的signal, 在程式被結束前, 可以先去移除註冊, 像這樣:

//Listening to exit signals for graceful leave
go func() {
	c := make(chan os.Signal, 1)
	signal.Notify(c, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
	<-c
	log.Println("I'm leaving")
	cm.Leave()
	os.Exit(0)
}()

這只是一個sample而已, 還有一些待改進的

這個語法蠻有趣的, 早上一直都在看這個想能拿來幹嘛? 結果早上有個phone interview就有點腦袋小小的轉不過來, 不過這不是重點, 先簡單的來講一下Generators

產生器? 顧名思義或許是這樣, 可以先看一下MDN上的文件, 這是一個從ES6開始才有的功能, 因此要在比較新的瀏覽器, 或是nodejs 6之後才有支援, 先來看看MDN上那個簡單的例子:

function* idMaker(){
  var index = 0;
  while(index < 3)
    yield index++;
}

var gen = idMaker();

console.log(gen.next().value); // 0
console.log(gen.next().value); // 1
console.log(gen.next().value); // 2
console.log(gen.next().value); // undefined

一個generator是以”function*“宣告的, 可以說它是一種特別的function, 跟一般的function一次跑到結束不同, 它是可以中途暫停的, 以上面這例子來說, 如果習慣傳統的寫法, 可能會有點小暈, while loop在idMaker裡面不是一次做到完嗎? 剛剛說generator是可被中途暫停的, 因此, 在第一輪的時候會先停在”yield”處, 當你呼叫next()才會到下一個yield位置停下來並回傳, 我的感覺是比較像一種有帶state的function之類的

有什麼用途? 從上面的例子, 當然最直覺想到的是ID產生器或是計數器之類的東西, 網路上應該可以找到一些不同的用法, 比如說搭配Promise, 有興趣可以自己找找, 是不只可以用在產生器, 拿我早上interview被問到的實作strstr, 不是很難的東西, 我原本拿go寫,出了點小槌, 而且也只能找第一個發生的字串, 後來用generator改了這版本:

以這個來說, 第一次呼叫會找出第一個發生的點, 可以持續呼叫到所有的都找出來為止, generator是可以被iterate的, 因此這邊可以用

for(var i of gen) {
    console.log(i);
}

不需要一直call next(), next()回傳回來的會是{value:…, done:…}, 因此可以看done知道是否已經結束

下面這範例則是一個質數產生器, 一直call next就可以產生下一個質數: