最后的并行查询加载模块BatchQueryLoader直接就是调用上面的异步并行查询执行器BatchQueryExecutor,完成不同数据源的数据并行异步加载,代码如下

发布时间:2026/7/2 2:34:54
最后的并行查询加载模块BatchQueryLoader直接就是调用上面的异步并行查询执行器BatchQueryExecutor,完成不同数据源的数据并行异步加载,代码如下 ** * filename:BatchQueryLoader.java * * Newland Co. Ltd. All rights reserved. * * Description:并行查询加载模块 * author tangjie * version 1.0 * */ package newlandframework.batchtask.parallel; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Iterator; import java.util.List; public class BatchQueryLoader { private final CollectionStatementWrapper statements new ArrayListStatementWrapper(); public void attachLoadEnv(final String sql, final Statement statement, final Connection con) { statements.add(new StatementWrapper(sql, statement, con)); } public CollectionStatementWrapper getStatements() { return statements; } public void close() throws SQLException { IteratorStatementWrapper iter statements.iterator(); while (iter.hasNext()) { iter.next().getCon().close(); } } public ListResultSet executeQuery() throws SQLException { ListResultSet result; if (1 statements.size()) { StatementWrapper entity statements.iterator().next(); result Arrays.asList(entity.getStatement().executeQuery( entity.getSql())); return result; } else { BatchQueryExecutor query new BatchQueryExecutor(); result query.executeQuery(statements, new BatchQueryStatementWrapper, ResultSet() { Override public ResultSet query(final StatementWrapper input) throws Exception { return input.getStatement().executeQuery( input.getSql()); } }); return result; } } }批量处理线程池运行参数配置加载BatchTaskConfigurationLoader模块主要从负责从batchtask-configuration.xml中加载线程池的运行参数。BatchTaskConfiguration批处理线程池运行参数对应的JavaBean结构/** * filename:BatchTaskConfiguration.java * * Newland Co. Ltd. All rights reserved. * * Description:批处理线程池参数配置 * author tangjie * version 1.0 * */ package newlandframework.batchtask.parallel; import org.apache.commons.lang.builder.EqualsBuilder; import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.commons.lang.builder.ToStringBuilder; import org.apache.commons.lang.builder.ToStringStyle; public class BatchTaskConfiguration { private String name; private int corePoolSize; private int maxPoolSize; private int keepAliveTime; private int workQueueSize; public void setName(String name) { this.name name; } public String getName() { return this.name; } public int getCorePoolSize() { return corePoolSize; } public void setCorePoolSize(int corePoolSize) { this.corePoolSize corePoolSize; } public int getMaxPoolSize() { return maxPoolSize; } public void setMaxPoolSize(int maxPoolSize) { this.maxPoolSize maxPoolSize; } public int getKeepAliveTime() { return keepAliveTime; } public void setKeepAliveTime(int keepAliveTime) { this.keepAliveTime keepAliveTime; } public int getWorkQueueSize() { return workQueueSize; } public void setWorkQueueSize(int workQueueSize) { this.workQueueSize workQueueSize; } public int hashCode() { return new HashCodeBuilder(1, 31).append(name).toHashCode(); } public String toString() { return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) .append(name, name).append(corePoolSize, corePoolSize) .append(maxPoolSize, maxPoolSize) .append(keepAliveTime, keepAliveTime) .append(workQueueSize, workQueueSize).toString(); } public boolean equals(Object o) { boolean res false; if (o ! null BatchTaskConfiguration.class.isAssignableFrom(o.getClass())) { BatchTaskConfiguration s (BatchTaskConfiguration) o; res new EqualsBuilder().append(name, s.getName()).isEquals(); } return res; } }当然了你进行参数配置的时候还可以指定多个线程池于是要设计一个批处理线程池工厂类BatchTaskThreadFactoryConfiguration来依次循环保存若干个线程池的参数配置/** * filename:BatchTaskThreadFactoryConfiguration.java * * Newland Co. Ltd. All rights reserved. * * Description:线程池参数配置工厂 * author tangjie * version 1.0 * */ package newlandframework.batchtask.parallel; import java.util.Map; import java.util.HashMap; public class BatchTaskThreadFactoryConfiguration { // 批处理线程池参数配置 private MapString, BatchTaskConfiguration batchTaskMap new HashMapString, BatchTaskConfiguration(); public BatchTaskThreadFactoryConfiguration() { } public void joinBatchTaskConfiguration(BatchTaskConfiguration batchTaskConfiguration) { if (batchTaskMap.containsKey(batchTaskConfiguration.getName())) { return; }else{ batchTaskMap.put(batchTaskConfiguration.getName(), batchTaskConfiguration); } } public MapString, BatchTaskConfiguration getBatchTaskMap() { return batchTaskMap; } }剩下的是加载运行时参数配置模块BatchTaskConfigurationLoader/** * filename:BatchTaskConfigurationLoader.java * * Newland Co. Ltd. All rights reserved. * * Description:线程池参数配置加载 * author tangjie * version 1.0 * */ package newlandframework.batchtask.parallel; import java.io.InputStream; import org.apache.commons.digester.Digester; public final class BatchTaskConfigurationLoader { private static final String BATCHTASK_THREADPOOL_CONFIG ./newlandframework/batchtask/parallel/batchtask-configuration.xml; private static BatchTaskThreadFactoryConfiguration config null; private BatchTaskConfigurationLoader() { } // 单例模式为了控制并发要进行同步控制 public static BatchTaskThreadFactoryConfiguration getConfig() { if (config null) { synchronized (BATCHTASK_THREADPOOL_CONFIG) { if (config null) { try { InputStream is getInputStream(); config (BatchTaskThreadFactoryConfiguration) getDigester().parse(getInputStream()); } catch (Exception e) { e.printStackTrace(); } } } } return config; } private static InputStream getInputStream() { return BatchTaskConfigurationLoader.class.getClassLoader() .getResourceAsStream(BATCHTASK_THREADPOOL_CONFIG); } private static Digester getDigester() { Digester digester new Digester(); digester.setValidating(false); digester.addObjectCreate(batchtask, BatchTaskThreadFactoryConfiguration.class); // 加载批处理异步批处理线程池参数配置 digester.addObjectCreate(*/jobpool, BatchTaskConfiguration.class); digester.addSetProperties(*/jobpool); digester.addSetProperty(*/jobpool/attribute, name, value); digester.addSetNext(*/jobpool, joinBatchTaskConfiguration); return digester; } }上面的这些模块主要是针对线程池的运行参数可以调整而设计准备的。并行异步批处理模块BatchTaskReactor主要类图结构如下BatchTaskRunner这个接口主要定义了批处理框架要初始化和回收资源的动作。/** * filename:BatchTaskRunner.java * * Newland Co. Ltd. All rights reserved. * * Description:批处理资源管理定义接口 * author tangjie * version 1.0 * */ package newlandframework.batchtask.parallel; import java.io.Closeable; public interface BatchTaskRunner extends Closeable { public void initialize(); public void close(); }我们还要重新实现一个线程工厂类BatchTaskThreadFactory用来管理我们线程池当中的线程。我们可以把线程池当中的线程放到线程组里面进行统一管理。比如线程池中的线程它的运行状态监控等等处理你可以通过重新生成一个监控线程来运行、跟踪线程组里面线程的运行情况。当然你还可以重新封装一个JMXJava Management Extensions的MBean对象通过JMX方式对线程池进行监控处理本文的后面有给出运用JMX技术进行批处理线程池任务完成情况监控的实现实现线程池中线程运行状态的监控可以参考一下。这里就不具体给出线程池线程状态监控的JMX模块代码了。言归正传线程工厂类BatchTaskThreadFactory的实现如下/** * filename:BatchTaskThreadFactory.java * * Newland Co. Ltd. All rights reserved. * * Description:线程池工厂 * author tangjie * version 1.0 * */ package newlandframework.batchtask.parallel; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.ThreadFactory; public class BatchTaskThreadFactory implements ThreadFactory { final private static String BATCHTASKFACTORYNAME batchtask-pool; final private String name; final private ThreadGroup threadGroup; final private AtomicInteger threadNumber new AtomicInteger(0); public BatchTaskThreadFactory() { this(BATCHTASKFACTORYNAME); } public BatchTaskThreadFactory(String name) { this.name name; SecurityManager security System.getSecurityManager(); threadGroup (security ! null) ? security.getThreadGroup() : Thread.currentThread().getThreadGroup(); } Override public Thread newThread(Runnable runnable) { Thread thread new Thread(threadGroup, runnable); thread.setName(String.format(BatchTask[%s-%d], threadGroup.getName(), threadNumber.incrementAndGet())); System.out.println(String.format(BatchTask[%s-%d], threadGroup.getName(), threadNumber.incrementAndGet())); if (thread.isDaemon()) { thread.setDaemon(false); } if (thread.getPriority() ! Thread.NORM_PRIORITY) { thread.setPriority(Thread.NORM_PRIORITY); } return thread; } }下面是关键模块并行异步批处理模块BatchTaskReactor的实现代码主要还是对ThreadPoolExecutor进行地封装考虑使用有界的数组阻塞队列ArrayBlockingQueue还是为了防止生产者无休止的请求服务导致内存崩溃最终做到内存使用可控采取的措施。/** * filename:BatchTaskReactor.java * * Newland Co. Ltd. All rights reserved. * * Description:批处理并行异步线程池处理模块 * author tangjie * version 1.0 * */ package newlandframework.batchtask.parallel; import java.util.Set; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public final class BatchTaskReactor implements BatchTaskRunner { private MapString, ExecutorService threadPools new ConcurrentHashMapString, ExecutorService(); private static BatchTaskReactor context; private static Lock REACTORLOCK new ReentrantLock(); public static final String BATCHTASK_THREADPOOL_NAME newlandframework_batchtask; private BatchTaskReactor() { initialize(); } // 防止并发重复创建批处理反应器对象 public static BatchTaskReactor getReactor() { if (context null) { try { REACTORLOCK.lock(); if (context null) { context new BatchTaskReactor(); } } finally { REACTORLOCK.unlock(); } } return context; } public ExecutorService getBatchTaskThreadPoolName() { return getBatchTaskThreadPool(BATCHTASK_THREADPOOL_NAME); } public ExecutorService getBatchTaskThreadPool(String poolName) { if (!threadPools.containsKey(poolName)) { throw new IllegalArgumentException(String.format( 批处理线程池名称:[%s]参数配置不存在, poolName)); } return threadPools.get(poolName); } public SetString getBatchTaskThreadPoolNames() { return threadPools.keySet(); } // 关闭线程池,同时等待异步执行的任务返回执行结果 public void close() { for (EntryString, ExecutorService entry : threadPools.entrySet()) { entry.getValue().shutdown(); System.out.println(String.format(关闭批处理线程池:[%s]成功, entry.getKey())); } threadPools.clear(); } // 初始化批处理线程池 public void initialize() { BatchTaskThreadFactoryConfiguration poolFactoryConfig BatchTaskConfigurationLoader.getConfig(); if (poolFactoryConfig ! null) { initThreadPool(poolFactoryConfig); } } private void initThreadPool(BatchTaskThreadFactoryConfiguration poolFactoryConfig) { for (EntryString, BatchTaskConfiguration entry : poolFactoryConfig.getBatchTaskMap().entrySet()) { BatchTaskConfiguration config entry.getValue(); // 使用有界的阻塞队列,考虑为了防止生产者无休止的请求服务,导致内存崩溃,最终做到内存使用可控 BlockingQueueRunnable queue new ArrayBlockingQueueRunnable(config.getWorkQueueSize()); ThreadPoolExecutor threadPool new ThreadPoolExecutor( config.getCorePoolSize(), config.getMaxPoolSize(), config.getKeepAliveTime(), TimeUnit.SECONDS, queue, new BatchTaskThreadFactory(entry.getKey()),new ThreadPoolExecutor.CallerRunsPolicy()); threadPools.put(entry.getKey(), threadPool); System.out.println(String.format(批处理线程池:[%s]创建成功,config.toString())); } } }