From f95d79d8fe8e4d697a5e6ddfe7c926b5f3700355 Mon Sep 17 00:00:00 2001 From: Aaron LI Date: Fri, 15 Apr 2016 10:52:44 +0800 Subject: Add rebuild_ipod_db.py: build iPod Shuffle (1Gen, 2Gen) database --- python/rebuild_ipod_db.py | 594 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 594 insertions(+) create mode 100755 python/rebuild_ipod_db.py (limited to 'python/rebuild_ipod_db.py') diff --git a/python/rebuild_ipod_db.py b/python/rebuild_ipod_db.py new file mode 100755 index 0000000..8599c65 --- /dev/null +++ b/python/rebuild_ipod_db.py @@ -0,0 +1,594 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# LICENSE: +# --------------------------------------------------------------------------- +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# --------------------------------------------------------------------------- +# +# Based on Matrin Fiedler's "rebuild_db.py" v1.0-rc1 (2006-04-26): +# http://shuffle-db.sourceforge.net/ +# + +from __future__ import print_function + + +__title__ = "iPod Shuffle Database Builder" +__author__ = "Aaron LI" +__version__ = "2.0.1" +__date__ = "2016-04-14" + + +import sys +import os +import operator +import array +import random +import fnmatch +import operator +import string +import argparse +import functools +import shutil +from collections import OrderedDict + + +domains = [] +total_count = 0 + + +class LogObj: + """ + Print and log the process information. + """ + def __init__(self, filename=None): + self.filename = filename + + def open(self): + if self.filename: + try: + self.logfile = open(self.filename, "w") + except IOError: + self.logfile = None + else: + self.logfile = None + + def log(self, line="", end="\n"): + value = line + end + if self.logfile: + self.logfile.write(value) + print(value, end="") + + def close(self): + if self.logfile: + self.logfile.close() + + +class Rule: + """ + A RuleSet for the way to handle the found playable files. + """ + SUPPORT_PROPS = ("filename", "size", "ignore", "type", + "shuffle", "reuse", "bookmark") + + def __init__(self, conditions=None, actions=None): + self.conditions = conditions + self.actions = actions + + @classmethod + def parse(cls, rule): + """ + Parse the whole line of a rule. + + Syntax: + condition1, condition2, ...: action1, action2, ... + + condition examples: + * filename ~ "*.mp3" + * size > 100000 + action examples: + * ignore = 1 + * shuffle = 1 + + Return: a object of this class with the parsed rule. + """ + conditions, actions = rule.split(":") + conditions = list(map(cls.parse_condition, conditions.split(","))) + actions = dict(map(cls.parse_action, actions.split(","))) + return cls(conditions, actions) + + @classmethod + def parse_condition(cls, cond): + sep_pos = min([ cond.find(sep) for sep in "~=<>" \ + if cond.find(sep)>0 ]) + prop = cond[:sep_pos].strip() + if prop not in cls.SUPPORT_PROPS: + raise ValueError("WARNING: unknown property '%s'" % prop) + return (prop, cond[sep_pos], + cls.parse_value(cond[sep_pos+1:].strip())) + + @classmethod + def parse_action(cls, action): + prop, value = map(str.strip, action.split("=", 1)) + if prop not in cls.SUPPORT_PROPS: + raise ValueError("WARNING: unknown property '%s'" % prop) + return (prop, cls.parse_value(value)) + + @staticmethod + def parse_value(value): + value = value.strip().strip('"').strip("'") + try: + return int(value) + except ValueError: + return value + + def match(self, props): + """ + Check whether the given props match all the conditions. + """ + def match_condition(props, cond): + """ + Check whether the given props match the given condition. + """ + try: + prop, op, ref = props[cond[0]], cond[1], cond[2] + except KeyError: + return False + if op == "~": + return fnmatch.fnmatchcase(prop.lower(), ref.lower()) + elif op == "=": + return prop == ref + elif op == ">": + return prop > ref + elif op == "<": + return prop < ref + else: + return False + # + return functools.reduce(operator.and_, + [ match_condition(props, cond) \ + for cond in self.conditions ], + True) + + +class Entries: + """ + Walk through the directory to find all files, and filter by the + extensions to get all the playable files. + """ + PLAYABLE_EXTS = (".mp3", ".m4a", ".m4b", ".m4p", ".aa", ".wav") + + def __init__(self, dirs=[], rename=True, recursive=True, ignore_dup=True): + self.entries = [] + self.add_dirs(dirs=dirs, rename=rename, recursive=recursive, + ignore_dup=ignore_dup) + + def add_dirs(self, dirs=[], rename=True, recursive=True, ignore_dup=True): + for dir in dirs: + self.add_dir(dir=dir, rename=rename, recursive=recursive, + ignore_dup=ignore_dup) + + def add_dir(self, dir, rename=True, recursive=True, ignore_dup=True): + global logobj + if recursive: + # Get all directories, and rename them if needed + dirs = [] + for dirName, subdirList, fileList in os.walk(dir): + dirs.append(dirName) + for dirName in dirs: + newDirName = self.get_newname(dirName) + if rename and newDirName != dirName: + logobj.log("Rename: '%s' -> '%s'" % (dirName, newDirName)) + shutil.move(dirName, newDirName) + # Get all files + files = [] + for dirName, subdirList, fileList in os.walk(dir): + files.extend([ os.path.join(dirName, f) for f in fileList ]) + else: + # rename the directory if needed + newDir = self.get_newname(dir) + if rename and newDir != dir: + logobj.log("Rename: '%s' -> '%s'" % (dir, newDir)) + shutil.move(dir, newDir) + files = [ os.path.join(newDir, f) for f in self.listfiles(newDir) ] + # + for fn in files: + # rename filename if needed + newfn = self.get_newname(fn) + if rename and newfn != fn: + logobj.log("Rename: '%s' -> '%s'" % (fn, newfn)) + shutil.move(fn, newfn) + fn = newfn + # filter by playable extensions + if os.path.splitext(fn)[1].lower() not in self.PLAYABLE_EXTS: + continue + if ignore_dup and (fn in self.entries): + continue + self.entries.append(fn) + print("Entry: %s" % fn) + + @staticmethod + def listfiles(path, ignore_hidden=True): + """ + List only files of a directory + """ + for f in os.listdir(path): + if os.path.isfile(os.path.join(path, f)): + if ignore_hidden and f[0] != ".": + yield f + else: + yield f + + @staticmethod + def get_newname(path): + def conv_char(ch): + safe_char = string.ascii_letters + string.digits + "-_" + if ch in safe_char: + return ch + return "_" + # + if path == ".": + return path + dirname, basename = os.path.split(path) + base, ext = os.path.splitext(basename) + newbase = "".join(map(conv_char, base)) + if basename == newbase+ext: + return os.path.join(dirname, basename) + if os.path.exists("%s/%s%s" % (dirname, newbase, ext)): + i = 0 + while os.path.exists("%s/%s_%d%s" % (dirname, newbase, i, ext)): + i += 1 + newbase += "_%d" % i + newname = "%s/%s%s" % (dirname, newbase, ext) + return newname + + def fix_and_sort(self): + """ + Fix the entries' pathes (should starts with "/"), and sort. + """ + self.entries = [ "/"+f.lstrip("./") for f in self.entries ] + self.entries.sort() + + def apply_rules(self, rules): + """ + Apply rules to the found entries. + The filtered/updated entries and properties are saved in: + 'self.entries_dict' + """ + self.entries_dict = OrderedDict() + + for fn in self.entries: + # set default properties + props = { + "filename": fn, + "size": os.stat(fn[1:]).st_size, + "ignore": 0, + "type": 1, + "shuffle": 1, + "bookmark": 0 + } + # check and apply rules + for rule in rules: + if rule.match(props): + props.update(rule.actions) + # + if props["ignore"]: + continue + # + self.entries_dict[fn] = props + + def get_entries(self): + return self.entries_dict.items() + + +class iTunesSD: + """ + Class to handle the iPod Shuffle main database + "iPod_Control/iTunes/iTunesSD" + """ + def __init__(self, dbfile="./iPod_Control/iTunes/iTunesSD"): + self.dbfile = dbfile + self.load() + + def load(self): + """ + Load original header and entries. + """ + self.old_entries = {} + self.header_main = array.array("B") # unsigned integer array + self.header_entry = array.array("B") # unsigned integer array + db = open(self.dbfile, "rb") + try: + self.header_main.fromfile(db, 18) + self.header_entry.fromfile(db, 33) + db.seek(18) + entry = db.read(558) + while len(entry) == 558: + filename = entry[33::2].split(b"\0", 1)[0] + self.old_entries[filename] = entry + entry = db.read(558) + except EOFError: + pass + db.close() + print("Loaded %d entries from existing database" % \ + len(self.old_entries)) + + def build_header(self, force=False): + global logobj + # rebuild database header + if force or len(self.header_main) != 18: + logobj.log("Rebuild iTunesSD main header ...") + del self.header_main[:] + self.header_main.fromlist([0,0,0,1,6,0,0,0,18] + [0]*9) + if force or len(self.header_entry) != 33: + logobj.log("Rebuild iTunesSD entry header ...") + del self.header_entry[:] + self.header_entry.fromlist([0,2,46,90,165,1] + [0]*20 + \ + [100,0,0,1,0,2,0]) + + def add_entries(self, entries, reuse=True): + """ + Prepare the entries for database + """ + self.entries = OrderedDict() + + for fn, props in entries.get_entries(): + if reuse and props.get("reuse") and (fn in self.old_entries): + # retrieve entry from old entries + entry = self.old_entries[fn] + else: + # build new entry + self.header_entry[29] = props["type"] + entry = self.header_entry.tostring() + \ + "".join([ c+"\0" for c in fn[:261] ]).encode("utf-8") + \ + ("\0"*(525 - 2*len(fn))).encode("utf-8") + # modify the shuffle and bookmark flags + entry = entry[:555] + chr(props["shuffle"]).encode("utf-8") + \ + chr(props["bookmark"]).encode("utf-8") + entry[557:] + # + self.entries[fn] = entry + + def write(self, dbfile=None): + if dbfile is None: + dbfile = self.dbfile + # Make a backup + if os.path.exists(dbfile): + shutil.copy2(dbfile, dbfile+"_bak") + + # write main database file + with open(dbfile, "wb") as db: + self.header_main.tofile(db) + for entry in self.entries.values(): + db.write(entry) + # Update database header + num_entries = len(self.entries) + db.seek(0) + db.write(b"\0%c%c" % (num_entries>>8, num_entries&0xFF)) + + +class iTunesPState: + """ + iPod Shuffle playback state database: "iPod_Control/iTunes/iTunesPState" + """ + def __init__(self, dbfile="iPod_Control/iTunes/iTunesPState"): + self.dbfile = dbfile + self.load() + + def load(self): + with open(self.dbfile, "rb") as db: + a = array.array("B") + a.fromstring(db.read()) + self.PState = a.tolist() + + def update(self, volume=None): + if len(self.PState) != 21: + # volume 29, FW ver 1.0 + self.PState = self.listval(29) + [0]*15 + self.listval(1) + # track 0, shuffle mode, start of track + self.PState[3:15] = [0]*6 + [1] + [0]*5 + if volume is not None: + self.PState[:3] = self.listval(volume) + + def write(self, dbfile=None): + if dbfile is None: + dbfile = self.dbfile + # Make a backup + if os.path.exists(dbfile): + shutil.copy2(dbfile, dbfile+"_bak") + + with open(dbfile, "wb") as db: + array.array("B", self.PState).tofile(db) + + @staticmethod + def listval(i): + if i < 0: + i += 0x1000000 + return [i&0xFF, (i>>8)&0xFF, (i>>16)&0xFF] + + +class iTunesStats: + """ + iPod Shuffle statistics database: "iPod_Control/iTunes/iTunesStats" + """ + def __init__(self, dbfile="iPod_Control/iTunes/iTunesStats"): + self.dbfile = dbfile + + def write(self, count, dbfile=None): + if dbfile is None: + dbfile = self.dbfile + # Make a backup + if os.path.exists(dbfile): + shutil.copy2(dbfile, dbfile+"_bak") + + with open(dbfile, "wb") as db: + data = self.stringval(count) + "\0"*3 + \ + (self.stringval(18) + "\xff"*3 + "\0"*12) * count + db.write(data.encode("utf-8")) + + @staticmethod + def stringval(i): + if i < 0: + i += 0x1000000 + return "%c%c%c" % (i&0xFF, (i>>8)&0xFF, (i>>16)&0xFF) + + +class iTunesShuffle: + """ + iPod shuffle database: "iPod_Control/iTunes/iTunesShuffle" + """ + def __init__(self, dbfile="iPod_Control/iTunes/iTunesShuffle"): + self.dbfile = dbfile + + def shuffle(self, entries): + """ + Generate the shuffle sequences for the entries, and take care + of the "shuffle" property. + """ + shuffle_prop = [ props["shuffle"] + for fn, props in entries.get_entries() ] + shuffle_idx = [ idx for idx, s in enumerate(shuffle_prop) if s == 1 ] + shuffled = shuffle_idx.copy() + random.seed() + random.shuffle(shuffled) + shuffle_seq = list(range(len(shuffle_prop))) + for i, idx in enumerate(shuffle_idx): + shuffle_seq[idx] = shuffled[i] + self.shuffle_seq = shuffle_seq + + def write(self, dbfile=None): + if dbfile is None: + dbfile = self.dbfile + # Make a backup + if os.path.exists(dbfile): + shutil.copy2(dbfile, dbfile+"_bak") + + with open(dbfile, "wb") as db: + data = "".join(map(iTunesStats.stringval, self.shuffle_seq)) + db.write(data.encode("utf-8")) + + +def main(): + prog_basename = os.path.splitext(os.path.basename(sys.argv[0]))[0] + + # command line arguments + parser = argparse.ArgumentParser( + description="Rebuild iPod Shuffle Database", + epilog="Version: %s (%s)\n\n" % (__version__, __date__) + \ + "Only 1st and 2nd iPod Shuffle supported!\n\n" + \ + "The script must be placed under the iPod's root directory") + parser.add_argument("-f", "--force", dest="force", action="store_true", + help="always rebuild database entries, do NOT reuse old ones") + parser.add_argument("-M", "--no-rename", dest="norename", + action="store_false", default=True, + help="do NOT rename files") + parser.add_argument("-V", "--volume", dest="volume", type=int, + help="set playback volume (0 - 38)") + parser.add_argument("-r", "--rulesfile", dest="rulesfile", + default="%s.rules" % prog_basename, + help="additional rules filename") + parser.add_argument("-l", "--logfile", dest="logfile", + default="%s.log" % prog_basename, + help="log output filename") + parser.add_argument("dirs", nargs="*", + help="directories to be searched for playable files") + args = parser.parse_args() + + flag_reuse = not args.force + + # Start logging + global logobj + logobj = LogObj(args.logfile) + logobj.open() + + # Rules for how to handle the found playable files + rules = [] + # Add default rules + rules.append(Rule(conditions=[("filename", "~", "*.mp3")], + actions={"type":1, "shuffle":1, "bookmark":0})) + rules.append(Rule(conditions=[("filename", "~", "*.m4?")], + actions={"type":2, "shuffle":1, "bookmark":0})) + rules.append(Rule(conditions=[("filename", "~", "*.m4b")], + actions={"shuffle":0, "bookmark":1})) + rules.append(Rule(conditions=[("filename", "~", "*.aa")], + actions={"type":1, "shuffle":0, "bookmark":1, "reuse":1})) + rules.append(Rule(conditions=[("filename", "~", "*.wav")], + actions={"type":4, "shuffle":0, "bookmark":0})) + rules.append(Rule(conditions=[("filename", "~", "*.book.???")], + actions={"shuffle":0, "bookmark":1})) + rules.append(Rule(conditions=[("filename", "~", "*.announce.???")], + actions={"shuffle":0, "bookmark":0})) + rules.append(Rule(conditions=[("filename", "~", "/backup/*")], + actions={"ignore":1})) + # Load additional rules + try: + for line in open(args.rulesfile, "r").readlines(): + rules.append(Rule.parse(line)) + logobj.log("Loaded additional rules from file: %s" % args.rulesfile) + except IOError: + pass + + # cd to the directory of this script + os.chdir(os.path.dirname(sys.argv[0])) + + if not os.path.isdir("iPod_Control/iTunes"): + logobj.log("ERROR: No iPod control directory found!") + logobj.log("Please make sure that:") + logobj.log("(*) this script is placed under the iPod's root directory") + logobj.log("(*) the iPod was correctly initialized with iTunes") + sys.exit(1) + + # playable entries + logobj.log("Search for playable entries ...") + entries = Entries() + if args.dirs: + for dir in args.dirs: + entries.add_dir(dir=dir, recursive=True, rename=args.norename) + else: + entries.add_dir(".", recursive=True, rename=args.norename) + entries.fix_and_sort() + logobj.log("Apply rules to entries ...") + entries.apply_rules(rules=rules) + + # read main database file + logobj.log("Update main database ...") + db = iTunesSD(dbfile="iPod_Control/iTunes/iTunesSD") + db.build_header(force=args.force) + db.add_entries(entries=entries, reuse=flag_reuse) + assert len(db.entries) == len(entries.get_entries()) + db.write() + logobj.log("Added %d entries ..." % len(db.entries)) + + # other misc databases + logobj.log("Update playback state database ...") + db_pstate = iTunesPState(dbfile="iPod_Control/iTunes/iTunesPState") + db_pstate.update(volume=args.volume) + db_pstate.write() + logobj.log("Update statistics database ...") + db_stats = iTunesStats(dbfile="iPod_Control/iTunes/iTunesStats") + db_stats.write(count=len(db.entries)) + logobj.log("Update shuffle database ...") + db_shuffle = iTunesShuffle(dbfile="iPod_Control/iTunes/iTunesShuffle") + db_shuffle.shuffle(entries=entries) + db_shuffle.write() + + logobj.log("The iPod Shuffle database was rebuilt successfully!") + + logobj.close() + + +if __name__ == "__main__": + main() + +# vim: set ts=4 sw=4 tw=0 fenc=utf-8 ft=python: # -- cgit v1.2.2