博客 / 詳情

返回

千億級IM獨立開發指南!全球即時通訊全套代碼4小時速成(四)

本文篇幅較長,共計26667字(包括代碼),預計閲讀時長30-60min

這是《千億級IM獨立開發指南!全球即時通訊全套代碼4小時速成》的第四篇:《服務端搭建與總結》

系列文章可參考:
《千億級IM獨立開發指南!全球即時通訊全套代碼4小時速成(一)》:Demo演示與IM設計
《千億級IM獨立開發指南!全球即時通訊全套代碼4小時速成(二)》:UI設計與搭建
《千億級IM獨立開發指南!全球即時通訊全套代碼4小時速成(三)》:APP 內部流程與邏輯

四、服務端搭建與總結

這篇終於進入了最後的部分:服務端。
隨着這部分的完成,一個完整的,可制支撐千億級消息的IM便白嫖完成!所以,我們加一把勁,來看一下這最後的服務端部分。

1. 服務端選型

服務端的開發,首當其衝,且也是最重要的,便是服務端的選型。根據前兩篇的需求分析和功能設計,我們有三個突出的核心需求:

  1. 能與 RTM 服務端交互:這意味着雲上曲率官方必須提供對應語言的Server 端 SDK
  2. 支持 HTTP/HTTPS 以 GET 和 POST 的方式訪問
  3. 能以簡單且快速的方式,開發我們所需求的業務

雲上曲率 RTM 其實提供了很多不同的SDK用於客户與RTM,官網上面直接列出的有 Go、PHP、C++、Python、Java、C# 六種。其實還存在其他的隱藏款,比如 Node.js。

在這些SDK中,從簡單和方便程度上而言,快速開發有以下四個選項:
● C++
● Go
● Python
● Node.js

首先,因為Node.js SDK計劃會有重大更新,當前 GitHub 上是較老版本,所以我們暫不選擇。這也是Node.js SDK 成為隱藏款的核心原因。

而 Python 和 Go 對於 HTTP/HTTPS 來説,則算是經典候選。直接啓動 HTTP 服務器,接入 RTM 對應語言的SDK,實現相關的業務代碼即可。而我們這裏選擇C++。
我們選擇C++的原因有兩點:一是在FPNN框架的加持下,C++開發者將不再需要去處理任何 HTTP/HTTPS 相關的支持,HTTP/HTTPS 的支持可以被完全透明化。第二點就是,整個RTM服務系統就是基於C++ FPNN的框架進行開發的,所以我們可以更好地與RTM服務器進行交互。

雖然RTM服務端的C++ SDK是用 FPNN C++ SDK進行開發,但 FPNN C++ SDK 是FPNN 框架的特化子集,大部分情況下幾乎無需改動便可從FPNN C++ SDK改為由FPNN框架提供基礎支持。這樣我們便可輕鬆實現開發需求。
但接下來,我們會採用更加簡單的操作!

2. FPNN 框架的配置

首先從GitHub上面下載FPNN框架的最新發行版,目前是 1.1.3 版本。
注意:FPNN 框架目前僅支持 CentOS、Ubuntu 和 MacOS 三個操作系統,WIndows 僅有 C++ Widnows SDK。C++ SDK 不含服務器和HTTP/HTTPS等功能支持。

按照“FPNN安裝與集成”進行環境配置和框架編譯。
注意:如果編譯和運行環境不是亞馬遜AWS,而是阿里雲、騰訊雲等,或者自己的內網虛擬機,切記根據“FPNN注意事項”進行修改和配置。否則服務可能無響應。因為在默認情況下,適配的是亞馬遜AWS的運行環境。

3. 服務器框架搭建

3.1. 服務器框架搭建

採用FPNN框架開發的服務,其實必須的就3個文件:一個C++代碼文件,一個運行配置文件,一個Makefile
在這裏,我們把業務代碼和框架代碼分離:將負責具體業務請求處理的 QuestProcessor 類的實現和服務框架分開,一共產生5個文件:Makefile、IMDemoServer.cpp、QuestProcessor.h、QuestProcessor.cpp、im.conf。這也是採用FPNN框架進行開發的推薦做法。

首先是服務框架 IMDemoServer.cpp,如果我們不修改業務請求處理類的名稱的話,以下代碼無需修改,這也算是FPNN框架開發的標準代碼:

IMDemoServer.cpp:

#include <iostream>
#include "TCPEpollServer.h"
#include "QuestProcessor.h"
#include "Setting.h"

using namespace fpnn;

int main(int argc, char* argv[])
{
    if (argc != 2)
    {
        std::cout<<"Usage: "<<argv[0]<<" config"<<std::endl;
        return 0;
    }
    if(!Setting::load(argv[1])){
        std::cout<<"Config file error:"<< argv[1]<<std::endl;
        return 1;
    }

    ServerPtr server = TCPEpollServer::create();
    server->setQuestProcessor(std::make_shared<QuestProcessor>());
    if (server->startup())
        server->run();

    return 0;
}

對,整個服務器框架就算是完成了。當然,如果不採用HTTP或者TCP,而改為UDP,則將代碼中的 TCPEpollServer 直接換成 UDPEpollServer,整個服務就從TCP或者HTTP/HTTPS服務,變成了UDP服務。

然後是業務框架:

QuestProcessor.h:

#ifndef QuestProcessor_H
#define QuestProcessor_H

#include "IQuestProcessor.h"

using namespace fpnn;

class QuestProcessor: public IQuestProcessor
{
    QuestProcessorClassPrivateFields(QuestProcessor)
    
public:
    virtual ~QuestProcessor() {}

    QuestProcessorClassBasicPublicFuncs
};

#endif

QuestProcessor.cpp:

`#include "QuestProcessor.h"`

嗯,FPNN空的業務框架這樣就完成了!
鑑於我們需要處理 userLogin、userRegister、createGroup、joinGroup、createRoom、lookup 六個請求,所以我們修改業務框架代碼如下:

QuestProcessor.h:

#ifndef QuestProcessor_H
#define QuestProcessor_H

#include "IQuestProcessor.h"

using namespace fpnn;

class QuestProcessor: public IQuestProcessor
{
    QuestProcessorClassPrivateFields(QuestProcessor)
    
public:
    FPAnswerPtr userLogin(const FPReaderPtr args, const FPQuestPtr quest, const ConnectionInfo& ci);
    FPAnswerPtr userRegister(const FPReaderPtr args, const FPQuestPtr quest, const ConnectionInfo& ci);
    FPAnswerPtr createGroup(const FPReaderPtr args, const FPQuestPtr quest, const ConnectionInfo& ci);
    FPAnswerPtr joinGroup(const FPReaderPtr args, const FPQuestPtr quest, const ConnectionInfo& ci);
    FPAnswerPtr createRoom(const FPReaderPtr args, const FPQuestPtr quest, const ConnectionInfo& ci);
    FPAnswerPtr lookup(const FPReaderPtr args, const FPQuestPtr quest, const ConnectionInfo& ci);

    QuestProcessor();
    virtual ~QuestProcessor() {}

    QuestProcessorClassBasicPublicFuncs
};

#endif

QuestProcessor.cpp:

#include "FPLog.h"
#include "QuestProcessor.h"

QuestProcessor::QuestProcessor()
{
    registerMethod("userLogin", &QuestProcessor::userLogin);
    registerMethod("userRegister", &QuestProcessor::userRegister);
    registerMethod("createGroup", &QuestProcessor::createGroup);
    registerMethod("joinGroup", &QuestProcessor::joinGroup);
    registerMethod("createRoom", &QuestProcessor::createRoom);
    registerMethod("lookup", &QuestProcessor::lookup);
}

FPAnswerPtr QuestProcessor::userLogin(const FPReaderPtr args, const FPQuestPtr quest, const ConnectionInfo& ci)
{
    //-- TODO
    return nullptr;
}

FPAnswerPtr QuestProcessor::userRegister(const FPReaderPtr args, const FPQuestPtr quest, const ConnectionInfo& ci)
{
    //-- TODO
    return nullptr;
}

FPAnswerPtr QuestProcessor::createGroup(const FPReaderPtr args, const FPQuestPtr quest, const ConnectionInfo& ci)
{
    //-- TODO
    return nullptr;
}

FPAnswerPtr QuestProcessor::joinGroup(const FPReaderPtr args, const FPQuestPtr quest, const ConnectionInfo& ci)
{
    //-- TODO
    return nullptr;
}

FPAnswerPtr QuestProcessor::createRoom(const FPReaderPtr args, const FPQuestPtr quest, const ConnectionInfo& ci)
{
    //-- TODO
    return nullptr;
}

FPAnswerPtr QuestProcessor::lookup(const FPReaderPtr args, const FPQuestPtr quest, const ConnectionInfo& ci)
{
    //-- TODO
    return nullptr;
}

到此,業務框架也就搭建完成。

然後是 FPNN的標準配置文件:

im.conf:

FPNN.server.listening.ip = 
FPNN.server.listening.port = 13601
FPNN.server.name = IMDemoServer

FPNN.server.log.level = WARN
FPNN.server.log.endpoint = std::cout
FPNN.server.log.route = IMDemoServer

配置中,監聽IP為空表示我們在本機所有IP地址,或者網絡接口上進行監聽。
監聽端口為 13601,監聽IPv4,暫不啓動IPv6的監聽。
然後日誌輸出等級為 WARN,即僅輸出警告,及警告以上級別的日誌。警告以下級別的日誌將被忽略。
日誌向標準輸出輸出。

因為FPNN框架不是專門的HTTP/HTTPS服務框架,而只是帶了HTTP/HTTPS協議支持,所以這裏我們要增加一行配置,以便打開對 HTTP 的支持:

im.conf:

# FPNN 服務支持 HTTP/HTTPS 訪問
FPNN.server.http.supported = true

因為我們沒有HTTPS證書,所以暫時僅啓動HTTP支持。如果需要HTTPS支持,則在獲取證書後,按照“FPNN HTTP/HTTPS & webSocket (ws/wss) 支持”一文進行配置即可。

最後,是我們的Makfile,這也是從FPNN 框架開發的標準模版修改而來:

Makefile:

EXES_SERVER = IMDemoServer

FPNN_DIR = ../../../infra-fpnn
CFLAGS +=
CXXFLAGS +=
CPPFLAGS += -I$(FPNN_DIR)/extends -I$(FPNN_DIR)/core -I$(FPNN_DIR)/proto -I$(FPNN_DIR)/base -I$(FPNN_DIR)/proto/msgpack -I$(FPNN_DIR)/proto/rapidjson
LIBS += -L$(FPNN_DIR)/core -L$(FPNN_DIR)/proto -L$(FPNN_DIR)/extends -L$(FPNN_DIR)/base -lfpnn

OBJS_SERVER = IMDemoServer.o QuestProcessor.o

all: $(EXES_SERVER)

clean:
    $(RM) *.o $(EXES_SERVER)
include $(FPNN_DIR)/def.mk

其中,FPNN_DIR 指明瞭FPNN框架的路徑;OBJS_SERVER 指明瞭相關的cpp代碼文件對應的目標文件。
其餘一般情況下,直接照抄即可。

3.2. RTM Server C++ SDK 接入

一般情況下,我們直接引入 RTM C++ Server SDK 即可。但鑑於RTM本身就是用FPNN框架進行開發,且我們所需要和RTM打交道的接口很少,而且FPNN體系開發又很容易,所以我們這次不引入 RTM C++ Server SDK,而是通過FPNN協議直接訪問 RTM 服務。
但訪問RTM需要服務端密鑰簽名,所以我們直接從 RTM C++ Server SDK 中摘出兩個文件 RTMMidGenerator.h 和 RTMMidGenerator.cpp 加入我們的服務代碼中。兩個文件相當簡單,總代碼量不到60行,這裏也就不再具體分析。
在 QuestProcessor.cpp 中引入 RTMMidGenerator.h,並初始化 RTMMidGenerator 類:

QuestProcessor.cpp:

... ...
#include "RTMMidGenerator.h"
... ...

QuestProcessor::QuestProcessor()
{
    ... ...
    RTMMidGenerator::init();
}

然後修改我們的 Makefile,將 OBJS_SERVER 添加一個參數 RTMMidGenerator.o 即可:

Makefile:

OBJS_SERVER = IMDemoServer.o RTMMidGenerator.o QuestProcessor.o

4. 功能的開發

4.1. 與 RTM 服務器的交互

首先,我們需要登錄雲上曲率的用户控制枱,在控制枱左側,“控制枱概覽”中,選擇“實時信令”條目:

進入實時信令的項目選擇頁(需註冊雲上曲率官網賬號)選擇對應的項目,進入項目控制枱。
在項目控制枱左側列表中,選擇“服務配置”:

然後在右側,便可看到服務器項目需配置的數據信息:

我們記錄下項目編號、服務端SDK接入點,以及密鑰,將其配置入 im.conf

RTM.config.endpoint = rtm-nx-back.ilivedata.com:13315
RTM.config.pid = 80000253
RTM.config.secret = xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx

之後,我們來加入與RTM服務器通信的模塊。

既然RTM服務器由FPNN框架開發,各個平台和語言的SDK又都由對應的FPNN SDK開發,那我們直接用FPNN Client 和RTM 服務器連接即可。編輯代碼,在代碼中加入 TCPClient 的使用,以及與RTM服務器通訊需要的信息:

QuestProcessor.h:

... ...
#include "TCPClient.h"
... ...
    
class QuestProcessor: public IQuestProcessor
{
    ... ...
    
    int _pid;
    std::string _secret;
    
    TCPClientPtr _rtmServerClient;
    
    ... ...
}

QuestProcessor.cpp:

... ...  
#include "Setting.h"  
... ...
    
QuestProcessor::QuestProcessor()
{
    ... ...
    std::string endpoint = Setting::getString("RTM.config.endpoint");
    _rtmServerClient = TCPClient::createClient(endpoint);
    _rtmServerClient->keepAlive();
    _rtmServerClient->setQuestTimeout(10);
    
    _secret = Setting::getString("RTM.config.secret");
    _pid = Setting::getInt("RTM.config.pid");
    
    ... ...
}

其中頭文件 Setting.h 中包含的靜態類 calss Setting 負責從配置文件中提取信息。
我們通過接入點創建了 TCPClient 之後,開啓了 FPNN 的鏈接包活功能,並修改了默認的超時時間。
並從配置文件,加載了項目編號,以及訪問密鑰。

4.2. 數據簽名

從RTM公開的SDK我們發現,服務端訪問RTM服務,需要對接口和數據進行簽名,於是加入輔助函數 makeSignAndSalt():

QuestProcessor.h:

class QuestProcessor: public IQuestProcessor
{
    ... ...
    
    void makeSignAndSalt(int32_t ts, const std::string& cmd, std::string& sign, int64_t& salt);
    
    ... ...
}

QuestProcessor.cpp:

... ...

#include "hex.h"
#include "md5.h"

... ...
    
void QuestProcessor::makeSignAndSalt(int32_t ts, const std::string& cmd, std::string& sign, int64_t& salt)
{
    salt = RTMMidGenerator::genMid();
    std::string content = std::to_string(_pid) + ":" + _secret + ":" + std::to_string(salt) + ":" + cmd + ":" + std::to_string(ts);
    
    unsigned char digest[16];
    md5_checksum(digest, content.c_str(), content.size());
    char hexstr[32 + 1];
    Hexlify(hexstr, digest, sizeof(digest));

    sign.assign(hexstr); 
}

... ...

4.3. 數據存儲功能

我們開發這個後端服務的核心目的,便是對用户信息的管理。於是我們需要保存、查找和增改相關數據。
為了簡化起見,我們採用json格式進行本地存儲。雖然FPNN框架帶有RapidJSON的最新版本,但直接用RapidJSON進行操作,還是太過麻煩和不便。於是我們使用FPNN框架自帶的另外一個Json庫FPJson。採用FPJson,不僅簡單方便,不論多少層級的數據訪問,或者多麼複雜的STL組合容器數據打包,FPJson一律一行代碼直接搞定。而且對於我們目前的需求,甚至都可以直接作為容器使用,而無需以任何形式處理json格式。

因此,我們決定採用 FPJson,進行數據的管理和存儲。此外,我們希望當服務器啓動的時候,加載本地存儲的數據;當服務器停止的時候,保存相關的數據到磁盤。

編輯配置文件 im.conf,增加對數據保存路徑的配置:

# 用户賬號信息保存路徑
IMDemoServer.Store.path = ./im.users.json

編輯 QuestProcessor 類,加入對 FPJson 的引用,以及對服務器啓動和停止事件的處理。

QuestProcessor.h:

... ...
    
#include "FPJson.h"
    
... ...
    
class QuestProcessor: public IQuestProcessor
{
    ... ...
    
    std::mutex _mutex;
    JsonPtr _root;
    
    ... ...
    
public:
    virtual void start();
    virtual void serverStopped();
    
    ... ...
}

QuestProcessor.cpp:

#include "FileSystemUtil.h"

... ...
    
void QuestProcessor::start()
{
    std::string userFile = Setting::getString("IMDemoServer.Store.path");
    std::string userData;
    FileSystemUtil::readFileContent(userFile, userData);
    if (userData.size() > 0)
        _root = Json::parse(userData.c_str());
        
    if (!_root)
    {
        _root.reset(new Json());
        (*_root)["nextUid"] = 1;
        (*_root)["nextGid"] = 1;
        (*_root)["nextRid"] = 1;
    }
}

void QuestProcessor::serverStopped()
{
    std::string userData = _root->str();
    std::string userFile = Setting::getString("BizServer.Store.path");
    FileSystemUtil::saveFileContent(userFile, userData);
}

至此,數據的加載、保存和基礎管理,開發完畢。
同時 im.conf 後續也不再修改。

4.4. userLogin

我們規定 userLogin 接口需要兩個參數:username 和 pwd。兩者皆為字符串形式。
返回三個參數:pid、uid和token。其中pid為項目編號,整型;uid為用户唯一數字ID,整型;token為從RTM服務器獲取的登陸密鑰,字符串類型。

雖然我們在iOS篇中,規定是通過HTTP/HTTPS 的GET方式訪問,但FPNN框架本身支持HTTP、HTTPS、FPNN協議(TCP & UDP)、FPNN 加密協議(TCP & UDP)、FPNN over TLS、WebSocket(ws)、安全的WebSocket(wss) 幾種方式訪問,且後續的其他版本的App可能會以不同的形式訪問,所以我們在這裏需要兼容以上幾種協議傳輸的參數獲取。好在對於FPNN框架,僅有 HTTP/GET和其他方式參數獲取不同,所以 userLogin 接口及相關代碼如下:

QuestProcessor.h:

class QuestProcessor: public IQuestProcessor
{
    ... ...
    
    FPQuestPtr genTokenQuest(int64_t uid);
    void getToken(int64_t uid, std::shared_ptr<IAsyncAnswer> async);
    
    ... ...

public:

    ... ...
    
    FPAnswerPtr userLogin(const FPReaderPtr args, const FPQuestPtr quest, const ConnectionInfo& ci);
    
    ... ...
}

QuestProcessor.cpp:

FPQuestPtr QuestProcessor::genTokenQuest(int64_t uid)
{
    int32_t ts = slack_real_sec();
    std::string sign;
    int64_t salt;
    makeSignAndSalt(ts, "gettoken", sign, salt);

    FPQWriter qw(5, "gettoken");
    qw.param("pid", _pid);
    qw.param("sign", sign);
    qw.param("salt", salt);
    qw.param("ts", ts);
    qw.param("uid", uid);
    return qw.take();
}

void QuestProcessor::getToken(int64_t uid, std::shared_ptr<IAsyncAnswer> async)
{
    int pid = _pid;
    FPQuestPtr tokenQuest = genTokenQuest(uid);
    bool launchAsync = _rtmServerClient->sendQuest(tokenQuest, [uid, pid, async](FPAnswerPtr answer, int errorCode){

        if (errorCode == FPNN_EC_OK)
        {
            FPAReader ar(answer);
            FPAWriter aw(3, async->getQuest());
            aw.param("pid", pid);
            aw.param("uid", uid);
            aw.param("token", ar.getString("token"));

            async->sendAnswer(aw.take());
        }
        else
            async->sendErrorAnswer(errorCode, "BizServer error.");
    });

    if (launchAsync == false)
        async->sendErrorAnswer(FPNN_EC_CORE_UNKNOWN_ERROR, "BizServer error. You can retry.");
}

FPAnswerPtr QuestProcessor::userLogin(const FPReaderPtr args, const FPQuestPtr quest, const ConnectionInfo& ci)
{
    std::string username, password;
    if (quest->isHTTP())
    {
        //-- HTTP/HTTPS GET 訪問
        username = quest->http_uri("username");
        password = quest->http_uri("pwd");

        //-- 如果是 POST 訪問,而不是 GET 訪問
        if (username.empty())
            username = args->wantString("username");

        if (password.empty())
            password = args->wantString("pwd");
    }
    else
    {
        //-- FPNN/WebSocket/HTTP POST/HTTPS POST 訪問
        username = args->wantString("username");
        password = args->wantString("pwd");
    }

    int64_t uid = 0;
    bool passwordMached = true;
    {
        std::unique_lock<std::mutex> lck(_mutex);
        if ((*_root)["account"].exist(username))
        {
            if ((std::string)((*_root)["account"][username]["pwd"]) == password)
                uid = (*_root)["account"][username]["uid"];
            else
                passwordMached = false;
        }
    }

    if (passwordMached == false)
        return FpnnErrorAnswer(quest, FPNN_EC_CORE_UNKNOWN_ERROR, "Password is wrong!");

    if (uid == 0)
        return FpnnErrorAnswer(quest, FPNN_EC_CORE_UNKNOWN_ERROR, "User is unregistered!");

    getToken(uid, genAsyncAnswer(quest));

    return nullptr;
}

其中,函數 genTokenQuest() 生成向 RTM 服務集羣請求用户登陸 token 的請求數據;函數 getToken() 則調用 TCPClient 向 RTM集羣發送獲取用户登陸 token 的請求,並在異步回調中生成對 App 的返回數據。

而在 userLogin() 函數中,則展示瞭如何從FPNN框架所支持的各種訪問形式中,獲取對應的輸入參數。
然後通過 FPJson 檢查用户是否存在。當用户不存在時,返回對應的錯誤;當用户存在時,產生一個異步應答對象,交給函數 getToken(),以便當用户登陸 touken 請求返回時,能對 App 的請求進行異步應答。而期間如果有任何錯誤,導致流程中斷,FPNN的異步應答對象將自動對App的請求進行迴應。

4.5. userRegister

用户註冊流程與用户登錄流程高度類似。
當 FPJson 確認註冊的用户不存在,提交的用户名可以註冊後,記錄用户的註冊信息,然後通過 getToken() 函數,向RTM集羣請求用户的登錄token。具體代碼如下:

QuestProcessor.cpp:

FPAnswerPtr QuestProcessor::userRegister(const FPReaderPtr args, const FPQuestPtr quest, const ConnectionInfo& ci)
{
    std::string username, password;
    if (quest->isHTTP())
    {
        //-- HTTP/HTTPS GET 訪問
        username = quest->http_uri("username");
        password = quest->http_uri("pwd");

        //-- 如果是 POST 訪問,而不是 GET 訪問
        if (username.empty())
            username = args->wantString("username");

        if (password.empty())
            password = args->wantString("pwd");
    }
    else
    {
        //-- FPNN/WebSocket/HTTP POST/HTTPS POST 訪問
        username = args->wantString("username");
        password = args->wantString("pwd");
    }

    int64_t uid = 0;
    {
        std::unique_lock<std::mutex> lck(_mutex);
        if ((*_root)["account"].exist(username) == false)
        {
            (*_root)["account"][username]["pwd"] = password;

            uid = (*_root)["nextUid"];

            (*_root)["nextUid"] = uid + 1;
            (*_root)["account"][username]["uid"] = uid;
        }
    }

    if (uid == 0)
        return FpnnErrorAnswer(quest, FPNN_EC_CORE_UNKNOWN_ERROR, "Username is existed!");

    getToken(uid, genAsyncAnswer(quest));

    return nullptr;
}

4.6. createGroup

從創建羣組開始,為了後續方便,我們摘出兩個通用函數 sendAsyncQuest() 以及 extraParams()。
因為在後續的接口中,我們需要返回給App的應答其實已經準備好了,但是還需要RTM做進一步的操作才能返回,所以我們提取出了異步應答函數 sendAsyncQuest():

QuestProcessor.h:

class QuestProcessor: public IQuestProcessor
{
    ... ...
    
    void sendAsyncQuest(FPQuestPtr quest, std::shared_ptr<IAsyncAnswer> async, FPAnswerPtr realAnswer = nullptr);
    
    ... ...
}

QuestProcessor.cpp:

void QuestProcessor::sendAsyncQuest(FPQuestPtr quest, std::shared_ptr<IAsyncAnswer> async, FPAnswerPtr realAnswer)
{
    bool launchAsync = _rtmServerClient->sendQuest(quest, [async, realAnswer](FPAnswerPtr answer, int errorCode){

        if (errorCode == FPNN_EC_OK)
        {
            if (realAnswer)
                async->sendAnswer(realAnswer);
            else
                async->sendEmptyAnswer();
        }
        else
            async->sendErrorAnswer(errorCode, "BizServer error.");
    });

    if (launchAsync == false)
        async->sendErrorAnswer(FPNN_EC_CORE_UNKNOWN_ERROR, "BizServer error. You can retry.");
}

而後續的接口,大部分輸入參數高度一致,所以我們有了統一的參數提取函數 extraParams():

QuestProcessor.h:

class QuestProcessor: public IQuestProcessor
{
    ... ...
    
    FPAnswerPtr extraParams(const FPReaderPtr args, const FPQuestPtr quest, const char* xidKey, const char* xnameKey,
        int64_t &xid, std::string& xname);
    
    ... ...
}

QuestProcessor.cpp:

FPAnswerPtr QuestProcessor::extraParams(const FPReaderPtr args, const FPQuestPtr quest,
            const char* xidKey, const char* xnameKey, int64_t &xid, std::string& xname)
{
    if (quest->isHTTP())
    {
        //-- HTTP/HTTPS GET 訪問
        std::string xidString = quest->http_uri(xidKey);
        if (xidString.empty())
            xid = 0;
        else
            xid = std::stoll(xidString);

        xname = quest->http_uri(xnameKey);

        //-- 如果是 POST 訪問,而不是 GET 訪問
        if (xid == 0)
            xid = args->wantInt(xidKey);

        if (xname.empty())
            xname = args->wantString(xnameKey);
    }
    else
    {
        //-- FPNN/WebSocket/HTTP POST/HTTPS POST 訪問
        xid = args->wantInt(xidKey);
        xname = args->wantString(xnameKey);
    }

    if (xid == 0)
        return FpnnErrorAnswer(quest, FPNN_EC_CORE_UNKNOWN_ERROR, std::string("Invalid ").append(xidKey).append("!").c_str());

    if (xname.empty())
        return FpnnErrorAnswer(quest, FPNN_EC_CORE_UNKNOWN_ERROR, std::string("Invalid ").append(xnameKey).append("!").c_str());

    return nullptr;
}

在以上基礎上,我們再來實現創建羣組的功能:

QuestProcessor.cpp:

FPAnswerPtr QuestProcessor::createGroup(const FPReaderPtr args, const FPQuestPtr quest, const ConnectionInfo& ci)
{
    int64_t uid = 0;
    std::string groupname;

    FPAnswerPtr answer = extraParams(args, quest, "uid", "group", uid, groupname);
    if (answer)
        return answer;

    int64_t gid = 0;

    {
        std::unique_lock<std::mutex> lck(_mutex);
        if ((*_root)["group"].exist(groupname) == false)
        {
            gid = (*_root)["nextGid"];

            (*_root)["nextGid"] = gid + 1;
            (*_root)["group"][groupname] = gid;
        }
    }

    if (gid == 0)
        return FpnnErrorAnswer(quest, FPNN_EC_CORE_UNKNOWN_ERROR, "Group is existed!");

    int32_t ts = slack_real_sec();
    std::string sign;
    int64_t salt;
    makeSignAndSalt(ts, "addgroupmembers", sign, salt);

    FPQWriter qw(6, "addgroupmembers");
    qw.param("pid", _pid);
    qw.param("sign", sign);
    qw.param("salt", salt);
    qw.param("ts", ts);
    qw.param("gid", gid);
    qw.param("uids", std::set<int64_t>{uid});

    FPAWriter aw(1, quest);
    aw.param("gid", gid);

    sendAsyncQuest(qw.take(), genAsyncAnswer(quest), aw.take());

    return nullptr;
}

在 createGroup 中,我們先驗證需要創建的羣組唯一名稱並不存在,然後給羣組分配唯一數字ID,記錄羣組唯一名稱和ID信息,然後向 RTM服務集羣提交 addgroupmembers 請求。當 RTM 服務集羣對 addgroupmembers 請求通過後,再返回給 App 對應的應答數據。

4.7. joinGroup

加入羣組與創建羣組類似。
當確認羣組存在後,我們的服務器便向 RTM服務集羣提交 addgroupmembers 請求。當 RTM 服務集羣對 addgroupmembers 請求通過後,再返回給 App 對應的應答數據。

QuestProcessor.cpp:

FPAnswerPtr QuestProcessor::joinGroup(const FPReaderPtr args, const FPQuestPtr quest, const ConnectionInfo& ci)
{
    int64_t uid = 0;
    std::string groupname;

    FPAnswerPtr answer = extraParams(args, quest, "uid", "group", uid, groupname);
    if (answer)
        return answer;

    int64_t gid = 0;

    {
        std::unique_lock<std::mutex> lck(_mutex);
        if ((*_root)["group"].exist(groupname))
            gid = (*_root)["group"][groupname];
        else
            return FpnnErrorAnswer(quest, FPNN_EC_CORE_UNKNOWN_ERROR, "Group is not existed!");        
    }

    int32_t ts = slack_real_sec();
    std::string sign;
    int64_t salt;
    makeSignAndSalt(ts, "addgroupmembers", sign, salt);

    FPQWriter qw(6, "addgroupmembers");
    qw.param("pid", _pid);
    qw.param("sign", sign);
    qw.param("salt", salt);
    qw.param("ts", ts);
    qw.param("gid", gid);
    qw.param("uids", std::set<int64_t>{uid});

    FPAWriter aw(1, quest);
    aw.param("gid", gid);

    sendAsyncQuest(qw.take(), genAsyncAnswer(quest), aw.take());

    return nullptr;
}

4.8. createRoom

創建房間與創建羣組高度相似。區別只是一個是羣組,一個是房間。而且房間App端可以直接加入,因此不再有對 RTM 集羣的請求操作。

QuestProcessor.cpp:

FPAnswerPtr QuestProcessor::createRoom(const FPReaderPtr args, const FPQuestPtr quest, const ConnectionInfo& ci)
{
    std::string roomname;

    if (quest->isHTTP())
    {
        //-- HTTP/HTTPS GET 訪問
        roomname = quest->http_uri("room");

        //-- 如果是 POST 訪問,而不是 GET 訪問
        if (roomname.empty())
            roomname = args->wantString("room");
    }
    else
    {
        //-- FPNN/WebSocket/HTTP POST/HTTPS POST 訪問
        roomname = args->wantString("room");
    }

    int64_t rid = 0;

    {
        std::unique_lock<std::mutex> lck(_mutex);
        if ((*_root)["room"].exist(roomname) == false)
        {
            rid = (*_root)["nextRid"];

            (*_root)["nextRid"] = rid + 1;
            (*_root)["room"][roomname] = rid;
        }
        else
            rid = (*_root)["room"][roomname];
    }

    FPAWriter aw(1, quest);
    aw.param("rid", rid);

    return aw.take();
}

4.9. lookup
最後,是查詢功能。
查詢其實就是在Json的數據字典中,進行查找或遍歷。為了簡單易於理解,這裏沒有做任何的優化和技巧處理。流程也很簡單,不再做詳細説明。相關代碼如下:

QuestProcessor.cpp:

FPAnswerPtr QuestProcessor::lookup(const FPReaderPtr args, const FPQuestPtr quest, const ConnectionInfo& ci)
{
    std::set<int64_t> uids, gids, rids;
    std::set<std::string> users, groups, rooms;

    uids = args->get("uids", uids);
    gids = args->get("gids", gids);
    rids = args->get("rids", rids);

    users = args->get("users", users);
    groups = args->get("groups", groups);
    rooms = args->get("rooms", rooms);

    std::map<std::string, int64_t> userResult, groupResult, roomResult;

    //====================================//
    {
        std::unique_lock<std::mutex> lck(_mutex);

        for (auto& username: users)
        {
            if ((*_root)["account"].exist(username))
                userResult[username] = (*_root)["account"][username]["uid"];
        }

        for (auto& groupname: groups)
        {
            if ((*_root)["group"].exist(groupname))
                groupResult[groupname] = (*_root)["group"][groupname];
        }

        for (auto& roomname: rooms)
        {
            if ((*_root)["room"].exist(roomname))
                roomResult[roomname] = (*_root)["room"][roomname];
        }

        if (uids.size() > 0)
        {
            const std::map<std::string, JsonPtr> * accountDict = _root->getDict("account");
            for (auto& node: *accountDict)
            {
                int64_t uid = (int64_t)(node.second->wantInt("uid"));
                if (uids.find(uid) != uids.end())
                {
                    userResult[node.first] = uid;
                    uids.erase(uid);

                    if (uids.empty())
                        break;
                }
            }
        }
            
        if (gids.size() > 0)
        {
            const std::map<std::string, JsonPtr> * groupDict = _root->getDict("group");
            for (auto& node: *groupDict)
            {
                int64_t gid = *(node.second);
                if (gids.find(gid) != gids.end())
                {
                    groupResult[node.first] = gid;
                    gids.erase(gid);

                    if (gids.empty())
                        break;
                }
            }
        }

        if (rids.size() > 0)
        {
            const std::map<std::string, JsonPtr> * roomDict = _root->getDict("room");
            for (auto& node: *roomDict)
            {
                int64_t rid = *(node.second);
                if (rids.find(rid) != rids.end())
                {
                    roomResult[node.first] = rid;
                    rids.erase(rid);

                    if (rids.empty())
                        break;
                }
            }
        }
    }
    //====================================//

    FPAWriter aw(3, quest);
    aw.param("users", userResult);
    aw.param("groups", groupResult);
    aw.param("rooms", roomResult);

    return aw.take();
}

至此,整個 IMDemo 服務端邊開發完成。
Server 端完整代碼請參見:https://github.com/highras/rt...

5. 編譯 & 運行

編譯好 FPNN框架後,進入 IMDemoServer 的代碼目錄,直接 make 就行。

[swxlion@ip-10-65-5-131 demoServer]$ make
g++ -c  -std=c++11 -DHOST_PLATFORM_AWS -I../../../infra-fpnn/extends -I../../../infra-fpnn/core -I../../../infra-fpnn/proto -I../../../infra-fpnn/base -I../../../infra-fpnn/proto/msgpack -I../../../infra-fpnn/proto/rapidjson -g -Wall -Werror -fPIC -O2  -o BizServer.o BizServer.cpp
g++ -c  -std=c++11 -DHOST_PLATFORM_AWS -I../../../infra-fpnn/extends -I../../../infra-fpnn/core -I../../../infra-fpnn/proto -I../../../infra-fpnn/base -I../../../infra-fpnn/proto/msgpack -I../../../infra-fpnn/proto/rapidjson -g -Wall -Werror -fPIC -O2  -o RTMMidGenerator.o RTMMidGenerator.cpp
g++ -c  -std=c++11 -DHOST_PLATFORM_AWS -I../../../infra-fpnn/extends -I../../../infra-fpnn/core -I../../../infra-fpnn/proto -I../../../infra-fpnn/base -I../../../infra-fpnn/proto/msgpack -I../../../infra-fpnn/proto/rapidjson -g -Wall -Werror -fPIC -O2  -o QuestProcessor.o QuestProcessor.cpp
g++  -std=c++11 -DHOST_PLATFORM_AWS -I../../../infra-fpnn/extends -I../../../infra-fpnn/core -I../../../infra-fpnn/proto -I../../../infra-fpnn/base -I../../../infra-fpnn/proto/msgpack -I../../../infra-fpnn/proto/rapidjson -g -Wall -Werror -fPIC -O2  -o BizServer BizServer.o RTMMidGenerator.o QuestProcessor.o -L../../../infra-fpnn/core -L../../../infra-fpnn/proto -L../../../infra-fpnn/extends -L../../../infra-fpnn/base -lfpnn -O2 -rdynamic -lstdc++ -lfpnn -lfpproto -lextends -lfpbase -lpthread -lz -lssl -lcrypto -lcurl -ltcmalloc
[swxlion@ip-10-65-5-131 demoServer]$ 

運行:

[swxlion@ip-10-65-5-131 demoServer]$ ./IMDemoServer im.conf

6. 項目後記

到此,IMDemo服務端開發完畢,整個IMDemo項目已經處於完全可用的狀態。
整個項目的運行效果可參見下面的動畫演示:

https://www.bilibili.com/vide...

完整的項目代碼請參見:https://github.com/highras/rt...
出於篇幅的原因,我們沒有介紹文件的發送、離線語音、實時音視頻,以及多語言翻譯、語音識別、文本審核、圖像審核、音視頻審核等功能。這些將後續以本Demo擴展的方式,或者另起新篇進行介紹。如果有感興趣的同學,可以先行瀏覽雲上曲率官網進行了解。

安全性警告

本服務器代碼僅做演示和講解之用,沒有做進一步的安全措施。
此外,對於用户,僅做了用户名的衝突檢測,沒有做進一步的安全管控。
實際項目中,請避免在公網執行非加密傳輸,或者未驗證操作。

使用雲上曲率RTM的知名企業與項目

FunPlus,趣加集團。中國出海品牌榜第19名,遊戲領域SLG品類全球第一。
其中,
《阿瓦隆之王》(King of Avalon):曾連續半年以上進入Apple AppStore 遊戲收入榜前五的SLG類型遊戲。全面使用雲上曲率RTM作為信令傳輸和IM聊天消息渠道。單項目消息量日峯值超越240億條/天。
《火槍紀元》(Guns of Glory):同樣曾連續半年以上進入Apple AppStore 遊戲收入榜前五的SLG類型遊戲,同樣全面使用雲上曲率RTM作為信令傳輸和IM聊天消息渠道。

Century Games:點點互動/世紀創遊。旗下多款知名遊戲的聊天系統採用雲上曲率RTM。

其它相關的SDK

雲上曲率基於RTM,面向遊戲和社交娛樂行業,還分別有不帶UI界面的IM Lib SDK,以及帶有可自定義界面的 IM Kit SDK。
相對於 RTM SDK 而言,IM Lib 和 IM Kit 更加上層,封裝更加面相IM和社交娛樂行業。接如何使用更加簡便。我們後續會推出基於IM Lib 和 IM Kit 的文章和教程。

IM Kit 效果參考:

7. 其他的版本與方案

本篇計劃後續推出Android/Kotlin版本,以及Go的服務端版本。
RTC部分正在考慮合適的demo形式。
此外,RTM是信令層面的SDK,我們後續也將推出IM Lib和IM Kit 層面的demo和文章。
相關後續文檔,請關注雲上曲率官方賬號。

user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.