千万级日志回放引擎设计稿

现在压测系统一直用的方案是goreplay进行二次开发完成的。因为整体是Java技术栈的,使用goreplay有存在两方面问题:一是兼容性,语言和开发框架上,增加了用例创建执行的复杂度;二是维护成本,goreplay二次开发方案已经无法满足现在的性能测试需求。如果维护两套压测引擎会带来更多工作量。

所以为了尽可能解决这两方面问题,接到了一个活儿,调研一下Java实现日志回放功能。主要就是读了goreplay的源码以及它设计思路,用Java重现实现一遍。

这里用到了前两天分享的Disruptor高性能队列常用API演示高性能队列Disruptor在测试中应用,有兴趣的可以再翻一翻。另视频版还在制作中,年后会和大家相见。

思路

总体设计思路如下:

千万级日志回放设计

PS:流量递增和动态增减尚未实现,还在研究goreplay的源码。

日志拉取和解析

日志的拉取和初步解析依旧采取原来项目中的逻辑,通过SQL语句网关日志中拉取日志,并对日志内容进行初步解析,放入云OSS中,并将链接存入数据库(此步骤放在录制流量成功之后)。

PS:目前日志解析保留的有用信息只有URL

日志格式如下:

/v1/level,funtester.com,-,token,-,1622611469,-
/v1/level,funtester.com,-,token,-,1622611469,-
/v1/level,funtester.com,-,token,-,1622611469,-
/v1/level,funtester.com,-,token,-,1622611469,-
/v1/level,funtester.com,-,token,-,1622611469,-
/v1/level,funtester.com,-,token,-,1622611469,-
/v1/level,funtester.com,-,token,-,1622611469,-

实现步骤

  • 首先将日志中有用信息(URL)以及token放到内存中
  • 通过配置host,读取URL,以及响应header(token,压测标识,常用header,模拟盘标识)组装HTTP请求。
  • 创建Disruptor对象,使用异步创建生产者
  • 通过消费者消费(发出请求)消息(HTTP请求对象),达到HTTP接口日志流量回复功能。

性能指标

  • 本机6C16G配置测试数据

  • 实测1千万URL读取速度约为9s ~ 13s,内存无压力,如果后续更大日志量需求,可以通过stream方式异步读取日志,实测日志读取速度在80万/s以上,满足目前需求。

  • 单生产者速度25万QPS

  • 单机测试QPS 8.8万,CPU跑满,触及物理极限,此数据与之前工具对比压测差异不大。

风险

  • 消费者异步对消息进行存储,超过一定数量将会丢弃消息。这个问题在消费者速度小于生产者速度时会触发。
  • 消费者数量需要在启动前设定,如果参数设置不合理,会导致消费者压力瓶颈,无法动态增加消费者。

PS:这些风险后续会逐个解决。

代码实现

生产者Demo:

def ft = {
    output("创建线程")
    fun {
        int i = 0
        while (key) {
            def url = logs.get(i % logs.size())
            def get = getHttpGet(HOST + url)
            get.addHeader("token", tokens.get(i % tokens.size()))
            get.addHeader(HttpClientConstant.USER_AGENT)
            ringBuffer.publishEvent {e, s ->
                e.setRequest(get)
            }
            i++
        }
    }
}
ft()

读取文件代码

/**
 * 通过闭包传入方法读取超大文件部分内容
 *
 * @param filePath
 * @param function
 * @return
 */
public static List<String> readByLine(String filePath, Function<String, String> function) {
    if (StringUtils.isEmpty(filePath) || !new File(filePath).exists() || new File(filePath).isDirectory())
        ParamException.fail("文件信息错误!" + filePath);
    logger.debug("读取文件名:{}", filePath);
    List<String> lines = new ArrayList<>();
    File file = new File(filePath);
    if (file.isFile() && file.exists()) { // 判断文件是否存在
        try (FileInputStream fileInputStream = new FileInputStream(file);
             InputStreamReader read = new InputStreamReader(fileInputStream, DEFAULT_CHARSET);
             BufferedReader bufferedReader = new BufferedReader(read, 3 * 1024 * 1024);) {
            String line = null;
            while ((line = bufferedReader.readLine()) != null) {
                String apply = function.apply(line);
                if (StringUtils.isNotBlank(apply)) lines.add(apply);
            }
        } catch (Exception e) {
            logger.warn("读取文件内容出错", e);
        }
    } else {
        logger.warn("找不到指定的文件:{}", filePath);
    }
    return lines;
}

演示Demo

package com.funtest.groovytest

import com.funtester.base.constaint.FixedThread
import com.funtester.config.HttpClientConstant
import com.funtester.frame.execute.Concurrent
import com.funtester.frame.execute.ThreadPoolUtil
import com.funtester.httpclient.ClientManage
import com.funtester.httpclient.FunLibrary
import com.funtester.utils.ArgsUtil
import com.funtester.utils.RWUtil
import com.lmax.disruptor.EventHandler
import com.lmax.disruptor.RingBuffer
import com.lmax.disruptor.WorkHandler
import com.lmax.disruptor.YieldingWaitStrategy
import com.lmax.disruptor.dsl.Disruptor
import com.lmax.disruptor.dsl.ProducerType
import org.apache.http.client.methods.HttpGet
import org.apache.http.client.methods.HttpRequestBase
import org.junit.platform.commons.util.StringUtils

import java.util.concurrent.LinkedBlockingDeque
import java.util.function.Function

class ReplayTest extends FunLibrary {

    static String url = "http://localhost:12345/test";

    static HttpGet httpGet = getHttpGet(url);

    //    static LinkedBlockingQueue<HttpRequestBase> requests = new LinkedBlockingQueue<>()

    static def HOST = "http://localhost:12345"

    static def key = true

    static Disruptor<RequestEvent> disruptor

    public static void main(String[] args) {
        def logfile = "/Users/oker/Desktop/log.csv"
        //        def logfile = "/Users/oker/Desktop/fun.csv"
        //1千万日志
        def tokenfile = "/Users/oker/Desktop/token.csv"
        //2万用户token
        List<String> logs = RWUtil.readByLine(logfile, new Function<String, String>() {

            @Override
            String apply(String s) {
                return StringUtils.isNotBlank(s) && s.startsWith("/") ? s.split(COMMA)[0] : null
            }
        });
        List<String> tokens = RWUtil.readByLine(tokenfile, new Function<String, String>() {

            @Override
            String apply(String s) {
                return StringUtils.isNotBlank(s) ? s.split(COMMA)[4] : null
            }
        });

        output("总计 ${formatLong(logs.size())} 条日志")
        disruptor = new Disruptor<RequestEvent>(
                RequestEvent::new,
                512 * 512,
                ThreadPoolUtil.getFactory(),
                ProducerType.MULTI,
                new YieldingWaitStrategy()
        );
        RingBuffer<RequestEvent> ringBuffer = disruptor.getRingBuffer();

        def ft = {
            output("创建线程")
            fun {
                int i = 0
                while (key) {
                    def url = logs.get(i % logs.size())
                    def get = getHttpGet(HOST + url)
                    get.addHeader("token", tokens.get(i % tokens.size()))
                    get.addHeader(HttpClientConstant.USER_AGENT)
                    ringBuffer.publishEvent {e, s ->
                        e.setRequest(get)
                    }
                    i++
                }
            }
        }
        ft()
        disruptor.handleEventsWith(new FunTester(10))
        //        5.times {ft()}

        //下面开始测试
        ClientManage.init(10, 5, 0, "", 0)
        def util = new ArgsUtil(args)
        def thread = util.getIntOrdefault(0, 20)
        def times = util.getIntOrdefault(1, 60000)
        RUNUP_TIME = util.getIntOrdefault(2, 0)
        def tasks = []
        thread.times {
            def tester = new FunTester(times)
            disruptor.handleEventsWith(tester);
            tasks << tester
        }
        disruptor.start();
        new Concurrent(tasks, "这是千万级日志回放演示Demo").start()

    }


    private static class FunTester extends FixedThread implements EventHandler<RequestEvent>, WorkHandler<RequestEvent> {

        LinkedBlockingDeque<HttpRequestBase> reqs = new LinkedBlockingDeque<HttpRequestBase>()

        FunTester(int limit) {
            super(null, limit, true)
        }

        @Override
        protected void doing() throws Exception {
            FunLibrary.executeOnly(reqs.take())
        }

        @Override
        FixedThread clone() {
            return new FunTester(limit)
        }

        @Override
        protected void after() {
            super.after()
            key = false
            disruptor.shutdown()
        }

        @Override
        void onEvent(RequestEvent event, long sequence, boolean endOfBatch) throws Exception {
            if (reqs.size() < 100000) reqs.add(event.getRequest())
        }

        @Override
        void onEvent(RequestEvent event) throws Exception {
            if (reqs.size() < 100000) reqs.add(event.getRequest())
        }
    }


    private static class RequestEvent {

        HttpRequestBase request;

        public HttpRequestBase getRequest() {
            return request;
        }

        public void setRequest(HttpRequestBase request) {
            this.request = request;
        }

    }


}

PS:这里用到了多个group,原因在设计稿中标记了。

Have Fun ~ Tester !