百度360必应搜狗淘宝本站头条
当前位置:网站首页 > IT技术 > 正文

用C实现协程库

wptr33 2025-04-27 16:40 22 浏览

协程这个东西有一段时间非常火热,特别是Go出来以后,大家都觉得这个用户态线程技术解决了很多问题,甚至用它可以支撑8亿用户,于是大家纷纷写了C/C++的协程库。实际上,我觉得协程库和支撑多少用户关系不大,甚至不用协程还可以支撑更多的用户(减少了协程的开销),协程只是提供一种编程模式,让服务器程序写起来感觉轻松一些。

我们这个协程库,首先它只是一个玩具,我也没有把它用在生产环境中(如果要用我会直接用Go),写这个协程库纯粹是为了学习。

其次,这个库脱胎于云风的协程库,不过云风的协程库更像一个玩具,如果你想知道协程应该怎么实现,看看这个入门是很不错的,代码非常简洁。但这个库也有这些缺点:

  • 功能不大完整,只能支持主协程和协程的切换,无法在协程里面创建协程并启动它。
  • 使用的是共享栈的方式,即所有协程只使用一个栈,协程暂停时,需要把用到的栈内存暂时保存起来,等要运行,再把保存的栈内存拷贝回协程执行的栈。这种方式在resume和yield时,会不断的拷贝内存,效率上会有问题。
  • 环境的切换使用ucontext,因为是系统调用,可能在性能上会有一点点影响,这个没有具体测试过不好下绝对的定论。

我fork过来的修改版在这里,代码改得比较多,这份实现逻辑上更加接近于lua的协程库:

  • 首先是支持协程里启动协程,比如A resume B => B resume C => C yield 返回 B => B yeild 返回 A。
  • 协程的状态也和Lua保持一致:
    • CO_STATUS_SUSPEND 协程创建后未resume,或yield后处的状态
    • CO_STATUS_RUNNING 协程当前正在运行
    • CO_STATUS_NORMAL 当前协程resume了其他协程后处于这个状态
    • CO_STATUS_DEAD 协程执行结束

  • 没有使用共享栈的方式,我的考虑是这样的:
    • 在实际经验中,栈的内存使用其实不多,如果我们默认分配每个栈128K内存,8000个协程才需要1G的虚拟内存,实际的物理内存肯定是更少的。不共享栈,减少了栈内存拷贝的开销,效率会有很明显的提升,也就是典型的空间换时间。
    • 即使要优化,也很容易实现,即对协程分组,每组协程共享一个栈,算是时间和空间上一个权衡,但实际效果究竟如何,有兴趣的人自行测试吧。
  • 协程创建出来后,即使执行完毕,也不释放,只给他标记一个CO_STATUS_DEAD状态,后面创建的协程可以重用,这样减少频繁创建协程的开销。
  • 执行环境的切换,使用的仍然是ucontext,因为我不确定使用ucontext带来的开销到底有多少,但ucontext的好处是支持很多硬件;如果要自己写,通常也只能支持i386和x86_x64两种架构,真的在生产环境中遇到瓶颈再换实现也不迟。

代码量不多,我直接贴在这时,也可以到github上去取:

// coroutine.h
#ifndef C_COROUTINE_H
#define C_COROUTINE_H

// 协程执行结束
#define CO_STATUS_DEAD 0
// 协程创建后未resume,或yield后处的状态
#define CO_STATUS_SUSPEND 1
// 协程当前正在运行
#define CO_STATUS_RUNNING 2
// 当前协程resume了其他协程,此时处于这个状态
#define CO_STATUS_NORMAL 3

// 类型声明
struct schedule;
typedef struct schedule schedule_t;
typedef void (*co_func)(schedule_t *, void *ud);

// 打开一个调度器,每个线程一个:stsize为栈大小,传0为默认
schedule_t * co_open(int stsize);
// 关闭调度器
void co_close(schedule_t *);
// 新建协程
int co_new(schedule_t *, co_func, void *ud);
// 启动协程
int co_resume(schedule_t *, int id);
// 取协程状态
int co_status(schedule_t *, int id);
// 取当前正在运行的协程ID
int co_running(schedule_t *);
// 调用yield让出执行权
int co_yield(schedule_t *);

#endif

实现

// coroutine.c
#include "coroutine.h"
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <stddef.h>
#include <string.h>
#include <stdint.h> 
#if __APPLE__ && __MACH__
    #include <sys/ucontext.h>
#else 
    #include <ucontext.h>
#endif

#define MIN_STACK_SIZE (128*1024)
#define MAX_STACK_SIZE (1024*1024)
#define DEFAULT_COROUTINE 128
#define MAIN_CO_ID 0

#define MIN(a, b) ((a) > (b) ? (b) : (a))
#define MAX(a, b) ((a) < (b) ? (b) : (a))

struct coroutine;

// 每个线程的调度器
typedef struct schedule {
    int stsize;             // 栈大小
    int nco;                // 当前有几个协程
    int cap;                // 协程数组容量
    int running;            // 当前正在运行的协程ID
    struct coroutine **co;  // 协程数组
} schedule_t;

// 协程数据
typedef struct coroutine {
    co_func func;           // 协程回调函数
    void *ud;               // 用户数据
    int pco;                // 前一个协程,即resume这个协程的那个协程
    ucontext_t ctx;         // 协程的执行环境
    schedule_t * sch;       // 调度器
    int status;             // 当前状态:CO_STATUS_RUNNING...
    char *stack;            // 栈内存
} coroutine_t;

schedule_t *co_open(int stsize) {
    schedule_t *S = malloc(sizeof(*S));
    S->nco = 0;
    S->stsize = MIN(MAX(stsize, MIN_STACK_SIZE), MAX_STACK_SIZE);
    S->cap = DEFAULT_COROUTINE;
    S->co = malloc(sizeof(coroutine_t *) * S->cap);
    memset(S->co, 0, sizeof(coroutine_t *) * S->cap);

    // 创建主协程
    int id = co_new(S, NULL, NULL);
    assert(id == MAIN_CO_ID);
    // 主协程为运行状态
    coroutine_t *co = S->co[MAIN_CO_ID];
    co->status = CO_STATUS_RUNNING;
    S->running = id;
    return S;
}

void co_close(schedule_t *S) {
    assert(S->running == MAIN_CO_ID);
    int i;
    for (i=0;i<S->cap;i++) {
        coroutine_t * co = S->co[i];
        if (co) {
            free(co->stack);
            free(co);
        }
    }
    free(S->co);
    S->co = NULL;
    free(S);
}

static void cofunc(uint32_t low32, uint32_t hi32) {
    uintptr_t ptr = (uintptr_t)low32 | ((uintptr_t)hi32 << 32);
    schedule_t *S = (schedule_t *)ptr;
    int id = S->running;
    coroutine_t *co = S->co[id];
    co->func(S, co->ud);
    // 标记协程为死亡
    co->status = CO_STATUS_DEAD;
    --S->nco;
    // 恢复前一个协程
    coroutine_t *pco = S->co[co->pco];
    pco->status = CO_STATUS_RUNNING;
    S->running = co->pco;
    ucontext_t dummy;
    swapcontext(&dummy, &pco->ctx);
}

int co_new(schedule_t *S, co_func func, void *ud) {
    int cid = -1;
    if (S->nco >= S->cap) {
        cid = S->cap;
        S->co = realloc(S->co, S->cap * 2 * sizeof(coroutine_t *));
        memset(S->co + S->cap , 0 , sizeof(coroutine_t *) * S->cap);
        S->cap *= 2;
    } else {
        int i;
        for (i=0;i<S->cap;i++) {
            int id = (i+S->nco) % S->cap;
            if (S->co[id] == NULL) {
                cid = id;
                break;
            } 
            else if (S->co[id]->status == CO_STATUS_DEAD) {
                // printf("reuse dead coroutine: %d\n", id);
                cid = id;
                break;
            }
        }
    }

    if (cid >= 0) {
        coroutine_t *co;
        if (S->co[cid])
            co = S->co[cid];
        else {
            co = malloc(sizeof(*co));
            co->pco = 0;
            co->stack = cid != MAIN_CO_ID ? malloc(S->stsize) : 0;
            S->co[cid] = co;
        }
        ++S->nco;

        co->func = func;
        co->ud = ud;
        co->sch = S;
        co->status = CO_STATUS_SUSPEND;

        if (func) {
            coroutine_t *curco = S->co[S->running];
            assert(curco);
            getcontext(&co->ctx);
            co->ctx.uc_stack.ss_sp = co->stack;
            co->ctx.uc_stack.ss_size = S->stsize;
            co->ctx.uc_link = &curco->ctx;
            uintptr_t ptr = (uintptr_t)S;
            makecontext(&co->ctx, (void (*)(void))cofunc, 2, (uint32_t)ptr, (uint32_t)(ptr>>32));
        }
    }

    return cid;
}

int co_resume(schedule_t * S, int id) {
    assert(id >=0 && id < S->cap);
    coroutine_t *co = S->co[id];
    coroutine_t *curco = S->co[S->running];
    if (co == NULL || curco == NULL)
        return -1;
    int status = co->status;
    switch(status) {
    case CO_STATUS_SUSPEND:
        curco->status = CO_STATUS_NORMAL;
        co->pco = S->running;
        co->status = CO_STATUS_RUNNING;
        S->running = id;
        swapcontext(&curco->ctx, &co->ctx);
        return 0;
    default:
        return -1;
    }
}

int co_yield(schedule_t * S) {
    int id = S->running;
    // 主协程不能yield
    if (id == MAIN_CO_ID)
        return -1;
    // 恢复当前协程环境
    assert(id >= 0);
    coroutine_t * co = S->co[id];
    coroutine_t *pco = S->co[co->pco];
    co->status = CO_STATUS_SUSPEND;
    pco->status = CO_STATUS_RUNNING;
    S->running = co->pco;
    swapcontext(&co->ctx ,&pco->ctx);
    return 0;
}

int co_status(schedule_t * S, int id) {
    assert(id>=0 && id < S->cap);
    if (S->co[id] == NULL) {
        return CO_STATUS_DEAD;
    }
    return S->co[id]->status;
}

int co_running(schedule_t * S) {
    return S->running;
}

使用方法参考main.c,如果你会用Lua,应该很容易上手,测试代码中有一段是测试创建协程和切换协程的开销的:

int stop = 0;
static void foo_5(schedule_t *S, void *ud) {
    while (!stop) {
        co_yield(S);
    }
}

static void test5(schedule_t *S) {
    printf("test5 start===============\n");
    struct timeval begin;
    struct timeval end;
    int i;
    int count = 10000;

    gettimeofday(&begin, NULL);
    for (i = 0; i < count; ++i) {
        co_new(S, foo_5, NULL);
    }
    gettimeofday(&end, NULL);
    printf("create time=%f\n", timediff(&begin, &end));

    gettimeofday(&begin, NULL);
    for (i =0; i < 1000000; ++i) {
        int co = (i % count) + 1;
        co_resume(S, co);
    }
    gettimeofday(&end, NULL);
    printf("swap time=%f\n", timediff(&begin, &end));

    // 先释放掉原来的
    stop = 1;
    for (i = 0; i < count; ++i) {
        int co = (i % count) + 1;
        co_resume(S, co);
    }
    gettimeofday(&begin, NULL);
    for (i = 0; i < count; ++i) {
        co_new(S, foo_5, NULL);
    }
    gettimeofday(&end, NULL);
    printf("create time2=%f\n", timediff(&begin, &end));
    printf("test5 end===============\n");
}

结果如下,我的虚拟机CPU是双核Intel(R) Xeon(R) CPU E5-2680 v3 @ 2.50GHz

# 第一次创建10000个协程
create time=0.053979s
# 切换200W次协程
swap time=0.883039s
# 第二次创建10000个协程
create time2=0.005390s

这样的性能表现是否能满足要求呢。

再一次声明这个协程库不保证没有BUG,虽然我写了几个测试函数验证过,如果要用在生产环境中,请仔细阅读代码。

推荐一个协程的训练营,之前一直有关注,讲的很不错,内容比较丰富,点这链接有很大的优惠!


纯C语言|实现协程框架,底层原理与性能分析,面试利刃纯C语言|实现协程框架,底层原理与性能分析,面试利刃-学习视频教程-腾讯课堂

相关推荐

MySQL进阶五之自动读写分离mysql-proxy

自动读写分离目前,大量现网用户的业务场景中存在读多写少、业务负载无法预测等情况,在有大量读请求的应用场景下,单个实例可能无法承受读取压力,甚至会对业务产生影响。为了实现读取能力的弹性扩展,分担数据库压...

Postgres vs MySQL_vs2022连接mysql数据库

...

3分钟短文 | Laravel SQL筛选两个日期之间的记录,怎么写?

引言今天说一个细分的需求,在模型中,或者使用laravel提供的EloquentORM功能,构造查询语句时,返回位于两个指定的日期之间的条目。应该怎么写?本文通过几个例子,为大家梳理一下。学习时...

一文由浅入深带你完全掌握MySQL的锁机制原理与应用

本文将跟大家聊聊InnoDB的锁。本文比较长,包括一条SQL是如何加锁的,一些加锁规则、如何分析和解决死锁问题等内容,建议耐心读完,肯定对大家有帮助的。为什么需要加锁呢?...

验证Mysql中联合索引的最左匹配原则

后端面试中一定是必问mysql的,在以往的面试中好几个面试官都反馈我Mysql基础不行,今天来着重复习一下自己的弱点知识。在Mysql调优中索引优化又是非常重要的方法,不管公司的大小只要后端项目中用到...

MySQL索引解析(联合索引/最左前缀/覆盖索引/索引下推)

目录1.索引基础...

你会看 MySQL 的执行计划(EXPLAIN)吗?

SQL执行太慢怎么办?我们通常会使用EXPLAIN命令来查看SQL的执行计划,然后根据执行计划找出问题所在并进行优化。用法简介...

MySQL 从入门到精通(四)之索引结构

索引概述索引(index),是帮助MySQL高效获取数据的数据结构(有序),在数据之外,数据库系统还维护者满足特定查询算法的数据结构,这些数据结构以某种方式引用(指向)数据,这样就可以在这些数据结构...

mysql总结——面试中最常问到的知识点

mysql作为开源数据库中的榜一大哥,一直是面试官们考察的重中之重。今天,我们来总结一下mysql的知识点,供大家复习参照,看完这些知识点,再加上一些边角细节,基本上能够应付大多mysql相关面试了(...

mysql总结——面试中最常问到的知识点(2)

首先我们回顾一下上篇内容,主要复习了索引,事务,锁,以及SQL优化的工具。本篇文章接着写后面的内容。性能优化索引优化,SQL中索引的相关优化主要有以下几个方面:最好是全匹配。如果是联合索引的话,遵循最...

MySQL基础全知全解!超详细无废话!轻松上手~

本期内容提醒:全篇2300+字,篇幅较长,可搭配饭菜一同“食”用,全篇无废话(除了这句),干货满满,可收藏供后期反复观看。注:MySQL中语法不区分大小写,本篇中...

深入剖析 MySQL 中的锁机制原理_mysql 锁详解

在互联网软件开发领域,MySQL作为一款广泛应用的关系型数据库管理系统,其锁机制在保障数据一致性和实现并发控制方面扮演着举足轻重的角色。对于互联网软件开发人员而言,深入理解MySQL的锁机制原理...

Java 与 MySQL 性能优化:MySQL分区表设计与性能优化全解析

引言在数据库管理领域,随着数据量的不断增长,如何高效地管理和操作数据成为了一个关键问题。MySQL分区表作为一种有效的数据管理技术,能够将大型表划分为多个更小、更易管理的分区,从而提升数据库的性能和可...

MySQL基础篇:DQL数据查询操作_mysql 查

一、基础查询DQL基础查询语法SELECT字段列表FROM表名列表WHERE条件列表GROUPBY分组字段列表HAVING分组后条件列表ORDERBY排序字段列表LIMIT...

MySql:索引的基本使用_mysql索引的使用和原理

一、索引基础概念1.什么是索引?索引是数据库表的特殊数据结构(通常是B+树),用于...