| | |
| | | import org.apache.commons.lang3.StringUtils; |
| | | import org.springframework.stereotype.Service; |
| | | |
| | | import java.util.Locale; |
| | | import java.util.Map; |
| | | import java.util.Set; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | |
| | | |
| | | /** |
| | | * 查询 |
| | | * |
| | | * @param sourceDbe |
| | | * @param sourceTable |
| | | * @param filter |
| | | * @param params |
| | | * @param uniqueField 主键,不仅会用于识别还会用于排序防止oracle分页获取到重复数据 |
| | | * @param uniqueField 主键,不仅会用于识别还会用于排序防止oracle分页获取到重复数据 |
| | | * @param minID |
| | | */ |
| | | public void query(DataBaseEntity sourceDbe, String sourceTable, String filter, Object[] params, String uniqueField, String minID) { |
| | |
| | | String tempPartMinID = minID; |
| | | int partSize = partCount * QUERY_PAGE_SIZE; |
| | | int count = ceilPage(totalCount, partSize); |
| | | for (int i = 1; i <= count; i++) { |
| | | for (int i = 1; i <= count; i++) { |
| | | rangeSql.setLength(0); |
| | | if (!StringUtils.isEmpty(filter)) { |
| | | rangeSql.append(filter).append(" and "); |
| | |
| | | |
| | | /** |
| | | * 从队列中获取 |
| | | * |
| | | * @param tableName |
| | | * @return |
| | | */ |
| | |
| | | |
| | | /** |
| | | * 判定是否查询完毕 |
| | | * |
| | | * @param tableName |
| | | * @return |
| | | */ |
| | |
| | | |
| | | /** |
| | | * 判定插入队列(查询完成后放入的队列)是否为空 |
| | | * |
| | | * @param tableName |
| | | * @return |
| | | */ |
| | |
| | | |
| | | /** |
| | | * 关闭查询线程 |
| | | * |
| | | * @param tableName |
| | | */ |
| | | public void shutdownQueryThread(String tableName) { |
| | |
| | | |
| | | /** |
| | | * 清理 |
| | | * |
| | | * @param tableName |
| | | */ |
| | | public void clear(String tableName) { |
| | |
| | | |
| | | /** |
| | | * 提取错误日志,只能提取一次,提取后会直接清空 |
| | | * |
| | | * @param tableName |
| | | * @return |
| | | */ |
| | |
| | | |
| | | /** |
| | | * 获取分段的数量,最多16,最小为0,为0表示用不上所有的线程 |
| | | * |
| | | * @param totalCount |
| | | * @return |
| | | */ |
| | |
| | | |
| | | /** |
| | | * 获取sql:查询范围内的最大id值 |
| | | * |
| | | * @param uniqueField |
| | | * @param sourceTable |
| | | * @param filter |
| | |
| | | |
| | | /** |
| | | * 放入队列 |
| | | * |
| | | * @param tableName |
| | | * @param dte |
| | | */ |
| | |
| | | queryQueue = new LinkedBlockingQueue<>(); |
| | | queryMap.put(tableName, queryQueue); |
| | | } |
| | | if (tableName.endsWith("BAK20230823")) { |
| | | //检查dte中的source_info 和 pre_master_key 是否为空 |
| | | for (int i = 0; i < dte.getRows(); i++) { |
| | | String sourceInfo = dte.getString(i, "source_info"); |
| | | String preMasterKey = dte.getString(i, "pre_master_key"); |
| | | if (StringUtils.isEmpty(sourceInfo) || StringUtils.isEmpty(preMasterKey)) { |
| | | dte.setFieldValue(i, "source_info", "ch-kt"); |
| | | String idFiledName = tableName.toLowerCase(Locale.ROOT).startsWith("t_wip_detail") ? "wip_detail_id" : tableName.toLowerCase(Locale.ROOT).startsWith("t_wip_product_keyp") ? "pk_id" : "wip_id"; |
| | | dte.setFieldValue(i, "pre_master_key", dte.getString(i, idFiledName)); |
| | | } |
| | | } |
| | | } |
| | | |
| | | queryQueue.add(dte); |
| | | WriteUtil.append("DA-成功放入队列-" + tableName + "-当前剩余队列数:" + queryQueue.size()); |
| | | while (queryQueue.size() >= 10) { |
| | | SpringMVCContextHolder.getSystemLogger().error("DA-队列已满-" + tableName + "-当前剩余队列数:" + queryQueue.size()); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 查询开始,向存活map中添加1 |
| | | * |
| | | * @param tableName |
| | | */ |
| | | private void startQuery(String tableName, String threadInfo) { |
| | |
| | | |
| | | /** |
| | | * 查询结束,向存活map中减少1 |
| | | * |
| | | * @param tableName |
| | | */ |
| | | private void finalQuery(String tableName, String threadInfo) { |
| | |
| | | |
| | | /** |
| | | * 允许执行查询(避免队列中等待插入的太多,导致内存溢出) |
| | | * |
| | | * @param tableName |
| | | * @return |
| | | */ |
| | |
| | | |
| | | /** |
| | | * 插入错误日志 |
| | | * |
| | | * @param tableName |
| | | * @param error |
| | | */ |
| | |
| | | |
| | | /** |
| | | * 获取页数,向上取整 |
| | | * |
| | | * @param count |
| | | * @param size |
| | | * @return |