import com.datagaps.core.engine.dao.CommonDAO import com.datagaps.core.engine.utils.FileHelper.fileSeparator import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession} import com.datagaps.core.engine.security.DecryptUtils import com.datagaps.core.engine.service.DataNerveUtils import com.datagaps.core.engine.utils.CodeUtils import org.apache.commons.lang.StringUtils import java.io.{BufferedReader, File, InputStreamReader} import java.io.{File, FileInputStream} import java.util.Properties import jcifs.config.PropertyConfiguration import jcifs.context.BaseContext import org.apache.commons.io.FileUtils import jcifs.smb.{NtlmPasswordAuthenticator, SmbFile, SmbFileInputStream} val connection = CommonDAO.getDataSourceByName("CSVwithdomain71_2") // local csv data source name var tempFilePath = "" var userName = "" var password = "" if(connection.options.contains("userName")) userName = connection.options("userName") else ""; if(connection.options.contains("password")) password = CodeUtils.getDecryptedPassword(connection.options("password")) else ""; var filePath = connection.options("path") //+ component.options.get("path").replace("\r", "") var line = ""; if(connection.sourceType.contains("Shared Folder")) { if(StringUtils.equalsIgnoreCase(connection.options("impersonate"),"false")){ val file = new File(filePath.replace("smb:","")) var is = new FileInputStream(file) tempFilePath = filePath + "/API.csv"; val source = scala.io.Source.fromFile(tempFilePath); var stream = source.bufferedReader() line = stream.readLine() }else { val jcifsProperties = new Properties(); jcifsProperties.setProperty("jcifs.smb.client.enableSMB2", "true"); jcifsProperties.setProperty("jcifs.smb.client.useSMB2Negotiation", "true"); jcifsProperties.setProperty("jcifs.smb.client.dfs.disabled", "false"); jcifsProperties.setProperty("jcifs.smb.client.disableSMB1", "true"); jcifsProperties.setProperty("jcifs.traceResources", "true"); jcifsProperties.setProperty("jcifs.smb.client.ipcSigningEnforced", "false") val config = new PropertyConfiguration(jcifsProperties) var baseCxt = new BaseContext(config) var auth = baseCxt.withCredentials(new NtlmPasswordAuthenticator("", userName, password)) tempFilePath = filePath + "/API.csv"; var smbFile = new SmbFile(tempFilePath, auth) val is = new SmbFileInputStream(smbFile); var bufRdr = new BufferedReader(new InputStreamReader(is)); smbFile.close() var count =0 line = bufRdr.readLine() while (line != null) { if(count > 0) { var record = line.split(',') val datasource = CommonDAO.getDataSourceByName(record(0)) val jdbcDatasource = CommonDAO.getDataSourceByName(record(4)) //val location = datasource.options.get("path").get+fileSeparator+record(1) if(StringUtils.equalsIgnoreCase(datasource.options("impersonate"),"false")){ val file = new File(filePath.replace("smb:","")) var is = new FileInputStream(file) tempFilePath = filePath + "/API.csv"; val source = scala.io.Source.fromFile(tempFilePath); var stream = source.bufferedReader() line = stream.readLine() }else { val jcifsProperties = new Properties(); jcifsProperties.setProperty("jcifs.smb.client.enableSMB2", "true"); jcifsProperties.setProperty("jcifs.smb.client.useSMB2Negotiation", "true"); jcifsProperties.setProperty("jcifs.smb.client.dfs.disabled", "false"); jcifsProperties.setProperty("jcifs.smb.client.disableSMB1", "true"); jcifsProperties.setProperty("jcifs.traceResources", "true"); jcifsProperties.setProperty("jcifs.smb.client.ipcSigningEnforced", "false") val config = new PropertyConfiguration(jcifsProperties) var baseCxt = new BaseContext(config) var auth = baseCxt.withCredentials(new NtlmPasswordAuthenticator("", datasource.options("userName"), CodeUtils.getDecryptedPassword(datasource.options("password")))) var smbFile1 = new SmbFile(datasource.options.get("path").get+fileSeparator+record(1), auth) val is1 = new SmbFileInputStream(smbFile1) var targetPath = datasource.options.get("path").get+fileSeparator+record(1); val targetFile = new File("/home/datagaps/files/"+record(1)) FileUtils.copyInputStreamToFile(is1, targetFile) var delimiter = record(2); if(record(2) == "") delimiter = "," val dataframe = spark.read.format(datasource.format).option("delimiter", delimiter).option("header", record(3)).load(targetFile.getPath()) var connection = jdbcDatasource.options val password = CodeUtils.getDecryptedPassword(connection("password")) dataframe.write.format("jdbc").mode(SaveMode.Overwrite).option("url", connection("url")).option("driver", connection("driver")).option("user", connection("userName")).option("password", password).option("dbtable", record(5)+"."+record(6)).save() } } line = bufRdr.readLine() count = count + 1 } } }