流量录制原理
录制内容
实现思路
-
首先,调用 accept 获得一个调用方的连接; -
第二步,在这个连接上通过调用 recv 读取请求数据,解析请求; -
第三步,目标服务开始执行业务逻辑,过程中可能需要调用一个或多个依赖服务,对于每一次依赖服务调用,目标服务需要通过 connect 与依赖服务建立连接,然后在这个连接上通过 send 发送请求数据,通过 recv 接收依赖服务响应; -
最后,目标服务通过 send 给调用方返回响应数据。
区分不同的请求
2、为了提高处理速度,可能创建子线程并发调用依赖服务。
实际上,子线程也可能再创建子线程,形成下图所示的线程关系:
对于这种涉及子线程的场景,我们只要把子线程的数据合并到请求处理线程即可。每个请求都会对应一个请求处理线程和一系列的子线程,最终我们可以根据线程 ID 来区分出不同请求。
区分数据类型
流量录制实现
-
录制agent:与目标进程部署在相同容器中,根据进程名找到要录制的目标进程 pid,(1) 控制录制 server 开启/关闭录制;(7) 从录制 server 接收原始数据,解析成完整流量,(8) 保存到日志文件中。
-
录制server:部署在宿主机上,负责 (2, 3) 加载/挂载 eBPF程序、(6) 从 eBPF Map 中读取原始数据。
-
eBPF 程序:负责在目标进程 (4) 发送和接收数据时,(5) 从挂载的函数中读取原始数据并写入 eBPF Map 中。
选择插桩点
-
accept 和 connect 用于区分 socket 类型。
-
send 和 recv 用于捕获发送和接收的数据。
-
close 用于识别调用的结束。
-
inet_accept
-
inet_stream_connect
-
inet_sendmsg
-
inet_recvmsg
-
inet_release
开发eBPF程序
开发eBPF程序
int inet_sendmsg(struct socket *sock, struct msghdr *msg, size_t size)
-
sock socket 指针
-
msg 要发送的数据
-
size 要发送数据的长度
-
成功时返回发送的数据长度,失败时返回错误码。
-
在函数入口处记录函数参数和上下文
-
在函数返回时记录实际发送的数据内容
函数入口 eBPF 程序:
SEC("kprobe/inet_sendmsg")
int BPF_KPROBE(inet_sendmsg_entry, struct socket *sock, struct msghdr *msg)
{
struct probe_ctx pctx = {
.bpf_ctx = ctx,
.version = EVENT_VERSION,
.source = EVENT_SOURCE_SOCKET,
.type = EVENT_SOCK_SENDMSG,
.sr.sock = sock,
};
int err;
// 过滤掉不需要录制的进程
if (pid_filter(&pctx)) {
return 0;
}
// 读取 socket 类型信息
err = read_socket_info(&pctx, &pctx.sr.sockinfo, sock);
if (err) {
tm_err2(&pctx, ERROR_READ_SOCKET_INFO, __LINE__, err);
return 0;
}
// 记录 msg 中的数据信息
err = bpf_probe_read(&pctx.sr.iter, sizeof(pctx.sr.iter), &msg->msg_iter);
if (err) {
tm_err2(&pctx, ERROR_BPF_PROBE_READ, __LINE__, err);
return 0;
}
// 将相关上下文信息保存到 map 中
pctx.id = bpf_ktime_get_ns();
err = save_context(pctx.pid, &pctx);
if (err) {
tm_err2(&pctx, ERROR_SAVE_CONTEXT, __LINE__, err);
}
return 0;
}
函数返回 eBPF 程序:
SEC("kretprobe/inet_sendmsg")
int BPF_KRETPROBE(inet_sendmsg_exit, int retval)
{
struct probe_ctx pctx = {
.bpf_ctx = ctx,
.version = EVENT_VERSION,
.source = EVENT_SOURCE_SOCKET,
.type = EVENT_SOCK_SENDMSG,
};
struct sock_send_recv_event event = {};
int err;
// 过滤掉不需要录制的进程
if (pid_filter(&pctx)) {
return 0;
}
// 如果发送失败, 跳过录制数据
if (retval <= 0) {
goto out;
}
// 从 map 中读取提前保存的上下文信息
err = read_context(pctx.pid, &pctx);
if (err) {
tm_err2(&pctx, ERROR_READ_CONTEXT, __LINE__, err);
goto out;
}
// 构造 sendmsg 报文
event.version = pctx.version;
event.source = pctx.source;
event.type = pctx.type;
event.tgid = pctx.tgid;
event.pid = pctx.pid;
event.id = pctx.id;
event.sock = (u64)pctx.sr.s;
event.sock_family = pctx.sr.sockinfo.sock_family;
event.sock_type = pctx.sr.sockinfo.sock_type;
// 从 msg 中读取数据填充到 event 报文, 并通过 map 传递到用户空间
sock_data_output(&pctx, &event, &pctx.sr.iter);
out:
// 清理上下文信息
err = delete_context(pctx.pid);
if (err) {
tm_err2(&pctx, ERROR_DELETE_CONTEXT, __LINE__, err);
}
return 0;
}
获取goid
获取goid
getg 函数:
// getg returns the pointer to the current g.
// The compiler rewrites calls to this function into instructions
// that fetch the g directly (from TLS or from the dedicated register).
func getg() *g
根据函数注释,当前 g 的指针是放在线程本地存储(TLS)中的,调用 getg() 的代码由编译器进行重写。为了找到 getg() 的实现方式,我们看到 runtime.newg 函数中调用了 getg,对它进行反汇编,发现 g 的指针保存在 fs 寄存器 -8 的内存地址上:
接下来,我们找到 struct g 中的 goid 字段(位于 runtime/runtime2.go):
type g struct {
.... 此处省略大量字段
goid int64
.... 此处省略大量字段
}
拿到 g 的指针后,只要加上 goid 字段的偏移量即可获取到 goid。同时,考虑到不同的 go 版本之间,goid 偏移量可能不同,最终在 eBPF 程序中我们可以这样获取当前 goid:
static __always_inline
u64 get_goid()
{
struct task_struct *task = (struct task_struct *)bpf_get_current_task();
unsigned long fsbase = 0;
void *g = NULL;
u64 goid = 0;
bpf_probe_read(&fsbase, sizeof(fsbase), &task->thread.fsbase);
bpf_probe_read(&g, sizeof(g), (void*)fsbase-8);
bpf_probe_read(&goid, sizeof(goid), (void*)g+GOID_OFFSET);
return goid;
}
遇到的问题
eBPF 程序虽然可以使用 C 语言开发,但是与普通 C 语言开发过程有较大的差别,增加了很多限制。
-
不允许使用全局变量、常量字符串或数组,可以保存到 map 中。
-
不支持函数调用,可以通过 inline 内联解决。
-
栈空间不能超过512字节,必要时可通过 array 类型的 map 做缓冲区。
-
不能直接访问用户态和内核态内存,要通过 bpf-helper 的相关函数。
-
单个程序指令条数不能超过 1000000,尽量保持 eBPF 程序逻辑简单,复杂的处理放在用户态程序完成。
-
循环必须有明确的次数上限,不能只靠运行时判断。
-
结构体成员要内存对齐,否则可能导致部分内存未初始化,引发 verifier 报错。
-
代码经过编译器优化后 verifier 可能误报内存访问越界问题,可以在代码中增加 if 判断帮助 verifer 识别,必要时可通过内联汇编的方式解决。
-
….
随着 clang 和内核对 ebpf 支持的逐渐完善,很多问题也在逐步得到解决,后续的开发体验也会变得更顺畅。
安全机制
总结
总结
END
作者及部门介绍
招聘信息
团队后端、测试需求招聘中,欢迎有兴趣的小伙伴加入,可以扫描下方二维码简历直投,期待你的加入!
研发工程师
测试开发工程师
本篇文章来源于微信公众号:滴滴技术
本文来自投稿,不代表TakinTalks稳定性技术交流平台立场,如若转载,请联系原作者。