Как заставить ThreadPoolExecutor увеличить потоки до максимума перед очередью?
Я был расстроен в течение некоторого времени с поведением по умолчанию ThreadPoolExecutor который поддерживает ExecutorService пулы потоков, которые так много из нас используют. Цитата из Javadocs:
если есть больше, чем corePoolSize, но меньше, чем maximumpoolsize потоков работает, новый поток будет создан только если очередь полна.
это означает, что если вы определяете пул потоков со следующим кодом, он будет никогда запустите 2-й поток, потому что LinkedBlockingQueue неограниченна.
ExecutorService threadPool =
new ThreadPoolExecutor(1 /*core*/, 50 /*max*/, 60 /*timeout*/,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(/* unlimited queue */));
только если у тебя ограниченном очереди является полное все Потоки над номером ядра запущены. Я подозреваю, что большое количество младших Java многопоточных программистов не знают об этом поведении ThreadPoolExecutor.
теперь у меня есть конкретный случай использования, когда это не является оптимальным. Я ищу способы, не написав свой собственный класс TPE, работать вокруг него.
мои требования для веб-службы, которая делает обратные вызовы к возможно ненадежной третьей стороне.
- я не хочу делать обратный вызов синхронно с веб-запросом, поэтому я хочу использовать пул потоков.
- я обычно получаю пару таких в минуту, поэтому я не хочу иметь
newFixedThreadPool(...)С большим количеством потоков, которые в основном находятся в состоянии покоя. - каждый так часто я получаю всплеск этого трафика, и я хочу, чтобы масштабировать количество потоков до некоторого максимального значения (скажем 50).
- мне нужно сделать лучшие попытка сделать все обратные вызовы, поэтому я хочу поставить в очередь любые дополнительные выше 50. Я не хочу перегружать остальную часть моего веб-сервера с помощью
newCachedThreadPool().
как я могу обойти это ограничение в ThreadPoolExecutor где очередь должна быть ограниченной и полной до будет запущено больше потоков? Как я могу заставить его начать больше потоков до задачи по очереди?
Edit:
@Flavio делает хороший момент об использовании ThreadPoolExecutor.allowCoreThreadTimeOut(true) чтобы иметь тайм-аут основных потоков и выход. Я подумал об этом, но мне все еще нужна функция core-threads. Я не хотел, чтобы количество потоков в пуле опускалось ниже размера ядра, если это возможно.
8 ответов:
как я могу обойти это ограничение в
ThreadPoolExecutorгде очередь должна быть ограничена и заполнена до того, как будет запущено больше потоков.я считаю, что наконец-то нашел несколько элегантное (может быть, немного хаки) решение этого ограничения с
ThreadPoolExecutor. Она включает в себя расширениеLinkedBlockingQueue, чтобы он вернетсяfalseнаqueue.offer(...)когда уже есть некоторые задачи в очереди. Если текущие потоки не справляются с поставленными в очередь задачами, TPE добавит дополнительный поток. Если пул уже находится на максимальных потоках, тоRejectedExecutionHandlerбудет называться. Это обработчик, который затем делаетput(...)в очередь.конечно странно писать очередь где
offer(...)может возвратитьfalseиput()никогда не блокирует, так что это хак часть. Но это хорошо работает с использованием очереди TPE, поэтому я не вижу никаких проблем с этим.вот код:
// extend LinkedBlockingQueue to force offer() to return false conditionally BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() { private static final long serialVersionUID = -6903933921423432194L; @Override public boolean offer(Runnable e) { /* * Offer it to the queue if there is 0 items already queued, else * return false so the TPE will add another thread. If we return false * and max threads have been reached then the RejectedExecutionHandler * will be called which will do the put into the queue. */ if (size() == 0) { return super.offer(e); } else { return false; } } }; ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1 /*core*/, 50 /*max*/, 60 /*secs*/, TimeUnit.SECONDS, queue); threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { /* * This does the actual put into the queue. Once the max threads * have been reached, the tasks will then queue up. */ executor.getQueue().put(r); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return; } } });С этим механизмом, когда я отправить задачи в очередь,
ThreadPoolExecutorбудет:
- первоначально масштабируйте количество потоков до размера ядра (здесь 1).
- предложите его в очередь. Если очередь пуста, она будет помещена в очередь для обработки существующими потоками.
- если в очереди уже есть 1 или более элементов, то
offer(...)возвращает false.- если возвращается false, увеличьте количество потоков в пуле до достижения максимального числа (здесь 50).
- если на максимуме, то он вызывает
RejectedExecutionHandler- The
RejectedExecutionHandlerзатем помещает задачу в очередь для обработки первым доступным потоком в порядке FIFO.хотя в моем примере кода выше, очередь не ограничена, вы также можете определить его как ограниченную очередь. Например, если вы добавляете емкость 1000 к
LinkedBlockingQueueтогда это:
- масштабирование потоков до max
- затем очередь до тех пор, пока это не будет полный с 1000 задач
- затем заблокируйте вызывающего абонента, пока пространство не станет доступным для очереди.
кроме того, если вам действительно нужно использовать
offer(...)вRejectedExecutionHandlerзатем вы могли бы использоватьoffer(E, long, TimeUnit)вместо сLong.MAX_VALUEкак тайм-аут.Edit:
я подправил свой
offer(...)переопределение метода в обратной связи @Ralf. Это приведет только к увеличению количества потоков в пуле, если они не будут идти в ногу с нагрузка.Edit:
еще одна настройка для этого ответа может заключаться в том, чтобы фактически спросить TPE, есть ли свободные потоки и только поставить элемент в очередь, если это так. Вы должны были бы сделать истинный класс для этого и добавить
ourQueue.setThreadPoolExecutor(tpe);метод.затем ваш
offer(...)метод может выглядеть примерно так:
- проверьте, если
tpe.getPoolSize() == tpe.getMaximumPoolSize()в этом случае просто позвонитеsuper.offer(...).- то если звоните
super.offer(...)так как там, кажется, простоя потоков.- в противном случае вернуть
falseчтобы разветвить другую нить.может, так:
int poolSize = tpe.getPoolSize(); int maximumPoolSize = tpe.getMaximumPoolSize(); if (poolSize >= maximumPoolSize || poolSize > tpe.getActiveCount()) { return super.offer(e); } else { return false; }обратите внимание, что методы get на TPE являются дорогими, так как они обращаются
volatileполя или (в случаеgetActiveCount()) заблокируйте TPE и пройдитесь по списку потоков. Кроме того, здесь есть условия гонки, которые могут привести к тому, что задача будет поставлена в очередь неправильно или другой поток раздвоится, когда был простой поток.
установите размер ядра и максимальный размер в одно и то же значение и разрешите удаление основных потоков из пула с помощью
allowCoreThreadTimeOut(true).
у меня уже есть два других ответа на этот вопрос, но я подозреваю, что это один из лучших.
он основан на технике в настоящее время принимаются ответ, а именно:
- переопределить очередь
offer()метод для (иногда) возврата false,- что вызывает
ThreadPoolExecutorчтобы либо создать новый поток, либо отклонить задачу, и- установить
RejectedExecutionHandlerдо на самом деле поставить задачу в очередь отказ.проблема в том, когда
offer()должна возвращать false. В настоящее время принятый ответ возвращает false, когда в очереди есть несколько задач, но, как я указал в своем комментарии, это вызывает нежелательные эффекты. Кроме того, если вы всегда возвращаете false, вы будете продолжать создавать новые потоки, даже если у вас есть потоки, ожидающие очереди.решение заключается в использовании Java 7
LinkedTransferQueueиoffer()вызовtryTransfer(). Когда существует ожидающий поток потребителя, задача будет просто передана этому потоку. В противном случае,offer()вернет false иThreadPoolExecutorсоздаст новый поток.BlockingQueue<Runnable> queue = new LinkedTransferQueue<Runnable>() { @Override public boolean offer(Runnable e) { return tryTransfer(e); } }; ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue); threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { executor.getQueue().put(r); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } });
Примечание: теперь я предпочитаю и рекомендую мой другой ответ.
вот версия, которая кажется мне гораздо более простой: увеличьте corePoolSize (до предела maximumPoolSize) всякий раз, когда выполняется новая задача, а затем уменьшите corePoolSize (до предела указанного пользователем "размера основного пула") всякий раз, когда задача завершается.
другими словами, следите за количеством запущенных или поставленных в очередь задач и убедитесь, что corePoolSize равен равно числу задач, если оно находится между указанным пользователем "размером основного пула" и максимальным размером пула.
public class GrowBeforeQueueThreadPoolExecutor extends ThreadPoolExecutor { private int userSpecifiedCorePoolSize; private int taskCount; public GrowBeforeQueueThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); userSpecifiedCorePoolSize = corePoolSize; } @Override public void execute(Runnable runnable) { synchronized (this) { taskCount++; setCorePoolSizeToTaskCountWithinBounds(); } super.execute(runnable); } @Override protected void afterExecute(Runnable runnable, Throwable throwable) { super.afterExecute(runnable, throwable); synchronized (this) { taskCount--; setCorePoolSizeToTaskCountWithinBounds(); } } private void setCorePoolSizeToTaskCountWithinBounds() { int threads = taskCount; if (threads < userSpecifiedCorePoolSize) threads = userSpecifiedCorePoolSize; if (threads > getMaximumPoolSize()) threads = getMaximumPoolSize(); setCorePoolSize(threads); } }как написано, класс не поддерживает изменение указанного пользователем corePoolSize или maximumPoolSize после построения и не поддерживает управление рабочей очередью напрямую или через
remove()илиpurge().
у нас есть подкласс
ThreadPoolExecutorэто требует дополнительногоcreationThresholdи переопределяетexecute.public void execute(Runnable command) { super.execute(command); final int poolSize = getPoolSize(); if (poolSize < getMaximumPoolSize()) { if (getQueue().size() > creationThreshold) { synchronized (this) { setCorePoolSize(poolSize + 1); setCorePoolSize(poolSize); } } } }может быть, это тоже помогает, Но ваш выглядит более вычурно, конечно...
рекомендуемый ответ решает только одну (1) проблему с пулом потоков JDK:
пулы потоков JDK смещены в сторону очереди. Поэтому вместо того, чтобы порождать новый поток, они будут ставить задачу в очередь. Только если очередь достигнет своего предела, пул потоков создаст новый поток.
выход на пенсию потока не происходит, когда нагрузка облегчается. Например, если у нас есть всплеск заданий, попадающих в пул, который заставляет пул перейти к max, а затем при небольшой нагрузке до 2 задач одновременно пул будет использовать все потоки для обслуживания легкой нагрузки, предотвращающей выход потока на пенсию. (только 2 потока будут необходимы...)
недовольный поведением выше, я пошел вперед и реализовал пул для преодоления недостатков выше.
для решения 2) Использование планирования Lifo решает проблему. Эта идея была представлена Беном Маурером на конференции ACM applicative 2015: системы @ Facebook scale
Так родилась новая реализация:
до сих пор эта реализация улучшает производительность асинхронного выполнения для зел.
реализация spin способна уменьшить накладные расходы на переключение контекста, что обеспечивает превосходную производительность для определенных случаев использования.
надеюсь, что это помогает...
PS: JDK Fork Join Pool реализует ExecutorService и работает как" обычный " пул потоков, Реализация является производительной, она использует планирование потоков LIFO, однако нет контроля над внутренним размером очереди, таймаутом выхода на пенсию...
Примечание: теперь я предпочитаю и рекомендую мой другой ответ.
у меня есть еще одно предложение, следуя первоначальной идее изменения очереди, чтобы вернуть false. В этом случае все задачи могут войти в очередь, но всякий раз, когда задача ставится в очередь после
execute(), мы следуем за ним с помощью задачи sentinel no-op, которую очередь отклоняет, вызывая появление нового потока, который будет выполнять no-op сразу же после чего-то из очереди.потому что рабочие потоки могут будьте опрашивать
LinkedBlockingQueueдля новой задачи можно поставить задачу в очередь, даже если есть доступный поток. Чтобы избежать появления новых потоков даже при наличии доступных потоков, мы должны отслеживать, сколько потоков ожидает новых задач в очереди, и только порождать новый поток, когда в очереди больше задач, чем ожидающих потоков.final Runnable SENTINEL_NO_OP = new Runnable() { public void run() { } }; final AtomicInteger waitingThreads = new AtomicInteger(0); BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() { @Override public boolean offer(Runnable e) { // offer returning false will cause the executor to spawn a new thread if (e == SENTINEL_NO_OP) return size() <= waitingThreads.get(); else return super.offer(e); } @Override public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException { try { waitingThreads.incrementAndGet(); return super.poll(timeout, unit); } finally { waitingThreads.decrementAndGet(); } } @Override public Runnable take() throws InterruptedException { try { waitingThreads.incrementAndGet(); return super.take(); } finally { waitingThreads.decrementAndGet(); } } }; ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue) { @Override public void execute(Runnable command) { super.execute(command); if (getQueue().size() > waitingThreads.get()) super.execute(SENTINEL_NO_OP); } }; threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { if (r == SENTINEL_NO_OP) return; else throw new RejectedExecutionException(); } });
лучшее решение, которое я могу придумать, это продлить.
ThreadPoolExecutorпредлагает несколько методов крюк:beforeExecuteиafterExecute. В вашем расширении вы можете использовать ограниченную очередь для подачи задач и вторую неограниченную очередь для обработки переполнения. Когда кто-то звонитsubmit, вы можете попытаться поместить запрос в ограниченную очередь. Если вы столкнулись с исключением, вы просто вставляете задачу в свою очередь переполнения. Затем вы можете использоватьafterExecuteкрюк, чтобы увидеть, если есть все, что находится в очереди переполнения после завершения задачи. Таким образом, исполнитель сначала позаботится о том, что находится в ограниченной очереди, и автоматически вытащит из этой неограниченной очереди, как только позволит время.кажется, что это больше работы, чем ваше решение, но по крайней мере это не связано с неожиданным поведением очередей. Я также полагаю, что есть лучший способ проверить состояние очереди и потоков, а не полагаться на исключения, которые довольно медленно выбрасываются.
Comments