#!/usr/bin/env python # # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import getpass import os import re import platform import subprocess import sys import time import errno from re import split from time import sleep BIN = "bin" LIB = "lib" CONF = "conf" LOG = "logs" WEBAPP = "server" + os.sep + "webapp" CONFIG_SETS_CONF = "server" + os.sep + "solr" + os.sep + "configsets" + os.sep + "basic_configs" + os.sep + "conf" DATA = "data" ATLAS_CONF = "ATLAS_CONF" ATLAS_LOG = "ATLAS_LOG_DIR" ATLAS_PID = "ATLAS_PID_DIR" ATLAS_WEBAPP = "ATLAS_EXPANDED_WEBAPP_DIR" ATLAS_SERVER_OPTS = "ATLAS_SERVER_OPTS" ATLAS_OPTS = "ATLAS_OPTS" ATLAS_SERVER_HEAP = "ATLAS_SERVER_HEAP" ATLAS_DATA = "ATLAS_DATA_DIR" ATLAS_HOME = "ATLAS_HOME_DIR" HBASE_CONF_DIR = "HBASE_CONF_DIR" MANAGE_LOCAL_HBASE = "MANAGE_LOCAL_HBASE" MANAGE_LOCAL_SOLR = "MANAGE_LOCAL_SOLR" SOLR_BIN = "SOLR_BIN" SOLR_CONF = "SOLR_CONF" SOLR_PORT = "SOLR_PORT" DEFAULT_SOLR_PORT = "9838" SOLR_SHARDS = "SOLR_SHARDS" DEFAULT_SOLR_SHARDS = "1" SOLR_REPLICATION_FACTOR = "SOLR_REPLICATION_FACTOR" DEFAULT_SOLR_REPLICATION_FACTOR = "1" ENV_KEYS = ["JAVA_HOME", ATLAS_OPTS, ATLAS_SERVER_OPTS, ATLAS_SERVER_HEAP, ATLAS_LOG, ATLAS_PID, ATLAS_CONF, "ATLASCPPATH", ATLAS_DATA, ATLAS_HOME, ATLAS_WEBAPP, HBASE_CONF_DIR, SOLR_PORT] IS_WINDOWS = platform.system() == "Windows" ON_POSIX = 'posix' in sys.builtin_module_names CONF_FILE="atlas-application.properties" HBASE_STORAGE_CONF_ENTRY="atlas.graph.storage.backend\s*=\s*hbase" HBASE_STORAGE_LOCAL_CONF_ENTRY="atlas.graph.storage.hostname\s*=\s*localhost" SOLR_INDEX_CONF_ENTRY="atlas.graph.index.search.backend\s*=\s*solr5" SOLR_INDEX_LOCAL_CONF_ENTRY="atlas.graph.index.search.solr.zookeeper-url\s*=\s*localhost" SOLR_INDEX_ZK_URL="atlas.graph.index.search.solr.zookeeper-url" TOPICS_TO_CREATE="atlas.notification.topics" DEBUG = False def scriptDir(): """ get the script path """ return os.path.dirname(os.path.realpath(__file__)) def atlasDir(): home = os.path.dirname(scriptDir()) return os.environ.get(ATLAS_HOME, home) def libDir(dir) : return os.path.join(dir, LIB) def confDir(dir): localconf = os.path.join(dir, CONF) return os.environ.get(ATLAS_CONF, localconf) def hbaseBinDir(dir): return os.path.join(dir, "hbase", BIN) def hbaseConfDir(dir): return os.environ.get(HBASE_CONF_DIR, os.path.join(dir, "hbase", CONF)) def solrBinDir(dir): return os.environ.get(SOLR_BIN, os.path.join(dir, "solr", BIN)) def solrConfDir(dir): return os.environ.get(SOLR_CONF, os.path.join(dir, "solr", CONFIG_SETS_CONF)) def solrPort(): return os.environ.get(SOLR_PORT, DEFAULT_SOLR_PORT) def solrShards(): return os.environ.get(SOLR_SHARDS, DEFAULT_SOLR_SHARDS) def solrReplicationFactor(): return os.environ.get(SOLR_REPLICATION_FACTOR, DEFAULT_SOLR_REPLICATION_FACTOR) def logDir(dir): localLog = os.path.join(dir, LOG) return os.environ.get(ATLAS_LOG, localLog) def pidFile(dir): localPid = os.path.join(dir, LOG) return os.path.join(os.environ.get(ATLAS_PID, localPid), 'atlas.pid') def dataDir(dir): data = os.path.join(dir, DATA) return os.environ.get(ATLAS_DATA, data) def webAppDir(dir): webapp = os.path.join(dir, WEBAPP) return os.environ.get(ATLAS_WEBAPP, webapp) def kafkaTopicSetupDir(homeDir): return os.path.join(homeDir, "hook", "kafka-topic-setup") def expandWebApp(dir): webappDir = webAppDir(dir) webAppMetadataDir = os.path.join(webappDir, "atlas") d = os.sep if not os.path.exists(os.path.join(webAppMetadataDir, "WEB-INF")): try: os.makedirs(webAppMetadataDir) except OSError as e: if e.errno != errno.EEXIST: raise e pass atlasWarPath = os.path.join(atlasDir(), "server", "webapp", "atlas.war") if (isCygwin()): atlasWarPath = convertCygwinPath(atlasWarPath) os.chdir(webAppMetadataDir) jar(atlasWarPath) def dirMustExist(dirname): if not os.path.exists(dirname): os.mkdir(dirname) return dirname def executeEnvSh(confDir): envscript = '%s/atlas-env.sh' % confDir if not IS_WINDOWS and os.path.exists(envscript): envCmd = 'source %s && env' % envscript command = ['bash', '-c', envCmd] proc = subprocess.Popen(command, stdout = subprocess.PIPE) for line in proc.stdout: (key, _, value) = line.strip().partition("=") if key in ENV_KEYS: os.environ[key] = value proc.communicate() def java(classname, args, classpath, jvm_opts_list, logdir=None): java_home = os.environ.get("JAVA_HOME", None) if java_home: prg = os.path.join(java_home, "bin", "java") else: prg = which("java") if prg is None: raise EnvironmentError('The java binary could not be found in your path or JAVA_HOME') commandline = [prg] commandline.extend(jvm_opts_list) commandline.append("-classpath") commandline.append(classpath) commandline.append(classname) commandline.extend(args) return runProcess(commandline, logdir) def jar(path): java_home = os.environ.get("JAVA_HOME", None) if java_home: prg = os.path.join(java_home, "bin", "jar") else: prg = which("jar") if prg is None: raise EnvironmentError('The jar binary could not be found in your path or JAVA_HOME') commandline = [prg] commandline.append("-xf") commandline.append(path) process = runProcess(commandline) process.wait() def is_exe(fpath): return os.path.isfile(fpath) and os.access(fpath, os.X_OK) def which(program): fpath, fname = os.path.split(program) if fpath: if is_exe(program): return program else: for path in os.environ["PATH"].split(os.pathsep): path = path.strip('"') exe_file = os.path.join(path, program) if is_exe(exe_file): return exe_file return None def runProcess(commandline, logdir=None, shell=False, wait=False): """ Run a process :param commandline: command line :return:the return code """ global finished debug ("Executing : %s" % str(commandline)) timestr = time.strftime("atlas.%Y%m%d-%H%M%S") stdoutFile = None stderrFile = None if logdir: stdoutFile = open(os.path.join(logdir, timestr + ".out"), "w") stderrFile = open(os.path.join(logdir,timestr + ".err"), "w") p = subprocess.Popen(commandline, stdout=stdoutFile, stderr=stderrFile, shell=shell) if wait: p.communicate() return p def print_output(name, src, toStdErr): """ Relay the output stream to stdout line by line :param name: :param src: source stream :param toStdErr: flag set if stderr is to be the dest :return: """ global needPassword debug ("starting printer for %s" % name ) line = "" while not finished: (line, done) = read(src, line) if done: out(toStdErr, line + "\n") flush(toStdErr) if line.find("Enter password for") >= 0: needPassword = True line = "" out(toStdErr, line) # closedown: read remainder of stream c = src.read(1) while c!="" : c = c.decode('utf-8') out(toStdErr, c) if c == "\n": flush(toStdErr) c = src.read(1) flush(toStdErr) src.close() def read_input(name, exe): """ Read input from stdin and send to process :param name: :param process: process to send input to :return: """ global needPassword debug ("starting reader for %s" % name ) while not finished: if needPassword: needPassword = False if sys.stdin.isatty(): cred = getpass.getpass() else: cred = sys.stdin.readline().rstrip() exe.stdin.write(cred + "\n") def debug(text): if DEBUG: print '[DEBUG] ' + text def error(text): print '[ERROR] ' + text sys.stdout.flush() def info(text): print text sys.stdout.flush() def out(toStdErr, text) : """ Write to one of the system output channels. This action does not add newlines. If you want that: write them yourself :param toStdErr: flag set if stderr is to be the dest :param text: text to write. :return: """ if toStdErr: sys.stderr.write(text) else: sys.stdout.write(text) def flush(toStdErr) : """ Flush the output stream :param toStdErr: flag set if stderr is to be the dest :return: """ if toStdErr: sys.stderr.flush() else: sys.stdout.flush() def read(pipe, line): """ read a char, append to the listing if there is a char that is not \n :param pipe: pipe to read from :param line: line being built up :return: (the potentially updated line, flag indicating newline reached) """ c = pipe.read(1) if c != "": o = c.decode('utf-8') if o != '\n': line += o return line, False else: return line, True else: return line, False def writePid(atlas_pid_file, process): f = open(atlas_pid_file, 'w') f.write(str(process.pid)) f.close() def exist_pid(pid): if ON_POSIX: #check if process id exist in the current process table #See man 2 kill - Linux man page for info about the kill(pid,0) system function try: os.kill(pid, 0) except OSError as e : return e.errno == errno.EPERM else: return True elif IS_WINDOWS: #The os.kill approach does not work on Windows with python 2.7 #the output from tasklist command is searched for the process id pidStr = str(pid) command='tasklist /fi "pid eq %s"' % pidStr sub_process=subprocess.Popen(command, stdout = subprocess.PIPE, shell=False) sub_process.communicate() output = subprocess.check_output(command) output=split(" *",output) for line in output: if pidStr in line: return True return False #os other than nt or posix - not supported - need to delete the file to restart server if pid no longer exist return True def wait_for_shutdown(pid, msg, wait): count = 0 sys.stdout.write(msg) while exist_pid(pid): sys.stdout.write('.') sys.stdout.flush() sleep(1) if count > wait: break count = count + 1 sys.stdout.write('\n') def is_hbase(confdir): confdir = os.path.join(confdir, CONF_FILE) return grep(confdir, HBASE_STORAGE_CONF_ENTRY) is not None def is_hbase_local(confdir): if os.environ.get(MANAGE_LOCAL_HBASE, "False").lower() == 'false': return False confdir = os.path.join(confdir, CONF_FILE) return grep(confdir, HBASE_STORAGE_CONF_ENTRY) is not None and grep(confdir, HBASE_STORAGE_LOCAL_CONF_ENTRY) is not None def run_hbase_action(dir, action, hbase_conf_dir = None, logdir = None, wait=True): if IS_WINDOWS: if action == 'start': hbaseScript = 'start-hbase.cmd' else: hbaseScript = 'stop-hbase.cmd' if hbase_conf_dir is not None: cmd = [os.path.join(dir, hbaseScript), '--config', hbase_conf_dir] else: cmd = [os.path.join(dir, hbaseScript)] else: hbaseScript = 'hbase-daemon.sh' if hbase_conf_dir is not None: cmd = [os.path.join(dir, hbaseScript), '--config', hbase_conf_dir, action, 'master'] else: cmd = [os.path.join(dir, hbaseScript), action, 'master'] return runProcess(cmd, logdir, False, wait) def is_solr(confdir): confdir = os.path.join(confdir, CONF_FILE) return grep(confdir, SOLR_INDEX_CONF_ENTRY) is not None def is_solr_local(confdir): if os.environ.get(MANAGE_LOCAL_SOLR, "False").lower() == 'false': return False confdir = os.path.join(confdir, CONF_FILE) return grep(confdir, SOLR_INDEX_CONF_ENTRY) is not None and grep(confdir, SOLR_INDEX_LOCAL_CONF_ENTRY) is not None def get_solr_zk_url(confdir): confdir = os.path.join(confdir, CONF_FILE) return getConfig(confdir, SOLR_INDEX_ZK_URL) def get_topics_to_create(confdir): confdir = os.path.join(confdir, CONF_FILE) topic_list = getConfig(confdir, TOPICS_TO_CREATE) if topic_list is not None: topics = topic_list.split(",") else: topics = ["ATLAS_HOOK", "ATLAS_ENTITIES"] return topics def run_solr(dir, action, zk_url = None, port = None, logdir = None, wait=True): solrScript = "solr" if IS_WINDOWS: solrScript = "solr.cmd" if zk_url is None: if port is None: cmd = [os.path.join(dir, solrScript), action] else: cmd = [os.path.join(dir, solrScript), action, '-p', str(port)] else: if port is None: cmd = [os.path.join(dir, solrScript), action, '-z', zk_url] else: cmd = [os.path.join(dir, solrScript), action, '-z', zk_url, '-p', port] return runProcess(cmd, logdir, False, wait) def create_solr_collection(dir, confdir, index, logdir = None, wait=True): solrScript = "solr" if IS_WINDOWS: solrScript = "solr.cmd" cmd = [os.path.join(dir, solrScript), 'create', '-c', index, '-d', confdir, '-shards', solrShards(), '-replicationFactor', solrReplicationFactor()] return runProcess(cmd, logdir, False, wait) def configure_hbase(dir): env_conf_dir = os.environ.get(HBASE_CONF_DIR) conf_dir = os.path.join(dir, "hbase", CONF) tmpl_dir = os.path.join(dir, CONF, "hbase") data_dir = dataDir(atlasDir()) if env_conf_dir is None or env_conf_dir == conf_dir: hbase_conf_file = "hbase-site.xml" tmpl_file = os.path.join(tmpl_dir, hbase_conf_file + ".template") if IS_WINDOWS: url_prefix="file:///" else: url_prefix="file://" conf_file = os.path.join(conf_dir, hbase_conf_file) if os.path.exists(tmpl_file): debug ("Configuring " + tmpl_file + " to " + conf_file) f = open(tmpl_file,'r') template = f.read() f.close() config = template.replace("${hbase_home}", dir) config = config.replace("${atlas_data}", data_dir) config = config.replace("${url_prefix}", url_prefix) f = open(conf_file,'w') f.write(config) f.close() os.remove(tmpl_file) def server_already_running(pid): print "Atlas server is already running under process %s" % pid sys.exit() def server_pid_not_running(pid): print "The Server is no longer running with pid %s" %pid def grep(file, value): for line in open(file).readlines(): if re.match(value, line): return line return None def getConfig(file, key): key = key + "\s*=" for line in open(file).readlines(): if re.match(key, line): return line.split('=')[1].strip() return None def isCygwin(): return platform.system().startswith("CYGWIN") # Convert the specified cygwin-style pathname to Windows format, # using the cygpath utility. By default, path is assumed # to be a file system pathname. If isClasspath is True, # then path is treated as a Java classpath string. def convertCygwinPath(path, isClasspath=False): if (isClasspath): cygpathArgs = ["cygpath", "-w", "-p", path] else: cygpathArgs = ["cygpath", "-w", path] windowsPath = subprocess.Popen(cygpathArgs, stdout=subprocess.PIPE).communicate()[0] windowsPath = windowsPath.strip() return windowsPath