造轮子之MemorySafeLinkedBlockingQueue-LinkBlockingQueue改进

发布一下 0 0

LinkBlockingQueue改进

问题背景

https://github.com/apache/dubbo/pull/9722/files
使用线程池的同学对于标题中的队列想必都有过使用,但上述队列使用不当时则会造成程序OOM,那怎么来控制呢?

使用ArrayBlockingQueue?如何来评估长度?

是否有一个完美的解决方案呢,MemorySafeLinkedBlockingQueue则通过对内存的限制判断尽面控制队列的容量,完成解决了可能存在的OOM问题。

获取内存大小(注:单位大B;支持准实时更新):

Runtime.getRuntime().freeMemory()//JVM中已经申请到的堆内存中还未使用的大小Runtime.getRuntime().maxMemory()// JVM可从操作系统申请到的最大内存值 -XxmRuntime.getRuntime().totalMemory()// JVM已从操作系统申请到的内存大小 —Xxs可设置该值大小-初始堆的大小

线程池在excute任务时,放队列,放不进去,使用新线程运行任务。这个放不进行,是使用的offer??非阻塞方法吗?

参考:https://blog.csdn.net/weixin_43108539/article/details/125190023

 public void execute(Runnable command) {        if (command == null)            throw new NullPointerException();     	//拿到32位的int        int c = ctl.get();     	//工作线程数<核心线程数        if (workerCountOf(c) < corePoolSize) {            //进入if,代表可以创建 核心 线程数            if (addWorker(command, true))                return;            //如果没进入if,代表创建核心线程数失败,重新获取 ctl            c = ctl.get();        }        //判断线程池为Running状态,将任务添加入阻塞队列,使用offer        if (isRunning(c) && workQueue.offer(command)) {            int recheck = ctl.get();            //再次判断是否为Running状态,若不是Running状态,remove任务            if (! isRunning(recheck) && remove(command))                reject(command);            //如果线程池在Running状态,线程池数量为0            else if (workerCountOf(recheck) == 0)                //阻塞队列有任务,但是没有工作线程,添加一个任务为空的工作线程处理阻塞队列中的任务                addWorker(null, false);        }        //阻塞队列已满,创建非核心线程,拒绝策略-addWorker中有判断核心线程数是否超过最大线程数        else if (!addWorker(command, false))            reject(command);    }

空闲内存计算

package com.zte.sdn.oscp.queue;import cn.hutool.core.thread.NamedThreadFactory;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicBoolean;public class MemoryLimitCalculator {    private static volatile long maxAvailable;    private static final AtomicBoolean refreshStarted = new AtomicBoolean(false);    private static void refresh() {        maxAvailable = Runtime.getRuntime().freeMemory();    }    private static void checkAndScheduleRefresh() {        if (!refreshStarted.get()) {            // immediately refresh when first call to prevent maxAvailable from being 0            // to ensure that being refreshed before refreshStarted being set as true            // notice: refresh may be called for more than once because there is no lock            refresh();            if (refreshStarted.compareAndSet(false, true)) {                ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Dubbo-Memory-Calculator"));                // check every 50 ms to improve performance                scheduledExecutorService.scheduleWithFixedDelay(MemoryLimitCalculator::refresh, 50, 50, TimeUnit.MILLISECONDS);                Runtime.getRuntime().addShutdownHook(new Thread(() -> {                    refreshStarted.set(false);                    scheduledExecutorService.shutdown();                }));            }        }    }    /**     * Get the maximum available memory of the current JVM.     *     * @return maximum available memory     */    public static long maxAvailable() {        checkAndScheduleRefresh();        return maxAvailable;    }    /**     * Take the current JVM's maximum available memory     * as a percentage of the result as the limit.     *     * @param percentage percentage     * @return available memory     */    public static long calculate(final float percentage) {        if (percentage <= 0 || percentage > 1) {            throw new IllegalArgumentException();        }        checkAndScheduleRefresh();        return (long) (maxAvailable() * percentage);    }    /**     * By default, it takes 80% of the maximum available memory of the current JVM.     *     * @return available memory     */    public static long defaultLimit() {        checkAndScheduleRefresh();        return (long) (maxAvailable() * 0.8);    }}

内存安全队列

package com.zte.sdn.oscp.queue;import java.util.Collection;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.TimeUnit;public class MemorySafeLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> {    private static final long serialVersionUID = 8032578371739960142L;    public static int THE_256_MB = 256 * 1024 * 1024;    private int maxFreeMemory;    private Rejector<E> rejector;    public MemorySafeLinkedBlockingQueue() {        this(THE_256_MB);    }    public MemorySafeLinkedBlockingQueue(final int maxFreeMemory) {        super(Integer.MAX_VALUE);        this.maxFreeMemory = maxFreeMemory;        //default as DiscardPolicy to ensure compatibility with the old version        this.rejector = new DiscardPolicy<>();    }    public MemorySafeLinkedBlockingQueue(final Collection<? extends E> c,                                         final int maxFreeMemory) {        super(c);        this.maxFreeMemory = maxFreeMemory;        //default as DiscardPolicy to ensure compatibility with the old version        this.rejector = new DiscardPolicy<>();    }    /**     * set the max free memory.     *     * @param maxFreeMemory the max free memory     */    public void setMaxFreeMemory(final int maxFreeMemory) {        this.maxFreeMemory = maxFreeMemory;    }    /**     * get the max free memory.     *     * @return the max free memory limit     */    public int getMaxFreeMemory() {        return maxFreeMemory;    }    /**     * set the rejector.     *     * @param rejector the rejector     */    public void setRejector(final Rejector<E> rejector) {        this.rejector = rejector;    }    /**     * determine if there is any remaining free memory.     *     * @return true if has free memory     */    public boolean hasRemainedMemory() {        return MemoryLimitCalculator.maxAvailable() > maxFreeMemory;    }    @Override    public void put(final E e) throws InterruptedException {        if (hasRemainedMemory()) {            super.put(e);        } else {            rejector.reject(e, this);        }    }    @Override    public boolean offer(final E e, final long timeout, final TimeUnit unit) throws InterruptedException {        if (!hasRemainedMemory()) {            rejector.reject(e, this);            return false;        }        return super.offer(e, timeout, unit);    }    @Override    public boolean offer(final E e) {        if (!hasRemainedMemory()) {            rejector.reject(e, this);            return false;        }        return super.offer(e);    }}

拒绝策略

注意其中的rejector是拒绝策略,默认的DiscardPolicy什么也不处理;

而DiscardOldPolicy的处理逻辑很简单

public class DiscardOldestPolicy<E> implements Rejector<E> {    @Override    public void reject(final E e, final Queue<E> queue) {        queue.poll();        queue.offer(e);    }}

AbortPolicy则直接抛出异常

public class AbortPolicy<E> implements Rejector<E> {    @Override    public void reject(final E e, final Queue<E> queue) {        throw new RejectException("no more memory can be used !");    }}

个人建议增加日志打印即可。

版权声明:内容来源于互联网和用户投稿 如有侵权请联系删除

本文地址:http://0561fc.cn/154548.html