diff options
Diffstat (limited to 'python')
-rwxr-xr-x | python/rebuild_ipod_db.py | 595 |
1 files changed, 0 insertions, 595 deletions
diff --git a/python/rebuild_ipod_db.py b/python/rebuild_ipod_db.py deleted file mode 100755 index 20c5454..0000000 --- a/python/rebuild_ipod_db.py +++ /dev/null @@ -1,595 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -# -# LICENSE: -# --------------------------------------------------------------------------- -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -# --------------------------------------------------------------------------- -# -# Based on Matrin Fiedler's "rebuild_db.py" v1.0-rc1 (2006-04-26): -# http://shuffle-db.sourceforge.net/ -# - -from __future__ import print_function - - -__title__ = "iPod Shuffle Database Builder" -__author__ = "Aaron LI" -__version__ = "2.0.2" -__date__ = "2016-04-16" - - -import sys -import os -import operator -import array -import random -import fnmatch -import operator -import string -import argparse -import functools -import shutil -from collections import OrderedDict - - -domains = [] -total_count = 0 - - -class LogObj: - """ - Print and log the process information. - """ - def __init__(self, filename=None): - self.filename = filename - - def open(self): - if self.filename: - try: - self.logfile = open(self.filename, "w") - except IOError: - self.logfile = None - else: - self.logfile = None - - def log(self, line="", end="\n"): - value = line + end - if self.logfile: - self.logfile.write(value) - print(value, end="") - - def close(self): - if self.logfile: - self.logfile.close() - - -class Rule: - """ - A RuleSet for the way to handle the found playable files. - """ - SUPPORT_PROPS = ("filename", "size", "ignore", "type", - "shuffle", "reuse", "bookmark") - - def __init__(self, conditions=None, actions=None): - self.conditions = conditions - self.actions = actions - - @classmethod - def parse(cls, rule): - """ - Parse the whole line of a rule. - - Syntax: - condition1, condition2, ...: action1, action2, ... - - condition examples: - * filename ~ "*.mp3" - * size > 100000 - action examples: - * ignore = 1 - * shuffle = 1 - - Return: a object of this class with the parsed rule. - """ - conditions, actions = rule.split(":") - conditions = list(map(cls.parse_condition, conditions.split(","))) - actions = dict(map(cls.parse_action, actions.split(","))) - return cls(conditions, actions) - - @classmethod - def parse_condition(cls, cond): - sep_pos = min([ cond.find(sep) for sep in "~=<>" \ - if cond.find(sep)>0 ]) - prop = cond[:sep_pos].strip() - if prop not in cls.SUPPORT_PROPS: - raise ValueError("WARNING: unknown property '%s'" % prop) - return (prop, cond[sep_pos], - cls.parse_value(cond[sep_pos+1:].strip())) - - @classmethod - def parse_action(cls, action): - prop, value = map(str.strip, action.split("=", 1)) - if prop not in cls.SUPPORT_PROPS: - raise ValueError("WARNING: unknown property '%s'" % prop) - return (prop, cls.parse_value(value)) - - @staticmethod - def parse_value(value): - value = value.strip().strip('"').strip("'") - try: - return int(value) - except ValueError: - return value - - def match(self, props): - """ - Check whether the given props match all the conditions. - """ - def match_condition(props, cond): - """ - Check whether the given props match the given condition. - """ - try: - prop, op, ref = props[cond[0]], cond[1], cond[2] - except KeyError: - return False - if op == "~": - return fnmatch.fnmatchcase(prop.lower(), ref.lower()) - elif op == "=": - return prop == ref - elif op == ">": - return prop > ref - elif op == "<": - return prop < ref - else: - return False - # - return functools.reduce(operator.and_, - [ match_condition(props, cond) \ - for cond in self.conditions ], - True) - - -class Entries: - """ - Walk through the directory to find all files, and filter by the - extensions to get all the playable files. - """ - PLAYABLE_EXTS = (".mp3", ".m4a", ".m4b", ".m4p", ".aa", ".wav") - - def __init__(self, dirs=[], rename=True, recursive=True, ignore_dup=True): - self.entries = [] - self.add_dirs(dirs=dirs, rename=rename, recursive=recursive, - ignore_dup=ignore_dup) - - def add_dirs(self, dirs=[], rename=True, recursive=True, ignore_dup=True): - for dir in dirs: - self.add_dir(dir=dir, rename=rename, recursive=recursive, - ignore_dup=ignore_dup) - - def add_dir(self, dir, rename=True, recursive=True, ignore_dup=True): - global logobj - if recursive: - # Get all directories, and rename them if needed - dirs = [] - for dirName, subdirList, fileList in os.walk(dir): - dirs.append(dirName) - for dirName in dirs: - newDirName = self.get_newname(dirName) - if rename and newDirName != dirName: - logobj.log("Rename: '%s' -> '%s'" % (dirName, newDirName)) - shutil.move(dirName, newDirName) - # Get all files - files = [] - for dirName, subdirList, fileList in os.walk(dir): - files.extend([ os.path.join(dirName, f) for f in fileList ]) - else: - # rename the directory if needed - newDir = self.get_newname(dir) - if rename and newDir != dir: - logobj.log("Rename: '%s' -> '%s'" % (dir, newDir)) - shutil.move(dir, newDir) - files = [ os.path.join(newDir, f) for f in self.listfiles(newDir) ] - # - for fn in files: - # rename filename if needed - newfn = self.get_newname(fn) - if rename and newfn != fn: - logobj.log("Rename: '%s' -> '%s'" % (fn, newfn)) - shutil.move(fn, newfn) - fn = newfn - # filter by playable extensions - if os.path.splitext(fn)[1].lower() not in self.PLAYABLE_EXTS: - continue - if ignore_dup and (fn in self.entries): - continue - self.entries.append(fn) - print("Entry: %s" % fn) - - @staticmethod - def listfiles(path, ignore_hidden=True): - """ - List only files of a directory - """ - for f in os.listdir(path): - if os.path.isfile(os.path.join(path, f)): - if ignore_hidden and f[0] != ".": - yield f - else: - yield f - - @staticmethod - def get_newname(path): - def conv_char(ch): - safe_char = string.ascii_letters + string.digits + "-_" - if ch in safe_char: - return ch - return "_" - # - if path == ".": - return path - dirname, basename = os.path.split(path) - base, ext = os.path.splitext(basename) - newbase = "".join(map(conv_char, base)) - if basename == newbase+ext: - return os.path.join(dirname, basename) - if os.path.exists("%s/%s%s" % (dirname, newbase, ext)): - i = 0 - while os.path.exists("%s/%s_%d%s" % (dirname, newbase, i, ext)): - i += 1 - newbase += "_%d" % i - newname = "%s/%s%s" % (dirname, newbase, ext) - return newname - - def fix_and_sort(self): - """ - Fix the entries' pathes (should starts with "/"), and sort. - """ - self.entries = [ "/"+f.lstrip("./") for f in self.entries ] - self.entries.sort() - - def apply_rules(self, rules): - """ - Apply rules to the found entries. - The filtered/updated entries and properties are saved in: - 'self.entries_dict' - """ - self.entries_dict = OrderedDict() - - for fn in self.entries: - # set default properties - props = { - "filename": fn, - "size": os.stat(fn[1:]).st_size, - "ignore": 0, - "type": 1, - "shuffle": 1, - "bookmark": 0 - } - # check and apply rules - for rule in rules: - if rule.match(props): - props.update(rule.actions) - # - if props["ignore"]: - continue - # - self.entries_dict[fn] = props - - def get_entries(self): - return self.entries_dict.items() - - -class iTunesSD: - """ - Class to handle the iPod Shuffle main database - "iPod_Control/iTunes/iTunesSD" - """ - def __init__(self, dbfile="./iPod_Control/iTunes/iTunesSD"): - self.dbfile = dbfile - self.load() - - def load(self): - """ - Load original header and entries. - """ - self.old_entries = {} - self.header_main = array.array("B") # unsigned integer array - self.header_entry = array.array("B") # unsigned integer array - db = open(self.dbfile, "rb") - try: - self.header_main.fromfile(db, 18) - self.header_entry.fromfile(db, 33) - db.seek(18) - entry = db.read(558) - while len(entry) == 558: - filename = entry[33::2].split(b"\0", 1)[0] - self.old_entries[filename] = entry - entry = db.read(558) - except EOFError: - pass - db.close() - print("Loaded %d entries from existing database" % \ - len(self.old_entries)) - - def build_header(self, force=False): - global logobj - # rebuild database header - if force or len(self.header_main) != 18: - logobj.log("Rebuild iTunesSD main header ...") - del self.header_main[:] - self.header_main.fromlist([0,0,0,1,6,0,0,0,18] + [0]*9) - if force or len(self.header_entry) != 33: - logobj.log("Rebuild iTunesSD entry header ...") - del self.header_entry[:] - self.header_entry.fromlist([0,2,46,90,165,1] + [0]*20 + \ - [100,0,0,1,0,2,0]) - - def add_entries(self, entries, reuse=True): - """ - Prepare the entries for database - """ - self.entries = OrderedDict() - - for fn, props in entries.get_entries(): - if reuse and props.get("reuse") and (fn in self.old_entries): - # retrieve entry from old entries - entry = self.old_entries[fn] - else: - # build new entry - self.header_entry[29] = props["type"] - entry_data = "".join([ c+"\0" for c in fn[:261] ]) + \ - "\0"*(558 - len(self.header_entry) - 2*len(fn)) - entry = self.header_entry.tostring() + \ - entry_data.encode("utf-8") - # modify the shuffle and bookmark flags - entry = entry[:555] + chr(props["shuffle"]).encode("utf-8") + \ - chr(props["bookmark"]).encode("utf-8") + entry[557] - # - self.entries[fn] = entry - - def write(self, dbfile=None): - if dbfile is None: - dbfile = self.dbfile - # Make a backup - if os.path.exists(dbfile): - shutil.copy2(dbfile, dbfile+"_bak") - - # write main database file - with open(dbfile, "wb") as db: - self.header_main.tofile(db) - for entry in self.entries.values(): - db.write(entry) - # Update database header - num_entries = len(self.entries) - db.seek(0) - db.write(b"\0%c%c" % (num_entries>>8, num_entries&0xFF)) - - -class iTunesPState: - """ - iPod Shuffle playback state database: "iPod_Control/iTunes/iTunesPState" - """ - def __init__(self, dbfile="iPod_Control/iTunes/iTunesPState"): - self.dbfile = dbfile - self.load() - - def load(self): - with open(self.dbfile, "rb") as db: - a = array.array("B") - a.fromstring(db.read()) - self.PState = a.tolist() - - def update(self, volume=None): - if len(self.PState) != 21: - # volume 29, FW ver 1.0 - self.PState = self.listval(29) + [0]*15 + self.listval(1) - # track 0, shuffle mode, start of track - self.PState[3:15] = [0]*6 + [1] + [0]*5 - if volume is not None: - self.PState[:3] = self.listval(volume) - - def write(self, dbfile=None): - if dbfile is None: - dbfile = self.dbfile - # Make a backup - if os.path.exists(dbfile): - shutil.copy2(dbfile, dbfile+"_bak") - - with open(dbfile, "wb") as db: - array.array("B", self.PState).tofile(db) - - @staticmethod - def listval(i): - if i < 0: - i += 0x1000000 - return [i&0xFF, (i>>8)&0xFF, (i>>16)&0xFF] - - -class iTunesStats: - """ - iPod Shuffle statistics database: "iPod_Control/iTunes/iTunesStats" - """ - def __init__(self, dbfile="iPod_Control/iTunes/iTunesStats"): - self.dbfile = dbfile - - def write(self, count, dbfile=None): - if dbfile is None: - dbfile = self.dbfile - # Make a backup - if os.path.exists(dbfile): - shutil.copy2(dbfile, dbfile+"_bak") - - with open(dbfile, "wb") as db: - data = self.stringval(count) + "\0"*3 + \ - (self.stringval(18) + "\xff"*3 + "\0"*12) * count - db.write(data.encode("utf-8")) - - @staticmethod - def stringval(i): - if i < 0: - i += 0x1000000 - return "%c%c%c" % (i&0xFF, (i>>8)&0xFF, (i>>16)&0xFF) - - -class iTunesShuffle: - """ - iPod shuffle database: "iPod_Control/iTunes/iTunesShuffle" - """ - def __init__(self, dbfile="iPod_Control/iTunes/iTunesShuffle"): - self.dbfile = dbfile - - def shuffle(self, entries): - """ - Generate the shuffle sequences for the entries, and take care - of the "shuffle" property. - """ - shuffle_prop = [ props["shuffle"] - for fn, props in entries.get_entries() ] - shuffle_idx = [ idx for idx, s in enumerate(shuffle_prop) if s == 1 ] - shuffled = shuffle_idx.copy() - random.seed() - random.shuffle(shuffled) - shuffle_seq = list(range(len(shuffle_prop))) - for i, idx in enumerate(shuffle_idx): - shuffle_seq[idx] = shuffled[i] - self.shuffle_seq = shuffle_seq - - def write(self, dbfile=None): - if dbfile is None: - dbfile = self.dbfile - # Make a backup - if os.path.exists(dbfile): - shutil.copy2(dbfile, dbfile+"_bak") - - with open(dbfile, "wb") as db: - data = "".join(map(iTunesStats.stringval, self.shuffle_seq)) - db.write(data.encode("utf-8")) - - -def main(): - prog_basename = os.path.splitext(os.path.basename(sys.argv[0]))[0] - - # command line arguments - parser = argparse.ArgumentParser( - description="Rebuild iPod Shuffle Database", - epilog="Version: %s (%s)\n\n" % (__version__, __date__) + \ - "Only 1st and 2nd iPod Shuffle supported!\n\n" + \ - "The script must be placed under the iPod's root directory") - parser.add_argument("-f", "--force", dest="force", action="store_true", - help="always rebuild database entries, do NOT reuse old ones") - parser.add_argument("-M", "--no-rename", dest="norename", - action="store_false", default=True, - help="do NOT rename files") - parser.add_argument("-V", "--volume", dest="volume", type=int, - help="set playback volume (0 - 38)") - parser.add_argument("-r", "--rulesfile", dest="rulesfile", - default="%s.rules" % prog_basename, - help="additional rules filename") - parser.add_argument("-l", "--logfile", dest="logfile", - default="%s.log" % prog_basename, - help="log output filename") - parser.add_argument("dirs", nargs="*", - help="directories to be searched for playable files") - args = parser.parse_args() - - flag_reuse = not args.force - - # Start logging - global logobj - logobj = LogObj(args.logfile) - logobj.open() - - # Rules for how to handle the found playable files - rules = [] - # Add default rules - rules.append(Rule(conditions=[("filename", "~", "*.mp3")], - actions={"type":1, "shuffle":1, "bookmark":0})) - rules.append(Rule(conditions=[("filename", "~", "*.m4?")], - actions={"type":2, "shuffle":1, "bookmark":0})) - rules.append(Rule(conditions=[("filename", "~", "*.m4b")], - actions={"shuffle":0, "bookmark":1})) - rules.append(Rule(conditions=[("filename", "~", "*.aa")], - actions={"type":1, "shuffle":0, "bookmark":1, "reuse":1})) - rules.append(Rule(conditions=[("filename", "~", "*.wav")], - actions={"type":4, "shuffle":0, "bookmark":0})) - rules.append(Rule(conditions=[("filename", "~", "*.book.???")], - actions={"shuffle":0, "bookmark":1})) - rules.append(Rule(conditions=[("filename", "~", "*.announce.???")], - actions={"shuffle":0, "bookmark":0})) - rules.append(Rule(conditions=[("filename", "~", "/backup/*")], - actions={"ignore":1})) - # Load additional rules - try: - for line in open(args.rulesfile, "r").readlines(): - rules.append(Rule.parse(line)) - logobj.log("Loaded additional rules from file: %s" % args.rulesfile) - except IOError: - pass - - # cd to the directory of this script - os.chdir(os.path.dirname(sys.argv[0])) - - if not os.path.isdir("iPod_Control/iTunes"): - logobj.log("ERROR: No iPod control directory found!") - logobj.log("Please make sure that:") - logobj.log("(*) this script is placed under the iPod's root directory") - logobj.log("(*) the iPod was correctly initialized with iTunes") - sys.exit(1) - - # playable entries - logobj.log("Search for playable entries ...") - entries = Entries() - if args.dirs: - for dir in args.dirs: - entries.add_dir(dir=dir, recursive=True, rename=args.norename) - else: - entries.add_dir(".", recursive=True, rename=args.norename) - entries.fix_and_sort() - logobj.log("Apply rules to entries ...") - entries.apply_rules(rules=rules) - - # read main database file - logobj.log("Update main database ...") - db = iTunesSD(dbfile="iPod_Control/iTunes/iTunesSD") - db.build_header(force=args.force) - db.add_entries(entries=entries, reuse=flag_reuse) - assert len(db.entries) == len(entries.get_entries()) - db.write() - logobj.log("Added %d entries ..." % len(db.entries)) - - # other misc databases - logobj.log("Update playback state database ...") - db_pstate = iTunesPState(dbfile="iPod_Control/iTunes/iTunesPState") - db_pstate.update(volume=args.volume) - db_pstate.write() - logobj.log("Update statistics database ...") - db_stats = iTunesStats(dbfile="iPod_Control/iTunes/iTunesStats") - db_stats.write(count=len(db.entries)) - logobj.log("Update shuffle database ...") - db_shuffle = iTunesShuffle(dbfile="iPod_Control/iTunes/iTunesShuffle") - db_shuffle.shuffle(entries=entries) - db_shuffle.write() - - logobj.log("The iPod Shuffle database was rebuilt successfully!") - - logobj.close() - - -if __name__ == "__main__": - main() - -# vim: set ts=4 sw=4 tw=0 fenc=utf-8 ft=python: # |