#!python3
# Copyright 2007-2017 Gemr. All Rights Reserved.
# Licensed to MIT see LICENSE.txt
import os
import pathlib
import subprocess
import sys
import winreg
import psutil
import time
import warnings
import gdeps as GDeps
__author__ = 'Suryavarman (http://sourceforge.net/u/suryavarman/profile/)'
[docs]def call(inCommand, inCheckCall=True, inEnv=os.environ.copy()):
""" https://docs.python.org/3/library/subprocess.html#subprocess.check_output
https://docs.python.org/3/library/subprocess.html
https://stackoverflow.com/questions/2502833/store-output-of-subprocess-popen-call-in-a-string
https://stackoverflow.com/questions/2715847/python-read-streaming-input-from-subprocess-communicate
https://stackoverflow.com/questions/9960133/substract-values-two-dictionaries-python
:return: returncode, returnstdoutlines: This a code error and an array of the output lines.
:rtype: int, array of str
.. note:
`How to create a log file in MS-DOS?<https://answers.yahoo.com/question/index?qid=20070907140737AApyEpH>`_
"""
print("——————————————————————————————————————————————————————————————————————")
print("GDeps.call :", file=sys.stderr)
print("command : ")
print("{", file=sys.stderr)
print(inCommand)
print("}", file=sys.stderr)
print("")
print("Output : ")
print("{", file=sys.stderr)
returncode = -1
returnstdoutlines = []
def popen(inProcess):
for line in inProcess.stdout:
print(line, end='')
for aLine in line.splitlines():
returnstdoutlines.append(aLine)
inProcess.wait()
if inProcess.returncode != 0:
print("returned non-zero exit status", inProcess.returncode, file=sys.stderr)
return inProcess.returncode
try:
if inEnv:
with subprocess.Popen(inCommand, env=inEnv, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=1, universal_newlines=True, shell=True) as aProcess:
returncode = popen(aProcess)
else:
with subprocess.Popen(inCommand, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=1, universal_newlines=True, shell=True) as aProcess:
returncode = popen(aProcess)
except OSError as aError:
print("Execution failed:", aError, file=sys.stderr)
print("}", file=sys.stderr)
print("")
print("——————————————————————————————————————————————————————————————————————")
return returncode, returnstdoutlines
[docs]def callWithLogFile(inCommand, inLogFilePath='', inOverwriteLog=False, inEnv=os.environ.copy()):
aLogContent = ''
if inLogFilePath:
if not inOverwriteLog:
if not os.path.exists(inLogFilePath):
# To avoid an error if the file doesn't exist.
# Because it's suppose to be create.
with open(inLogFilePath, 'w'):
pass
aLogContent = GDeps.getLogFileContent(inLogFilePath)
inCommand += ' > ' + '"' + inLogFilePath + '" 2>&1'
aResult = call(inCommand, False, inEnv)
if inLogFilePath and not inOverwriteLog:
GDeps.pushFrontLogFileContent(inLogFilePath, aLogContent)
return aResult
[docs]def getDirectories(inDirectory):
""" Recursive function to return all the directories.
:param inDirectory: Path of the parent directory.
:type inDirectory: str
:return: the list directories inside inDirectory
:rtype: array of str
"""
aDirectories = [os.path.normpath(inDirectory)]
aPaths = os.listdir(aDirectories[0])
for aPath in aPaths:
aFullPath = os.path.normpath(inDirectory + r'\\' + aPath)
if os.path.isdir(aFullPath):
aDirectories += getDirectories(aFullPath)
return aDirectories
[docs]def getPaths(inDirectory):
"""Recursive function to list every files and directories inside inDirectory
:return: List of un-ignoring paths
:rtype: array of str
"""
aPaths = [os.path.normpath(inDirectory)]
aDirectories = os.listdir(aPaths[0])
for aDirectory in aDirectories:
aPath = os.path.normpath(inDirectory + r'\\' + aDirectory)
if os.path.isdir(aPath):
aPaths += getPaths(aPath)
else:
aPaths.append(aPath)
return aPaths
[docs]def is32bitsOs():
return sys.maxsize == 2**31 - 1
[docs]def is64bitsOs():
return sys.maxsize != 2**31 - 1
[docs]def setRegisterApp(inAppName, inDirectory):
"""
:param inAppName: example 'cl.exe'
:type inAppName: str
:param inDirectory: example r'C:\\Program Files (x86)\\Microsoft Visual Studio 12.0\\VC\\bin'
:type inDirectory: str
.. note:
`<https://docs.python.org/3.0/library/winreg.html>`_
`<https://stackoverflow.com/questions/2383970/-winreg-createkey-problem-in-python>`_
`<https://msdn.microsoft.com/en-us/library/windows/desktop/aa384129%28v=vs.85%29.aspx>`_
`<https://docs.python.org/3/library/sys.html#sys.platform>`_
`<http://www.gossamer-threads.com/lists/python/dev/235919>`_
`<http://www.rhyous.com/2010/10/20/adding-an-alias-in-windows-7-or-making-ls-dir-in-a-command-prompt/>`_
`<http://darkforge.blogspot.com/2010/08/permanent-windows-command-line-aliases.html>`_
"""
try:
aPath = os.path.normpath(inDirectory + r'\\' + inAppName)
aKey = r"SOFTWARE\Microsoft\Windows\CurrentVersion\App Paths"
aCmdApp = inAppName[0:-4] # remove the extension .exe
assert os.path.isfile(aPath), "Doesn't exist or is not a file but a directory : " + aPath
print(aPath)
print(inAppName)
print(aCmdApp)
with winreg.CreateKey(winreg.HKEY_CURRENT_USER, aKey) as aPathKey:
print(' Key "' + aKey + '" value : ' + str(aPathKey))
with winreg.CreateKey(aPathKey, inAppName) as aAppKey:
print(' Key "' + aKey + r'\\' + inAppName + '" value : ' + str(aAppKey))
winreg.SetValue(aAppKey, '', winreg.REG_SZ, aPath)
winreg.SetValueEx(aAppKey, 'Path', 0, winreg.REG_SZ, inDirectory)
#aKey = r"SOFTWARE\Microsoft\Command Processor"
#with winreg.CreateKey(winreg.HKEY_CURRENT_USER, aKey) as aPathKey:
# print(' Key "' + aKey + '" value : ' + str(aPathKey))
# print( winreg.SetValueEx( aPathKey, aCmdApp, 0, winreg.REG_SZ, aPath ) )
aBatFilePath = os.path.normpath(aCmdApp + '.bat')
if not os.path.isfile(aBatFilePath):
with open(aBatFilePath, 'w') as aBat:
aBat.write('SET RealCMDPath="' + aPath + '"')
call('doskey ' + aCmdApp + '= "' + aPath + '"')
except EnvironmentError as aError:
print("Execution failed:", aError, file=sys.stderr)
[docs]def createLocalCommandLine(inAppName, inAppFullPath, inDestDirectory):
"""
If some application like nmake use the command cl to call C:\\Program Files (x86)\\Microsoft Visual Studio 12.0
\\VC\\bin\\cl.exe. You can use this function to create cl.bat in the working directory.
:param inAppName: The command line name : cl for the cl.exe
:type inAppName: str
:param inAppFullPath: The full path of the application : r'C:\\Program Files (x86)\\Microsoft Visual Studio 12.0
\\VC\\bin\\cl.exe'
:type inAppFullPath: str
:param inDestDirectory: The destination Directory where a inAppName.bat will be create.
:type inDestDirectory: str
.. note:
You cannot specify a new drive or path for the destination of the renamed file.
`Adding alias <http://www.rhyous.com/2010/10/20/adding-an-alias-in-windows-7-or-making-ls-dir-in-a-command-prompt/>`_
"""
deleteLocalCommandLine(inAppName, inAppFullPath, inDestDirectory)
aBatFilePath = os.path.normpath(inDestDirectory + r'\\' + inAppName + '.bat')
with open(aBatFilePath, 'w') as aBat:
aBat.write('"' + inAppFullPath + r'" %*\n')
[docs]def deleteLocalCommandLine(inAppName, inAppFullPath, inDestDirectory):
""" Call this function to clean the bat file generate by createLocalCommandLine
:param inAppName: The command line name : cl for the cl.exe
:type inAppName: str
:param inAppFullPath: The full path of the application : r'C:\\Program Files (x86)\\Microsoft Visual Studio 12.0
\\VC\\bin\\cl.exe'
:type inAppFullPath: str
:param inDestDirectory: The destination Directory where a inAppName.bat will be create.
:type inDestDirectory: str
"""
aBatFilePath = os.path.normpath(inDestDirectory + r'\\' + inAppName + '.bat')
if os.path.exists(aBatFilePath):
assert os.path.isfile(aBatFilePath), 'the path: ' + aBatFilePath + " already exist and it's a directory."
os.remove(aBatFilePath)
assert not os.path.isfile(aBatFilePath), 'the file: ' + aBatFilePath + ' already exist.'
[docs]def rmdir(inDir, inLogFilePath='', inWaitUnlock=False, inWaitTime=10):
""" Use dos to remove the directory.
:param inDir: The directory to be remove
:type inDir: str
:param inLogFilePath: If it isn't empty. The output will be redirect to the log file.
:type inLogFilePath: str
:param inWaitUnlock: if it's True we will wait inWaitTime seconds the inSource unlock to remove the directory. After
this time in any case we will try to remove. Every seconds we will test if the folder has
been unlock.
:type inWaitUnlock: bool
:param inWaitTime: While the inWaitTime timer has not expired we will check every seconds if the folder has
been unlock. The minimum value is 1s
:type inWaitTime: int [1: MAX_INT]
:return: The GDeps.call result
:rtype: int, array of str
"""
if os.path.exists(inDir):
aRemoveFolderCommand = 'rmdir /S /Q ' + inDir
if inWaitUnlock:
waitWhilePathIsLock(inDir, inWaitTime)
return callWithLogFile(aRemoveFolderCommand, inLogFilePath)
else:
return 0, []
[docs]def rename(inPath, inNewName, inLogFilePath='', inWaitUnlock=False, inWaitTime=10):
""" Use dos to rename a file, a directory...
:param inPath: The directory to be rename
:type inPath: str
:param inNewName: the new name
:type inNewName: str
:param inLogFilePath: If it isn't empty. The output will be redirect to the log file.
:type inLogFilePath: str
:param inWaitUnlock: if it's True we will wait inWaitTime seconds the inSource unlock to rename. After
this time in any case we will try to rename. Every seconds we will test if the folder has
been unlock.
:type inWaitUnlock: bool
:param inWaitTime: While the inWaitTime timer has not expired we will check every seconds if the folder has
been unlock. The minimum value is 1s
:type inWaitTime: int [1: MAX_INT]
:return: The GDeps.call result
:rtype: int, array of str
.. note:
You cannot specify a new drive or path for the destination of the renamed file.
`RENAME <http://www.computerhope.com/renamehl.htm>`_
"""
if inWaitUnlock:
waitWhilePathIsLock(inPath, inWaitTime)
aCommand = 'rename "' + inPath + '" "' + inNewName + '"'
return callWithLogFile(aCommand, inLogFilePath)
[docs]def copy(inSource, inDestination, inLogFilePath='', inWaitUnlock=False, inWaitTime=10):
"""
Use dos to copy a directory.
:param inSource: The directory to be rename
:type inSource: str
:param inDestination: the destination directory path
:type inDestination: str
:param inLogFilePath: If it isn't empty. The output will be redirect to the log file.
:type inLogFilePath: str
:param inWaitUnlock: if it's True we will wait inWaitTime seconds the inSource unlock to copy past. After
this time in any case we will try the copy. Every seconds we will test if the folder has
been unlock.
:type inWaitUnlock: bool
:param inWaitTime: While the inWaitTime timer has not expired we will check every seconds if the folder has
been unlock. The minimum value is 1s
:type inWaitTime: int [1: MAX_INT]
:return: The GDeps.call result
:rtype: int, array of str
.. note:
`XCOPY <http://www.computerhope.com/xcopyhlp.htm>`_
`COPY <https://www.computerhope.com/copyhlp.htm>`_
"""
# print('inWaitUnlock = ' + str(inWaitUnlock))
# print('inWaitTime = ' + str(inWaitTime))
# print('inSource = ' + inSource)
# print('inDestination = ' + inDestination)
if inWaitUnlock:
waitWhilePathIsLock(inSource, inWaitTime)
waitWhilePathIsLock(inDestination, inWaitTime)
if os.path.isfile(inSource):
aCommand = 'copy "' + inSource + '" "' + inDestination + '" /Y /V'
else:
aCommand = 'xcopy "' + inSource + '" "' + inDestination + '" /H /K /Y /E /I'
return callWithLogFile(aCommand, inLogFilePath)
[docs]def getParentDir(inDirectory):
"""
:param inDirectory: The children directory
:type inDirectory: str
:return: The absolute path of the parent directory.
:rtype: str
"""
aChildAbsPath = os.path.abspath(inDirectory)
return aChildAbsPath[:-len(pathlib.Path(aChildAbsPath).name)]
[docs]def fileIsLock(inPath):
""" Test if the file is using by another process
:param inPath: the directory file
:type inPath: str
:return: False if the file is lock by an other process.
:rtype: bool
"""
aProcessList = psutil.Process()
aOpenFileList = os.path.normpath(str(aProcessList.open_files()))
return aOpenFileList.find(os.path.normpath(inPath)) != -1
[docs]def folderHasFilesLock(inPath):
""" Test if every files inside the folder are using by another process
:param inPath: the directory file
:type inPath: str
:return: False if the folder has a locked file by an other process.
:rtype: bool
"""
aPaths = getPaths(inPath)
aProcessList = psutil.Process()
aOpenFileList = os.path.normpath(str(aProcessList.open_files()))
# print(aPaths)
for aFilePath in aPaths:
if os.path.isfile(aFilePath) and aOpenFileList.find(os.path.normpath(aFilePath)) != -1:
return True
return False
[docs]def waitWhilePathIsLock(inPath, inWaitTime):
""" During the inWaiTime seconds will wait the unblocking of inPath.
:param inPath: The directory or the file that should be test.
:type inPath: str
:param inWaitTime: While the inWaitTime timer has not expired we will check every seconds if the folder has
been unlock. The minimum value is 1s
:type inWaitTime: int [1: MAX_INT]
:return: If the path is a folder : return False if the folder has a locked file by an other process.
If the path is file : return False if the file is locke.
:rtype: bool
"""
i = 0
aIsDirectory = os.path.exists(inPath) and os.path.isdir(inPath)
aIsFile = os.path.exists(inPath) and os.path.isfile(inPath)
if aIsDirectory:
while i < inWaitTime and folderHasFilesLock(inPath):
i += 1
time.sleep(1)
print('wait copy folder')
elif aIsFile:
while i < inWaitTime and fileIsLock(inPath):
i += 1
time.sleep(1)
print('wait copy file')
aResult = (not folderHasFilesLock(inPath)) if aIsDirectory else not fileIsLock(inPath) if aIsFile else True
if not aResult:
if aIsDirectory:
warnings.warn('After ' + str(inWaitTime) + ' seconds the directory stay blocked.')
elif aIsFile:
warnings.warn('After ' + str(inWaitTime) + ' seconds the file stay blocked.')
else:
warnings.warn('The path ' + inPath + 'is not a file or a folder and after ' + inWaitTime + ' seconds is stay blocked.')
return aResult
[docs]class call_Report(GDeps.Reporting):
def __init__(self):
GDeps.Reporting.__init__(self)
self.m_ErrorRegexp = r"(.*is not recognized.*|.*Files was unexpected.*)"
[docs]class copy_Report(GDeps.Reporting):
def __init__(self):
GDeps.Reporting.__init__(self)
self.m_ErrorRegexp = r"(.*File not found.*)"
self.m_WarningRegexp = r"(.*cannot access.*)"
[docs]class rename_Report(GDeps.Reporting):
"""
.. todo:: has to be test
"""
def __init__(self):
GDeps.Reporting.__init__(self)
self.m_ErrorRegexp = r"(.*File not found.*)"
self.m_WarningRegexp = r"(.*cannot access.*)"
[docs]class rmdir_Report(GDeps.Reporting):
def __init__(self):
GDeps.Reporting.__init__(self)
self.m_ErrorRegexp = r"(.*The system cannot find the file specified.*)"