异源数据同步 → 记一次 DataX 已同步数据量优化
2026/6/25 14:18:40 网站建设 项目流程

跟好朋友吃夜宵,吐槽我的相亲情况
我:近五年来,我自认为是一位作风正直的人,不抽烟不喝酒,也不和女孩打情骂俏;作息规律,21点睡,06点起,并且我性格也好,安静、老实、非常听话
最后我叹气道:这么多优点,怎么相亲女孩都看不上我呢?
朋友:刚出来半年,就忘记我们是怎么进去的呢?当初没钱还带我去PC,我都不好意思说你

缓冲区阻塞

在 异构数据源同步之数据同步 → datax 改造,有点意思 中提到 Runtime.getRuntime().exec 会发生阻塞,究其原因是缓冲区填满导致的死锁

当 Runtime 对象调用 exec(cmd) 后,JVM 会启动一个子进程,该进程会与 JVM 进程建立三个管道连接:标准输入(stdin)、标准输出(stdout)、标准错误(stderr),这些管道在操作系统中都有固定大小的缓冲区。如果子进程持续向 stdout 或 stderr 写入数据,而父进程(JVM)没有及时通过 Process.getInputStream() 和 Process.getErrorStream() 来读取,缓冲区就会被填满,一旦缓冲区满,子进程的写入操作就会被阻塞,进而挂起。此时如果父进程调用了 process.waitFor() 等待子进程结束,就会形成经典的死锁父进程等子进程结束,子进程等父进程读取缓冲区,程序便永远卡住

我们采用了两个线程分别来读取stdoutstderr

Process process = Runtime.getRuntime().exec(DATAX_COMMAND); // 另启线程读取标准输出 new Thread(() -> { try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream(), SYSTEM_ENCODING))) { String line; while ((line = reader.readLine()) != null) { System.out.println(line); } } catch (IOException e) { throw new RuntimeException(e); } }).start(); // 另启线程读取错误输出 new Thread(() -> { try (BufferedReader errorReader = new BufferedReader(new InputStreamReader(process.getErrorStream(), SYSTEM_ENCODING))) { String line; while ((line = errorReader.readLine()) != null) { System.out.println(line); } } catch (IOException e) { throw new RuntimeException(e); } }).start(); // 等待命令执行完成 int i = process.waitFor(); if (i == 0) { System.out.println("job执行完成"); } else { System.out.println("job执行失败"); }

如果我们不用区分标准输出错误输出,我们可以将错误输出合并到标准输出

List<String> DATAX_COMMNDS = Arrays.asList("java", "-server", "-Xms4g", "-Xmx4g", "-Dfile.encoding=GBK", "-Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener", "-Ddatax.home=E:\\git-project\\datax-home", "-Dlogback.configurationFile=E:\\git-project\\datax-home\\conf\\logback.xml", "-classpath", "E:\\git-project\\datax-home\\lib\\*", "com.alibaba.datax.core.Engine", "-mode", "standalone", "-job", "E:\\git-project\\datax-home\\job\\mysql2mysql.json") ProcessBuilder pb = new ProcessBuilder(DATAX_COMMNDS); // 合并错误流到标准输出 pb.redirectErrorStream(true); Process process = pb.start(); // 另启线程读取标准输出 new Thread(() -> { try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream(), SYSTEM_ENCODING))) { String line; while ((line = reader.readLine()) != null) { System.out.println(line); } } catch (IOException e) { throw new RuntimeException(e); } }).start(); // 等待命令执行完成 int i = process.waitFor(); if (i == 0) { System.out.println("job执行完成"); } else { System.out.println("job执行失败"); }

每执行一次任务,都创建一个新的线程来读取输出流是不合理的;线程的创建与销毁都是存在资源消耗的,更合理的做法是采用线程池

线程池的合理创建与业务有关(IO密集还是CPU密集),就不展开了

已同步记录数

因为要实时感知 DataX 的同步记录数,我们改造了 DataX 的日志输出,将 DataX 每次写入目标库的记录数输出到日志中,然后读取日志中的记录数,并进行累加实时更新到数据库中。具体实现可参考:异源数据同步 → 如何获取 DataX 已同步数据量?,其中强调了持久化到数据库是一定要采用

updatetable_namesetsync_rows = sync_rows + syncRows;

具体实现类似如下

private static final List<String> DATAX_COMMNDS = Arrays.asList("java", "-server", "-Xms4g", "-Xmx4g", "-Dfile.encoding=GBK", "-Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener", "-Ddatax.home=E:\\git-project\\datax-home", "-Dlogback.configurationFile=E:\\git-project\\datax-home\\conf\\logback.xml", "-classpath", "E:\\git-project\\datax-home\\lib\\*", "com.alibaba.datax.core.Engine", "-mode", "standalone", "-job", "E:\\git-project\\datax-home\\job\\mysql2mysql.json"); @Test public void test() { // 先生成任务日志 Long jobId = 1L; LocalDateTime now = LocalDateTime.now(); int execStatus = -1; String msg = ""; QslJobLog qslJobLog = new QslJobLog(jobId, execStatus, 0L, now, now); qslJobLogDao.insert(qslJobLog); try { ProcessBuilder pb = new ProcessBuilder(DATAX_COMMNDS); // 合并错误流到标准输出 pb.redirectErrorStream(true); Process process = pb.start(); // 线程池线程异步读取标准流 Future<String> streamFuture = readStream(process, qslJobLog.getId()); msg = streamFuture.get(); // 等待命令执行完成 int i = process.waitFor(); if (i == 0) { execStatus = 1; LOGGER.info("job[{}]执行完成", jobId); } else { LOGGER.error("job[{}]执行失败", jobId); execStatus = 0; } } catch (Exception e) { execStatus = 0; msg = "任务执行异常:" + e.getMessage(); LOGGER.error("任务执行异常:", e); } now = LocalDateTime.now(); qslJobLogDao.update(new LambdaUpdateWrapper<QslJobLog>() .eq(QslJobLog::getId, qslJobLog.getId()) .set(QslJobLog::getExecStatus, execStatus) .set(QslJobLog::getUpdateTime, now) .set(QslJobLog::getRemark, msg)); } private Future<String> readStream(Process process, Long jobLogId) { return executorService.submit(() -> { String threadName = Thread.currentThread().getName(); LOGGER.info("线程[{}]读取任务日志开始", threadName); StringBuilder sb = new StringBuilder(); try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream(), SYSTEM_ENCODING))) { String line; while ((line = reader.readLine()) != null) { if (line.contains("sync rows=")) { long syncRows = Long.parseLong(line.split("=")[1]); qslJobLogDao.update(new LambdaUpdateWrapper<QslJobLog>() .eq(QslJobLog::getId, jobLogId) .setSql("sync_rows = sync_rows + " + syncRows)); } else { sb.append(line).append(StrPool.CRLF); } LOGGER.info(line); } } catch (IOException e) { LOGGER.error("日志读取异常:", e); } LOGGER.info("线程[{}]读取任务日志结束", threadName); // 保留后面20000字符 return sb.length() > 20000 ? sb.substring(sb.length() - 20000, sb.length()) : sb.toString(); }); }

其中表 tbl_qsl_job_log 结构如下

CREATE TABLE `tbl_qsl_job_log` ( `id` bigint NOT NULL COMMENT '主键id', `job_id` bigint NOT NULL COMMENT '任务id', `sync_rows` bigint NOT NULL DEFAULT '0' COMMENT '同步数量', `exec_status` tinyint DEFAULT NULL COMMENT '执行-状态,-2:等待中,-1:执行中,0:失败,1:成功', `remark` text COMMENT 'datax执行日志', `create_time` datetime DEFAULT NULL COMMENT '创建时间', `update_time` datetime DEFAULT NULL COMMENT '最终修改时间', PRIMARY KEY (`id`) )

readStream方法进行一下补充说明,其中StringBuilder sb记录的是 DataX 的日志输出(不包括包含sync rows=的行),并且截取最后 20000 个字符进行落库,目的是方便从平台查看 DataX 的执行日志

针对如上代码,你们觉得有哪些优化空间?下面是我做的一些优化调整

  1. 删除 DataX 日志落库逻辑,直接对接 DataX 任务日志文件

    DataX 日志不落库的话,对readStream进行调整

    private Future<Long> readStream(Process process, Long jobLogId) { return executorService.submit(() -> { String threadName = Thread.currentThread().getName(); long totalRows = 0; LOGGER.info("线程[{}]读取任务日志开始,jobLogId={}", threadName, jobLogId); try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream(), SYSTEM_ENCODING))) { String line; while ((line = reader.readLine()) != null) { if (line.contains("sync rows=")) { long syncRows = Long.parseLong(line.split("=")[1]); totalRows += syncRows; qslJobLogDao.update(new LambdaUpdateWrapper<QslJobLog>() .eq(QslJobLog::getId, jobLogId) .setSql("sync_rows = sync_rows + " + syncRows)); } LOGGER.info(line); } } catch (IOException e) { LOGGER.error("日志读取异常:", e); } LOGGER.info("线程[{}]读取任务日志结束,jobLogId={}", threadName, jobLogId); return totalRows; }); }

    既然对接 DataX 日志文件,那么 DataX 日志文件的重要性就上来了,自然对其结构管理要更规范一些了;对 DataX 的logbook.xml进行调整

    <?xml version="1.0" encoding="UTF-8"?> <configuration> <property name="log.dir" value="${datax.home}/log/" /> <property name="job.id" value="${job.id}" /> <property name="job.log.id" value="${job.log.id}" /> <property name="ymd" value="${current.day}"/> <property name="byMillionSecond" value="${current.time.millis}"/> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <Encoding>UTF-8</Encoding> <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> <pattern>%msg%n</pattern> </encoder> </appender> <appender name="FILE" class="ch.qos.logback.core.FileAppender"> <charset>UTF-8</charset> <file>${log.dir}/${ymd}/${job.id}/${job.log.id}-${byMillionSecond}.log</file> <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{0} - %msg%n</pattern> </encoder> </appender> <root level="${loglevel:-INFO}"> <appender-ref ref="STDOUT" /> <appender-ref ref="FILE" /> </root> </configuration>

    文件中涉及 5 个变量,可以通过设置系统属性的方式传递给 DataX 的 logback

    private static final String DATAX_HOME_PATH = "E:\\git-project\\datax-home"; @Test public void test() { // 先生成任务日志 Long jobId = 2L; LocalDateTime now = LocalDateTime.now(); int execStatus = -1; long totalRows = 0; QslJobLog qslJobLog = new QslJobLog(jobId, execStatus, 0L, now, now); qslJobLogDao.insert(qslJobLog); String currentDay = now.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")); String currentTimeMillis = now.format(DateTimeFormatter.ofPattern("HH_mm_ss.SSS")); List<String> DATAX_COMMNDS = Arrays.asList("java", "-server", "-Xms4g", "-Xmx4g", "-Dfile.encoding=GBK", "-Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener", "-Ddatax.home=" + DATAX_HOME_PATH, "-Dlogback.configurationFile=" + DATAX_HOME_PATH + "\\conf\\logback.xml", "-Djob.id=" + jobId, "-Djob.log.id=" + qslJobLog.getId(), "-Dcurrent.day=" + currentDay, "-Dcurrent.time.millis=" + currentTimeMillis, "-classpath", DATAX_HOME_PATH + "\\lib\\*", "com.alibaba.datax.core.Engine", "-mode", "standalone", "-job", DATAX_HOME_PATH + "\\job\\mysql2mysql.json"); String jobLogPath = DATAX_HOME_PATH + "\\log\\" + currentDay + "\\" + jobId + "\\" + qslJobLog.getId() + "-" + currentTimeMillis + ".log"; try { ProcessBuilder pb = new ProcessBuilder(DATAX_COMMNDS); // 合并错误流到标准输出 pb.redirectErrorStream(true); Process process = pb.start(); // 线程池线程异步读取标准流 Future<Long> streamFuture = readStream(process, qslJobLog.getId()); totalRows = streamFuture.get(); // 等待命令执行完成 int i = process.waitFor(); if (i == 0) { execStatus = 1; LOGGER.info("job[{}]执行完成,totalRows={}", jobId, totalRows); } else { LOGGER.error("job[{}]执行失败", jobId); execStatus = 0; } } catch (Exception e) { execStatus = 0; LOGGER.error("任务执行异常:", e); } now = LocalDateTime.now(); qslJobLogDao.update(new LambdaUpdateWrapper<QslJobLog>() .eq(QslJobLog::getId, qslJobLog.getId()) .set(QslJobLog::getExecStatus, execStatus) .set(QslJobLog::getUpdateTime, now) .set(QslJobLog::getRemark, jobLogPath)); }
    • datax.home,DataX 的 home 路径,示例中路径:E:\git-project\datax-home
    • job.id,平台任务id,示例中是 2
    • job.log.id,平台任务执行的日志id,每次任务执行时,通过 Mybatis Plus 的雪花算法生成
    • current.day,平台任务执行时的日期,格式:yyyy-MM-dd
    • current.time.millis,平台任务执行时的时分秒以及毫秒,格式:HH_mm_ss.SSS

    示例代码中,jobLogPath落库到了表 tbl_qsl_job_log 的remark字段,这是不推荐的,应该新增字段(如:datax_log_path)来存储;任务执行完成之后,日志路径与文件名格式如下

    这个路径也在表 tbl_qsl_job_log 中进行了存储

  2. 删除异步等待,减少平台任务与 DataX 任务的结束时差

    我们细细斟酌下如下几行代码

    // 线程池线程异步读取标准流

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询