package com.product.data.sync.service; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.google.common.collect.Lists; import com.product.common.lang.StringUtils; import com.product.core.config.Global; import com.product.core.connection.ConnectionManager; import com.product.core.dao.BaseDao; import com.product.core.entity.DataTableEntity; import com.product.core.entity.FieldSetEntity; import com.product.core.exception.BaseException; import com.product.core.permission.PermissionService; import com.product.core.service.support.AbstractBaseService; import com.product.core.service.support.QueryFilterService; import com.product.core.spring.context.SpringMVCContextHolder; import com.product.core.transfer.Transactional; import com.product.data.sync.config.CmnConst; import com.product.data.sync.config.SystemCode; import com.product.data.sync.service.ide.IConnectionConfigurationService; import com.product.data.sync.util.DataManipulationUtils; import com.product.util.BaseUtil; import org.springframework.stereotype.Service; import org.springframework.beans.factory.annotation.Autowired; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.Date; import java.util.List; /** * Copyright LX * * @Title: ConnectionConfigurationService * @Project: product-server * @date: 2021-08-11 16:35 * @author: luoxin * @Description: 数据库同步配置 */ @Service public class ConnectionConfigurationService extends AbstractBaseService implements IConnectionConfigurationService { @Autowired public BaseDao baseDao; @Autowired PermissionService permissionService; @Autowired QueryFilterService queryFilterService; /** * 数据同步连接配置 --本地同步 * * @param fse * @throws BaseException */ private void currentConnectionConfigTables(FieldSetEntity fse, String condition) throws BaseException { BaseUtil.createCreatorAndCreationTime(fse); baseDao.saveFieldSetEntity(fse); String uuid = fse.getUUID(); DataTableEntity dt = baseDao.listTable("SELECT table_name,? as database_config_uuid from product_sys_datamodel_table_field_v where length(table_name)>0 " + condition + " GROUP BY table_name ", new Object[]{uuid}); dt.getMeta().setTableName(new Object[]{CmnConst.PRODUCT_SYS_DATABASE_TABLENAME_FIELD}); add(dt); } /** * 同步本库字段名 * * @param fse * @param tableName 表名 * @throws BaseException */ private void currentConnectionConfigFields(FieldSetEntity fse, String tableName) throws BaseException { BaseUtil.createCreatorAndCreationTime(fse); if (!StringUtils.isEmpty(fse.getUUID())) { // 删除已存在的配置 重新插入最新配置 baseDao.delete(CmnConst.PRODUCT_SYS_DATABASE_TABLENAME_FIELD, "database_config_uuid=?", new Object[]{fse.getUUID()}); } baseDao.saveFieldSetEntity(fse); String uuid = fse.getUUID(); DataTableEntity dt = baseDao.listTable("SELECT table_name,field_name,? as database_config_uuid from product_sys_datamodel_table_field_v where table_name = ? ", new Object[]{uuid, tableName}); dt.getMeta().setTableName(new Object[]{CmnConst.PRODUCT_SYS_DATABASE_TABLENAME_FIELD}); add(dt); } @Override @Transactional public String saveConnectionConfiguration(FieldSetEntity fs) throws BaseException, SQLException, ClassNotFoundException { String uuid = fs.getUUID(); Connection conn = this.runConnectionConfiguration(fs); String sourceType = Global.getSystemConfig("data.source.type", ""); if (BaseUtil.strIsNull(uuid)) { //数据库类型相同 if (sourceType != null && sourceType.equalsIgnoreCase(fs.getString(CmnConst.DATABASE_TYPE))) { if ("/".equals(fs.getString(CmnConst.IP_ADDRESS))) { // 修改了 product_sys_database_tablename_field 的表结构需要修改 此方法 this.currentConnectionConfigTables(fs, ""); return fs.getUUID(); } } fs.setValue("org_level_uuid", SpringMVCContextHolder.getCurrentUser().getOrg_level_uuid()); fs.setValue("created_by", SpringMVCContextHolder.getCurrentUser().getUser_id()); fs.setValue("created_utc_datetime", new Date()); uuid = baseDao.add(fs); if (conn != null) { this.saveTableName(conn, fs, null); } else { throw new BaseException(SystemCode.GET_JDBC_CONNECT_FAIL.getValue(), SystemCode.GET_JDBC_CONNECT_FAIL.getText()); } } else { DataTableEntity sync_config = baseDao.listTable(CmnConst.PRODUCT_SYS_DATABASE_SYNC_CONFIG, " database_config_uuid = ? ", new String[]{uuid}); if (BaseUtil.dataTableIsEmpty(sync_config)) { //直接删除表 从新保存表名 baseDao.delete(CmnConst.PRODUCT_SYS_DATABASE_TABLENAME_FIELD, " database_config_uuid = ? ", new String[]{uuid}); //数据库类型相同 if (sourceType != null && sourceType.equalsIgnoreCase(fs.getString(CmnConst.DATABASE_TYPE))) { if ("/".equals(fs.getString(CmnConst.IP_ADDRESS))) { // 修改了 product_sys_database_tablename_field 的表结构需要修改 此方法 this.currentConnectionConfigTables(fs, ""); return fs.getUUID(); } } if (conn != null) { this.saveTableName(conn, fs, null); } else { throw new BaseException(SystemCode.GET_JDBC_CONNECT_FAIL.getValue(), SystemCode.GET_JDBC_CONNECT_FAIL.getText()); } fs.setValue("updated_by", SpringMVCContextHolder.getCurrentUser().getUser_id()); fs.setValue("updated_utc_datetime", new Date()); baseDao.update(fs); try { conn.close(); } catch (SQLException e) { throw new BaseException(e.toString(), ""); } } else { //已创建同步的表不更新 List originName = Lists.newArrayList(); for (int i = 0; i < sync_config.getRows(); i++) { FieldSetEntity syncFse = sync_config.getFieldSetEntity(i); originName.add(syncFse.getString(CmnConst.DATA_ORIGIN_NAME)); } String and = BaseUtil.buildQuestionMarkFilter("", originName.toArray(), false); //先删除其他表 baseDao.delete(CmnConst.PRODUCT_SYS_DATABASE_TABLENAME_FIELD, " database_config_uuid = ? AND table_name " + and, new String[]{uuid}); //数据库类型相同 if (sourceType != null && sourceType.equalsIgnoreCase(fs.getString(CmnConst.DATABASE_TYPE))) { if ("/".equals(fs.getString(CmnConst.IP_ADDRESS))) { // 修改了 product_sys_database_tablename_field 的表结构需要修改 此方法 this.currentConnectionConfigTables(fs, " and table_name " + and); return fs.getUUID(); } } if (conn != null) { this.saveTableName(conn, fs, and); } else { throw new BaseException(SystemCode.GET_JDBC_CONNECT_FAIL.getValue(), SystemCode.GET_JDBC_CONNECT_FAIL.getText()); } } baseDao.update(fs); } return uuid; } //获取表名和字段名 public void saveTableNameAndFieldName(Connection con, FieldSetEntity fs) throws BaseException { String uuid = fs.getString(CmnConst.UUID); String databaseType = fs.getString(CmnConst.DATABASE_TYPE);//数据库类型 String sql; if ("mysql".equals(databaseType)) { //数据库名称 sql = "select TABLE_NAME,COLUMN_NAME from information_schema.COLUMNS WHERE TABLE_SCHEMA = '" + fs.getString(CmnConst.DATABASE_NAME) + "'"; } else if ("oracle".equals(databaseType)) { //用户名 分大小写 sql = "select TABLE_NAME,COLUMN_NAME from all_tab_columns"; } else if ("sqlserver".equals(databaseType)) { //数据库名称 sql = "SELECT a.name TABLE_NAME,b.name COLUMN_NAME FROM " + fs.getString(CmnConst.DATABASE_NAME) + "..SysObjects a " + "LEFT JOIN " + fs.getString(CmnConst.DATABASE_NAME) + "..SysColumns b ON a.id = b.id Where a.XType='U' " + "ORDER BY a.name "; } else if ("informix".equals(databaseType)) { //数据库名称 sql = "SELECT t.tabname TABLE_NAME,c.colname COLUMN_NAME" + " FROM \"informix\".systables AS t\n" + " JOIN \"informix\".syscolumns AS c ON t.tabid = c.tabid\n" + " WHERE t.tabtype = 'T'\n" + " AND t.tabid >= 100\n" + " ORDER BY t.tabname,c.colno;"; } else { throw new BaseException(SystemCode.SYSTEM_UNKNOWN_DATABASE_TYPE.getValue(), SystemCode.SYSTEM_UNKNOWN_DATABASE_TYPE.getText()); } try { PreparedStatement ps = con.prepareStatement(sql); ResultSet resultSet = ps.executeQuery(); while (resultSet.next()) { FieldSetEntity field = new FieldSetEntity(); field.setTableName(CmnConst.PRODUCT_SYS_DATABASE_TABLENAME_FIELD); field.setValue(CmnConst.DATABASE_CONFIG_UUID, uuid); //表名 field.setValue(CmnConst.TABLE_NAME, resultSet.getString("TABLE_NAME")); //字段名 field.setValue(CmnConst.FIELD_NAME, resultSet.getString("COLUMN_NAME")); baseDao.add(field); } } catch (SQLException e) { try { con.close(); } catch (SQLException ex) { ex.printStackTrace(); throw new BaseException(SystemCode.GET_DATA_SOURCE_FAIL.getValue(), SystemCode.GET_DATA_SOURCE_FAIL.getText() + ex.getMessage()); } throw new BaseException(SystemCode.GET_DATA_SOURCE_FAIL.getValue(), SystemCode.GET_DATA_SOURCE_FAIL.getText() + e.getMessage()); } } //获取并保存表名 public void saveTableName(Connection con, FieldSetEntity fs, String condition) throws BaseException { String uuid = fs.getString(CmnConst.UUID); String databaseType = fs.getString(CmnConst.DATABASE_TYPE);//数据库类型 String sql; if ("mysql".equals(databaseType)) { //数据库名称 if (!BaseUtil.strIsNull(condition)) { condition = " AND TABLE_NAME " + condition; } sql = "select TABLE_NAME from information_schema.COLUMNS WHERE TABLE_SCHEMA = '" + fs.getString(CmnConst.DATABASE_NAME) + "' " + condition + " GROUP BY TABLE_NAME"; } else if ("oracle".equals(databaseType)) { //用户名 分大小写 if (!BaseUtil.strIsNull(condition)) { condition = " AND TABLE_NAME " + condition; } sql = "select TABLE_NAME from all_tab_columns WHERE OWNER = '" + fs.getString(CmnConst.USER_NAME) + "' " + condition + " GROUP BY TABLE_NAME "; } else if ("sqlserver".equals(databaseType)) { //数据库名称 if (!BaseUtil.strIsNull(condition)) { condition = " AND name" + condition; } sql = "SELECT name TABLE_NAME FROM " + fs.getString(CmnConst.DATABASE_NAME) + "..SysObjects Where XType='U' " + condition; } else if ("informix".equals(databaseType)) { //数据库名称 if (!BaseUtil.strIsNull(condition)) { condition = " AND tabname " + condition; } sql = "SELECT tabname TABLE_NAME FROM \"informix\".systables WHERE tabtype = 'T' AND tabid >= 100 " + condition; } else { throw new BaseException(SystemCode.SYSTEM_UNKNOWN_DATABASE_TYPE.getValue(), SystemCode.SYSTEM_UNKNOWN_DATABASE_TYPE.getText()); } try { PreparedStatement ps = con.prepareStatement(sql); SpringMVCContextHolder.getSystemLogger().info("执行sql:\n\t" + sql); ResultSet resultSet = ps.executeQuery(); while (resultSet.next()) { FieldSetEntity field = new FieldSetEntity(); field.setTableName(CmnConst.PRODUCT_SYS_DATABASE_TABLENAME_FIELD); field.setValue(CmnConst.DATABASE_CONFIG_UUID, uuid); //只取表名 field.setValue(CmnConst.TABLE_NAME, resultSet.getString("TABLE_NAME")); //字段名 // field.setValue(CmnConst.FIELD_NAME,resultSet.getString("COLUMN_NAME")); baseDao.add(field); } } catch (SQLException e) { SpringMVCContextHolder.getSystemLogger().error(e); SpringMVCContextHolder.getSystemLogger().info("执行sql错误:\n\t" + sql); try { con.close(); } catch (SQLException ex) { ex.printStackTrace(); throw new BaseException(SystemCode.GET_DATA_SOURCE_FAIL.getValue(), SystemCode.GET_DATA_SOURCE_FAIL.getText() + ex.getMessage()); } throw new BaseException(SystemCode.GET_DATA_SOURCE_FAIL.getValue(), SystemCode.GET_DATA_SOURCE_FAIL.getText() + e.getMessage()); } } /** * 查询数据源库 获取并保存字段名 * * @param fs * @return * @throws BaseException */ @Transactional @Override public Boolean saveField(FieldSetEntity fs) throws BaseException, SQLException, ClassNotFoundException { //表名 String tableName = fs.getString(CmnConst.TABLE_NAME); //数据库连接uuid String databaseConfigUuid = fs.getString(CmnConst.DATABASE_CONFIG_UUID); //获取连接配置 FieldSetEntity configField = baseDao.getFieldSetEntity(CmnConst.PRODUCT_SYS_DATABASE_CONNECTION_CONFIG, databaseConfigUuid, false); //数据库类型 String databaseType = configField.getString(CmnConst.DATABASE_TYPE); //获取数据源连接 Connection con = this.runConnectionConfiguration(configField); String sourceType = Global.getSystemConfig("data.source.type", ""); if (sourceType != null && sourceType.equalsIgnoreCase(configField.getString(CmnConst.DATABASE_TYPE))) { if ("/".equals(configField.getString(CmnConst.IP_ADDRESS))) { // 修改了 product_sys_database_tablename_field 的表结构需要修改 此方法 this.currentConnectionConfigFields(configField, tableName); return true; } } String sql; if ("mysql".equals(databaseType)) { //数据库名称 sql = "select COLUMN_NAME from information_schema.COLUMNS WHERE TABLE_SCHEMA = '" + configField.getString(CmnConst.DATABASE_NAME) + "' AND TABLE_NAME = '" + tableName + "'"; } else if ("oracle".equals(databaseType)) { //用户名 分大小写 sql = "select COLUMN_NAME from all_tab_columns WHERE OWNER = '" + configField.getString(CmnConst.USER_NAME) + "' AND TABLE_NAME = '" + tableName + "'"; } else if ("sqlserver".equals(databaseType)) { //数据库名称 sql = "SELECT name TABLE_NAME FROM " + fs.getString(CmnConst.DATABASE_NAME) + "..SysColumns WHERE id = ( " + " SELECT id FROM " + fs.getString(CmnConst.DATABASE_NAME) + "..SysObjects WHERE XType='U' and name = '" + tableName + "')"; } else if ("informix".equals(databaseType)) { //数据库名称 sql = " SELECT colname COLUMN_NAME from \"informix\".syscolumns where tabid = ( " + " SELECT tabid FROM \"informix\".systables WHERE tabname = '" + tableName + "'); "; } else { throw new BaseException(SystemCode.SYSTEM_UNKNOWN_DATABASE_TYPE.getValue(), SystemCode.SYSTEM_UNKNOWN_DATABASE_TYPE.getText()); } try { PreparedStatement ps = con.prepareStatement(sql); ResultSet resultSet = ps.executeQuery(); //判断改表名是否需要添加字段 // if(resultSet.getRow() > 0) { //先删除之前的数据 baseDao.delete(CmnConst.PRODUCT_SYS_DATABASE_TABLENAME_FIELD, " database_config_uuid = ? and table_name = ? ", new String[]{databaseConfigUuid, tableName}); while (resultSet.next()) { // String fieldName = fs.getString(CmnConst.FIELD_NAME); // if (BaseUtil.strIsNull(fieldName)) { // fs.setValue(CmnConst.FIELD_NAME, resultSet.getString("COLUMN_NAME")); // baseDao.update(fs); // continue; // } FieldSetEntity field = new FieldSetEntity(); field.setTableName(CmnConst.PRODUCT_SYS_DATABASE_TABLENAME_FIELD); field.setValue(CmnConst.DATABASE_CONFIG_UUID, databaseConfigUuid); //表名 field.setValue(CmnConst.TABLE_NAME, tableName); //字段名 field.setValue(CmnConst.FIELD_NAME, resultSet.getString("COLUMN_NAME")); //再添加新的数据 baseDao.add(field); } // } } catch (SQLException e) { try { con.close(); } catch (SQLException ex) { ex.printStackTrace(); throw new BaseException(SystemCode.GET_DATA_SOURCE_FAIL.getValue(), SystemCode.GET_DATA_SOURCE_FAIL.getText() + ex.getMessage()); } throw new BaseException(SystemCode.GET_DATA_SOURCE_FAIL.getValue(), SystemCode.GET_DATA_SOURCE_FAIL.getText() + e.getMessage()); } return true; } @Override public DataTableEntity listConnectionConfiguration(FieldSetEntity fs) throws BaseException { String queryFilter; if (BaseUtil.dataTableIsEmpty(fs.getSubDataTable("systemSeniorQueryString"))) { queryFilter = ""; } else { queryFilter = queryFilterService.getQueryFilter(fs); } DataTableEntity dt = baseDao.listTable(CmnConst.PRODUCT_SYS_DATABASE_CONNECTION_CONFIG, queryFilter, null, null, null, fs.getInteger(CmnConst.PAGESIZE), fs.getInteger(CmnConst.CPAGE)); return dt; } @Override public FieldSetEntity getConnectionConfiguration(FieldSetEntity fs) throws BaseException { return baseDao.getFieldSetEntity(CmnConst.PRODUCT_SYS_DATABASE_CONNECTION_CONFIG, fs.getUUID(), false); } @Override @Transactional public boolean delConnectionConfiguration(FieldSetEntity fs) throws BaseException { String uuid = fs.getUUID(); String[] uuids = uuid.split(","); DataTableEntity dataTableEntity = baseDao.listTable(CmnConst.PRODUCT_SYS_DATABASE_SYNC_CONFIG, BaseUtil.buildQuestionMarkFilter(CmnConst.DATABASE_CONFIG_UUID, uuids.length, true), uuids); if (BaseUtil.dataTableIsEmpty(dataTableEntity)) { //删除该连接对应的表名跟字段名 baseDao.delete(CmnConst.PRODUCT_SYS_DATABASE_TABLENAME_FIELD, BaseUtil.buildQuestionMarkFilter(CmnConst.DATABASE_CONFIG_UUID, uuids.length, true), uuids); //删除连接配置 return baseDao.delete(CmnConst.PRODUCT_SYS_DATABASE_CONNECTION_CONFIG, uuids); } else { throw new BaseException(SystemCode.SYNC_EXISTING_CONFIGURATION_FAIL.getValue(), SystemCode.SYNC_EXISTING_CONFIGURATION_FAIL.getText()); } } public Connection runConnectionConfiguration(FieldSetEntity fs) throws BaseException, SQLException, ClassNotFoundException { String databaseType = fs.getString(CmnConst.DATABASE_TYPE);//数据库类型 String ipAddress = fs.getString(CmnConst.IP_ADDRESS);//ip地址 String databaseName = fs.getString(CmnConst.DATABASE_NAME);//数据库名称 String portNumber = fs.getString(CmnConst.PORT_NUMBER);//端口号 String userName = fs.getString(CmnConst.USER_NAME);//用户名 String userPassword = fs.getString(CmnConst.USER_PASSWORD);//密码 String instantiation = fs.getString(CmnConst.INSTANTIATION);//实例名 String url; String diver; String sourceType = Global.getSystemConfig("data.source.type", ""); if (sourceType != null && sourceType.equalsIgnoreCase(fs.getString(CmnConst.DATABASE_TYPE))) { if ("/".equals(fs.getString(CmnConst.IP_ADDRESS))) { return ConnectionManager.getConnection(); } } // if("null".equals(databaseType) && !BaseUtil.strIsNull(jdbcConnect)){ // url = jdbcConnect; // if(jdbcConnect.contains("mysql")){ // diver = "com.mysql.cj.jdbc.Driver"; // }else if(jdbcConnect.contains("oracle")){ // diver = "oracle.jdbc.driver.OracleDriver"; // }else if(jdbcConnect.contains("sqlserver")){ // diver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"; // }else{ // throw new BaseException("", ""); // } // }else if ("mysql".equals(databaseType)) { diver = "com.mysql.cj.jdbc.Driver"; url = "jdbc:mysql://" + ipAddress + ":" + portNumber + "/" + databaseName + "?useSSL=false&serverTimezone=UTC"; } else if ("oracle".equals(databaseType)) { diver = "oracle.jdbc.driver.OracleDriver"; url = "jdbc:oracle:thin:@" + ipAddress + ":" + portNumber + ":orcl"; } else if ("sqlserver".equals(databaseType)) { diver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"; url = "jdbc:sqlserver://" + ipAddress + ":" + portNumber + ";DataBaseName=" + databaseName; } else if ("informix".equals(databaseType)) { diver = "com.informix.jdbc.IfxDriver"; url = "jdbc:informix-sqli://" + ipAddress + ":" + portNumber + "/" + databaseName + ":informixserver=" + instantiation; } else { throw new BaseException(SystemCode.SYSTEM_UNKNOWN_DATABASE_TYPE.getValue(), SystemCode.SYSTEM_UNKNOWN_DATABASE_TYPE.getText()); } //获取jdbc连接 return DataManipulationUtils.getConnection(diver, url, userName, userPassword); } public JSONArray getSyncTree() throws BaseException { DataTableEntity dataTableEntity = baseDao.listTable(CmnConst.PRODUCT_SYS_DATABASE_CONNECTION_CONFIG); JSONArray linkArray = new JSONArray(); for (int i = 0; i < dataTableEntity.getRows(); i++) { JSONObject configObject = new JSONObject(); FieldSetEntity fieldSetEntity = dataTableEntity.getFieldSetEntity(i); String uuid = fieldSetEntity.getUUID(); configObject.put("uuid", uuid);//uuid configObject.put("label", fieldSetEntity.getString("connect_name"));//连接名称 JSONArray syncArray = new JSONArray(); DataTableEntity syncData = baseDao.listTable(CmnConst.PRODUCT_SYS_DATABASE_SYNC_CONFIG, " database_config_uuid = ? ", new String[]{uuid}); for (int j = 0; j < syncData.getRows(); j++) { JSONObject jsonObject = new JSONObject(); FieldSetEntity syncField = syncData.getFieldSetEntity(j); jsonObject.put("uuid", syncField.getString("uuid")); jsonObject.put("label", syncField.getString("system_table_name")); syncArray.add(jsonObject); } configObject.put("children", syncArray); linkArray.add(configObject); } return linkArray; } }