多线程+队列做大数据量批量插入

前言

批量写入大数据在我们平时的项目中或有遇到,一般我们能想到的提高速度的方式就是使用多线程。比如我们要入10w条数据,那么创建10个线程,每个线程承担入1w条数据。从效率上来说,这比单线程场景高10倍。本人曾经想试图封装这个工具类出来,但是借鉴了网上很多封装的例子,最后还是失败了。
最近浏览群里的一位大佬的帖子,发现了他也封装了这种批量提交的工具类,我体验了一下速度很快,所以就想拿出来给大家分享一下,在此也十分感谢茶佬的支持。

代码

添加必要依赖
因为有用到hutool的异常工具类,所以要添加,如果你项目没有且没有添加的条件,改成手动抛出则可

 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- 阿里 druid 数据源 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.1.10</version>
        </dependency>

        <!-- mybatis+springboot的jar-->
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.1.2</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.47</version>
            <scope>runtime</scope>
        </dependency>

        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.7.16</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

工具类

import cn.hutool.core.exceptions.ExceptionUtil;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/**
 * @Author: 朝花不迟暮
 * @Date: 2021/11/30 22:48
 * @Description:
 */
public class BatchInsertProcessor<T> {
    /**
     * 线程名称前缀,可自定义
     */
    private String threadNamePrefix = "reportInsert-";


    // 是否已经开始处理
    private boolean started;
    // 用于等待线程处理结束后的收尾处理
    private CountDownLatch cdl;
    // 是否还会产生数据: 用于配合 queue.size() 判断线程是否该结束
    private volatile boolean isProduceData = true;
    // 实体数据容器队列,队列满,则限制生产方的生产速度
    private ArrayBlockingQueue<T> queue;

    // 能存储数据时,就调用该方法给使用方,使用方可以调用存储接口存储
    private StorageConsumer<T> consumer;
    // 批量插入时,每次最多插入多少条
    private int maxItemCount;
    private List<WorkThread> workThreadList;


    public BatchInsertProcessor() {
        this(1000);
    }

    /**
     * <pre>
     *    capacity :利用队列的阻塞 put,来调节生产速度和消费速度的差别
     *      当生产速度明显大于插入速度时,该参数用来限制生产的速度,达到该上限时,生成方就会阻塞,知道有新的容量空闲出来
     * </pre>
     *
     * @param capacity 队列能接收的最大容量
     */
    public BatchInsertProcessor(int capacity) {
        queue = new ArrayBlockingQueue<>(capacity);
    }

    /**
     * 配置线程名称前缀
     *
     * @param threadNamePrefix
     */
    public synchronized void setThreadNamePrefix(String threadNamePrefix) {
        if (started) {
            throw new RuntimeException("已经开始处理,不能再线程名称前缀");
        }
        this.threadNamePrefix = threadNamePrefix;
    }

    public void start(StorageConsumer<T> consumer) {
        this.start(consumer, 4);
    }

    public void start(StorageConsumer<T> consumer, int workThreadCount) {
        this.start(consumer, workThreadCount, 0);
    }


    /**
     * @param consumer        每次保存实体时,调用该方法,由该方法处理保存逻辑
     * @param workThreadCount 并发数量:用几个多线程同时处理数据
     * @param maxItemCount    批量插入数量,每次插入最多多少条:将单个实体缓存起来,达到该数量再入库,如果使用批量插入功能,则必须大于 1
     */
    public synchronized void start(StorageConsumer<T> consumer, int workThreadCount, int maxItemCount) {
        if (started) {
            throw new RuntimeException("处理中");
        }
        started = true;
        this.consumer = consumer;
        this.maxItemCount = maxItemCount;
        this.cdl = new CountDownLatch(workThreadCount);
        workThreadList = IntStream.range(0, workThreadCount)
                .mapToObj(i -> {
                    final WorkThread workThread = new WorkThread(threadNamePrefix + i, maxItemCount);
                    workThread.start();
                    return workThread;
                })
                .collect(Collectors.toList());
    }

    /**
     * 将实体交给处理器,处理器的线程会消费该实体;
     * <pre>
     *  当容器队列已满时,则会阻塞,以此达到生产方暂停生产的目的;可以防止生产速度过快(消费速度过慢),导致占用过多内存
     * </pre>
     *
     * @param entity
     */
    public void put(T entity) {
        try {
            queue.put(entity);
        } catch (InterruptedException e) {
            ExceptionUtil.wrapAndThrow(e);
        }
    }

    /**
     * 等待,处理器处理完成;此方法会阻塞
     */
    public void await() {
        if (!started) {
            throw new RuntimeException("还未运行");
        }
        try {
            isProduceData = false;
            cdl.await();
            for (WorkThread workThread : workThreadList) {
                workThread.clearEntity();
            }
        } catch (InterruptedException e) {
            ExceptionUtil.wrapAndThrow(e);
        }
    }

    /**
     * 立即停止,只适合在生产方不生产数据时,调用
     */
    public void stop() {
        if (!started) {
            throw new RuntimeException("还未运行");
        }
        isProduceData = false;
        queue.clear();
    }

    private class WorkThread extends Thread {
        // 批量插入时,用于缓存实体的容器
        private List<T> batchCacheContainer;
        private int maxItemCount;

        public WorkThread(String name, int maxItemCount) {
            super(name);
            this.maxItemCount = maxItemCount;
            if (maxItemCount > 0) {
                batchCacheContainer = new ArrayList<>(maxItemCount);
            }
        }

        @Override
        public void run() {
            while (true) {
                // 如果不产生数据了,队列也会空,则退出线程
                if (!isProduceData && queue.size() == 0) {
                    break;
                }
                final T entity;
                try {
                    entity = queue.poll(500, TimeUnit.MILLISECONDS);
                    if (entity == null) {
                        continue;
                    }
                    if (maxItemCount > 0) {
                        batchCacheContainer.add(entity);
                        if (batchCacheContainer.size() >= maxItemCount) {
                            consumer.accept(null, batchCacheContainer);
                            batchCacheContainer.clear();
                        }
                    } else {
                        consumer.accept(entity, null);
                    }
                } catch (InterruptedException e) {
                    cdl.countDown();
                    ExceptionUtil.wrapAndThrow(e);
                }
            }
            cdl.countDown();
        }

        public void clearEntity() {
            if (maxItemCount > 0 && batchCacheContainer.size() > 0) {
                consumer.accept(null, batchCacheContainer);
                batchCacheContainer.clear();
            }
        }
    }

    public interface StorageConsumer<T> {
        /**
         * 需要使用方存储数据时,会调用该方法
         *
         * @param t
         * @param ts
         */
        void accept(T t, List<T> ts);
    }
}

封装实体类,这个我自己写了个测试实体

/**
 * @Author: 朝花不迟暮
 * @Date: 2021/11/30 22:52
 * @Description:
 */
public class DemoEntity {
    private int id;
    private String name;

    public DemoEntity(int id, String name) {
        this.id = id;
        this.name = name;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return "DemoEntity{" +
                "id=" + id +
                ", name='" + name + ''' +
                '}';
    }
}

创建表sql

CREATE TABLE `batch_save` (
  `id` int(11) NOT NULL,
  `name` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

mapper.xml

    <insert id="saveList">
        insert into batch_save(id, name) values
        <foreach collection="list" item="item" separator="," index="index">
            (#{item.id}, #{item.name})
        </foreach>
    </insert>

测试类

package com.zhbcm.save;

import com.zhbcm.save.dao.BatchSaveDao;
import com.zhbcm.save.entity.DemoEntity;
import com.zhbcm.save.util.BatchInsertProcessor;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

@SpringBootTest
class BatchInsertApplicationTests {
    @Resource
    private BatchSaveDao batchSaveDao;

    /**
     * 多线程批量插入测试
     */
    @Test
    public void batchInsert() {
        final BatchInsertProcessor<DemoEntity> work = new BatchInsertProcessor<>();
        work.start((t, ts) -> {
        //单条数据t就有数据,多条数据ts就会有数据
        //这里我是插了2w条数据,所以用ts
            batchSaveDao.saveList(ts);
        }, 10, 2000);
        // 模拟生产数据
        try {
            for (int i = 0; i < 20000; i++) {
                work.put(new DemoEntity(i, i + " name"));
            }
            // 等待入库完成
            work.await();
        } catch (Exception e) {
            // 如果生产过程中有异常,立即停止掉处理器,不再入库
            work.stop();
        }
    }

实际效果:2w条数据1s,我这里开了10个线程,每个线程最大承担量为2000
在这里插入图片描述
码云传送门
原文茶佬笔记地址:https://www.yuque.com/mrcode.cn/note-combat/qd8bo3

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
)">
< <上一篇
下一篇>>