From 57aa4f8a00e2e545c9341af79913aadcadaa3964 Mon Sep 17 00:00:00 2001 From: xeals Date: Thu, 13 Mar 2025 12:19:30 +1100 Subject: [PATCH] Port rulegen.py to py3 and logging --- rulegen.py | 210 ++++++++++++++++++++++++++--------------------------- 1 file changed, 105 insertions(+), 105 deletions(-) diff --git a/rulegen.py b/rulegen.py index 492f31b..970f600 100755 --- a/rulegen.py +++ b/rulegen.py @@ -12,19 +12,23 @@ # # Please see the attached LICENSE file for additional licensing information. -import sys -import re -import time -import operator -import enchant - -from optparse import OptionParser, OptionGroup - from collections import Counter - -import subprocess - +from optparse import OptionParser, OptionGroup +import enchant import multiprocessing +import operator +import re +import subprocess +import sys +import time +import logging + +logging.VERBOSE = 15 +logging.addLevelName(logging.VERBOSE, "VERBOSE") +logging.Logger.verbose = lambda self, msg, *args, **kwargs: \ + self.log(logging.VERBOSE, msg, *args, **kwargs) + +LOGGER = logging.getLogger("rulegen") VERSION = "0.0.4" @@ -61,8 +65,6 @@ class RuleGen: self.brute_rules = False # Debugging options - self.verbose = False - self.debug = False self.word = None # Custom word to use. self.quiet = False @@ -75,10 +77,10 @@ class RuleGen: ######################################################################## # Preanalysis Password Patterns self.password_pattern = dict() - self.password_pattern["insertion"] = re.compile('^[^a-z]*(?P.+?)[^a-z]*$', re.IGNORECASE) - self.password_pattern["email"] = re.compile('^(?P.+?)@[A-Z0-9.-]+\.[A-Z]{2,4}', re.IGNORECASE) - self.password_pattern["alldigits"] = re.compile('^(\d+)$', re.IGNORECASE) - self.password_pattern["allspecial"]= re.compile('^([^a-z0-9]+)$', re.IGNORECASE) + self.password_pattern["insertion"] = re.compile(r'^[^a-z]*(?P.+?)[^a-z]*$', re.IGNORECASE) + self.password_pattern["email"] = re.compile(r'^(?P.+?)@[A-Z0-9.-]+\.[A-Z]{2,4}', re.IGNORECASE) + self.password_pattern["alldigits"] = re.compile(r'^(\d+)$', re.IGNORECASE) + self.password_pattern["allspecial"]= re.compile(r'^([^a-z0-9]+)$', re.IGNORECASE) ######################################################################## # Hashcat Rules Engine @@ -172,9 +174,9 @@ class RuleGen: matrix = [] # Generate and populate the initial matrix - for i in xrange(len(password) + 1): + for i in range(len(password) + 1): matrix.append([]) - for j in xrange(len(word) + 1): + for j in range(len(word) + 1): if i == 0: matrix[i].append(j) elif j == 0: @@ -183,8 +185,8 @@ class RuleGen: matrix[i].append(0) # Calculate edit distance for each substring - for i in xrange(1,len(password) + 1): - for j in xrange(1,len(word) + 1): + for i in range(1,len(password) + 1): + for j in range(1,len(word) + 1): if password[i-1] == word[j-1]: matrix[i][j] = matrix[i-1][j-1] else: @@ -205,7 +207,7 @@ class RuleGen: if not s1: return len(s2) - previous_row = xrange(len(s2) + 1) + previous_row = range(len(s2) + 1) for i, c1 in enumerate(s1): current_row = [i + 1] for j, c2 in enumerate(s2): @@ -219,11 +221,11 @@ class RuleGen: def levenshtein_print(self,matrix,word,password): """ Print word X password matrix """ - print " %s" % " ".join(list(word)) + print(" %s" % " ".join(list(word))) for i,row in enumerate(matrix): - if i == 0: print " ", - else: print password[i-1], - print " ".join("%2d" % col for col in row) + if i == 0: print(" "), + else: print(password[i-1]), + print(" ".join("%2d" % col for col in row)) def generate_levenshtein_rules(self, word, password): """ Generates levenshtein rules. Returns a list of lists of levenshtein rules. """ @@ -254,7 +256,7 @@ class RuleGen: cost = matrix[i][j] # Calculate minimum cost of each operation - cost_delete = cost_insert = cost_equal_or_replace = sys.maxint + cost_delete = cost_insert = cost_equal_or_replace = sys.maxsize if i > 0: cost_insert = matrix[i-1][j] if j > 0: cost_delete = matrix[i][j-1] if i > 0 and j > 0: cost_equal_or_replace = matrix[i-1][j-1] @@ -285,7 +287,7 @@ class RuleGen: def generate_words(self,password): """ Generate source word candidates.""" - if self.debug: print "[*] Generating source words for %s" % password + LOGGER.debug("[*] Generating source words for %s" % password) words = list() words_collection = list() @@ -319,8 +321,8 @@ class RuleGen: suggestions.append(suggestion) if len(suggestions) != len(set(suggestions)): - print sorted(suggestions) - print sorted(set(suggestions)) + print(sorted(suggestions)) + print(sorted(set(suggestions))) for suggestion in suggestions: @@ -346,23 +348,20 @@ class RuleGen: best_found_distance = word["distance"] elif word["distance"] > best_found_distance: - if self.verbose: - print "[-] %s => {edit distance suboptimal: %d (%d)} => %s" % \ - (word["suggestion"], word["distance"], best_found_distance, word["password"]) + LOGGER.verbose("[-] %s => {edit distance suboptimal: %d (%d)} => %s" % \ + (word["suggestion"], word["distance"], best_found_distance, word["password"])) break # Filter words with too big edit distance if word["distance"] <= self.max_word_dist: - if self.debug: - print "[+] %s => {edit distance: %d (%d)} = > %s" % \ - (word["suggestion"], word["distance"],best_found_distance, word["password"]) + LOGGER.debug("[+] %s => {edit distance: %d (%d)} = > %s" % \ + (word["suggestion"], word["distance"],best_found_distance, word["password"])) words_collection.append(word) else: - if self.verbose: - print "[-] %s => {max distance exceeded: %d (%d)} => %s" % \ - (word["suggestion"], word["distance"], self.max_word_dist, word["password"]) + LOGGER.verbose("[-] %s => {max distance exceeded: %d (%d)} => %s" % \ + (word["suggestion"], word["distance"], self.max_word_dist, word["password"])) if self.max_words: words_collection = words_collection[:self.max_words] @@ -399,7 +398,7 @@ class RuleGen: else: preanalysis_password += c password = preanalysis_password - if self.debug: "[*] Preanalysis Password: %s" % password + LOGGER.debug("[*] Preanalysis Password: %s" % password) return self.enchant.suggest(password) @@ -433,9 +432,9 @@ class RuleGen: hashcat_rule = self.generate_advanced_hashcat_rules(suggestion, lev_rule, password) if hashcat_rule == None: - print "[!] Processing FAILED: %s => ;( => %s" % (suggestion,password) - print " Sorry about that, please report this failure to" - print " the developer: iphelix [at] thesprawl.org" + LOGGER.warning("[!] Processing FAILED: %s => ;( => %s" % (suggestion,password)) + LOGGER.warning(" Sorry about that, please report this failure to") + LOGGER.warning(" the developer: iphelix [at] thesprawl.org") else: hashcat_rules.append(hashcat_rule) @@ -453,9 +452,8 @@ class RuleGen: best_found_rule_length = rule_length elif rule_length > best_found_rule_length: - if self.verbose: - print "[-] %s => {best rule length exceeded: %d (%d)} => %s" % \ - (suggestion, rule_length, best_found_rule_length, password) + LOGGER.verbose("[-] %s => {best rule length exceeded: %d (%d)} => %s" % \ + (suggestion, rule_length, best_found_rule_length, password)) break if rule_length <= self.max_rule_len: @@ -467,7 +465,7 @@ class RuleGen: """ Generate basic hashcat rules using only basic insert,delete,replace rules. """ hashcat_rules = [] - if self.debug: print "[*] Simple Processing %s => %s" % (word,password) + LOGGER.debug("[*] Simple Processing %s => %s" % (word,password)) # Dynamically apply rules to the source word # NOTE: Special case were word == password this would work as well. @@ -475,7 +473,7 @@ class RuleGen: for (op,p,w) in rules: - if self.debug: print "\t[*] Simple Processing Started: %s - %s" % (word_rules, " ".join(hashcat_rules)) + LOGGER.debug("\t[*] Simple Processing Started: %s - %s" % (word_rules, " ".join(hashcat_rules))) if op == 'insert': hashcat_rules.append("i%s%s" % (self.int_to_hashcat(p),password[p])) @@ -489,20 +487,20 @@ class RuleGen: hashcat_rules.append("o%s%s" % (self.int_to_hashcat(p),password[p])) word_rules = self.hashcat_rule['o'](word_rules,p,password[p]) - if self.debug: print "\t[*] Simple Processing Ended: %s => %s => %s" % (word_rules, " ".join(hashcat_rules),password) + LOGGER.debug("\t[*] Simple Processing Ended: %s => %s => %s" % (word_rules, " ".join(hashcat_rules),password)) # Check if rules result in the correct password if word_rules == password: return hashcat_rules else: - if self.debug: print "[!] Simple Processing FAILED: %s => %s => %s (%s)" % (word," ".join(hashcat_rules),password,word_rules) + LOGGER.debug("[!] Simple Processing FAILED: %s => %s => %s (%s)" % (word," ".join(hashcat_rules),password,word_rules)) return None def generate_advanced_hashcat_rules(self,word,rules,password): """ Generate advanced hashcat rules using full range of available rules. """ hashcat_rules = [] - if self.debug: print "[*] Advanced Processing %s => %s" % (word,password) + LOGGER.debug("[*] Advanced Processing %s => %s" % (word,password)) # Dynamically apply and store rules in word_rules variable. # NOTE: Special case where word == password this would work as well. @@ -514,7 +512,7 @@ class RuleGen: for i,(op,p,w) in enumerate(rules): - if self.debug: print "\t[*] Advanced Processing Started: %s - %s" % (word_rules, " ".join(hashcat_rules)) + LOGGER.debug("\t[*] Advanced Processing Started: %s - %s" % (word_rules, " ".join(hashcat_rules))) if op == 'insert': hashcat_rules.append("i%s%s" % (self.int_to_hashcat(p),password[p])) @@ -538,7 +536,7 @@ class RuleGen: # This rule was made obsolete by a prior global replacement if word_rules[p] == password[p]: - if self.debug: print "\t[*] Advanced Processing Obsolete Rule: %s - %s" % (word_rules, " ".join(hashcat_rules)) + LOGGER.debug("\t[*] Advanced Processing Obsolete Rule: %s - %s" % (word_rules, " ".join(hashcat_rules))) # Swapping rules elif p < len(password)-1 and p < len(word_rules)-1 and word_rules[p] == password[p+1] and word_rules[p+1] == password[p]: @@ -649,7 +647,7 @@ class RuleGen: hashcat_rules.append("o%s%s" % (self.int_to_hashcat(p),password[p])) word_rules = self.hashcat_rule['o'](word_rules,p,password[p]) - if self.debug: print "\t[*] Advanced Processing Ended: %s %s" % (word_rules, " ".join(hashcat_rules)) + LOGGER.debug("\t[*] Advanced Processing Ended: %s %s" % (word_rules, " ".join(hashcat_rules))) ######################################################################## # Prefix rules @@ -718,7 +716,7 @@ class RuleGen: if word_rules == password: return hashcat_rules else: - if self.debug: print "[!] Advanced Processing FAILED: %s => %s => %s (%s)" % (word," ".join(hashcat_rules),password,word_rules) + LOGGER.debug("[!] Advanced Processing FAILED: %s => %s => %s (%s)" % (word," ".join(hashcat_rules),password,word_rules)) return None @@ -727,21 +725,21 @@ class RuleGen: # Skip all numeric passwords if password.isdigit(): - if self.verbose and not self.quiet: print "[!] %s => {skipping numeric} => %s" % (password,password) + if not self.quiet: LOGGER.verbose("[!] %s => {skipping numeric} => %s" % (password,password)) self.numeric_stats_total += 1 return False # Skip passwords with less than 25% of alpha character # TODO: Make random word detection more reliable based on word entropy. elif len([c for c in password if c.isalpha()]) < len(password)/4: - if self.verbose and not self.quiet:print "[!] %s => {skipping alpha less than 25%%} => %s" % (password,password) + if not self.quiet: LOGGER.verbose("[!] %s => {skipping alpha less than 25%%} => %s" % (password,password)) self.special_stats_total += 1 return False # Only check english ascii passwords for now # TODO: Add support for more languages. elif [c for c in password if ord(c) < 32 or ord(c) > 126]: - if self.verbose and not self.quiet: print "[!] %s => {skipping non ascii english} => %s" % (password,password) + if not self.quiet: LOGGER.verbose("[!] %s => {skipping non ascii english} => %s" % (password,password)) self.foreign_stats_total += 1 return False @@ -751,7 +749,7 @@ class RuleGen: def analyze_password(self,password, rules_queue=multiprocessing.Queue(), words_queue=multiprocessing.Queue()): """ Analyze a single password. """ - if self.verbose: print "[*] Analyzing password: %s" % password + LOGGER.verbose("[*] Analyzing password: %s" % password) words = [] @@ -799,21 +797,20 @@ class RuleGen: best_found_rule_length = rule_length elif rule_length > best_found_rule_length: - if self.verbose: - print "[-] %s => {best rule length exceeded: %d (%d)} => %s" % \ - (word["suggestion"], rule_length, best_found_rule_length, password) + LOGGER.verbose("[-] %s => {best rule length exceeded: %d (%d)} => %s" % \ + (word["suggestion"], rule_length, best_found_rule_length, password)) break if rule_length <= self.max_rule_len: hashcat_rule_str = " ".join(hashcat_rule + word["pre_rule"] or [':']) - if self.verbose: print "[+] %s => %s => %s" % (word["suggestion"], hashcat_rule_str, password) + LOGGER.verbose("[+] %s => %s => %s" % (word["suggestion"], hashcat_rule_str, password)) rules_queue.put(hashcat_rule_str) def password_worker(self,i, passwords_queue, rules_queue, words_queue): - if self.debug: print "[*] Password analysis worker [%d] started." % i + LOGGER.debug("[*] Password analysis worker [%d] started." % i) try: while True: password = passwords_queue.get() @@ -823,16 +820,16 @@ class RuleGen: self.analyze_password(password, rules_queue, words_queue) except (KeyboardInterrupt, SystemExit): - if self.debug: print "[*] Password analysis worker [%d] terminated." % i + LOGGER.debug("[*] Password analysis worker [%d] terminated." % i) - if self.debug: print "[*] Password analysis worker [%d] stopped." % i + LOGGER.debug("[*] Password analysis worker [%d] stopped." % i) def rule_worker(self, rules_queue, output_rules_filename): """ Worker to store generated rules. """ - print "[*] Saving rules to %s" % output_rules_filename + LOGGER.info("[*] Saving rules to %s" % output_rules_filename) f = open(output_rules_filename, 'w') - if self.debug: print "[*] Rule worker started." + LOGGER.debug("[*] Rule worker started.") try: while True: rule = rules_queue.get() @@ -844,17 +841,17 @@ class RuleGen: f.flush() except (KeyboardInterrupt, SystemExit): - if self.debug: print "[*] Rule worker terminated." + LOGGER.debug("[*] Rule worker terminated.") f.close() - if self.debug: print "[*] Rule worker stopped." + LOGGER.debug("[*] Rule worker stopped.") def word_worker(self, words_queue, output_words_filename): """ Worker to store generated rules. """ - print "[*] Saving words to %s" % output_words_filename + LOGGER.info("[*] Saving words to %s" % output_words_filename) f = open(output_words_filename, 'w') - if self.debug: print "[*] Word worker started." + LOGGER.debug("[*] Word worker started.") try: while True: word = words_queue.get() @@ -866,17 +863,17 @@ class RuleGen: f.flush() except (KeyboardInterrupt, SystemExit): - if self.debug: print "[*] Word worker terminated." + LOGGER.debug("[*] Word worker terminated.") f.close() - if self.debug: print "[*] Word worker stopped." + LOGGER.debug("[*] Word worker stopped.") # Analyze passwords file - def analyze_passwords_file(self,passwords_file): + def analyze_passwords_file(self,passwords_file, encoding): """ Analyze provided passwords file. """ - print "[*] Analyzing passwords file: %s:" % passwords_file - print "[*] Press Ctrl-C to end execution and generate statistical analysis." + LOGGER.info("[*] Analyzing passwords file: %s:" % passwords_file) + LOGGER.info("[*] Press Ctrl-C to end execution and generate statistical analysis.") # Setup queues passwords_queue = multiprocessing.Queue(self.threads) @@ -891,7 +888,7 @@ class RuleGen: # Continue with the main thread - f = open(passwords_file,'r') + f = open(passwords_file,'r', encoding=encoding) password_count = 0 analysis_start = time.time() @@ -904,8 +901,8 @@ class RuleGen: # Provide analysis time feedback to the user if not self.quiet and password_count != 0 and password_count % 5000 == 0: segment_time = time.time() - segment_start - print "[*] Processed %d passwords in %.2f seconds at the rate of %.2f p/sec" % \ - (password_count, segment_start - analysis_start, 5000/segment_time ) + LOGGER.info("[*] Processed %d passwords in %.2f seconds at the rate of %.2f p/sec" % \ + (password_count, segment_start - analysis_start, 5000/segment_time )) segment_start = time.time() password_count += 1 @@ -915,7 +912,7 @@ class RuleGen: passwords_queue.put(password) except (KeyboardInterrupt, SystemExit): - print "\n[!] Rulegen was interrupted." + LOGGER.warning("\n[!] Rulegen was interrupted.") else: # Signal workers to stop. @@ -933,15 +930,15 @@ class RuleGen: f.close() analysis_time = time.time() - analysis_start - print "[*] Finished processing %d passwords in %.2f seconds at the rate of %.2f p/sec" % (password_count, analysis_time, float(password_count)/analysis_time ) + LOGGER.info("[*] Finished processing %d passwords in %.2f seconds at the rate of %.2f p/sec" % (password_count, analysis_time, float(password_count)/analysis_time )) - print "[*] Generating statistics for [%s] rules and words." % self.basename - print "[-] Skipped %d all numeric passwords (%0.2f%%)" % \ - (self.numeric_stats_total, float(self.numeric_stats_total)*100.0/float(password_count)) - print "[-] Skipped %d passwords with less than 25%% alpha characters (%0.2f%%)" % \ - (self.special_stats_total, float(self.special_stats_total)*100.0/float(password_count)) - print "[-] Skipped %d passwords with non ascii characters (%0.2f%%)" % \ - (self.foreign_stats_total, float(self.foreign_stats_total)*100.0/float(password_count)) + LOGGER.info("[*] Generating statistics for [%s] rules and words." % self.basename) + LOGGER.info("[-] Skipped %d all numeric passwords (%0.2f%%)" % \ + (self.numeric_stats_total, float(self.numeric_stats_total)*100.0/float(password_count))) + LOGGER.info("[-] Skipped %d passwords with less than 25%% alpha characters (%0.2f%%)" % \ + (self.special_stats_total, float(self.special_stats_total)*100.0/float(password_count))) + LOGGER.info("[-] Skipped %d passwords with non ascii characters (%0.2f%%)" % \ + (self.foreign_stats_total, float(self.foreign_stats_total)*100.0/float(password_count))) # TODO: Counter breaks on large files. uniq -c | sort -rn is still the most # optimal way. @@ -950,11 +947,11 @@ class RuleGen: rules_counter = Counter(rules_file) rule_counter_total = sum(rules_counter.values()) - print "\n[*] Top 10 rules" + LOGGER.info("[*] Top 10 rules") rules_i = 0 for (rule, count) in rules_counter.most_common(): rules_sorted_file.write(rule) - if rules_i < 10: print "[+] %s - %d (%0.2f%%)" % (rule.rstrip('\r\n'), count, count*100/rule_counter_total) + if rules_i < 10: LOGGER.info("[+] %s - %d (%0.2f%%)" % (rule.rstrip('\r\n'), count, count*100/rule_counter_total)) rules_i += 1 rules_file.close() @@ -966,11 +963,11 @@ class RuleGen: words_counter = Counter(words_file) word_counter_total = sum(rules_counter.values()) - print "\n[*] Top 10 words" + LOGGER.info("[*] Top 10 words") words_i = 0 for (word, count) in words_counter.most_common(): words_sorted_file.write(word) - if words_i < 10: print "[+] %s - %d (%0.2f%%)" % (word.rstrip('\r\n'), count, count*100/word_counter_total) + if words_i < 10: LOGGER.info("[+] %s - %d (%0.2f%%)" % (word.rstrip('\r\n'), count, count*100/word_counter_total)) words_i += 1 words_file.close() @@ -993,19 +990,19 @@ class RuleGen: if out == password: hashcat_rules_str = " ".join(rules or [':']) - if self.verbose: print "[+] %s => %s => %s" % (word, hashcat_rules_str, password) + LOGGER.verbose("[+] %s => %s => %s" % (word, hashcat_rules_str, password)) else: - print "[!] Hashcat Verification FAILED: %s => %s => %s (%s)" % (word," ".join(rules or [':']),password,out) + LOGGER.error("[!] Hashcat Verification FAILED: %s => %s => %s (%s)" % (word," ".join(rules or [':']),password,out)) if __name__ == "__main__": header = " _ \n" header += " RuleGen %s | |\n" % VERSION header += " _ __ __ _ ___| | _\n" - header += " | '_ \ / _` |/ __| |/ /\n" + header += " | '_ \\ / _` |/ __| |/ /\n" header += " | |_) | (_| | (__| < \n" - header += " | .__/ \__,_|\___|_|\_\\\n" + header += " | .__/ \\__,_|\\___|_|\\_\\\n" header += " | | \n" header += " |_| iphelix@thesprawl.org\n" header += "\n" @@ -1017,6 +1014,7 @@ if __name__ == "__main__": parser.add_option("-w","--wordlist", help="Use a custom wordlist for rule analysis.", metavar="wiki.dict") parser.add_option("-q", "--quiet", action="store_true", dest="quiet", default=False, help="Don't show headers.") parser.add_option("--threads", type="int", default=multiprocessing.cpu_count(), help="Parallel threads to use for processing.") + parser.add_option("-e", "--encoding", help="Input file encoding.", default="utf-8") wordtune = OptionGroup(parser, "Fine tune source word generation:") wordtune.add_option("--maxworddist", help="Maximum word edit distance (Levenshtein)", type="int", default=10, metavar="10") @@ -1037,7 +1035,7 @@ if __name__ == "__main__": spelltune.add_option("--providers", help="Comma-separated list of provider engines", default="aspell,myspell", metavar="aspell,myspell") parser.add_option_group(spelltune) - debug = OptionGroup(parser, "Debuggin options:") + debug = OptionGroup(parser, "Debugging options:") debug.add_option("-v","--verbose", help="Show verbose information.", action="store_true", default=False) debug.add_option("-d","--debug", help="Debug rules.", action="store_true", default=False) debug.add_option("--password", help="Process the last argument as a password not a file.", action="store_true", default=False) @@ -1049,7 +1047,7 @@ if __name__ == "__main__": # Print program header if not options.quiet: - print header + print (header) if len(args) < 1: parser.error("no passwords file specified") @@ -1069,23 +1067,25 @@ if __name__ == "__main__": rulegen.more_rules=options.morerules rulegen.simple_rules=options.simplerules rulegen.brute_rules=options.bruterules - if rulegen.brute_rules: print "[!] Bruteforcing reversal and rotation rules. (slower)" + if rulegen.brute_rules: LOGGER.info("[!] Bruteforcing reversal and rotation rules. (slower)") # Debugging options rulegen.word = options.word - rulegen.verbose=options.verbose - rulegen.debug = options.debug rulegen.hashcat = options.hashcat rulegen.quiet = options.quiet + logging.basicConfig(level=logging.DEBUG if options.debug + else logging.VERBOSE if options.verbose + else logging.INFO) + # Custom wordlist if not options.word: if options.wordlist: rulegen.load_custom_wordlist(options.wordlist) - print "[*] Using Enchant '%s' module. For best results please install" % rulegen.enchant.provider.name - print " '%s' module language dictionaries." % rulegen.enchant.provider.name + LOGGER.info("[*] Using Enchant '%s' module. For best results please install" % rulegen.enchant.provider.name) + LOGGER.info(" '%s' module language dictionaries." % rulegen.enchant.provider.name) # Analyze a single password or several passwords in a file if options.password: rulegen.analyze_password(args[0]) else: - rulegen.analyze_passwords_file(args[0]) + rulegen.analyze_passwords_file(args[0], options.encoding)