Commit 3137bb21 by lina.li Committed by Sarath Subramanian

ATLAS-3183: Read Impala lineage record for creating view and send to Atlas

parent d5437734
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>apache-atlas</artifactId>
<groupId>org.apache.atlas</groupId>
<version>2.0.0-SNAPSHOT</version>
<relativePath>../../</relativePath>
</parent>
<artifactId>impala-bridge</artifactId>
<description>Apache Atlas Impala Bridge Module</description>
<name>Apache Atlas Impala Bridge</name>
<packaging>jar</packaging>
<properties>
<!-- log4j 2.9 and later are multi-release jars for Java 9. Our Jetty version don't support
that. Therefore, we have to use log4j 2.8 in integration test -->
<log4j.version>2.8</log4j.version>
</properties>
<dependencies>
<!-- Logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-notification</artifactId>
</dependency>
<!-- to bring up atlas server for integration tests -->
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-client-v2</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>hdfs-model</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
<version>${jersey.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>hive-bridge</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-webapp</artifactId>
<type>war</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-annotations</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>${commons-io.version}</version>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>${commons-cli.version}</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>${commons-lang.version}</version>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-graphdb-impls</artifactId>
<type>pom</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-intg</artifactId>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-repository</artifactId>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>${hive.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>javax.ws.rs</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-cli</artifactId>
<version>${hive.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty.aggregate</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<profiles>
<profile>
<id>dist</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-hook</id>
<phase>package</phase>
<goals>
<goal>copy</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/dependency/hook/impala/atlas-impala-plugin-impl</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<artifactItems>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>${project.artifactId}</artifactId>
<version>${project.version}</version>
</artifactItem>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>atlas-client-common</artifactId>
<version>${project.version}</version>
</artifactItem>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>atlas-client-v1</artifactId>
<version>${project.version}</version>
</artifactItem>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>atlas-client-v2</artifactId>
<version>${project.version}</version>
</artifactItem>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>atlas-intg</artifactId>
<version>${project.version}</version>
</artifactItem>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>atlas-notification</artifactId>
<version>${project.version}</version>
</artifactItem>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>hdfs-model</artifactId>
<version>${project.version}</version>
</artifactItem>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>atlas-common</artifactId>
<version>${project.version}</version>
</artifactItem>
<artifactItem>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${kafka.scala.binary.version}</artifactId>
<version>${kafka.version}</version>
</artifactItem>
<artifactItem>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</artifactItem>
<artifactItem>
<groupId>com.sun.jersey.contribs</groupId>
<artifactId>jersey-multipart</artifactId>
<version>${jersey.version}</version>
</artifactItem>
<artifactItem>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</artifactItem>
<artifactItem>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</artifactItem>
<artifactItem>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
</artifactItem>
<artifactItem>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
<version>${commons-conf.version}</version>
</artifactItem>
<artifactItem>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>${hbase.version}</version>
</artifactItem>
<artifactItem>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</artifactItem>
<artifactItem>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-json</artifactId>
<version>${jersey.version}</version>
</artifactItem>
<artifactItem>
<groupId>javax.ws.rs</groupId>
<artifactId>jsr311-api</artifactId>
<version>${jsr.version}</version>
</artifactItem>
</artifactItems>
</configuration>
</execution>
<execution>
<id>copy-hook-shim</id>
<phase>package</phase>
<goals>
<goal>copy</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/dependency/hook/impala</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<artifactItems>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>impala-bridge-shim</artifactId>
<version>${project.version}</version>
</artifactItem>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>atlas-plugin-classloader</artifactId>
<version>${project.version}</version>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
<build>
<plugins>
<plugin>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-maven-plugin</artifactId>
<configuration>
<skip>${skipTests}</skip>
<!--only skip int tests -->
<httpConnector>
<port>31000</port>
<idleTimeout>60000</idleTimeout>
</httpConnector>
<war>../../webapp/target/atlas-webapp-${project.version}.war</war>
<daemon>true</daemon>
<webAppSourceDirectory>../../webapp/src/test/webapp</webAppSourceDirectory>
<webApp>
<contextPath>/</contextPath>
<descriptor>${project.basedir}/../../webapp/src/test/webapp/WEB-INF/web.xml</descriptor>
</webApp>
<useTestScope>true</useTestScope>
<systemProperties>
<force>true</force>
<systemProperty>
<name>atlas.home</name>
<value>${project.build.directory}</value>
</systemProperty>
<systemProperty>
<key>atlas.conf</key>
<value>${project.build.directory}/test-classes</value>
</systemProperty>
<systemProperty>
<name>atlas.data</name>
<value>${project.build.directory}/data</value>
</systemProperty>
<systemProperty>
<name>atlas.log.dir</name>
<value>${project.build.directory}/logs</value>
</systemProperty>
<systemProperty>
<name>atlas.log.file</name>
<value>application.log</value>
</systemProperty>
<systemProperty>
<name>log4j.configuration</name>
<value>file:///${project.build.directory}/../../../distro/src/conf/atlas-log4j.xml</value>
</systemProperty>
<systemProperty>
<name>atlas.graphdb.backend</name>
<value>${graphdb.backend.impl}</value>
</systemProperty>
<systemProperty>
<key>embedded.solr.directory</key>
<value>${project.build.directory}</value>
</systemProperty>
</systemProperties>
<stopKey>atlas-stop</stopKey>
<stopPort>31001</stopPort>
<stopWait>${jetty-maven-plugin.stopWait}</stopWait>
<daemon>${debug.jetty.daemon}</daemon>
<testClassesDirectory>${project.build.testOutputDirectory}</testClassesDirectory>
<useTestClasspath>true</useTestClasspath>
</configuration>
<dependencies>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
</dependency>
</dependencies>
<executions>
<execution>
<id>start-jetty</id>
<phase>pre-integration-test</phase>
<goals>
<goal>deploy-war</goal>
</goals>
</execution>
<execution>
<id>stop-jetty</id>
<phase>post-integration-test</phase>
<goals>
<goal>stop</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-site-plugin</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.maven.doxia</groupId>
<artifactId>doxia-module-twiki</artifactId>
<version>${doxia.version}</version>
</dependency>
<dependency>
<groupId>org.apache.maven.doxia</groupId>
<artifactId>doxia-core</artifactId>
<version>${doxia.version}</version>
</dependency>
</dependencies>
<executions>
<execution>
<goals>
<goal>site</goal>
</goals>
<phase>prepare-package</phase>
</execution>
</executions>
<configuration>
<generateProjectInfo>false</generateProjectInfo>
<generateReports>false</generateReports>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<inherited>false</inherited>
<executions>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<executions>
<execution>
<id>copy-resources</id>
<phase>validate</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<outputDirectory>${basedir}/target/models</outputDirectory>
<resources>
<resource>
<directory>${basedir}/../models</directory>
<includes>
<include>0000-Area0/0010-base_model.json</include>
<include>1000-Hadoop/**</include>
</includes>
</resource>
</resources>
</configuration>
</execution>
<execution>
<id>copy-solr-resources</id>
<phase>validate</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/solr</outputDirectory>
<resources>
<resource>
<directory>${basedir}/../../test-tools/src/main/resources/solr</directory>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.impala;
import java.lang.Runnable;
import org.apache.atlas.impala.hook.ImpalaLineageHook;
import java.io.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOCase;
import org.apache.commons.io.comparator.LastModifiedFileComparator;
import org.apache.commons.io.filefilter.PrefixFileFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Entry point of actual implementation of Impala lineage tool. It reads the lineage records in
* lineage log. It then calls instance of ImpalaLineageHook to convert lineage records to
* lineage notifications and send them to Atlas.
*/
public class ImpalaLineageTool {
private static final Logger LOG = LoggerFactory.getLogger(ImpalaLineageTool.class);
private static final String WAL_FILE_EXTENSION = ".wal";
private static final String WAL_FILE_PREFIX = "WAL";
private String directoryName;
private String prefix;
public ImpalaLineageTool(String[] args) {
try {
Options options = new Options();
options.addOption("d", "directory", true, "the lineage files' folder");
options.addOption("p", "prefix", true, "the prefix of the lineage files");
CommandLine cmd = new DefaultParser().parse(options, args);
directoryName = cmd.getOptionValue("d");
prefix = cmd.getOptionValue("p");
} catch(ParseException e) {
LOG.warn("Failed to parse command arguments. Error: ", e.getMessage());
printUsage();
throw new RuntimeException(e);
}
}
public void run() {
ImpalaLineageHook impalaLineageHook = new ImpalaLineageHook();
File[] currentFiles = getCurrentFiles();
int fileNum = currentFiles.length;
for(int i = 0; i < fileNum; i++) {
String filename = currentFiles[i].getAbsolutePath();
String walFilename = directoryName + WAL_FILE_PREFIX + currentFiles[i].getName() + WAL_FILE_EXTENSION;
LOG.info("Importing: {}", filename);
importHImpalaEntities(impalaLineageHook, filename, walFilename);
if(i != fileNum - 1) {
deleteLineageAndWal(currentFiles[i], walFilename);
}
}
LOG.info("Impala bridge processing: Done! ");
}
public static void main(String[] args) {
if (args != null && args.length != 4) {
// The lineage file location and prefix should be input as the parameters
System.out.println("Impala bridge: wrong number of arguments. Please try again");
printUsage();
return;
}
ImpalaLineageTool instance = new ImpalaLineageTool(args);
instance.run();
}
/**
* Delete the used lineage file and wal file
* @param currentFile The current file
* @param wal The wal file
*/
public static void deleteLineageAndWal(File currentFile, String wal) {
if(currentFile.exists() && currentFile.delete()) {
LOG.info("Lineage file {} is deleted successfully", currentFile.getPath());
} else {
LOG.info("Failed to delete the lineage file {}", currentFile.getPath());
}
File file = new File(wal);
if(file.exists() && file.delete()) {
LOG.info("Wal file {} deleted successfully", wal);
} else {
LOG.info("Failed to delete the wal file {}", wal);
}
}
private static void printUsage() {
System.out.println();
System.out.println();
System.out.println("Usage: import-impala.sh [-d <directory>] [-p <prefix>]" );
System.out.println(" Imports specified lineage files by given directory and file prefix.");
System.out.println();
}
/**
* This function figures out the right lineage file path+name to process sorted by the last
* time they are modified. (old -> new)
* @return get the lineage files from given directory with given prefix.
*/
public File[] getCurrentFiles() {
try {
LOG.info("Scanning: " + directoryName);
File folder = new File(directoryName);
File[] listOfFiles = folder.listFiles((FileFilter) new PrefixFileFilter(prefix, IOCase.SENSITIVE));
if ((listOfFiles == null) || (listOfFiles.length == 0)) {
LOG.info("Found no lineage files.");
return new File[0];
}
if(listOfFiles.length > 1) {
Arrays.sort(listOfFiles, LastModifiedFileComparator.LASTMODIFIED_COMPARATOR);
}
LOG.info("Found {} lineage files" + listOfFiles.length);
return listOfFiles;
} catch(Exception e) {
LOG.error("Import lineage file failed.", e);
}
return new File[0];
}
private boolean processImpalaLineageHook(ImpalaLineageHook impalaLineageHook, List<String> lineageList) {
boolean allSucceed = true;
// returns true if successfully sent to Atlas
for (String lineageRecord : lineageList) {
try {
impalaLineageHook.process(lineageRecord);
} catch (Exception ex) {
String errorMessage = String.format("Exception at query {} \n", lineageRecord);
LOG.error(errorMessage, ex);
allSucceed = false;
}
}
return allSucceed;
}
/**
* Create a list of lineage queries based on the lineage file and the wal file
* @param name
* @param walfile
* @return
*/
public void importHImpalaEntities(ImpalaLineageHook impalaLineageHook, String name, String walfile) {
List<String> lineageList = new ArrayList<>();
try {
File lineageFile = new File(name); //use current file length to minus the offset
File walFile = new File(walfile);
// if the wal file does not exist, create one with 0 byte read, else, read the number
if(!walFile.exists()) {
BufferedWriter writer = new BufferedWriter(new FileWriter(walfile));
writer.write("0, " + name);
writer.close();
}
LOG.debug("Reading: " + name);
String lineageRecord = FileUtils.readFileToString(lineageFile, "UTF-8");
lineageList.add(lineageRecord);
// call instance of ImpalaLineageHook to process the list of Impala lineage record
if(processImpalaLineageHook(impalaLineageHook, lineageList)) {
// write how many bytes the current file is to the wal file
FileWriter newWalFile = new FileWriter(walfile, true);
BufferedWriter newWalFileBuf = new BufferedWriter(newWalFile);
newWalFileBuf.newLine();
newWalFileBuf.write(String.valueOf(lineageFile.length()) + "," + name);
newWalFileBuf.close();
newWalFile.close();
} else {
LOG.error("Error sending some of impala lineage records to ImpalaHook");
}
} catch (Exception e) {
LOG.error("Error in processing lineage records. Exception: " + e.getMessage());
}
}
}
\ No newline at end of file
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.impala.hook;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.atlas.impala.model.ImpalaOperationType;
import org.apache.atlas.impala.model.ImpalaQuery;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.commons.lang.StringUtils;
/**
* Contain the info related to an linear record from Impala
*/
public class AtlasImpalaHookContext {
public static final char QNAME_SEP_CLUSTER_NAME = '@';
public static final char QNAME_SEP_ENTITY_NAME = '.';
public static final char QNAME_SEP_PROCESS = ':';
private final ImpalaLineageHook hook;
private final ImpalaOperationType impalaOperation;
private final ImpalaQuery lineageQuery;
private final Map<String, AtlasEntity> qNameEntityMap = new HashMap<>();
public AtlasImpalaHookContext(ImpalaLineageHook hook, ImpalaOperationType operationType,
ImpalaQuery lineageQuery) throws Exception {
this.hook = hook;
this.impalaOperation = operationType;
this.lineageQuery = lineageQuery;
}
public ImpalaQuery getLineageQuery() {
return lineageQuery;
}
public String getQueryStr() { return lineageQuery.getQueryText(); }
public ImpalaOperationType getImpalaOperationType() {
return impalaOperation;
}
public void putEntity(String qualifiedName, AtlasEntity entity) {
qNameEntityMap.put(qualifiedName, entity);
}
public AtlasEntity getEntity(String qualifiedName) {
return qNameEntityMap.get(qualifiedName);
}
public Collection<AtlasEntity> getEntities() { return qNameEntityMap.values(); }
public String getClusterName() {
return hook.getClusterName();
}
public boolean isConvertHdfsPathToLowerCase() {
return hook.isConvertHdfsPathToLowerCase();
}
public String getQualifiedNameForDb(String dbName) {
return (dbName + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName();
}
public String getQualifiedNameForTable(String fullTableName) throws IllegalArgumentException {
if (fullTableName == null) {
throw new IllegalArgumentException("fullTableName is null");
}
int sepPos = fullTableName.lastIndexOf(QNAME_SEP_ENTITY_NAME);
if (!isSeparatorIndexValid(sepPos)) {
throw new IllegalArgumentException(fullTableName + " does not contain database name");
}
return getQualifiedNameForTable(fullTableName.substring(0, sepPos), fullTableName.substring(sepPos+1));
}
public String getQualifiedNameForTable(String dbName, String tableName) {
return (dbName + QNAME_SEP_ENTITY_NAME + tableName + QNAME_SEP_CLUSTER_NAME).toLowerCase() +
getClusterName();
}
public String getQualifiedNameForColumn(String fullColumnName) throws IllegalArgumentException {
if (fullColumnName == null) {
throw new IllegalArgumentException("fullColumnName is null");
}
int sepPosFirst = fullColumnName.indexOf(QNAME_SEP_ENTITY_NAME);
int sepPosLast = fullColumnName.lastIndexOf(QNAME_SEP_ENTITY_NAME);
if (!isSeparatorIndexValid(sepPosFirst) || !isSeparatorIndexValid(sepPosLast) ||
sepPosFirst == sepPosLast) {
throw new IllegalArgumentException(
String.format("fullColumnName {} does not contain database name or table name",
fullColumnName));
}
return getQualifiedNameForColumn(
fullColumnName.substring(0, sepPosFirst),
fullColumnName.substring(sepPosFirst+1, sepPosLast),
fullColumnName.substring(sepPosLast+1));
}
public String getColumnNameOnly(String fullColumnName) throws IllegalArgumentException {
if (fullColumnName == null) {
throw new IllegalArgumentException("fullColumnName is null");
}
int sepPosLast = fullColumnName.lastIndexOf(QNAME_SEP_ENTITY_NAME);
if (!isSeparatorIndexValid(sepPosLast)) {
return fullColumnName;
}
return fullColumnName.substring(sepPosLast+1);
}
public String getQualifiedNameForColumn(String dbName, String tableName, String columnName) {
return
(dbName + QNAME_SEP_ENTITY_NAME + tableName + QNAME_SEP_ENTITY_NAME +
columnName + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName();
}
public String getUserName() { return lineageQuery.getUser(); }
public String getDatabaseNameFromTable(String fullTableName) {
int sepPos = fullTableName.lastIndexOf(QNAME_SEP_ENTITY_NAME);
if (isSeparatorIndexValid(sepPos)) {
return fullTableName.substring(0, sepPos);
}
return null;
}
public String getTableNameFromColumn(String columnName) {
int sepPos = columnName.lastIndexOf(QNAME_SEP_ENTITY_NAME);
if (!isSeparatorIndexValid(sepPos)) {
return null;
}
String tableName = columnName.substring(0, sepPos);
if (!ImpalaIdentifierParser.isTableNameValid(tableName)) {
return null;
}
return tableName;
}
public boolean isSeparatorIndexValid(int index) {
return index > 0;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.impala.hook;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
/**
* Check if a string is a valid Impala table identifier.
* It could be <dbName>.<tableName> or <tableName>
*/
public class ImpalaIdentifierParser {
// http://www.cloudera.com/content/www/en-us/documentation/enterprise/latest/topics/impala_identifiers.html
// https://github.com/apache/impala/blob/64e6719870db5602a6fa85014bc6c264080b9414/tests/common/patterns.py
// VALID_IMPALA_IDENTIFIER_REGEX = re.compile(r'^[a-zA-Z][a-zA-Z0-9_]{,127}$')
// add "." to allow <dbName>.<tableName>
public static final String VALID_IMPALA_IDENTIFIER_REGEX = "^[a-zA-Z][a-zA-Z0-9_.]{0,127}$";
public static boolean isTableNameValid(String inTableName) {
if (StringUtils.isEmpty(inTableName)) {
return false;
}
if (!inTableName.matches(VALID_IMPALA_IDENTIFIER_REGEX)) {
return false;
}
String[] tokens = inTableName.split(".");
if (tokens.length > 2) {
// valid value should be <dbName>.<tableName> or <tableName>
return false;
}
for (String token : tokens) {
if (isReserved(token)) {
return false;
}
}
return true;
}
// The following is extracted from Impala code.
// Mainly from https://github.com/apache/impala/blob/master/fe/src/main/jflex/sql-scanner.flex
// Map from keyword string to token id.
// We use a linked hash map because the insertion order is important.
// for example, we want "and" to come after "&&" to make sure error reporting
// uses "and" as a display name and not "&&".
// Please keep the puts sorted alphabetically by keyword (where the order
// does not affect the desired error reporting)
static HashSet<String> keywordMap;
// map from token id to token description
static HashSet<String> tokenIdMap;
// Reserved words are words that cannot be used as identifiers. It is a superset of
// keywords.
static Set<String> reservedWords;
public static void init() {
// initilize keywords
keywordMap = new HashSet<>();
keywordMap.add("&&");
keywordMap.add("add");
keywordMap.add("aggregate");
keywordMap.add("all");
keywordMap.add("alter");
keywordMap.add("analytic");
keywordMap.add("and");
keywordMap.add("anti");
keywordMap.add("api_version");
keywordMap.add("array");
keywordMap.add("as");
keywordMap.add("asc");
keywordMap.add("authorization");
keywordMap.add("avro");
keywordMap.add("between");
keywordMap.add("bigint");
keywordMap.add("binary");
keywordMap.add("block_size");
keywordMap.add("boolean");
keywordMap.add("by");
keywordMap.add("cached");
keywordMap.add("case");
keywordMap.add("cascade");
keywordMap.add("cast");
keywordMap.add("change");
keywordMap.add("char");
keywordMap.add("class");
keywordMap.add("close_fn");
keywordMap.add("column");
keywordMap.add("columns");
keywordMap.add("comment");
keywordMap.add("compression");
keywordMap.add("compute");
keywordMap.add("copy");
keywordMap.add("create");
keywordMap.add("cross");
keywordMap.add("current");
keywordMap.add("data");
keywordMap.add("database");
keywordMap.add("databases");
keywordMap.add("date");
keywordMap.add("datetime");
keywordMap.add("decimal");
//keywordMap.add("default"); "default" can be database or table name
keywordMap.add("delete");
keywordMap.add("delimited");
keywordMap.add("desc");
keywordMap.add("describe");
keywordMap.add("distinct");
keywordMap.add("div");
keywordMap.add("double");
keywordMap.add("drop");
keywordMap.add("else");
keywordMap.add("encoding");
keywordMap.add("end");
keywordMap.add("escaped");
keywordMap.add("exists");
keywordMap.add("explain");
keywordMap.add("extended");
keywordMap.add("external");
keywordMap.add("false");
keywordMap.add("fields");
keywordMap.add("fileformat");
keywordMap.add("files");
keywordMap.add("finalize_fn");
keywordMap.add("first");
keywordMap.add("float");
keywordMap.add("following");
keywordMap.add("for");
keywordMap.add("format");
keywordMap.add("formatted");
keywordMap.add("from");
keywordMap.add("full");
keywordMap.add("function");
keywordMap.add("functions");
keywordMap.add("grant");
keywordMap.add("group");
keywordMap.add("hash");
keywordMap.add("having");
keywordMap.add("if");
keywordMap.add("ilike");
keywordMap.add("ignore");
keywordMap.add("in");
keywordMap.add("incremental");
keywordMap.add("init_fn");
keywordMap.add("inner");
keywordMap.add("inpath");
keywordMap.add("insert");
keywordMap.add("int");
keywordMap.add("integer");
keywordMap.add("intermediate");
keywordMap.add("interval");
keywordMap.add("into");
keywordMap.add("invalidate");
keywordMap.add("iregexp");
keywordMap.add("is");
keywordMap.add("join");
keywordMap.add("kudu");
keywordMap.add("last");
keywordMap.add("left");
keywordMap.add("like");
keywordMap.add("limit");
keywordMap.add("lines");
keywordMap.add("load");
keywordMap.add("location");
keywordMap.add("map");
keywordMap.add("merge_fn");
keywordMap.add("metadata");
keywordMap.add("not");
keywordMap.add("null");
keywordMap.add("nulls");
keywordMap.add("offset");
keywordMap.add("on");
keywordMap.add("||");
keywordMap.add("or");
keywordMap.add("orc");
keywordMap.add("order");
keywordMap.add("outer");
keywordMap.add("over");
keywordMap.add("overwrite");
keywordMap.add("parquet");
keywordMap.add("parquetfile");
keywordMap.add("partition");
keywordMap.add("partitioned");
keywordMap.add("partitions");
keywordMap.add("preceding");
keywordMap.add("prepare_fn");
keywordMap.add("primary");
keywordMap.add("produced");
keywordMap.add("purge");
keywordMap.add("range");
keywordMap.add("rcfile");
keywordMap.add("real");
keywordMap.add("recover");
keywordMap.add("refresh");
keywordMap.add("regexp");
keywordMap.add("rename");
keywordMap.add("repeatable");
keywordMap.add("replace");
keywordMap.add("replication");
keywordMap.add("restrict");
keywordMap.add("returns");
keywordMap.add("revoke");
keywordMap.add("right");
keywordMap.add("rlike");
keywordMap.add("role");
keywordMap.add("roles");
keywordMap.add("row");
keywordMap.add("rows");
keywordMap.add("schema");
keywordMap.add("schemas");
keywordMap.add("select");
keywordMap.add("semi");
keywordMap.add("sequencefile");
keywordMap.add("serdeproperties");
keywordMap.add("serialize_fn");
keywordMap.add("set");
keywordMap.add("show");
keywordMap.add("smallint");
keywordMap.add("sort");
keywordMap.add("stats");
keywordMap.add("stored");
keywordMap.add("straight_join");
keywordMap.add("string");
keywordMap.add("struct");
keywordMap.add("symbol");
keywordMap.add("table");
keywordMap.add("tables");
keywordMap.add("tablesample");
keywordMap.add("tblproperties");
keywordMap.add("terminated");
keywordMap.add("textfile");
keywordMap.add("then");
keywordMap.add("timestamp");
keywordMap.add("tinyint");
keywordMap.add("to");
keywordMap.add("true");
keywordMap.add("truncate");
keywordMap.add("unbounded");
keywordMap.add("uncached");
keywordMap.add("union");
keywordMap.add("unknown");
keywordMap.add("update");
keywordMap.add("update_fn");
keywordMap.add("upsert");
keywordMap.add("use");
keywordMap.add("using");
keywordMap.add("values");
keywordMap.add("varchar");
keywordMap.add("view");
keywordMap.add("when");
keywordMap.add("where");
keywordMap.add("with");
// Initilize tokenIdMap for error reporting
tokenIdMap = new HashSet<>(keywordMap);
// add non-keyword tokens. Please keep this in the same order as they are used in this
// file.
tokenIdMap.add("EOF");
tokenIdMap.add("...");
tokenIdMap.add(":");
tokenIdMap.add(";");
tokenIdMap.add("COMMA");
tokenIdMap.add(".");
tokenIdMap.add("*");
tokenIdMap.add("(");
tokenIdMap.add(")");
tokenIdMap.add("[");
tokenIdMap.add("]");
tokenIdMap.add("/");
tokenIdMap.add("%");
tokenIdMap.add("+");
tokenIdMap.add("-");
tokenIdMap.add("&");
tokenIdMap.add("|");
tokenIdMap.add("^");
tokenIdMap.add("~");
tokenIdMap.add("=");
tokenIdMap.add("!");
tokenIdMap.add("<");
tokenIdMap.add(">");
tokenIdMap.add("UNMATCHED STRING LITERAL");
tokenIdMap.add("!=");
tokenIdMap.add("INTEGER LITERAL");
tokenIdMap.add("NUMERIC OVERFLOW");
tokenIdMap.add("DECIMAL LITERAL");
tokenIdMap.add("EMPTY IDENTIFIER");
tokenIdMap.add("IDENTIFIER");
tokenIdMap.add("STRING LITERAL");
tokenIdMap.add("COMMENTED_PLAN_HINT_START");
tokenIdMap.add("COMMENTED_PLAN_HINT_END");
tokenIdMap.add("Unexpected character");
// For impala 3.0, reserved words = keywords + sql16ReservedWords - builtinFunctions
// - whitelist
// unused reserved words = reserved words - keywords. These words are reserved for
// forward compatibility purposes.
reservedWords = new HashSet<>(keywordMap);
// Add SQL:2016 reserved words
reservedWords.addAll(Arrays.asList(new String[] {
"abs", "acos", "allocate", "any", "are", "array_agg", "array_max_cardinality",
"asensitive", "asin", "asymmetric", "at", "atan", "atomic", "avg", "begin",
"begin_frame", "begin_partition", "blob", "both", "call", "called", "cardinality",
"cascaded", "ceil", "ceiling", "char_length", "character", "character_length",
"check", "classifier", "clob", "close", "coalesce", "collate", "collect",
"commit", "condition", "connect", "constraint", "contains", "convert", "copy",
"corr", "corresponding", "cos", "cosh", "count", "covar_pop", "covar_samp",
"cube", "cume_dist", "current_catalog", "current_date",
"current_default_transform_group", "current_path", "current_path", "current_role",
"current_role", "current_row", "current_schema", "current_time",
"current_timestamp", "current_transform_group_for_type", "current_user", "cursor",
"cycle", "day", "deallocate", "dec", "decfloat", "declare", "define",
"dense_rank", "deref", "deterministic", "disconnect", "dynamic", "each",
"element", "empty", "end-exec", "end_frame", "end_partition", "equals", "escape",
"every", "except", "exec", "execute", "exp", "extract", "fetch", "filter",
"first_value", "floor", "foreign", "frame_row", "free", "fusion", "get", "global",
"grouping", "groups", "hold", "hour", "identity", "indicator", "initial", "inout",
"insensitive", "integer", "intersect", "intersection", "json_array",
"json_arrayagg", "json_exists", "json_object", "json_objectagg", "json_query",
"json_table", "json_table_primitive", "json_value", "lag", "language", "large",
"last_value", "lateral", "lead", "leading", "like_regex", "listagg", "ln",
"local", "localtime", "localtimestamp", "log", "log10 ", "lower", "match",
"match_number", "match_recognize", "matches", "max", "member", "merge", "method",
"min", "minute", "mod", "modifies", "module", "month", "multiset", "national",
"natural", "nchar", "nclob", "new", "no", "none", "normalize", "nth_value",
"ntile", "nullif", "numeric", "occurrences_regex", "octet_length", "of", "old",
"omit", "one", "only", "open", "out", "overlaps", "overlay", "parameter",
"pattern", "per", "percent", "percent_rank", "percentile_cont", "percentile_disc",
"period", "portion", "position", "position_regex", "power", "precedes",
"precision", "prepare", "procedure", "ptf", "rank", "reads", "real", "recursive",
"ref", "references", "referencing", "regr_avgx", "regr_avgy", "regr_count",
"regr_intercept", "regr_r2", "regr_slope", "regr_sxx", "regr_sxy", "regr_syy",
"release", "result", "return", "rollback", "rollup", "row_number", "running",
"savepoint", "scope", "scroll", "search", "second", "seek", "sensitive",
"session_user", "similar", "sin", "sinh", "skip", "some", "specific",
"specifictype", "sql", "sqlexception", "sqlstate", "sqlwarning", "sqrt", "start",
"static", "stddev_pop", "stddev_samp", "submultiset", "subset", "substring",
"substring_regex", "succeeds", "sum", "symmetric", "system", "system_time",
"system_user", "tan", "tanh", "time", "timezone_hour", "timezone_minute",
"trailing", "translate", "translate_regex", "translation", "treat", "trigger",
"trim", "trim_array", "uescape", "unique", "unknown", "unnest", "update ",
"upper", "user", "value", "value_of", "var_pop", "var_samp", "varbinary",
"varying", "versioning", "whenever", "width_bucket", "window", "within",
"without", "year"}));
// TODO: Remove impala builtin function names. Need to find content of
// BuiltinsDb.getInstance().getAllFunctions()
//reservedWords.removeAll(BuiltinsDb.getInstance().getAllFunctions().keySet());
// Remove whitelist words. These words might be heavily used in production, and
// impala is unlikely to implement SQL features around these words in the near future.
reservedWords.removeAll(Arrays.asList(new String[] {
// time units
"year", "month", "day", "hour", "minute", "second",
"begin", "call", "check", "classifier", "close", "identity", "language",
"localtime", "member", "module", "new", "nullif", "old", "open", "parameter",
"period", "result", "return", "sql", "start", "system", "time", "user", "value"
}));
}
static {
init();
}
static boolean isReserved(String token) {
return token != null && reservedWords.contains(token.toLowerCase());
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.impala.hook;
import static org.apache.atlas.AtlasConstants.DEFAULT_CLUSTER_NAME;
import com.google.common.collect.Sets;
import java.io.IOException;
import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.impala.hook.events.BaseImpalaEvent;
import org.apache.atlas.impala.hook.events.CreateImpalaProcess;
import org.apache.atlas.impala.model.IImpalaLineageHook;
import org.apache.atlas.impala.model.ImpalaOperationType;
import org.apache.atlas.impala.model.ImpalaQuery;
import org.apache.atlas.type.AtlasType;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.security.auth.Subject;
import javax.security.auth.kerberos.KerberosPrincipal;
import java.util.HashSet;
public class ImpalaLineageHook extends AtlasHook implements IImpalaLineageHook {
private static final Logger LOG = LoggerFactory.getLogger(ImpalaLineageHook.class);
public static final String ATLAS_ENDPOINT = "atlas.rest.address";
public static final String REALM_SEPARATOR = "@";
public static final String CONF_PREFIX = "atlas.hook.impala.";
public static final String CONF_CLUSTER_NAME = "atlas.cluster.name";
public static final String CONF_REALM_NAME = "atlas.realm.name";
public static final String HDFS_PATH_CONVERT_TO_LOWER_CASE = CONF_PREFIX + "hdfs_path.convert_to_lowercase";
private ImpalaOperationParser parser = new ImpalaOperationParser();
private static final String clusterName;
private static final String realm;
private static final boolean convertHdfsPathToLowerCase;
static {
clusterName = atlasProperties.getString(CONF_CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
realm = atlasProperties.getString(CONF_REALM_NAME, DEFAULT_CLUSTER_NAME); // what should default be ??
convertHdfsPathToLowerCase = atlasProperties.getBoolean(HDFS_PATH_CONVERT_TO_LOWER_CASE, false);
}
public ImpalaLineageHook() {
}
public void process(String impalaQueryString) throws Exception {
ImpalaQuery lineageQuery = AtlasType.fromJson(impalaQueryString, ImpalaQuery.class);
process(lineageQuery);
}
public void process(ImpalaQuery lineageQuery) throws Exception {
if (StringUtils.isEmpty(lineageQuery.getQueryText())) {
LOG.warn("==> ImpalaLineageHook.process skips because the query text is empty <==");
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("==> ImpalaLineageHook.process({})", lineageQuery.getQueryText());
}
try {
ImpalaOperationType operationType = parser.getImpalaOperationType(lineageQuery.getQueryText());
AtlasImpalaHookContext context =
new AtlasImpalaHookContext(this, operationType, lineageQuery);
BaseImpalaEvent event = null;
switch (operationType) {
case CREATEVIEW:
event = new CreateImpalaProcess(context);
break;
default:
if (LOG.isDebugEnabled()) {
LOG.debug("HiveHook.run({}): operation ignored", lineageQuery.getQueryText());
}
break;
}
if (event != null) {
LOG.debug("Processing event: " + lineageQuery.getQueryText());
final UserGroupInformation ugi = getUgiFromUserName(lineageQuery.getUser());
super.notifyEntities(event.getNotificationMessages(), ugi);
}
} catch (Throwable t) {
LOG.error("ImpalaLineageHook.process(): failed to process query {}",
lineageQuery.getQueryText(), t);
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== ImpalaLineageHook.process({})", lineageQuery.getQueryText());
}
}
private UserGroupInformation getUgiFromUserName(String userName) throws IOException {
String userPrincipal = userName.contains(REALM_SEPARATOR)? userName : userName + "@" + getRealm();
Subject userSubject = new Subject(false, Sets.newHashSet(
new KerberosPrincipal(userPrincipal)), new HashSet<Object>(),new HashSet<Object>());
return UserGroupInformation.getUGIFromSubject(userSubject);
}
public String getClusterName() {
return clusterName;
}
public String getRealm() {
return realm;
}
public boolean isConvertHdfsPathToLowerCase() {
return convertHdfsPathToLowerCase;
}
}
\ No newline at end of file
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.impala.hook;
import org.apache.atlas.impala.model.ImpalaOperationType;
/**
* Parse an Impala query text and output the impala operation type
*/
public class ImpalaOperationParser {
public ImpalaOperationParser() {
}
public ImpalaOperationType getImpalaOperationType(String queryText) {
// TODO: more Impala commands will be handled in ATLAS-3184
if (queryText.toLowerCase().startsWith("create view")) {
return ImpalaOperationType.CREATEVIEW;
}
return ImpalaOperationType.UNKNOWN;
}
}
\ No newline at end of file
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.impala.hook.events;
import static org.apache.atlas.impala.hook.AtlasImpalaHookContext.QNAME_SEP_PROCESS;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.atlas.impala.hook.AtlasImpalaHookContext;
import org.apache.atlas.impala.model.ImpalaDataType;
import org.apache.atlas.impala.model.ImpalaNode;
import org.apache.atlas.impala.model.ImpalaOperationType;
import org.apache.atlas.impala.model.ImpalaVertexType;
import org.apache.atlas.impala.model.LineageVertex;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The base class for generating notification event to Atlas server
* Most code is copied from BaseHiveEvent to avoid depending on org.apache.atlas.hive.hook
*/
public abstract class BaseImpalaEvent {
private static final Logger LOG = LoggerFactory.getLogger(BaseImpalaEvent.class);
// Impala should re-use the same entity type as hive. So Hive and Impala can operate on same
// database or table
public static final String HIVE_TYPE_DB = "hive_db";
public static final String HIVE_TYPE_TABLE = "hive_table";
public static final String HIVE_TYPE_COLUMN = "hive_column";
public static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
public static final String ATTRIBUTE_NAME = "name";
public static final String ATTRIBUTE_OWNER = "owner";
public static final String ATTRIBUTE_CLUSTER_NAME = "clusterName";
public static final String ATTRIBUTE_CREATE_TIME = "createTime";
public static final String ATTRIBUTE_LAST_ACCESS_TIME = "lastAccessTime";
public static final String ATTRIBUTE_DB = "db";
public static final String ATTRIBUTE_COLUMNS = "columns";
public static final String ATTRIBUTE_TABLE = "table";
public static final String ATTRIBUTE_INPUTS = "inputs";
public static final String ATTRIBUTE_OUTPUTS = "outputs";
public static final String ATTRIBUTE_OPERATION_TYPE = "operationType";
public static final String ATTRIBUTE_START_TIME = "startTime";
public static final String ATTRIBUTE_USER_NAME = "userName";
public static final String ATTRIBUTE_QUERY_TEXT = "queryText";
public static final String ATTRIBUTE_QUERY_ID = "queryId";
public static final String ATTRIBUTE_QUERY_PLAN = "queryPlan";
public static final String ATTRIBUTE_END_TIME = "endTime";
public static final String ATTRIBUTE_RECENT_QUERIES = "recentQueries";
public static final String ATTRIBUTE_QUERY = "query";
public static final String ATTRIBUTE_DEPENDENCY_TYPE = "dependencyType";
public static final long MILLIS_CONVERT_FACTOR = 1000;
public static final Map<Integer, String> OWNER_TYPE_TO_ENUM_VALUE = new HashMap<>();
static {
OWNER_TYPE_TO_ENUM_VALUE.put(1, "USER");
OWNER_TYPE_TO_ENUM_VALUE.put(2, "ROLE");
OWNER_TYPE_TO_ENUM_VALUE.put(3, "GROUP");
}
protected final AtlasImpalaHookContext context;
protected final Map<String, ImpalaNode> vertexNameMap;
protected final Map<Long, LineageVertex> verticesMap;
public BaseImpalaEvent(AtlasImpalaHookContext context) {
this.context = context;
vertexNameMap = new HashMap<>();
verticesMap = new HashMap<>();
}
public AtlasImpalaHookContext getContext() {
return context;
}
public abstract List<HookNotification> getNotificationMessages() throws Exception;
public String getUserName() { return context.getUserName(); }
public String getTableNameFromColumn(String columnName) {
return context.getTableNameFromColumn(columnName);
}
public String getQualifiedName(ImpalaNode node) throws IllegalArgumentException {
return getQualifiedName(node.getOwnVertex());
}
public String getQualifiedName(LineageVertex node) throws IllegalArgumentException {
if (node == null) {
throw new IllegalArgumentException("node is null");
}
ImpalaVertexType nodeType = node.getVertexType();
if (nodeType == null) {
if (node.getVertexId() != null) {
LOG.warn("null qualified name for type: null and name: {}", node.getVertexId());
}
return null;
}
if (node.getVertexId() == null) {
LOG.warn("null qualified name for type: {} and name: null", nodeType);
return null;
}
switch (nodeType) {
case DATABASE:
return context.getQualifiedNameForDb(node.getVertexId());
case TABLE:
return context.getQualifiedNameForTable(node.getVertexId());
case COLUMN:
return context.getQualifiedNameForColumn(node.getVertexId());
default:
LOG.warn("null qualified name for type: {} and name: {}", nodeType, node.getVertexId());
return null;
}
}
static final class AtlasEntityComparator implements Comparator<AtlasEntity> {
@Override
public int compare(AtlasEntity entity1, AtlasEntity entity2) {
String name1 = (String)entity1.getAttribute(ATTRIBUTE_QUALIFIED_NAME);
String name2 = (String)entity2.getAttribute(ATTRIBUTE_QUALIFIED_NAME);
if (name1 == null) {
return -1;
}
if (name2 == null) {
return 1;
}
return name1.toLowerCase().compareTo(name2.toLowerCase());
}
}
static final Comparator<AtlasEntity> entityComparator = new AtlasEntityComparator();
protected String getQualifiedName(List<AtlasEntity> inputs, List<AtlasEntity> outputs) throws Exception {
ImpalaOperationType operation = context.getImpalaOperationType();
// TODO: add more operation type here
if (operation == ImpalaOperationType.CREATEVIEW) {
List<? extends AtlasEntity> sortedEntities = new ArrayList<>(outputs);
Collections.sort(sortedEntities, entityComparator);
for (AtlasEntity entity : sortedEntities) {
if (entity.getTypeName().equalsIgnoreCase(HIVE_TYPE_TABLE)) {
Long createTime = (Long)entity.getAttribute(ATTRIBUTE_CREATE_TIME);
return (String)entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + QNAME_SEP_PROCESS + createTime;
}
}
}
// TODO: add code for name construction for HDFS path
return null;
}
protected AtlasEntity getInputOutputEntity(ImpalaNode node, AtlasEntityExtInfo entityExtInfo) throws Exception {
AtlasEntity ret = null;
switch(node.getNodeType()) {
case TABLE:
case PARTITION:
case DFS_DIR: {
ret = toAtlasEntity(node, entityExtInfo);
}
break;
}
return ret;
}
protected AtlasEntity toAtlasEntity(ImpalaNode node, AtlasEntityExtInfo entityExtInfo) throws Exception {
AtlasEntity ret = null;
switch (node.getNodeType()) {
case DATABASE:
ret = toDbEntity(node);
break;
case TABLE:
case PARTITION:
ret = toTableEntity(node, entityExtInfo);
break;
default:
break;
}
return ret;
}
protected AtlasEntity toDbEntity(ImpalaNode db) throws Exception {
return toDbEntity(db.getNodeName());
}
protected AtlasEntity toDbEntity(String dbName) throws Exception {
String dbQualifiedName = context.getQualifiedNameForDb(dbName);
AtlasEntity ret = context.getEntity(dbQualifiedName);
if (ret == null) {
ret = new AtlasEntity(HIVE_TYPE_DB);
// Impala hook should not send metadata entities. set 'guid' to null - which will:
// - result in this entity to be not included in 'referredEntities'
// - cause Atlas server to resolve the entity by its qualifiedName
ret.setGuid(null);
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, dbQualifiedName);
ret.setAttribute(ATTRIBUTE_NAME, dbName.toLowerCase());
ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, context.getClusterName());
context.putEntity(dbQualifiedName, ret);
}
return ret;
}
protected AtlasEntityWithExtInfo toTableEntity(ImpalaNode table) throws Exception {
AtlasEntityWithExtInfo ret = new AtlasEntityWithExtInfo();
AtlasEntity entity = toTableEntity(table, ret);
if (entity != null) {
ret.setEntity(entity);
} else {
ret = null;
}
return ret;
}
protected AtlasEntity toTableEntity(ImpalaNode table, AtlasEntitiesWithExtInfo entities) throws Exception {
AtlasEntity ret = toTableEntity(table, (AtlasEntityExtInfo) entities);
if (ret != null) {
entities.addEntity(ret);
}
return ret;
}
protected AtlasEntity toTableEntity(ImpalaNode table, AtlasEntityExtInfo entityExtInfo) throws Exception {
if ((table == null) || (table.getNodeName() == null)) {
throw new IllegalArgumentException("table is null or its name is null");
}
String dbName = context.getDatabaseNameFromTable(table.getNodeName());
if (dbName == null) {
throw new IllegalArgumentException(String.format("db name is null for table: {}", table.getNodeName()));
}
AtlasEntity dbEntity = toDbEntity(dbName);
if (entityExtInfo != null) {
if (dbEntity != null) {
entityExtInfo.addReferredEntity(dbEntity);
}
}
AtlasEntity ret = toTableEntity(getObjectId(dbEntity), table, entityExtInfo);
return ret;
}
protected AtlasEntity toTableEntity(AtlasObjectId dbId, ImpalaNode table, AtlasEntityExtInfo entityExtInfo) throws Exception {
String tblQualifiedName = getQualifiedName(table);
AtlasEntity ret = context.getEntity(tblQualifiedName);
if (ret != null) {
return ret;
}
// a table created in Impala still uses HIVE_TYPE_TABLE to allow both Impala and Hive operate
// on the same table
ret = new AtlasEntity(HIVE_TYPE_TABLE);
// Impala hook should not send meta data entity to Atlas. set 'guid' to null - which will:
// - result in this entity to be not included in 'referredEntities'
// - cause Atlas server to resolve the entity by its qualifiedName
// TODO: enable this once HMS hook is in. Disable this before that.
ret.setGuid(null);
long createTime = getTableCreateTime(table);
long lastAccessTime = createTime;
ret.setAttribute(ATTRIBUTE_DB, dbId);
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, tblQualifiedName);
ret.setAttribute(ATTRIBUTE_NAME, table.getNodeName().toLowerCase());
// just fake it. It should not be sent to Atlas once HMS hook is in
ret.setAttribute(ATTRIBUTE_OWNER, getUserName());
ret.setAttribute(ATTRIBUTE_CREATE_TIME, createTime);
ret.setAttribute(ATTRIBUTE_LAST_ACCESS_TIME, lastAccessTime);
AtlasObjectId tableId = getObjectId(ret);
List<AtlasEntity> columns = getColumnEntities(tableId, table);
if (entityExtInfo != null) {
if (columns != null) {
for (AtlasEntity column : columns) {
entityExtInfo.addReferredEntity(column);
}
}
}
ret.setAttribute(ATTRIBUTE_COLUMNS, getObjectIds(columns));
context.putEntity(tblQualifiedName, ret);
return ret;
}
public static AtlasObjectId getObjectId(AtlasEntity entity) {
String qualifiedName = (String) entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME);
AtlasObjectId ret = new AtlasObjectId(entity.getGuid(), entity.getTypeName(), Collections
.singletonMap(ATTRIBUTE_QUALIFIED_NAME, qualifiedName));
return ret;
}
public static List<AtlasObjectId> getObjectIds(List<AtlasEntity> entities) {
final List<AtlasObjectId> ret;
if (CollectionUtils.isNotEmpty(entities)) {
ret = new ArrayList<>(entities.size());
for (AtlasEntity entity : entities) {
ret.add(getObjectId(entity));
}
} else {
ret = Collections.emptyList();
}
return ret;
}
public static long getTableCreateTime(ImpalaNode table) {
return getTableCreateTime(table.getOwnVertex());
}
public static long getTableCreateTime(LineageVertex tableVertex) {
Long createTime = tableVertex.getCreateTime();
if (createTime != null) {
return createTime.longValue() * MILLIS_CONVERT_FACTOR;
} else {
return System.currentTimeMillis();
}
}
protected List<AtlasEntity> getColumnEntities(AtlasObjectId tableId, ImpalaNode table) {
List<AtlasEntity> ret = new ArrayList<>();
for (ImpalaNode childNode : table.getChildren().values()) {
String colQualifiedName = getQualifiedName(childNode);
AtlasEntity column = context.getEntity(colQualifiedName);
if (column == null) {
column = new AtlasEntity(HIVE_TYPE_COLUMN);
// if column's table was sent in an earlier notification, set 'guid' to null - which will:
// - result in this entity to be not included in 'referredEntities'
// - cause Atlas server to resolve the entity by its qualifiedName
// TODO: enable this once HMS hook is in. Disable this before that.
column.setGuid(null);
column.setAttribute(ATTRIBUTE_TABLE, tableId);
column.setAttribute(ATTRIBUTE_QUALIFIED_NAME, colQualifiedName);
column.setAttribute(ATTRIBUTE_NAME, context.getColumnNameOnly(childNode.getNodeName()));
// just fake it. It should not be sent to Atlas once HMS hook is in
column.setAttribute(ATTRIBUTE_OWNER, getUserName());
context.putEntity(colQualifiedName, column);
}
ret.add(column);
}
return ret;
}
protected AtlasEntity getImpalaProcessEntity(List<AtlasEntity> inputs, List<AtlasEntity> outputs) throws Exception {
AtlasEntity ret = new AtlasEntity(ImpalaDataType.IMPALA_PROCESS.getName());
String queryStr = context.getQueryStr();
if (queryStr != null) {
queryStr = queryStr.toLowerCase().trim();
}
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getQualifiedName(inputs, outputs));
ret.setAttribute(ATTRIBUTE_INPUTS, getObjectIds(inputs));
ret.setAttribute(ATTRIBUTE_OUTPUTS, getObjectIds(outputs));
ret.setAttribute(ATTRIBUTE_NAME, queryStr);
ret.setAttribute(ATTRIBUTE_OPERATION_TYPE, context.getImpalaOperationType());
ret.setAttribute(ATTRIBUTE_START_TIME, context.getLineageQuery().getTimestamp());
ret.setAttribute(ATTRIBUTE_END_TIME, System.currentTimeMillis());
ret.setAttribute(ATTRIBUTE_USER_NAME, getUserName());
ret.setAttribute(ATTRIBUTE_QUERY_TEXT, queryStr);
ret.setAttribute(ATTRIBUTE_QUERY_ID, context.getLineageQuery().getQueryId());
ret.setAttribute(ATTRIBUTE_QUERY_PLAN, "Not Supported");
ret.setAttribute(ATTRIBUTE_RECENT_QUERIES, Collections.singletonList(queryStr));
return ret;
}
protected void addProcessedEntities(AtlasEntitiesWithExtInfo entitiesWithExtInfo) {
for (AtlasEntity entity : context.getEntities()) {
entitiesWithExtInfo.addReferredEntity(entity);
}
entitiesWithExtInfo.compact();
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.impala.hook.events;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.atlas.impala.hook.AtlasImpalaHookContext;
import org.apache.atlas.impala.model.ImpalaDataType;
import org.apache.atlas.impala.model.ImpalaDependencyType;
import org.apache.atlas.impala.model.ImpalaNode;
import org.apache.atlas.impala.model.ImpalaVertexType;
import org.apache.atlas.impala.model.LineageEdge;
import org.apache.atlas.impala.model.ImpalaQuery;
import org.apache.atlas.impala.model.LineageVertex;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.model.notification.HookNotification.EntityCreateRequestV2;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CreateImpalaProcess extends BaseImpalaEvent {
private static final Logger LOG = LoggerFactory.getLogger(CreateImpalaProcess.class);
public CreateImpalaProcess(AtlasImpalaHookContext context) {
super(context);
}
public List<HookNotification> getNotificationMessages() throws Exception {
List<HookNotification> ret = null;
AtlasEntitiesWithExtInfo entities = getEntities();
if (entities != null && CollectionUtils.isNotEmpty(entities.getEntities())) {
ret = Collections.singletonList(new EntityCreateRequestV2(getUserName(), entities));
}
return ret;
}
public AtlasEntitiesWithExtInfo getEntities() throws Exception {
AtlasEntitiesWithExtInfo ret = null;
List<ImpalaNode> inputNodes = new ArrayList<>();
List<ImpalaNode> outputNodes = new ArrayList<>();
List<AtlasEntity> inputs = new ArrayList<>();
List<AtlasEntity> outputs = new ArrayList<>();
Set<String> processedNames = new HashSet<>();
getInputOutList(context.getLineageQuery(), inputNodes, outputNodes);
if (skipProcess(inputNodes, outputNodes)) {
return ret;
}
ret = new AtlasEntitiesWithExtInfo();
if (!inputNodes.isEmpty()) {
for (ImpalaNode input : inputNodes) {
String qualifiedName = getQualifiedName(input);
if (qualifiedName == null || !processedNames.add(qualifiedName)) {
continue;
}
AtlasEntity entity = getInputOutputEntity(input, ret);
if (entity != null) {
inputs.add(entity);
}
}
}
if (outputNodes != null) {
for (ImpalaNode output : outputNodes) {
String qualifiedName = getQualifiedName(output);
if (qualifiedName == null || !processedNames.add(qualifiedName)) {
continue;
}
AtlasEntity entity = getInputOutputEntity(output, ret);
if (entity != null) {
outputs.add(entity);
}
}
}
if (!inputs.isEmpty() || !outputs.isEmpty()) {
AtlasEntity process = getImpalaProcessEntity(inputs, outputs);
if (process!= null && LOG.isDebugEnabled()) {
LOG.debug("get process entity with qualifiedName: {}", process.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
}
ret.addEntity(process);
processColumnLineage(process, ret);
addProcessedEntities(ret);
} else {
ret = null;
}
return ret;
}
private void processColumnLineage(AtlasEntity impalaProcess, AtlasEntitiesWithExtInfo entities) {
List<LineageEdge> edges = context.getLineageQuery().getEdges();
if (CollectionUtils.isEmpty(edges)) {
return;
}
final List<AtlasEntity> columnLineages = new ArrayList<>();
final Set<String> processedOutputCols = new HashSet<>();
for (LineageEdge edge : edges) {
if (!edge.getEdgeType().equals(ImpalaDependencyType.PROJECTION)) {
// Impala dependency type can only be predicate or projection.
// Impala predicate dependency: This is a dependency between a set of target
// columns (or exprs) and a set of source columns (base table columns). It
// indicates that the source columns restrict the values of their targets (e.g.
// by participating in WHERE clause predicates). It should not be part of lineage
continue;
}
List<AtlasEntity> outputColumns = new ArrayList<>();
for (Long targetId : edge.getTargets()) {
LineageVertex columnVertex = verticesMap.get(targetId);
String outputColName = getQualifiedName(columnVertex);
AtlasEntity outputColumn = context.getEntity(outputColName);
LOG.debug("processColumnLineage(): target id = {}, target column name = {}",
targetId, outputColName);
if (outputColumn == null) {
LOG.warn("column-lineage: non-existing output-column {}", outputColName);
continue;
}
if (processedOutputCols.contains(outputColName)) {
LOG.warn("column-lineage: duplicate for output-column {}", outputColName);
continue;
} else {
processedOutputCols.add(outputColName);
}
outputColumns.add(outputColumn);
}
List<AtlasEntity> inputColumns = new ArrayList<>();
for (Long sourceId : edge.getSources()) {
LineageVertex columnVertex = verticesMap.get(sourceId);
String inputColName = getQualifiedName(columnVertex);
AtlasEntity inputColumn = context.getEntity(inputColName);
if (inputColumn == null) {
LOG.warn("column-lineage: non-existing input-column {} with id ={}", inputColName, sourceId);
continue;
}
inputColumns.add(inputColumn);
}
if (inputColumns.isEmpty()) {
continue;
}
AtlasEntity columnLineageProcess = new AtlasEntity(ImpalaDataType.IMPALA_COLUMN_LINEAGE.getName());
// TODO: when there are multiple target IDs, should we use first column name or all of their name?
String columnQualifiedName = (String)impalaProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME) +
AtlasImpalaHookContext.QNAME_SEP_PROCESS + outputColumns.get(0).getAttribute(ATTRIBUTE_NAME);
columnLineageProcess.setAttribute(ATTRIBUTE_NAME, columnQualifiedName);
columnLineageProcess.setAttribute(ATTRIBUTE_QUALIFIED_NAME, columnQualifiedName);
columnLineageProcess.setAttribute(ATTRIBUTE_INPUTS, getObjectIds(inputColumns));
columnLineageProcess.setAttribute(ATTRIBUTE_OUTPUTS, getObjectIds(outputColumns));
columnLineageProcess.setAttribute(ATTRIBUTE_QUERY, getObjectId(impalaProcess));
// based on https://github.com/apache/impala/blob/master/fe/src/main/java/org/apache/impala/analysis/ColumnLineageGraph.java#L267
// There are two types of dependencies that are represented as edges in the column
// lineage graph:
// a) Projection dependency: This is a dependency between a set of source
// columns (base table columns) and a single target (result expr or table column).
// This dependency indicates that values of the target depend on the values of the source
// columns.
// b) Predicate dependency: This is a dependency between a set of target
// columns (or exprs) and a set of source columns (base table columns). It indicates that
// the source columns restrict the values of their targets (e.g. by participating in
// WHERE clause predicates).
columnLineageProcess.setAttribute(ATTRIBUTE_DEPENDENCY_TYPE, ImpalaDependencyType.PROJECTION.getName());
columnLineages.add(columnLineageProcess);
}
for (AtlasEntity columnLineage : columnLineages) {
String columnQualifiedName = (String)columnLineage.getAttribute(ATTRIBUTE_QUALIFIED_NAME);
LOG.debug("get column lineage entity with qualifiedName: {}", columnQualifiedName);
entities.addEntity(columnLineage);
}
}
// Process the impala query, classify the vertices as input or output based on LineageEdge
// Then organize the vertices into hierarchical structure: put all column vertices of a table
// as children of a ImpalaNode representing that table.
private void getInputOutList(ImpalaQuery lineageQuery, List<ImpalaNode> inputNodes,
List<ImpalaNode> outputNodes) {
// get vertex map with key being its id and
// ImpalaNode map with its own vertex's vertexId as its key
for (LineageVertex vertex : lineageQuery.getVertices()) {
verticesMap.put(vertex.getId(), vertex);
vertexNameMap.put(vertex.getVertexId(), new ImpalaNode(vertex));
}
// get set of source ID and set of target Id
Set<Long> sourceIds = new HashSet<>();
Set<Long> targetIds = new HashSet<>();
for (LineageEdge edge : lineageQuery.getEdges()) {
if (ImpalaDependencyType.PROJECTION.equals(edge.getEdgeType())) {
sourceIds.addAll(edge.getSources());
targetIds.addAll(edge.getTargets());
}
}
Map<String, ImpalaNode> inputMap = buildInputOutputList(sourceIds, verticesMap, vertexNameMap);
Map<String, ImpalaNode> outputMap = buildInputOutputList(targetIds, verticesMap, vertexNameMap);
inputNodes.addAll(inputMap.values());
outputNodes.addAll(outputMap.values());
}
/**
* From the list of Ids and Id to Vertices map, generate the Table name to ImpalaNode map.
* @param idSet the list of Ids. They are from lineage edges
* @param vertexMap the Id to Vertex map
* @param vertexNameMap the vertexId to ImpalaNode map.
* @return the table name to ImpalaNode map, whose table node contains its columns
*/
private Map<String, ImpalaNode> buildInputOutputList(Set<Long> idSet, Map<Long, LineageVertex> vertexMap,
Map<String, ImpalaNode> vertexNameMap) {
Map<String, ImpalaNode> returnTableMap = new HashMap<>();
for (Long id : idSet) {
LineageVertex vertex = vertexMap.get(id);
if (vertex == null) {
LOG.warn("cannot find vertex with id: {}", id);
continue;
}
if (ImpalaVertexType.COLUMN.equals(vertex.getVertexType())) {
// add column to its table node
String tableName = getTableNameFromColumn(vertex.getVertexId());
if (tableName == null) {
LOG.warn("cannot find tableName for vertex with id: {}, column name : {}",
id, vertex.getVertexId() == null? "null" : vertex.getVertexId());
continue;
}
ImpalaNode tableNode = returnTableMap.get(tableName);
if (tableNode == null) {
tableNode = vertexNameMap.get(tableName);
if (tableNode == null) {
LOG.warn("cannot find table node for vertex with id: {}, column name : {}",
id, vertex.getVertexId());
continue;
}
returnTableMap.put(tableName, tableNode);
}
tableNode.addChild(vertex);
}
}
return returnTableMap;
}
private boolean skipProcess(List<ImpalaNode> inputNodes, List<ImpalaNode> ouputNodes) {
if (inputNodes.isEmpty() || ouputNodes.isEmpty()) {
return true;
}
return false;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.impala.model;
/**
* Define the interface to process Impala lineage record
*/
public interface IImpalaLineageHook {
// The input is a serialized string of an Impala lineage record
void process(String impalaQueryString) throws Exception;
}
\ No newline at end of file
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.impala.model;
/**
* Data types used for Impala bridge
*/
public enum ImpalaDataType {
IMPALA_PROCESS,
IMPALA_PROCESS_EXECUTION,
IMPALA_COLUMN_LINEAGE;
public String getName() {
return name().toLowerCase();
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.impala.model;
public enum ImpalaDependencyType {
PROJECTION("PROJECTION"),
PREDICATE("PREDICATE");
private final String name;
ImpalaDependencyType(String name) {
this.name = name;
}
public String getName() {
return name.toUpperCase();
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.impala.model;
import java.util.HashMap;
import java.util.Map;
/**
* Contain vertex info of this node and its children. It is used only internally
*/
public class ImpalaNode {
LineageVertex ownVertex;
Map<Long, ImpalaNode> children;
public ImpalaNode(LineageVertex ownVertex) {
this.ownVertex = ownVertex;
children = new HashMap<>();
}
public String getNodeName() { return ownVertex.getVertexId(); }
public ImpalaVertexType getNodeType() { return ownVertex.getVertexType(); }
public LineageVertex getOwnVertex() { return ownVertex; }
public Map<Long, ImpalaNode> getChildren() { return children; }
/**
* Add child to this node
* @param child
* @return the node corresponding to the input child vertex
*/
public ImpalaNode addChild(LineageVertex child) {
ImpalaNode exitingChild = children.get(child.getId());
if (exitingChild != null) {
return exitingChild;
}
ImpalaNode newChild = new ImpalaNode(child);
return children.put(child.getId(), newChild);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.impala.model;
public enum ImpalaOperationType{
CREATEVIEW ("CREATEVIEW"),
UNKNOWN ("UNKNOWN");
private final String name;
ImpalaOperationType(String s) {
name = s;
}
public boolean equalsName(String otherName) {
return name.equals(otherName);
}
public String toString() {
return this.name;
}
}
\ No newline at end of file
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.impala.model;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import java.util.List;
/**
* Represent an Impala lineage record in lineage log.
*/
@JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown=true)
public class ImpalaQuery {
private String queryText;
private String queryId;
private String hash;
private String user;
// the time stamp is in seconds. It is Unix epoch, which is the number of seconds that have
// elapsed since January 1, 1970 (midnight UTC/GMT), not counting leap seconds
private Long timestamp;
private Long endTime;
private List<LineageEdge> edges;
private List<LineageVertex> vertices;
public List<LineageEdge> getEdges() {
return edges;
}
public List<LineageVertex> getVertices() {
return vertices;
}
public Long getEndTime() {
return endTime;
}
public String getHash() {
return hash;
}
public String getQueryId() {
return queryId;
}
public String getQueryText() {
return queryText;
}
public Long getTimestamp() {
return timestamp;
}
public String getUser() {
return user;
}
public void setEdges(List<LineageEdge> edges) {
this.edges = edges;
}
public void setEndTime(Long endTime) {
this.endTime = endTime;
}
public void setHash(String hash) {
this.hash = hash;
}
public void setQueryId(String queryId) {
this.queryId = queryId;
}
public void setQueryText(String queryText) {
this.queryText = queryText;
}
public void setTimestamp(Long timestamp) { this.timestamp = timestamp; }
public void setUser(String user) {
this.user = user;
}
public void setVertices(List<LineageVertex> vertices) {
this.vertices = vertices;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.impala.model;
public enum ImpalaVertexType {
DFS_DIR("DFS_DIR"),
PARTITION("PARTITION"),
COLUMN("COLUMN"),
TABLE("TABLE"),
DATABASE("DATABASE");
private final String name;
ImpalaVertexType(String name) {
this.name = name;
}
public String getName() {
return name.toUpperCase();
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.impala.model;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import java.util.List;
/**
* This represents an edge in Impala's lineage record that connects two entities
*/
@JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown=true)
public class LineageEdge {
private List<Long> sources;
private List<Long> targets;
private ImpalaDependencyType edgeType;
public List<Long> getSources() {
return sources;
}
public List<Long> getTargets() {
return targets;
}
public ImpalaDependencyType getEdgeType() {
return edgeType;
}
public void setSources(List<Long> sources) {
this.sources = sources;
}
public void setTargets(List<Long> targets) {
this.targets = targets;
}
public void setEdgeType(ImpalaDependencyType edgeType) {
this.edgeType = edgeType;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.impala.model;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
/**
* This represents an entity in Impala's lineage record.
*/
@JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown=true)
public class LineageVertex {
// id is used to reference this entity. It is used in LineageEdge to specify source and target
// https://github.com/apache/impala/blob/master/be/src/util/lineage-util.h#L40
// Impala id is int64. Therefore, define this field as Long
private Long id;
// specify the type of the entity, it could be "TABLE", "COLUMN" etc.
private ImpalaVertexType vertexType;
// specify the name of the entity
private String vertexId;
// It is optional, and could be null. It is only set if the verType is "TABLE"
private Long createTime;
public Long getId() { return id; }
public ImpalaVertexType getVertexType() {
return vertexType;
}
public String getVertexId() {
return vertexId;
}
public Long getCreateTime() { return createTime; }
public void setId(Long id) {
this.id = id;
}
public void setVertexType(ImpalaVertexType vertexType) {
this.vertexType = vertexType;
}
public void setVertexId(String vertexId) {
this.vertexId = vertexId;
}
public void setCreateTime(Long createTime) { this.createTime = createTime; }
}
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8" ?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
<appender name="FILE" class="org.apache.log4j.RollingFileAppender">
<param name="File" value="/var/log/hive/impala-bridge.log"/>
<param name="Append" value="true"/>
<param name="maxFileSize" value="100MB" />
<param name="maxBackupIndex" value="20" />
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%C{1}:%L)%n"/>
</layout>
</appender>
<logger name="org.apache.atlas.impala.ImpalaLineageTool" additivity="false">
<level value="info"/>
<appender-ref ref="FILE"/>
</logger>
<root>
<priority value="warn"/>
<appender-ref ref="FILE"/>
</root>
</log4j:configuration>
\ No newline at end of file
#!/bin/bash
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License. See accompanying LICENSE file.
#
# resolve links - $0 may be a softlink
PRG="${0}"
[[ `uname -s` == *"CYGWIN"* ]] && CYGWIN=true
while [ -h "${PRG}" ]; do
ls=`ls -ld "${PRG}"`
link=`expr "$ls" : '.*-> \(.*\)$'`
if expr "$link" : '/.*' > /dev/null; then
PRG="$link"
else
PRG=`dirname "${PRG}"`/"$link"
fi
done
BASEDIR=`dirname ${PRG}`
if test -z "${JAVA_HOME}"
then
JAVA_BIN=`which java`
JAR_BIN=`which jar`
else
JAVA_BIN="${JAVA_HOME}/bin/java"
JAR_BIN="${JAVA_HOME}/bin/jar"
fi
export JAVA_BIN
if [ ! -e "${JAVA_BIN}" ] || [ ! -e "${JAR_BIN}" ]; then
echo "$JAVA_BIN and/or $JAR_BIN not found on the system. Please make sure java and jar commands are available."
exit 1
fi
# Construct ATLAS_CONF where atlas-properties reside
# assume the hive-server2 is installed and contains Atlas configuration
# Otherwise, need to setup Atlas required properties and libraries before running this tool
if [ ! -z "$HIVE_CONF_DIR" ]; then
HIVE_CONF=$HIVE_CONF_DIR
elif [ ! -z "$HIVE_HOME" ]; then
HIVE_CONF="$HIVE_HOME/conf"
elif [ -e /etc/hive/conf ]; then
HIVE_CONF="/etc/hive/conf"
else
echo "Could not find a valid HIVE configuration for ATLAS"
exit 1
fi
if [ -z "$ATLAS_CONF" ]; then
export ATLAS_CONF=$HIVE_CONF
fi
# log dir for applications
ATLAS_LOG_DIR="/var/log/atlas"
ATLAS_LOG_FILE="impala-bridge.log"
LOG_CONFIG="${BASEDIR}/atlas-log4j.xml"
# Construct Atlas classpath.
DIR=$PWD
PARENT="$(dirname "$DIR")"
GRANDPARENT="$(dirname "$PARENT")"
LIB_PATH="$GRANDPARENT/server/webapp/atlas/WEB-INF/lib"
echo "$LIB_PATH"
# Construct Atlas classpath.
for i in "$LIB_PATH/"*.jar; do
ATLASCPPATH="${ATLASCPPATH}:$i"
done
for i in "${BASEDIR}/"*.jar; do
ATLASCPPATH="${ATLASCPPATH}:$i"
done
echo "Logging: ${ATLAS_LOG_DIR}/${ATLAS_LOG_FILE}"
echo "Log config: ${LOG_CONFIG}"
TIME=`date %Y%m%d%H%M%s`
CP="${ATLASCPPATH}:${ATLAS_CONF}"
# If running in cygwin, convert pathnames and classpath to Windows format.
if [ "${CYGWIN}" == "true" ]
then
ATLAS_LOG_DIR=`cygpath -w ${ATLAS_LOG_DIR}`
ATLAS_LOG_FILE=`cygpath -w ${ATLAS_LOG_FILE}`
CP=`cygpath -w -p ${CP}`
fi
JAVA_PROPERTIES="$ATLAS_OPTS -Datlas.log.dir=$ATLAS_LOG_DIR -Datlas.log.file=$ATLAS_LOG_FILE -Dlog4j.configuration=file://$LOG_CONFIG"
IMPORT_ARGS=$@
JVM_ARGS=
JAVA_PROPERTIES="${JAVA_PROPERTIES} ${JVM_ARGS}"
"${JAVA_BIN}" ${JAVA_PROPERTIES} -cp "${CP}" org.apache.atlas.impala.ImpalaLineageTool $IMPORT_ARGS
RETVAL=$?
[ $RETVAL -eq 0 ] && echo Done!
[ $RETVAL -ne 0 ] && echo Failed!
exit $RETVAL
\ No newline at end of file
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.impala;
import static org.apache.atlas.impala.hook.events.BaseImpalaEvent.ATTRIBUTE_QUALIFIED_NAME;
import static org.apache.atlas.impala.hook.events.BaseImpalaEvent.ATTRIBUTE_RECENT_QUERIES;
import static org.apache.atlas.impala.hook.events.BaseImpalaEvent.HIVE_TYPE_DB;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.fail;
import java.util.Collections;
import java.util.List;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.impala.hook.AtlasImpalaHookContext;
import org.apache.atlas.impala.hook.ImpalaLineageHook;
import org.apache.atlas.impala.model.ImpalaDataType;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.utils.AuthenticationUtil;
import org.apache.atlas.utils.ParamChecker;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.testng.annotations.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
public class ImpalaLineageITBase {
private static final Logger LOG = LoggerFactory.getLogger(ImpalaLineageITBase.class);
public static final String DEFAULT_DB = "default";
public static final String SEP = ":".intern();
public static final String IO_SEP = "->".intern();
protected static final String DGI_URL = "http://localhost:21000/";
protected static final String CLUSTER_NAME = "primary";
protected static final String PART_FILE = "2015-01-01";
protected static final String INPUTS = "inputs";
protected static final String OUTPUTS = "outputs";
protected static AtlasClientV2 atlasClientV2;
private static final String REFERENCEABLE_ATTRIBUTE_NAME = "qualifiedName";
private static final String ATTR_NAME = "name";
// to push entity creation/update to HMS, so HMS hook can push the metadata notification
// to Atlas, then the lineage notification from this tool can be created at Atlas
protected static Driver driverWithoutContext;
protected static SessionState ss;
protected static HiveConf conf;
@BeforeClass
public void setUp() throws Exception {
//Set-up hive session
conf = new HiveConf();
conf.setClassLoader(Thread.currentThread().getContextClassLoader());
HiveConf conf = new HiveConf();
SessionState ss = new SessionState(conf);
ss = SessionState.start(ss);
SessionState.setCurrentSessionState(ss);
driverWithoutContext = new Driver(conf);
Configuration configuration = ApplicationProperties.get();
String[] atlasEndPoint = configuration.getStringArray(ImpalaLineageHook.ATLAS_ENDPOINT);
if (atlasEndPoint == null || atlasEndPoint.length == 0) {
atlasEndPoint = new String[]{DGI_URL};
}
if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) {
atlasClientV2 = new AtlasClientV2(atlasEndPoint, new String[]{"admin", "admin"});
} else {
atlasClientV2 = new AtlasClientV2(atlasEndPoint);
}
}
protected String assertEntityIsRegistered(final String typeName, final String property, final String value,
final AssertPredicate assertPredicate) throws Exception {
waitFor(80000, new Predicate() {
@Override
public void evaluate() throws Exception {
AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo = atlasClientV2.getEntityByAttribute(typeName, Collections
.singletonMap(property,value));
AtlasEntity entity = atlasEntityWithExtInfo.getEntity();
assertNotNull(entity);
if (assertPredicate != null) {
assertPredicate.assertOnEntity(entity);
}
}
});
AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo = atlasClientV2.getEntityByAttribute(typeName, Collections.singletonMap(property,value));
AtlasEntity entity = atlasEntityWithExtInfo.getEntity();
return (String) entity.getGuid();
}
protected String assertProcessIsRegistered(String processQFName, String queryString) throws Exception {
try {
LOG.debug("Searching for process with query {}", processQFName);
return assertEntityIsRegistered(ImpalaDataType.IMPALA_PROCESS.getName(), ATTRIBUTE_QUALIFIED_NAME, processQFName, new AssertPredicate() {
@Override
public void assertOnEntity(final AtlasEntity entity) throws Exception {
List<String> recentQueries = (List<String>) entity.getAttribute(ATTRIBUTE_RECENT_QUERIES);
Assert.assertEquals(recentQueries.get(0), lower(queryString));
}
});
} catch(Exception e) {
LOG.error("Exception : ", e);
throw e;
}
}
protected String assertDatabaseIsRegistered(String dbName) throws Exception {
return assertDatabaseIsRegistered(dbName, null);
}
protected String assertDatabaseIsRegistered(String dbName, AssertPredicate assertPredicate) throws Exception {
LOG.debug("Searching for database: {}", dbName);
String dbQualifiedName = dbName + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME +
CLUSTER_NAME;
dbQualifiedName = dbQualifiedName.toLowerCase();
return assertEntityIsRegistered(HIVE_TYPE_DB, REFERENCEABLE_ATTRIBUTE_NAME, dbQualifiedName, assertPredicate);
}
protected String createDatabase() throws Exception {
String dbName = dbName();
return createDatabase(dbName);
}
protected String createDatabase(String dbName) throws Exception {
runCommand("CREATE DATABASE IF NOT EXISTS " + dbName);
return dbName;
}
protected String createTable(String dbName, String columnsString) throws Exception {
return createTable(dbName, columnsString, false);
}
protected String createTable(String dbName, String columnsString, boolean isPartitioned) throws Exception {
String tableName = tableName();
return createTable(dbName, tableName, columnsString, isPartitioned);
}
protected String createTable(String dbName, String tableName, String columnsString, boolean isPartitioned) throws Exception {
runCommand("CREATE TABLE IF NOT EXISTS " + dbName + "." + tableName + " " + columnsString + " comment 'table comment' " + (isPartitioned ? " partitioned by(dt string)" : ""));
return dbName + "." + tableName;
}
public interface AssertPredicate {
void assertOnEntity(AtlasEntity entity) throws Exception;
}
public interface Predicate {
/**
* Perform a predicate evaluation.
*
* @return the boolean result of the evaluation.
* @throws Exception thrown if the predicate evaluation could not evaluate.
*/
void evaluate() throws Exception;
}
/**
* Wait for a condition, expressed via a {@link Predicate} to become true.
*
* @param timeout maximum time in milliseconds to wait for the predicate to become true.
* @param predicate predicate waiting on.
*/
protected void waitFor(int timeout, Predicate predicate) throws Exception {
ParamChecker.notNull(predicate, "predicate");
long mustEnd = System.currentTimeMillis() + timeout;
while (true) {
try {
predicate.evaluate();
return;
} catch(Error | Exception e) {
if (System.currentTimeMillis() >= mustEnd) {
fail("Assertions failed. Failing after waiting for timeout " + timeout + " msecs", e);
}
LOG.debug("Waiting up to {} msec as assertion failed", mustEnd - System.currentTimeMillis(), e);
Thread.sleep(5000);
}
}
}
public static String lower(String str) {
if (StringUtils.isEmpty(str)) {
return null;
}
return str.toLowerCase().trim();
}
protected void runCommand(String cmd) throws Exception {
runCommandWithDelay(cmd, 0);
}
protected void runCommandWithDelay(String cmd, int sleepMs) throws Exception {
runCommandWithDelay(driverWithoutContext, cmd, sleepMs);
}
protected void runCommandWithDelay(Driver driver, String cmd, int sleepMs) throws Exception {
LOG.debug("Running command '{}'", cmd);
CommandProcessorResponse response = driver.run(cmd);
assertEquals(response.getResponseCode(), 0);
if (sleepMs != 0) {
Thread.sleep(sleepMs);
}
}
protected String random() {
return RandomStringUtils.randomAlphanumeric(10);
}
protected String tableName() {
return "table_" + random();
}
protected String dbName() {return "db_" + random();}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.impala;
import java.util.ArrayList;
import java.util.List;
import org.apache.atlas.impala.hook.AtlasImpalaHookContext;
import org.apache.atlas.impala.hook.ImpalaLineageHook;
import org.apache.atlas.impala.model.ImpalaQuery;
import org.testng.annotations.Test;
public class ImpalaLineageToolIT extends ImpalaLineageITBase {
private static String dir = System.getProperty("user.dir") + "/src/test/resources/";
private static String IMPALA = dir + "impala3.json";
private static String IMPALA_WAL = dir + "WALimpala.wal";
/**
* This tests
* 1) ImpalaLineageTool can parse one lineage file that contains "create view" command lineage
* 2) Lineage is sent to Atlas
* 3) Atlas can get this lineage from Atlas
*/
@Test
public void testCreateViewFromFile() {
List<ImpalaQuery> lineageList = new ArrayList<>();
ImpalaLineageHook impalaLineageHook = new ImpalaLineageHook();
try {
// create database and tables to simulate Impala behavior that Impala updates metadata
// to HMS and HMSHook sends the metadata to Atlas, which has to happen before
// Atlas can handle lineage notification
String dbName = "db_1";
createDatabase(dbName);
String sourceTableName = "table_1";
createTable(dbName, sourceTableName,"(id string, count int)", false);
String targetTableName = "view_1";
createTable(dbName, targetTableName,"(count int, id string)", false);
// process lineage record, and send corresponding notification to Atlas
String[] args = new String[]{"-d", "./", "-p", "impala"};
ImpalaLineageTool toolInstance = new ImpalaLineageTool(args);
toolInstance.importHImpalaEntities(impalaLineageHook, IMPALA, IMPALA_WAL);
// verify the process is saved in Atlas
// the value is from info in IMPALA_3
String createTime = new Long((long)(1554750072)*1000).toString();
String processQFName =
"db_1.view_1" + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME +
CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime;
processQFName = processQFName.toLowerCase();
assertProcessIsRegistered(processQFName,
"create view db_1.view_1 as select count, id from db_1.table_1");
} catch (Exception e) {
System.out.print("Appending file error");
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.impala.hook;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.atlas.impala.ImpalaLineageITBase;
import org.apache.atlas.impala.hook.events.BaseImpalaEvent;
import org.apache.atlas.impala.model.ImpalaDependencyType;
import org.apache.atlas.impala.model.ImpalaVertexType;
import org.apache.atlas.impala.model.LineageEdge;
import org.apache.atlas.impala.model.ImpalaQuery;
import org.apache.atlas.impala.model.LineageVertex;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.testng.Assert.assertFalse;
public class ImpalaLineageHookIT extends ImpalaLineageITBase {
private static final Logger LOG = LoggerFactory.getLogger(ImpalaLineageHookIT.class);
private static ImpalaLineageHook impalaHook;
@BeforeClass
public void setUp() throws Exception {
super.setUp();
impalaHook = new ImpalaLineageHook();
}
@AfterClass
public void testClean() {
impalaHook = null;
}
@Test
public void testCreateView() throws Exception {
// first trigger HMS hook to create related entities
String dbName = createDatabase();
assertDatabaseIsRegistered(dbName);
String tableName = createTable(dbName, "(id string, count int)");
String viewName = createTable(dbName, "(count int, id string)");
// then process lineage record to push lineage to Atlas
ImpalaQuery queryObj = new ImpalaQuery();
List<LineageEdge> edges = new ArrayList<>();
List<LineageVertex> vertices = new ArrayList<>();
queryObj.setQueryText("create view " + viewName + " as select count, id from " + tableName);
queryObj.setQueryId("3a441d0c130962f8:7f634aec00000000");
queryObj.setHash("64ff0425ccdfaada53e3f2fd76f566f7");
queryObj.setUser("admin");
queryObj.setTimestamp((long)1554750072);
queryObj.setEndTime((long)1554750554);
LineageEdge edge1 = new LineageEdge();
edge1.setSources( Arrays.asList((long)1));
edge1.setTargets( Arrays.asList((long)0));
edge1.setEdgeType(ImpalaDependencyType.PROJECTION);
edges.add(edge1);
LineageEdge edge2 = new LineageEdge();
edge2.setSources( Arrays.asList((long)3));
edge2.setTargets( Arrays.asList((long)2));
edge2.setEdgeType(ImpalaDependencyType.PROJECTION);
edges.add(edge2);
queryObj.setEdges(edges);
LineageVertex vertex1 = new LineageVertex();
vertex1.setId((long)0);
vertex1.setVertexType(ImpalaVertexType.COLUMN);
vertex1.setVertexId(viewName + ".count");
vertices.add(vertex1);
LineageVertex vertex2 = new LineageVertex();
vertex2.setId((long)1);
vertex2.setVertexType(ImpalaVertexType.COLUMN);
vertex2.setVertexId(tableName + ".count");
vertices.add(vertex2);
LineageVertex vertex3 = new LineageVertex();
vertex3.setId((long)2);
vertex3.setVertexType(ImpalaVertexType.COLUMN);
vertex3.setVertexId(viewName + ".id");
vertices.add(vertex3);
LineageVertex vertex4 = new LineageVertex();
vertex4.setId((long)3);
vertex4.setVertexType(ImpalaVertexType.COLUMN);
vertex4.setVertexId(tableName + ".id");
vertices.add(vertex4);
LineageVertex vertex5 = new LineageVertex();
vertex5.setId((long)4);
vertex5.setVertexType(ImpalaVertexType.TABLE);
vertex5.setVertexId(viewName);
vertex5.setCreateTime(System.currentTimeMillis() / 1000);
vertices.add(vertex5);
LineageVertex vertex6 = new LineageVertex();
vertex6.setId((long)5);
vertex6.setVertexType(ImpalaVertexType.TABLE);
vertex6.setVertexId(tableName);
vertex6.setCreateTime(System.currentTimeMillis() / 1000);
vertices.add(vertex6);
queryObj.setVertices(vertices);
try {
impalaHook.process(queryObj);
String createTime = new Long(BaseImpalaEvent.getTableCreateTime(vertex5)).toString();
String processQFName =
vertex5.getVertexId() + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME +
CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime;
processQFName = processQFName.toLowerCase();
assertProcessIsRegistered(processQFName, queryObj.getQueryText());
} catch (Exception ex) {
LOG.error("process create_view failed: ", ex);
assertFalse(true);
}
}
}
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######### Atlas Server Configs #########
atlas.rest.address=http://localhost:31000
######### Graph Database Configs #########
# Graph database implementation. Value inserted by maven.
atlas.graphdb.backend=org.apache.atlas.repository.graphdb.janus.AtlasJanusGraphDatabase
# Graph Storage
atlas.graph.storage.backend=berkeleyje
# Entity repository implementation
atlas.EntityAuditRepository.impl=org.apache.atlas.repository.audit.InMemoryEntityAuditRepository
# Graph Search Index Backend
atlas.graph.index.search.backend=solr
#Berkeley storage directory
atlas.graph.storage.directory=${sys:atlas.data}/berkley
#hbase
#For standalone mode , specify localhost
#for distributed mode, specify zookeeper quorum here
atlas.graph.storage.hostname=${graph.storage.hostname}
atlas.graph.storage.hbase.regions-per-server=1
atlas.graph.storage.lock.wait-time=10000
#ElasticSearch
atlas.graph.index.search.directory=${sys:atlas.data}/es
atlas.graph.index.search.elasticsearch.client-only=false
atlas.graph.index.search.elasticsearch.local-mode=true
atlas.graph.index.search.elasticsearch.create.sleep=2000
# Solr cloud mode properties
atlas.graph.index.search.solr.mode=cloud
atlas.graph.index.search.solr.zookeeper-url=${solr.zk.address}
atlas.graph.index.search.solr.embedded=true
atlas.graph.index.search.max-result-set-size=150
######### Notification Configs #########
atlas.notification.embedded=true
atlas.kafka.zookeeper.connect=localhost:19026
atlas.kafka.bootstrap.servers=localhost:19027
atlas.kafka.data=${sys:atlas.data}/kafka
atlas.kafka.zookeeper.session.timeout.ms=4000
atlas.kafka.zookeeper.sync.time.ms=20
atlas.kafka.consumer.timeout.ms=4000
atlas.kafka.auto.commit.interval.ms=100
atlas.kafka.hook.group.id=atlas
atlas.kafka.entities.group.id=atlas_entities
#atlas.kafka.auto.commit.enable=false
atlas.kafka.enable.auto.commit=false
atlas.kafka.auto.offset.reset=earliest
atlas.kafka.session.timeout.ms=30000
atlas.kafka.offsets.topic.replication.factor=1
######### Entity Audit Configs #########
atlas.audit.hbase.tablename=ATLAS_ENTITY_AUDIT_EVENTS
atlas.audit.zookeeper.session.timeout.ms=1000
atlas.audit.hbase.zookeeper.quorum=localhost
atlas.audit.hbase.zookeeper.property.clientPort=19026
######### Security Properties #########
# SSL config
atlas.enableTLS=false
atlas.server.https.port=31443
######### Security Properties #########
hbase.security.authentication=simple
atlas.hook.falcon.synchronous=true
######### JAAS Configuration ########
atlas.jaas.KafkaClient.loginModuleName = com.sun.security.auth.module.Krb5LoginModule
atlas.jaas.KafkaClient.loginModuleControlFlag = required
atlas.jaas.KafkaClient.option.useKeyTab = true
atlas.jaas.KafkaClient.option.storeKey = true
atlas.jaas.KafkaClient.option.serviceName = kafka
atlas.jaas.KafkaClient.option.keyTab = /etc/security/keytabs/atlas.service.keytab
atlas.jaas.KafkaClient.option.principal = atlas/_HOST@EXAMPLE.COM
######### High Availability Configuration ########
atlas.server.ha.enabled=false
#atlas.server.ids=id1
#atlas.server.address.id1=localhost:21000
######### Atlas Authorization #########
atlas.authorizer.impl=none
# atlas.authorizer.impl=simple
# atlas.authorizer.simple.authz.policy.file=atlas-simple-authz-policy.json
######### Atlas Authentication #########
atlas.authentication.method.file=true
atlas.authentication.method.ldap.type=none
atlas.authentication.method.kerberos=false
# atlas.authentication.method.file.filename=users-credentials.properties
<?xml version="1.0" encoding="UTF-8" ?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
<appender name="console" class="org.apache.log4j.ConsoleAppender">
<param name="Target" value="System.out"/>
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%C{1}:%L)%n"/>
</layout>
</appender>
<appender name="FILE" class="org.apache.log4j.RollingFileAppender">
<param name="File" value="${atlas.log.dir}/${atlas.log.file}"/>
<param name="Append" value="true"/>
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%C{1}:%L)%n"/>
<param name="maxFileSize" value="100MB" />
<param name="maxBackupIndex" value="20" />
</layout>
</appender>
<appender name="AUDIT" class="org.apache.log4j.RollingFileAppender">
<param name="File" value="${atlas.log.dir}/audit.log"/>
<param name="Append" value="true"/>
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d %x %m%n"/>
<param name="maxFileSize" value="100MB" />
<param name="maxBackupIndex" value="20" />
</layout>
</appender>
<appender name="METRICS" class="org.apache.log4j.RollingFileAppender">
<param name="File" value="${atlas.log.dir}/metric.log"/>
<param name="Append" value="true"/>
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d %x %m%n"/>
<param name="maxFileSize" value="100MB" />
</layout>
</appender>
<appender name="FAILED" class="org.apache.log4j.RollingFileAppender">
<param name="File" value="${atlas.log.dir}/failed.log"/>
<param name="Append" value="true"/>
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d %m"/>
<param name="maxFileSize" value="100MB" />
<param name="maxBackupIndex" value="20" />
</layout>
</appender>
<logger name="org.apache.atlas" additivity="false">
<level value="info"/>
<appender-ref ref="FILE"/>
</logger>
<logger name="org.apache.atlas.impala.ImpalaLineageTool" additivity="false">
<level value="debug"/>
<appender-ref ref="FILE"/>
</logger>
<logger name="org.apache.atlas.impala.hook.ImpalaLineageHook" additivity="false">
<level value="debug"/>
<appender-ref ref="FILE"/>
</logger>
<logger name="org.janusgraph" additivity="false">
<level value="warn"/>
<appender-ref ref="FILE"/>
</logger>
<logger name="org.springframework" additivity="false">
<level value="warn"/>
<appender-ref ref="console"/>
</logger>
<logger name="org.eclipse" additivity="false">
<level value="warn"/>
<appender-ref ref="console"/>
</logger>
<logger name="com.sun.jersey" additivity="false">
<level value="warn"/>
<appender-ref ref="console"/>
</logger>
<!-- to avoid logs - The configuration log.flush.interval.messages = 1 was supplied but isn't a known config -->
<logger name="org.apache.kafka.common.config.AbstractConfig" additivity="false">
<level value="error"/>
<appender-ref ref="FILE"/>
</logger>
<logger name="AUDIT" additivity="false">
<level value="info"/>
<appender-ref ref="AUDIT"/>
</logger>
<logger name="METRICS" additivity="false">
<level value="debug"/>
<appender-ref ref="METRICS"/>
</logger>
<logger name="FAILED" additivity="false">
<level value="info"/>
<appender-ref ref="AUDIT"/>
</logger>
<root>
<priority value="warn"/>
<appender-ref ref="FILE"/>
</root>
</log4j:configuration>
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration>
<property>
<name>hive.exec.submit.local.task.via.child</name>
<value>false</value>
</property>
<property>
<name>mapreduce.framework.name</name>
<value>local</value>
</property>
<property>
<name>fs.default.name</name>
<value>file:///</value>
</property>
<property>
<name>hive.metastore.event.listeners</name>
<value>org.apache.atlas.hive.hook.HiveMetastoreHookImpl</value>
</property>
<property>
<name>hive.support.concurrency</name>
<value>false</value>
</property>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>${project.basedir}/target/metastore</value>
</property>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:derby:;databaseName=${project.basedir}/target/metastore_db;create=true</value>
</property>
<property>
<name>atlas.hook.hive.synchronous</name>
<value>true</value>
</property>
<property>
<name>fs.pfile.impl</name>
<value>org.apache.hadoop.fs.ProxyLocalFileSystem</value>
</property>
<property>
<name>hive.in.test</name>
<value>true</value>
</property>
<property>
<name>hive.zookeeper.quorum</name>
<value>localhost:19026</value>
</property>
<property>
<name>hive.metastore.schema.verification</name>
<value>false</value>
</property>
<property>
<name>hive.metastore.disallow.incompatible.col.type.changes</name>
<value>false</value>
</property>
<property>
<name>datanucleus.schema.autoCreateAll</name>
<value>true</value>
</property>
<property>
<name>hive.exec.scratchdir</name>
<value>${project.basedir}/target/scratchdir</value>
</property>
</configuration>
\ No newline at end of file
{
"queryText": "INSERT INTO TABLE db_wlwspnwgfp.tbl_wlwspnwgfp_7 VALUES (1, 'foo', 'foo', 'foo', 'foo', 1), (2, 'foo', 'foo', 'foo', 'foo', 2)",
"queryId": "4c4cb5dc22194e20:c440f5fe00000000",
"hash": "d936d531989bd0f46d636bd05fc2540c",
"user": "impala@GCE.ABCDEFGH.COM",
"timestamp": 1553528501,
"endTime": 1553528505,
"edges": [
{
"sources": [],
"targets": [
0
],
"edgeType": "PROJECTION"
},
{
"sources": [],
"targets": [
1
],
"edgeType": "PROJECTION"
},
{
"sources": [],
"targets": [
2
],
"edgeType": "PROJECTION"
},
{
"sources": [],
"targets": [
3
],
"edgeType": "PROJECTION"
},
{
"sources": [],
"targets": [
4
],
"edgeType": "PROJECTION"
},
{
"sources": [],
"targets": [
5
],
"edgeType": "PROJECTION"
}
],
"vertices": [
{
"id": 0,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_7.col_tbl_wlwspnwgfp_7_id"
},
{
"id": 1,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_7.col_tbl_wlwspnwgfp_7_name"
},
{
"id": 2,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_7.col_tbl_wlwspnwgfp_7_street"
},
{
"id": 3,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_7.col_tbl_wlwspnwgfp_7_city"
},
{
"id": 4,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_7.col_tbl_wlwspnwgfp_7_state"
},
{
"id": 5,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_7.col_tbl_wlwspnwgfp_7_zipcode"
}
]
}
\ No newline at end of file
{
"queryText": "with a AS (SELECT * FROM db_wlwspnwgfp.tbl_wlwspnwgfp_1), b AS (SELECT * FROM db_wlwspnwgfp.tbl_wlwspnwgfp_2), c AS (SELECT * FROM db_wlwspnwgfp.tbl_wlwspnwgfp_3), d AS (SELECT * FROM db_wlwspnwgfp.tbl_wlwspnwgfp_4), e AS (SELECT * FROM db_wlwspnwgfp.tbl_wlwspnwgfp_5), f AS (SELECT * FROM db_wlwspnwgfp.tbl_wlwspnwgfp_6) INSERT INTO table db_wlwspnwgfp.tbl_wlwspnwgfp_7 SELECT * FROM a UNION SELECT * FROM b UNION SELECT * FROM c UNION SELECT * FROM d UNION SELECT * FROM e UNION SELECT * FROM f",
"queryId": "b9423a1de88a33c3:997879c000000000",
"hash": "a6ff7959c66c23499346eef791c66439",
"user": "impala@GCE.ABCDEFGH.COM",
"timestamp": 1553528521,
"endTime": 1553528525,
"edges": [
{
"sources": [
1,
2,
3,
4,
5,
6
],
"targets": [
0
],
"edgeType": "PROJECTION"
},
{
"sources": [
8,
9,
10,
11,
12,
13
],
"targets": [
7
],
"edgeType": "PROJECTION"
},
{
"sources": [
15,
16,
17,
18,
19,
20
],
"targets": [
14
],
"edgeType": "PROJECTION"
},
{
"sources": [
22,
23,
24,
25,
26,
27
],
"targets": [
21
],
"edgeType": "PROJECTION"
},
{
"sources": [
29,
30,
31,
32,
33,
34
],
"targets": [
28
],
"edgeType": "PROJECTION"
},
{
"sources": [
36,
37,
38,
39,
40,
41
],
"targets": [
35
],
"edgeType": "PROJECTION"
}
],
"vertices": [
{
"id": 0,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_7.col_tbl_wlwspnwgfp_7_id"
},
{
"id": 1,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_1.col_tbl_wlwspnwgfp_1_id"
},
{
"id": 2,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_2.col_tbl_wlwspnwgfp_2_id"
},
{
"id": 3,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_3.col_tbl_wlwspnwgfp_3_id"
},
{
"id": 4,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_4.col_tbl_wlwspnwgfp_4_id"
},
{
"id": 5,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_5.col_tbl_wlwspnwgfp_5_id"
},
{
"id": 6,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_6.col_tbl_wlwspnwgfp_6_id"
},
{
"id": 7,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_7.col_tbl_wlwspnwgfp_7_name"
},
{
"id": 8,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_1.col_tbl_wlwspnwgfp_1_name"
},
{
"id": 9,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_2.col_tbl_wlwspnwgfp_2_name"
},
{
"id": 10,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_3.col_tbl_wlwspnwgfp_3_name"
},
{
"id": 11,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_4.col_tbl_wlwspnwgfp_4_name"
},
{
"id": 12,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_5.col_tbl_wlwspnwgfp_5_name"
},
{
"id": 13,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_6.col_tbl_wlwspnwgfp_6_name"
},
{
"id": 14,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_7.col_tbl_wlwspnwgfp_7_street"
},
{
"id": 15,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_1.col_tbl_wlwspnwgfp_1_street"
},
{
"id": 16,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_2.col_tbl_wlwspnwgfp_2_street"
},
{
"id": 17,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_3.col_tbl_wlwspnwgfp_3_street"
},
{
"id": 18,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_4.col_tbl_wlwspnwgfp_4_street"
},
{
"id": 19,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_5.col_tbl_wlwspnwgfp_5_street"
},
{
"id": 20,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_6.col_tbl_wlwspnwgfp_6_street"
},
{
"id": 21,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_7.col_tbl_wlwspnwgfp_7_city"
},
{
"id": 22,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_1.col_tbl_wlwspnwgfp_1_city"
},
{
"id": 23,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_2.col_tbl_wlwspnwgfp_2_city"
},
{
"id": 24,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_3.col_tbl_wlwspnwgfp_3_city"
},
{
"id": 25,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_4.col_tbl_wlwspnwgfp_4_city"
},
{
"id": 26,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_5.col_tbl_wlwspnwgfp_5_city"
},
{
"id": 27,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_6.col_tbl_wlwspnwgfp_6_city"
},
{
"id": 28,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_7.col_tbl_wlwspnwgfp_7_state"
},
{
"id": 29,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_1.col_tbl_wlwspnwgfp_1_state"
},
{
"id": 30,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_2.col_tbl_wlwspnwgfp_2_state"
},
{
"id": 31,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_3.col_tbl_wlwspnwgfp_3_state"
},
{
"id": 32,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_4.col_tbl_wlwspnwgfp_4_state"
},
{
"id": 33,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_5.col_tbl_wlwspnwgfp_5_state"
},
{
"id": 34,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_6.col_tbl_wlwspnwgfp_6_state"
},
{
"id": 35,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_7.col_tbl_wlwspnwgfp_7_zipcode"
},
{
"id": 36,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_1.col_tbl_wlwspnwgfp_1_zipcode"
},
{
"id": 37,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_2.col_tbl_wlwspnwgfp_2_zipcode"
},
{
"id": 38,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_3.col_tbl_wlwspnwgfp_3_zipcode"
},
{
"id": 39,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_4.col_tbl_wlwspnwgfp_4_zipcode"
},
{
"id": 40,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_5.col_tbl_wlwspnwgfp_5_zipcode"
},
{
"id": 41,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_6.col_tbl_wlwspnwgfp_6_zipcode"
}
]
}
\ No newline at end of file
{
"queryText":"create view db_1.view_1 as select count, id from db_1.table_1",
"queryId":"3a441d0c130962f8:7f634aec00000000",
"hash":"64ff0425ccdfaada53e3f2fd76f566f7",
"user":"admin",
"timestamp":1554750072,
"endTime":1554750554,
"edges":[
{
"sources":[
1
],
"targets":[
0
],
"edgeType":"PROJECTION"
},
{
"sources":[
3
],
"targets":[
2
],
"edgeType":"PROJECTION"
}
],
"vertices":[
{
"id":0,
"vertexType":"COLUMN",
"vertexId":"db_1.view_1.count"
},
{
"id":1,
"vertexType":"COLUMN",
"vertexId":"db_1.table_1.count"
},
{
"id":2,
"vertexType":"COLUMN",
"vertexId":"db_1.view_1.id"
},
{
"id":3,
"vertexType":"COLUMN",
"vertexId":"db_1.table_1.id"
},
{
"id":4,
"vertexType":"TABLE",
"vertexId":"db_1.table_1",
"createTime":1554750070
},
{
"id":5,
"vertexType":"TABLE",
"vertexId":"db_1.view_1",
"createTime":1554750072
}
]
}
\ No newline at end of file
#username=group::sha256-password
admin=ADMIN::8c6976e5b5410415bde908bd4dee15dfb167a9c873fc4bb8a81f6f2ab448a918
rangertagsync=RANGER_TAG_SYNC::e3f67240f5117d1753c940dae9eea772d36ed5fe9bd9c94a300e40413f1afb9d
{
"enumDefs": [],
"structDefs": [],
"classificationDefs": [],
"entityDefs": [
{
"name": "impala_process",
"superTypes": [
"Process"
],
"serviceType": "impala",
"typeVersion": "1.0",
"attributeDefs": [
{
"name": "startTime",
"typeName": "date",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": false,
"isUnique": false
},
{
"name": "endTime",
"typeName": "date",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": false,
"isUnique": false
},
{
"name": "userName",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": false,
"isUnique": false
},
{
"name": "operationType",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": false,
"isUnique": false
},
{
"name": "queryText",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": false,
"isUnique": false
},
{
"name": "queryPlan",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": false,
"isUnique": false
},
{
"name": "queryId",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": false,
"isUnique": false
},
{
"name": "recentQueries",
"typeName": "array<string>",
"cardinality": "LIST",
"isIndexable": false,
"isOptional": true,
"isUnique": false
},
{
"name": "clusterName",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": true,
"includeInNotification": true,
"isUnique": false
},
{
"name": "queryGraph",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": true,
"isUnique": false
}
]
},
{
"name" : "impala_column_lineage",
"superTypes" : [
"Process"
],
"serviceType": "impala",
"typeVersion" : "1.0",
"attributeDefs" : [
{
"name": "dependencyType",
"typeName": "string",
"cardinality" : "SINGLE",
"isIndexable": false,
"isOptional": false,
"isUnique": false
}
]
},
{
"name" : "impala_process_execution",
"superTypes" : [
"ProcessExecution"
],
"serviceType": "impala",
"typeVersion" : "1.0",
"attributeDefs" : [
{
"name": "startTime",
"typeName": "date",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": false,
"isUnique": false
},
{
"name": "endTime",
"typeName": "date",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": false,
"isUnique": false
},
{
"name": "userName",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": false,
"isUnique": false
},
{
"name": "queryText",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": false,
"isUnique": false
},
{
"name": "queryGraph",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": true,
"isUnique": false
},
{
"name": "queryId",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": false,
"isUnique": false
},
{
"name": "queryPlan",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": false,
"isUnique": false
},
{
"name": "hostName",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": false,
"isUnique": false
}
]
}
],
"relationshipDefs": [
{
"name": "impala_process_column_lineage",
"serviceType": "impala",
"typeVersion": "1.0",
"relationshipCategory": "COMPOSITION",
"endDef1": {
"type": "impala_column_lineage",
"name": "query",
"isContainer": false,
"cardinality": "SINGLE"
},
"endDef2": {
"type": "impala_process",
"name": "columnLineages",
"isContainer": true,
"cardinality": "SET"
},
"propagateTags": "NONE"
},
{
"name": "impala_process_process_executions",
"serviceType": "impala",
"typeVersion": "1.0",
"relationshipCategory": "COMPOSITION",
"endDef1": {
"type": "impala_process",
"name": "processExecutions",
"cardinality": "SET",
"isContainer": true
},
"endDef2": {
"type": "impala_process_execution",
"name": "process",
"cardinality": "SINGLE"
},
"propagateTags": "NONE"
}
]
}
......@@ -784,6 +784,7 @@
<module>addons/hbase-testing-util</module>
<module>addons/kafka-bridge</module>
<module>tools/classification-updater</module>
<module>addons/impala-bridge</module>
<module>distro</module>
</modules>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment