首页 > *nix应用编程, *nix技术, 多核优化 > 双缓存的性能优化

双缓存的性能优化

2022年8月13日 发表评论 阅读评论 852 次浏览

一,问题描述
最近遇到个c程序优化问题,这个c程序(下称:程序c)的业务功能很简单,主要就是接收客户端的状态信息,并且更新到mysql数据库。
当前程序c的实现是单进程/单线程的select监听一个udp socket服务端,当接收到客户端的udp数据包时,就将数据包里的状态信息更新到mysql。

状态请求:
client –状态–> 程序c –更新–> mysql

client伪代码:

main
    while true
        new socket
        sendto socket data
        sleep 5
    end while
end main

即:单个客户端都是间隔5秒,就向服务端发送状态data

server伪代码:

main
    while true
        select sockfd
        data = recvfrom sockfd
        update_to_mysql data
    end while
end main

即:服务端单进程/单线程select等待,一旦收到客户端的data,就update更新到mysql

对于服务端的这种简单模型,如果客户端数量不多,那么服务端不会存在性能瓶颈。
一旦客户端数量过多,服务端就会存在问题,特别是在程序c里,更新数据到mysql的逻辑并不是简单的一条sql语句,而是各种查询,判断,更新,删除等操作。
所以,上面的server伪代码里,update_to_mysql函数就成为了性能瓶颈。
当客户端数量过多,而程序c又陷在update_to_mysql函数里没出来,就会出现select/recvfrom不及时,导致内核socket buffer满而丢包,也就会出现客户端的的后续请求丢失。
如果程序c只处理客户端的状态更新请求,那么业务异常现象可能就是某些终端的状态长时间得不到更新。
更严重的问题是,程序c还要处理其他一些控制请求,比如接收控制程序d的命令信息(比如对某个终端进行关机操作),并执行相应的动作。

命令请求:
程序d –命令–> 程序c –控制,比如关机–> client
这种情况下,client也有一个daemon程序,用来接收控制命令。

server伪代码:

main
    while true
        select sockfd
        if clent
            data = recvfrom sockfd
            update_to_mysql data
        if control
            data = recvfrom sockfd
            control_func data
    end while
end main

程序c被客户端状态更新请求占满,导致控制程序d的命令请求被丢失,业务异常现象就是业务失效,比如对终端的关机操作失败。

二,问题关键点
可以看到,在客户端数量过多的情况下,update_to_mysql是性能瓶颈,该函数里面调用了大量的safe_mysql_query、mysql_store_result等接口操作,而且从实际运行情况来看,mysqld服务程序的CPU占用也过高了。

三,优化方案
1,优化mysql。这个是基础,不论对程序c如何优化,优化mysql都有帮助。
2,优化客户端,将客户端的间隔5秒改为间隔10秒,轻轻松松,性能翻倍。负面影响:业务及时性下降
3,改程序c为多进程(类似nginx,多工作进程),比如单进程改双进程,性能翻倍;单进程改4进程,性能翻四倍。负面影响:性能翻倍/四倍是纯理论上的数据,并且瓶颈在mysql,多进程操作mysql性能整体增长到底多大存疑;如果业务逻辑有全局竞争(比如写配置文件),可能需要加锁。如果
4,改程序c为2线程,一个线程A用来接收socket和处理命令信息,一个线程B专门用来处理状态信息。线程A将接收的状态信息直接放到state_msg_buf,线程B从state_msg_buf中逐一获取状态信息,并调用update_to_mysql。

经过对比考虑,包括方案优缺点、改动大小和影响,选择1和4

四,方案4详细描述
方案4中的关键点是线程A如何将状态信息交到线程B,很容易看出,这就是单生产者-单消费者模型。这种模型的数据交换有两种性能较好的方式:
1,环形缓冲区/环形队列
2,双缓存
这两种方式都不用加锁或极小的临界区,对性能影响极小。

考虑状态上报请求这种业务逻辑的特性,环形缓冲区不太适合,因为如果线程B处理不过来,环形缓冲区就会满,此时线程A接收到的状态信息就得主动丢掉。而针对状态信息,很明显后面的状态信息才更有价值,要丢掉的应该是之前收到的存在环形缓冲区里的状态信息。而单靠环形缓冲区做不到这样,需要另加其他元素,而加了其他元素,就需要加锁等操作,也就脱离环形缓冲区的原本价值。

双缓存的设计会更灵活,就可以满足线程A丢掉之前收到的状态信息的要求。所以,方案4选用双缓存方式,实现线程A和线程B之间的状态信息交付。

缓存设计:
#define NUM_L 10000
#define NUM_W 5
typedef struct msg_pool
{
	uint32 valid_len __attribute__((__aligned__(64)));        // 有效数据长度,做cacheline对齐,确保该变量的单次读/写是原子的
	uint32 key[NUM_L * NUM_W];                      // hash key,由MSG里的某个字段(比如client唯一id)为key
	MSG data[NUM_L][NUM_W];                         // msg状态信息,是一个hash数组,长度为NUM_L,冲突域长NUM_W
} MSGPOOl;

MSGPOOl *g_msg_pool;
volatile MSGPOOl *g_msg_pool_w;       // 写缓存,volatile确保指针交换后,全局可见,可加可不加,因为我们会用mfence进行统一刷新
volatile MSGPOOl *g_msg_pool_r;       // 读缓存,volatile确保指针交换后,全局可见,可加可不加,因为我们会用mfence进行统一刷新

程序主逻辑:
// 需要互斥锁和条件变量
// 互斥锁是因为条件变量需要,其他逻辑是其实是无锁的
pthread_mutex_t g_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t g_cond = PTHREAD_COND_INITIALIZER;

main {
    // 创建读写缓存
    g_msg_pool = (MSGPOOl *)malloc(sizeof(MSGPOOl) * 2);
    memset(g_msg_pool, 0, sizeof(MSGPOOl) * 2);
	g_msg_pool_w = &g_msg_pool[0];
	g_msg_pool_r = &g_msg_pool[1];
    //读缓存的初始值,这个是必要的,解决初始竞态问题
    //保证生产者线程交换指针时,消费者线程一定处在条件变量等待状态
    g_msg_pool_r->valid_len = -1;

    // 创建消费线程
    pthread_create(user_status_consumer)

    // 生产者线程,直接利用程序c进程的主线程
    user_status_producer()
}

接下来是两个关键函数:
消费线程
void* user_status_consumer(void* arg)
{
	while(1)
	{
        // 全局刷新数据读可见性
        asm volatile("lfence" ::: "memory");

        // 处理读缓存池里的有效数据
        if (g_msg_pool_r->valid_len > 0)
            msg_pool_foreach(g_msg_pool_r);

        // 表示读缓存已处理完毕
        g_msg_pool_r->valid_len = 0;

        // 全局刷新数据写可见性
        asm volatile("sfence" ::: "memory");

        pthread_mutex_lock(&g_mutex);
        // 等待条件变量
        pthread_cond_wait(&g_cond, &g_mutex);
        pthread_mutex_unlock(&g_mutex);
	}

	exit(-1);
    return NULL;
}

生产线程
void user_status_producer()
{
    msg = recvfrom 客户端

    // 将状态信息添加到写缓存池,如果池内已有对应的msg,则进行更新(也就是实现了丢弃之前收到的旧的状态信息)
    msg_pool_add(g_msg_pool_w, msg)
    
    // 读写指针交换
    msg_pool_exchange_rw()
}


关键函数,读写指针交换
void msg_pool_exchange_rw()
{
    // 全局刷新数据读可见性
    asm volatile("lfence" ::: "memory");

    // 判断读缓存池是否已处理完,如果没有,则直接返回
    if (g_msg_pool_r->valid_len != 0) {
        return;
    }

    // 写缓存没有数据,无需交换指针,直接返回
    // 从外面的逻辑看,到这里,写缓存肯定是有数据的,但就这个函数内部还是要判断
    if (g_msg_pool_w->valid_len <= 0) {
        return;
    }

    // 最短间隔时间,5秒
    static time_t last_chktm = 0;
    time_t curt_chktm = time(NULL);
    if (last_chktm == 0)
    {
        last_chktm = curt_chktm;
    } 
    else if (last_chktm + 5 > curt_chktm)
    {
        return;
    }
    last_chktm = curt_chktm;
    
    // 读写指针交换
    MSGPOOl *tmp = g_msg_pool_w;
    g_msg_pool_w = g_msg_pool_r;
    g_msg_pool_r = tmp;

    // 全局刷新数据写可见性
    asm volatile("sfence" ::: "memory");

    pthread_mutex_lock(&g_mutex);
    // 唤醒条件变量
    pthread_cond_signal(&g_cond);
    pthread_mutex_unlock(&g_mutex);

}

除了条件变量,另外一个值得关注的就是:
asm volatile(“lfence” ::: “memory”);
asm volatile(“sfence” ::: “memory”);
其中lfence表示该语句之后的代码读到的数据都是最新的(比如cache已失效,则需要先从内存读取),包括其他cpu上的线程更新的全局变量数据。
sfence则表示该语句之前的代码改写的全局变量数据都主动通知到其他CPU,告诉其他cpu上的线程这个数据cache已失效,需要重新读取内存。

整体上,这种改法影响较小,只需新增一个初始化函数,申请下缓存池,新建个线程,然后将原程序调用的update_to_mysql函数改为写缓存区,总体感觉应该还可以~~

=================================================================
20220903更新:
突然想提一下,这个问题最终也没有采用前面提到的方案,直接采用春哥的openresty+lua把服务端的相关逻辑全部重写了。一个是之前的代码实在太烂了,修修补补还不知道有没有其他问题。二则是用lua重写也快,花了三两天就编完了,何乐而不为~~
普通的短连接,参考官网的示例,都是短连接:

https://github.com/openresty/stream-lua-nginx-module

如果需要用长连接,那么在lua里用个while死循环接收/发送客户端的数据即可,注意要处理好接收/发送函数的返回错误。
可以参考如下连接:

https://forum.openresty.us/d/6397-tcp/2

https://groups.google.com/g/openresty/c/1G3ACRCQ4js

https://forum.openresty.us/d/3275-ce17a99f27650960c9adf5436378d459

https://forum.openresty.us/?q=%E9%95%BF%E8%BF%9E%E6%8E%A5

https://blog.csdn.net/lishenglong666/article/details/120960332

https://catbro666.github.io/posts/30b81f82/

https://www.key-iot.com/news/297.html

最简单的参考demo代码:
nginx.conf

    server {
        listen 4321;
		lua_socket_keepalive_timeout 60s;

        content_by_lua_block {
            local sock = assert(ngx.req.socket(true))

            while true do

                local data = sock:receive()
                if data == nil then
                    ngx.log(ngx.ERR, "data is nil")
                    break
                end

                ngx.log(ngx.ERR, "client data:"..data)
                if data == "thunder!" then
                    ngx.say("flash!")  -- output data
                else
                    ngx.say("boom!")
                end

            end
        }
    }

socket.test.lua

local socket = require("socket")
local host = "127.0.0.1"
local port = 4321

local sock = assert(socket.connect(host, port))
local count = 0

while true do 
    sock:send("test"..count.."\n")
    count = count + 1

    local chunk, status, partial = sock:receive()
    print(chunk or partial)

    os.execute("sleep 1")
end

sock:close()

socket.test.sh

#!/bin/bash

j=$1
for ((i=1; i<=j; i++))
do

/usr/bin/lua socket.test.lua &

done

转载请保留地址:http://www.lenky.info/archives/2022/08/3178http://lenky.info/?p=3178


备注:如无特殊说明,文章内容均出自Lenky个人的真实理解而并非存心妄自揣测来故意愚人耳目。由于个人水平有限,虽力求内容正确无误,但仍然难免出错,请勿见怪,如果可以则请留言告之,并欢迎来讨论。另外值得说明的是,Lenky的部分文章以及部分内容参考借鉴了网络上各位网友的热心分享,特别是一些带有完全参考的文章,其后附带的链接内容也许更直接、更丰富,而我只是做了一下归纳&转述,在此也一并表示感谢。关于本站的所有技术文章,欢迎转载,但请遵从CC创作共享协议,而一些私人性质较强的心情随笔,建议不要转载。

法律:根据最新颁布的《信息网络传播权保护条例》,如果您认为本文章的任何内容侵犯了您的权利,请以Email或书面等方式告知,本站将及时删除相关内容或链接。

  1. 本文目前尚无任何评论.
您必须在 登录 后才能发布评论.