微服务开源框架TARS之有哪些基础组件

本篇内容介绍了“微服务开源框架TARS之有哪些基础组件”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

创新互联公司专注于企业成都全网营销、网站重做改版、兴安网站定制设计、自适应品牌网站建设、H5响应式网站成都做商城网站、集团公司官网建设、外贸网站建设、高端网站制作、响应式网页设计等建站业务,价格优惠性价比高,为兴安等各大城市提供网站开发制作服务。

线程安全队列: TC_ThreadQueue

先看下框架对TC_ThreadQueue类的使用如下:

typedef TC_ThreadQueue > recv_queue; // 接收队列
typedef TC_ThreadQueue > send_queue; // 发送队列

TC_ThreadQueue 的实现比较简单,在TARS的网络层实现中可以发现这个类比较重要,因为从框架中收到的网络包都会加入到这个缓存队列里面,然后多业务线程 ServantHandle 会调用 waitForRecvQueue 从该队列里面取网络数据包,然后调用 dispatch 调用协议消息对应的处理函数,先看下框架对 TC_ThreadQueue 的实现:

/**
 * @brief 线程安全队列
 */
template >
class TC_ThreadQueue
{
public:
    TC_ThreadQueue():_size(0){};

public:

    typedef D queue_type;

	/**
	 * @brief 从头部获取数据, 没有数据抛异常
	 *
	 * @param t
	 * @return bool: true, 获取了数据, false, 无数据
	 */
	T front();

    /**
     * @brief 从头部获取数据, 没有数据则等待.
     *
     * @param t 
	 * @param millsecond(wait = true时才生效)  阻塞等待时间(ms)
	 *                    0 表示不阻塞 
     * 					 -1 永久等待
     * @param wait, 是否wait
     * @return bool: true, 获取了数据, false, 无数据
     */
    bool pop_front(T& t, size_t millsecond = 0, bool wait = true);
    ...
    ...
}

TC_ThreadQueue使用了C++11标准库中的用于实现线程锁和 wait,如下,看下队列的成员函数:push_front 在队列前面加入数据,

template void TC_ThreadQueue::push_front(const T& t, bool notify)
{
    if(notify) {
        std::unique_lock lock(_mutex);

        _cond.notify_one();

        _queue.push_front(t);

        ++_size;
    }
    else
    {
        std::lock_guard lock (_mutex);

        _queue.push_front(t);

        ++_size;
    }
}

如上图调用push_front函数的时候调用 std::unique_lock lock(_mutex)加锁 ,避免网络层接收数据和业务层取同一队列的数据冲突,_cond.notify_one() 通知等待在该锁上某一个线程醒过来,调用该函数之前必须加锁,因为有数据过来了,例如网络层有线程需要取包并进行分发处理。

再看一个成员函数pop_front,从头部获取数据,没有数据则等待。millisecond 阻塞等待时间(ms)

  • 0 表示不阻塞

  • -1 永久等待

template bool TC_ThreadQueue::pop_front(T& t, size_t millsecond, bool wait)
{
    if(wait) {

        std::unique_lock lock(_mutex);

        if (_queue.empty()) {
            if (millsecond == 0) {
                return false;
            }
            if (millsecond == (size_t) -1) {
                _cond.wait(lock);
            }
            else {
                //超时了
                if (_cond.wait_for(lock, std::chrono::milliseconds(millsecond)) == std::cv_status::timeout) {
                    return false;
                }
            }
        }

        if (_queue.empty()) {
            return false;
        }

        t = _queue.front();
        _queue.pop_front();
        assert(_size > 0);
        --_size;

        return true;
    }
    else
    {
        std::lock_guard lock (_mutex);
        if (_queue.empty())
        {
            return false;
        }

        t = _queue.front();
        _queue.pop_front();
        assert(_size > 0);
        --_size;

        return true;
    }
}

BindAdapter::waitForRecvQueue的函数就是调用了pop_front函数,用于等待接收队列,函数原型如下:

bool TC_EpollServer::BindAdapter::waitForRecvQueue(uint32_t handleIndex, shared_ptr &data)
{
	bool bRet = getRecvQueue(handleIndex).pop_front(data);

    if (!bRet)
    {
        return bRet;
    }

    --_iRecvBufferSize;

    return bRet;
}

这里BindAdapter::waitForRecvQueue用于业务线程在等待服务器监听的适配器收到网络包后进行业务包的处理,这里传入的handleIndex表示接收队列索引,获取对应的_rbuffer

普通线程锁: TC_ThreadLock

TC_ThreadLock 类的定义如下

typedef TC_Monitor TC_ThreadLock;

TC_Monitor 线程锁监控模板类。通常线程锁,都通过该类来使用,而不是直接用TC_ThreadMutexTC_ThreadRecMutex

类的定义template class TC_Monitor 需要传入两个模板参数,TC_Monitor 包括以下成员变量:

mutable int     _nnotify;   // 上锁的次数
mutable P       _cond;      // 条件变量
T               _mutex;     // 互斥锁
/**
 * @brief 定义锁控制对象
 */
typedef TC_LockT > Lock;
typedef TC_TryLockT > TryLock;

第一个参数 TC_ThreadMutex 代表线程锁:同一个线程不可以重复加锁 ,包含成员变量

mutable std::mutex _mutex

延伸阅读,这里 tc_thread_mutex.h 还包括另外一个循环锁类 TC_ThreadRecMutex,即一个线程可以加多次锁,定义如下:

// 定义于tc_monitor.h中
typedef TC_Monitor TC_ThreadRecLock;

第二个参数 TC_ThreadCond 代表线程信号条件类:所有锁可以在上面等待信号发生,包含线程条件成员变量:

mutable std::condition_variable_any _cond

结合实际的使用场景,TC_Monitor::timedWait() 会调用 TC_ThreadCond 对象的 timedWait 函数,下一步调用 chrono 库的 millisecondsTC_ThreadCond::signal() 实现发送信号,等待在该条件上的一个线程会醒。

TC_LockT类定义: template class TC_LockT锁模板类,与其他具体锁配合使用,构造时候加锁,析够的时候解锁。

TC_LockT 构造函数,传入互斥量初始化成员变量 _mutexTC_LockT构造函数实现:

TC_LockT(const T& mutex) : _mutex(mutex) {
        _mutex.lock();
        _acquired = true;
}

到这里就可以看出 TC_Monitor 定义的 typedef TC_LockT > Lock,这里 Lock 类型的模板参数用的是 TC_Monitor 类。

实际使用场景如下:

Lock lock(*this);

TC_LockT 的构造函数,传入参数 thisTC_Monitor 的子类对象,TC_LockT 的构造函数调用_mutex.lock();实际就是调用了 TC_Monitor 对象的 lock 函数,TC_Monitorlock 函数实现:

void lock() const
{
    _mutex.lock();
    _nnotify = 0;
}

这里 _mutexTC_ThreadMutex 对象,进一步调用了 TC_ThreadRecMutex::lock() 成员函数,实现如下:

void TC_ThreadMutex::lock() const
{
    _mutex.lock();
}

然后上面定义的lock栈变量退出函数的时候调用 TC_LockT 的析构函数:实现如下:

virtual ~TC_LockT()
{
    if (_acquired)
    {
        _mutex.unlock(); //这里会调用TC_Monitor的unlock函数
    }
}

TC_Monitorunlock 函数实现:

void unlock() const
{
    notifyImpl(_nnotify);
    _mutex.unlock(); //这里会调用C++标准库中的unlock
}

这里调用 notifyImpl 函数是因为 TC_Monitor 类不只可以实现简单的互斥锁功能,还可以实现条件变量Condition功能,其中 notifyImpl 的实现为

void notifyImpl(int nnotify) const
{
    if(nnotify != 0)
    {
        if(nnotify == -1)
        {
            _cond.broadcast();
            return;
        }
        else
        {
            while(nnotify > 0)
            {
                _cond.signal();
                --nnotify;
            }
        }
    }
}

线程基类: TC_Thread

还是老样子,先看下项目实际对线程基类的使用。实际项目使用中,我们对 TC_Thread 又封装了一下,实现了一个BasicThread 类,下面看下 BasicThread 的定义:

class BasicThread : public tars::TC_Thread, public tars::TC_ThreadLock
{

   ...

    void terminate()
    {
        _bTerm = true;
        {
            Lock lock(*this);
            notifyAll();
        }
        getThreadControl().join();
    }
}

BasicThread 类,继承了 TC_ThreadTC_ThreadLock ,其中 TC_ThreadLock 第二点已经说明过了,所以这里重点看下 TC_Thread 类的使用,TC_Thread 的定义

class TC_Thread : public TC_Runable
{

    ...
    
    /**
     * 使用了C++11标准线程库std::thread, 构造函数传参数threadEntry线程函数,
     * 返回 TC_ThreadControl(_th),其中_th为std::thread对象
     */
    TC_ThreadControl start();

    static void threadEntry(TC_Thread *pThread); //静态函数, 线程入口

    virtual void run() = 0;

    ...
}

下一步看下线程控制类 TC_ThreadControl 的定义:

class TC_ThreadControl 
{
...
explicit TC_ThreadControl(std::thread *th); // 构造,传入std::thread对象

void join(); // 调用std::thread的join()阻塞当前的线程,直到另外一个线程运行结束

static void sleep(); // 调用std::this_thread::sleep函数线程将暂停执行
...
}

下一步看下 TC_Runable 的定义:

class TC_Runable
{
public:
    virtual ~TC_Runable(){};
    virtual void run() = 0; //定义了run纯虚函数
};

最后看下实际项目中对线程类的使用

class AntiSdkSyncThread : public BasicThread //这里等于多继承了TC_Thread和TC_ThreadLock两个类
{
    void run()  //实现基类的纯虚函数
    {
        Lock lock(*this);

        timedWait(10 * 1000); (间隔执行时间,实现了线程的定时执行功能)

            if(NULL != g_busi_interf)
            {
                Int32 ret = g_busi_interf->proc_();  //需要定期执行的函数
            }
    }
}

定义好了 AntiSdkSyncThread g_antiSdkSyncThread; 类,那么需要启动线程的时候执行g_antiSdkSyncThread.start(); 就会自然创建线程,并且 threadEntry 线程函数会调用 pThread->run() 多态函数,进程退出的时候调用 g_antiSdkSyncThread.terminate();

智能指针类: TC_AutoPtr

这里的智能指针可以放在容器中,且线程安全的智能指针,CPP11标准库的auto_ptr是不能放在容器中的,貌似已经被淘汰了,目前多数使用CPP11标准库的shared_ptr,不过需要编译器支持CPP11。

TC_HandleBase智能指针基类的定义如下,所有需要智能指针的类都需要从该对象继承,其中使用了C++11标准库中的进行原子计数。

class UTIL_DLL_API TC_HandleBase
{
public:

    /**
     * @brief 复制
     *
     * @return TC_HandleBase&
     */
    TC_HandleBase& operator=(const TC_HandleBase&)
    {
        return *this;
    }

    /**
     * @brief 增加计数
     */
    void incRef() { ++_atomic; }

    /**
     * @brief 减少计数
     */
    void decRef()
    {
        if((--_atomic) == 0 && !_bNoDelete)
        {
            _bNoDelete = true;
            delete this;
        }
    }

    /**
     * @brief 获取计数.
     *
     * @return int 计数值
     */
    int getRef() const        { return _atomic; }

    /**
     * @brief 设置不自动释放. 
	 *  
     * @param b 是否自动删除,true or false
     */
    void setNoDelete(bool b)  { _bNoDelete = b; }

protected:

    /**
     * @brief 构造函数    
     */
    TC_HandleBase() : _atomic(0), _bNoDelete(false)
    {
    }

    /**
     * @brief 拷贝构造
     */
    TC_HandleBase(const TC_HandleBase&) : _atomic(0), _bNoDelete(false)
    {
    }

    /**
     * @brief 析构
     */
    virtual ~TC_HandleBase()
    {
    }

protected:

    std::atomic	_atomic;    // 引用计数
    bool                _bNoDelete; // 是否自动删除
};

下一步看 TC_AutoPtr 智能指针模板类,可以放在容器中,且线程安全的智能指针,该智能指针通过引用计数实现,其构造函数和析构函数定义如下:

template 
class TC_AutoPtr
{ 
    TC_AutoPtr(T* p = 0)
    {
        _ptr = p;

        if(_ptr)
        {
            _ptr->incRef(); //构造函数 引用计算加1
        }
    }

    ...

  ~TC_AutoPtr()
    {
        if(_ptr)
        {
            _ptr->decRef(); //析构函数 引用计算减1
        }
    }
}
例子:实战项目使用
struct ConnStruct : public TC_HandleBase{...}

typedef TC_AutoPtr ConnStructPtr;

TC_AutoPtr 拷贝构造调用 _ptr->incRef(); 这里 ptrConnStructConnStruct继承于TC_HandleBase,等于调用了TC_HandleBaseT::incRef() {++_atomic;}

引用计数原子操作加1、析构引用计数原子操作减1,当引用计数减少到0时根据设置的开关是否要进行删除来决定是否触发delete。

例子:这是TARS使用异步rpc回调的典型例子,这里回调类使用了智能指针
// 定义回调函数智能指针,其中SessionCallback父类继承于TC_HandleBase
typedef TC_AutoPtr SessionCallbackPtr;  

//创建回调类SessionCallbackPtr,并传入初始化参数uin gameid等;
SessionCallbackPtr cb = new SessionCallback(iUin, iGameId, iSeqID, iCmd,sSessionID, theServant, current, cs, this);
//异步调用sessionserver远程接口
getSessionPrx()->async_getSession(cb, iUin, iGameId);

接口返回完成,回调SessionCallback::callback_getSession(tars::Int32 ret, const MGComm::SessionValue& retValue)函数,接收sessionserver接口的返回的SessionValue结构。

因为 SessionCallbackPtr 使用了智能指针,所以业务不需要去手动释放前面 new 出来的 SessionCallbackPtr ,还是比较方便的。

MySQL操作类: TC_Mysql

TC_Mysql封装好的mysql操作类,非线程安全,对于 insert/update 可以有更好的函数封装,防止SQL注入

使用方式:

TC_Mysql mysql;
//初始化mysql,init时不链接,请求时自动建立链接;
//数据库可以为空;
//端口默认为3306
mysql.init("192.168.1.2", "pc", "pc@sn", "db_tars_demo");

通常用:void init(const TC_DBConf& tcDBConf); 直接初始化数据库。例如:stDirectMysql.init(_stZoneDirectDBConf);

看下TC_DBConf的定义

struct TC_DBConf
{
    string _host;
    string _user;
    string _password;
    string _database;
    string _charset;
    int _port;
    int _flag;   //客户端标识

    TC_DBConf()
        : _port(0)
        , _flag(0)
    {}

    /**
    * @brief 读取数据库配置. 
    * 
    * @param mpParam 存放数据库配置的map 
    *        dbhost: 主机地址
    *        dbuser:用户名
    *        dbpass:密码
    *        dbname:数据库名称
    *        dbport:端口
    */
    void loadFromMap(const map &mpParam)
    {
        map mpTmp = mpParam;

        _host        = mpTmp["dbhost"];
        _user        = mpTmp["dbuser"];
        _password    = mpTmp["dbpass"];
        _database    = mpTmp["dbname"];
        _charset     = mpTmp["charset"];
        _port        = atoi(mpTmp["dbport"].c_str());
        _flag        = 0;

        if(mpTmp["dbport"] == "")
        {
            _port = 3306;
        }
    }
};
//进一步看下获取数据的使用
TC_Mysql::MysqlData data;
data = mysql.queryRecord("select * from t_app_users");
for(size_t i = 0; i < data.size(); i++)
{
    //如果不存在ID字段,则抛出异常
    cout << data[i]["ID"] << endl;
}

查询出来的mysql数据用MysqlData封装

class MysqlData
{  ...
    vector >& data();      
   ...
}
//插入数据,指定数据的类型:数值 或 字符串,对于字符串会自动转义
map > m;
m["ID"]     = make_pair(TC_Mysql::DB_INT, "2334");
m["USERID"] = make_pair(TC_Mysql::DB_STR, "abcttt");
m["APP"]    = make_pair(TC_Mysql::DB_STR, "abcapbbp");
m["LASTTIME"]    = make_pair(TC_Mysql::DB_INT, "now()");

mysql.replaceRecord("t_user_logs", m);

网络组件

整个TARS核心就提供一个很完善的网络框架,包括RPC功能,这里只介绍几个常用的网络组件。

TC_Socket : 封装了socket的基本方法

提供socket的操作类;支持tcp/udp socket;支持本地域套接字。

再下一层TARS封装了TC_TCPClientTC_UDPClient两个类用于实际操作tcp和udp应用。

使用方式:

例如:tcp客户端

TC_TCPClient stRouterClient;

stRouterClient.init(sIP, iPort, iTimeOut); // 这里传入ip和端口然后调用sendRecv进行消息的收发

Int32 ret = stRouterClient.sendRecv(request.c_str(), request.length(), recvBuf, iRecvLen);

注意多线程使用的时候,不能多线程同时send/recv,小心串包。

TC_Epoller

提供网络epoll的操作类,默认是ET模式,当状态发生变化的时候才获得通知,提供add、mod、del、wait等基础操作。

TC_ClientSocket : 客户端socket相关操作基类

提供关键成员函数init(const string &sIp, int iPort, int iTimeout),传入 IP 端口 和 超时时间

TC_TCPClient 继承于 TC_ClientSocket 提供成员函数:

  • sendRecv(发送到服务器, 从服务器返回不超过iRecvLen的字节)

  • sendRecvBySep( 发送倒服务器, 并等待服务器直到结尾字符, 包含结尾字符)

例子:
stRouterClient.init(sIP, iPort, iTimeOut);

size_t iRecvLen = sizeof(recvBuf)-1;
Int32 ret = stRouterClient.sendRecv(request.c_str(), request.length(), recvBuf, iRecvLen);

同理还有TC_UDPClient实现UDP客户端。

命令解析类: TC_Option

  1. 命令解析类;

  2. 通常用于解析命令行参数;

  3. 只支持双—的参数形式

  4. 分析main的输入参数,支持以下形式的参数:

./main.exe --name=value --param1 param2 param3
TC_Option op;
//解析命令行
op.decode(argc, argv);
//获取成对的参数,即获取 - - 表示的所有参数对
map mp = op.getMulti();
//表示非 – 的参数:即 param2, param3
vector d = op.getSingle();

如果value,param有空格或者 -- ,用引号括起来就可以了。

配置文件类: TC_Config

  1. 配置文件解析类(兼容wbl模式);

  2. 支持从string中解析配置文件;

  3. 支持生成配置文件;

  4. 解析出错抛出异常;

  5. 采用[]获取配置,如果无配置则抛出异常;

  6. 采用get获取配置,不存在则返回空;

  7. 读取配置文件是线程安全的,insert域等函数非线程安全

例子:
TC_Config config;
config.parseFile(ServerConfig::BasePath + ServerConfig::ServerName + ".conf");
stTmpGameServerConfig.iGameId = TC_Common::strto(config["/Main/"]);

配置文件样例

    GameId = 3001     ZoneId = 102     AsyncThreadCheckInterval = 1000     ...

使用get方法例子:如果读不到该配置,则返回默认值 sDefault,即下面例子中的 20000000

stTmpGameServerConfig.iMaxRegNum = TC_Common::strto(config.get("/Main/", "20000000"));

通用仿函数类: TC_Functor

TC_Functor 参考loki库的设计

  1. 仿函数对象调用方式, 即对上述的几种方式都可以在右侧添加一对圆括号,并在括号内部放一组合适的参数来调用,例如 a(p1,p2);

  2. 把整个调用(包括参数)封装一个函数对象, 调用对象建立时就传入了参数,调用的时候不用传入参数,例如 A a(p1, p2); a();

简单又好用的封装,具体见下面使用例子自然明白:

C函数调用

void TestFunction3(const string &s, int i){
    cout << "TestFunction3('">

C函数调用用wrapper封装:

//调用封装,构造的时候传入参数

TC_Functor::Result>::wrapper_type fwrapper3(cmd3, s3, 10);
fwrapper3();  //参数已经在构造的时候传入,调用的时候不用传参数了

说明:

  • void : 函数的返回值

  • TL::TLMaker::Result : 代表参数类型

对于调用的封装,注意对于传引用类型,具体的调用时候要保证引用的对象存在。

C++指向类成员函数的调用

struct TestMember
{
    void mem3(const string &s, int i)
    {
        cout << "TestMember::mem3(" << s << "," << i << ") called" << endl;
    }
}
TC_Functor::Result > cmd3(&tm, &TestMember::mem3);
cmd3("a", 33);

指向类成员函数的调用用wrapper封装:

TC_Functor::Result >::wrapper_type fwrapper3(cmd3, "a", 10);
fwrapper3();

实际例子:注册协议解析器

服务初始化initialize的时候,一般会调用

addServantProtocol(sRouterObj, AppProtocol::parseStream<0, uint16_t, false>,iHeaderLen);

这里设置BindAdapter的协议解析函数 protocol_functor _pfparseStream 函数,如下:

/**
 * @param T
 * @param offset
 * @param netorder
 * @param in
 * @param out
 * @return int
 */
template
static TC_NetWorkBuffer::PACKET_TYPE parseStream(TC_NetWorkBuffer& in,vector& out)
{
    size_t len = offset + sizeof(T);

    if (in.getBufferLength() < len)
    {
        return TC_NetWorkBuffer::PACKET_LESS;
    }

    string header;
    in.getHeader(len, header);

    assert(header.size() == len);

    T iHeaderLen = 0;

    ::memcpy(&iHeaderLen, header.c_str() + offset, sizeof(T));

    if (netorder)
    {
        iHeaderLen = net2host(iHeaderLen);
    }

    //长度保护一下
    if (iHeaderLen < (T)(len) || (uint32_t)iHeaderLen > TARS_NET_MAX_PACKAGE_SIZE)
    {
        return TC_NetWorkBuffer::PACKET_ERR;
    }

    if (in.getBufferLength() < (uint32_t)iHeaderLen)
    {
        return TC_NetWorkBuffer::PACKET_LESS;
    }

    in.getHeader(iHeaderLen, out);

    assert(out.size() == iHeaderLen);

    in.moveHeader(iHeaderLen);

    return TC_NetWorkBuffer::PACKET_FULL;
}

注册好解析函数之后,网络层收包调用parseProtocol函数

int TC_EpollServer::Connection::parseProtocol(TC_NetWorkBuffer &rbuf)
{
    ...
    TC_NetWorkBuffer::PACKET_TYPE b = _pBindAdapter->getProtocol()(rbuf, ro); //这里回调前面设置好的协议解析函数,从而实现协议解析
    ...
}

hash算法

util/tc_hash_fun.h中包含了对hash算法的实现,使用 hash_new ,可以对输入的字节流进行hash得到相当均匀的hash值,使用方式如下

#include "util/tc_hash_fun.h"
#include 
#include 
#include 

using namespace tars;
using namespace std;

int main(int argc, char* *argv[])
{
    unsigned int i = tars::hash_new()("abcd");
    cout << i << endl;
    return 0;
}

异常类: TC_Exception

class TC_Exception : public exception
{  
    /**
     * @brief 构造函数,提供了一个可以传入errno的构造函数, 
     *        异常抛出时直接获取的错误信息
     *  
     * @param buffer 异常的告警信息 
     * @param err    错误码, 可用strerror获取错误信息
     */
    TC_Exception(const string &buffer, int err);  
}

“微服务开源框架TARS之有哪些基础组件”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注创新互联网站,小编将为大家输出更多高质量的实用文章!


网站栏目:微服务开源框架TARS之有哪些基础组件
浏览路径:http://csdahua.cn/article/ieoghi.html
扫二维码与项目经理沟通

我们在微信上24小时期待你的声音

解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流