基于时间戳的日志回放引擎

之前写过一个日志回放引擎的第一代千万级日志回放引擎设计稿,当时理解的日志回放就是把日志记录的请求重新发出去,这就是回放线上用户的流量了。可是在我最近看goreplay的过程中,重新刷新了我的认知。

查阅了一些资料,终于算是了解了一些基于时间戳的方案和思路。大体如下:通过工具把线上某段时间的流量记录下来,其中包含时间戳等信息,然后通过回放引擎把流量回放出去。

解决思路

目前流量回放集中于HTTP流量,所以之前写过的引擎的发压部分还是可以继续使用。所以我也有了自己的解决思路:

  1. 日志清洗,其实就是把规范化的日志解析成引擎框架可以使用的对象,通常包含HTTP请求的组成部分。
  2. 按照时间戳排序,通常使用现成的工具这一步是可以省略,但是由于日志记录是已经存在的组件,这里需要做一些兼容性工作
  3. 日志回放,通过线程池和连接池两个池化技术可以解决性能方面的问题。再结合当前的分布式方案做一些兼容功能即可。

其中最最核心的应该就是队列的选择,这里我用看java的java.util.concurrent.DelayQueue,也没找到其他更好的框架了。其实在一开始我想复用自己写之前写的日志回放框架的队列,也尝试对集中常用队列进行了性能测试:

本来想是用多线程去读取日志的过程中,通过判断每一条日志是否到时间点,然后丢到一个线程安全的队列中,后面用线程池取队列中的对象,发送请求的。但是仔细想来太复杂了,流量过了好几手,不利于实现和拓展功能。

然后我重新对java.util.concurrent.DelayQueue进行了性能测试延迟队列DelayQueue性能测试,有了测试结果之后,就可以放心大胆地干了。关于延迟队列的基本使用可参考下单延迟10s撤单性能测试

实现

总体来说实现起来思路比较清晰,我分成三部分分享。

属性定义

  1. 我首先定义了一个com.funtester.frame.execute.ReplayConcurrent.ReplayLog日志对象,用于存储每一个请求日志
  2. 然后定义一个com.funtester.frame.execute.ReplayConcurrent#logs用来存储日志,这里旧事重提一下,千万级别的日志对象,存储在内存里面是OK的,所以我才会采用这种方式。为什么要从日志文件中转一手呢?因为日志是不按照时间戳排序的。
  3. 再定义com.funtester.frame.execute.ReplayConcurrent#logDelayQueue用来当作回放请求队列,也就是流量中转站,生产者从com.funtester.frame.execute.ReplayConcurrent#logs中取,clone之后丢到队列中;消费者从队列中取对象,丢给线程池。
  4. 定义com.funtester.frame.execute.ReplayConcurrent#handle当作是处理流量的方法,就是把流量对象包装成HttpRequestBase对象然后发送出去

生产者

  1. 确定使用异步线程完成,使用Java自定义异步功能实践
  2. 根据com.funtester.frame.execute.ReplayConcurrent#logDelayQueue性能测试数据,添加com.funtester.frame.execute.ReplayConcurrent#threadNum参数来控制。
  3. 多线程取com.funtester.frame.execute.ReplayConcurrent#logs对象,用到了几个线程安全类,用于保障多线程是顺序读取,避免了在延迟队列中进行排序操作。
  4. 使用了com.funtester.frame.execute.ReplayConcurrent#getMAX_LENGTH控制队列的长度。貌似没找到限制延迟队列长度的API。只能自己实现了,思路当添加日志数量超过最大值,存储当前队列长度。当长度大于最大长度,则在下一次添加对象前,休眠1s,然后在重置本地存储的队列长度。这样可以解决这个问题。当然最大值设置足够高,避免1s中内队列变成空。回放引擎设计50万QPS,所以我就先设置了80万的最大长度。后续可以根据实际情况调整。

消费者

  1. 依旧使用异步,生产者
  2. 使用API时java.util.concurrent.DelayQueue#poll(long, java.util.concurrent.TimeUnit),避免阻塞导致线程无法终止。
  3. 引入com.funtester.frame.execute.ReplayConcurrent#getMultiple控制流量回放的倍数。
  4. 使用com.funtester.frame.execute.ReplayConcurrent#getTotal记录回放的日志数量。
  5. 使用com.funtester.frame.execute.ReplayConcurrent#getHandle处理日志对象。

代码如下:

package com.funtester.frame.execute

import com.funtester.base.bean.AbstractBean
import com.funtester.frame.SourceCode
import com.funtester.utils.LogUtil
import com.funtester.utils.RWUtil
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger

import java.util.concurrent.DelayQueue
import java.util.concurrent.Delayed
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.LongAdder

/**
 * 回放功能执行类*/
class ReplayConcurrent extends SourceCode {

    private static Logger logger = LogManager.getLogger(ReplayConcurrent.class);

    static ThreadPoolExecutor executor

    static boolean key = true

    static int MAX_LENGTH = 800000

    int threadNum = 2

    String name

    String fileName

    int multiple

    Closure handle

    List<ReplayLog> logs

    DelayQueue<ReplayLog> logDelayQueue = new DelayQueue<ReplayLog>()

    LongAdder total = new LongAdder()

    ReplayConcurrent(String name, String fileName, int multiple, Closure handle) {
        this.name = name
        this.fileName = fileName
        this.multiple = multiple
        this.handle = handle

    }

    void start() {
        if (executor == null) executor = ThreadPoolUtil.createCachePool(THREADPOOL_MAX, "R")
        time({
            RWUtil.readFile(fileName, {
                def delay = new ReplayLog(it)
                if (delay.getTimestamp() != 0) logDelayQueue.add(delay)
            })
        }, 1, "读取日志$fileName")
        logs = logDelayQueue.toList()
        def timestamp = logs.get(0).getTimestamp()
        logDelayQueue.clear()
        AtomicInteger index = new AtomicInteger()
        AtomicInteger size = new AtomicInteger()
        def LogSize = logs.size()
        AtomicInteger diff = new AtomicInteger()
        threadNum.times {
            fun {
                while (key) {
                    if (index.get() % LogSize == 0) diff.set(getMark() - timestamp)
                    if (index.get() % MAX_LENGTH == 0) size.set(logDelayQueue.size())
                    if (size.get() > MAX_LENGTH) {
                        sleep(1.0)
                        size.set(logDelayQueue.size())
                    }
                    def replay = logs.get(index.getAndIncrement() % LogSize)
                    logDelayQueue.add(replay.clone(replay.timestamp + diff.get()))
                }
            }
        }
        threadNum.times {
            fun {
                while (key) {
                    def poll = logDelayQueue.poll(1, TimeUnit.SECONDS)
                    if (poll != null) {
                        executor.execute {
                            multiple.times {
                                handle(poll.getUrl())
                                total.add(1)
                            }
                        }

                    }
                }
            }
        }
        fun {
            while (key) {
                sleep(COUNT_INTERVAL as double)
                int real = total.sumThenReset() / COUNT_INTERVAL as int
                def active = executor.getActiveCount()
                def count = active == 0 ? 1 : active
                logger.info("{} ,实际QPS:{} 活跃线程数:{} 单线程效率:{}", name, real, active, real / count as int)
            }
        }

    }

    /**
     * 中止
     * @return
     */
    def stop() {
        key = false
        executor.shutdown()
        logger.info("replay压测关闭了!")
    }

    /**
     * 日志对象*/
    static class ReplayLog extends AbstractBean implements Delayed {

        int timestamp

        String url

        ReplayLog(String logLine) {
            def log = LogUtil.getLog(logLine)
            this.url = log.getUrl()
            this.timestamp = log.getTime()
        }

        ReplayLog(int timestamp, String url) {
            this.timestamp = timestamp
            this.url = url
        }

        @Override
        long getDelay(TimeUnit unit) {
            return this.timestamp - getMark()
        }

        @Override
        int compareTo(Delayed o) {
            return this.timestamp - o.timestamp
        }

        protected Object clone(int timestamp) {
            return new ReplayLog(timestamp, this.url)
        }
    }
}

自测

下面是我的测试用例:

package com.okcoin.hickwall.presses

import com.okcoin.hickwall.presses.funtester.frame.execute.ReplayConcurrent
import com.okcoin.hickwall.presses.funtester.httpclient.FunHttp

class RplayT extends FunHttp {

    static String HOST = "http://localhost:12345"

    public static void main(String[] args) {
        def fileName = "api.log"
        new ReplayConcurrent("测试回放功能", fileName, 1, {String url ->
            getHttpResponse(getHttpGet(HOST + url))
        }).start()

    }
}

测试结果如下:

22:45:43.510 main 
  ###### #     #  #    # ####### ######  #####  ####### ######  #####
  #      #     #  ##   #    #    #       #         #    #       #    #
  ####   #     #  # #  #    #    ####    #####     #    ####    #####
  #      #     #  #  # #    #    #            #    #    #       #   #
  #       #####   #    #    #    ######  #####     #    ######  #    #
  
10:56:18 F-5 测试回放功能, 实际QPS:23162 活跃线程数:0单线程效率:23162
10:56:23 F-5 测试回放功能, 实际QPS:36575 活跃线程数:6单线程效率:6095
10:56:28 F-5 测试回放功能, 实际QPS:38974 活跃线程数:21单线程效率:1855 
10:56:33 F-5 测试回放功能, 实际QPS:32798 活跃线程数:8单线程效率:4099
10:56:38 F-5 测试回放功能,实际QPS:35224 活跃线程数:4单线程效率:8806
10:56:43 F-5 测试回放功能,实际QPS:28426 活跃线程数:0单线程效率:28426
10:56:48 F-5 测试回放功能, 实际QPS:33607 活跃线程数:6单线程效率:5601
10:56:53 F-5 测试回放功能,实际QPS:34392 活跃线程数:0单线程效率:34392