aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rwxr-xr-xpython/rebuild_ipod_db.py594
1 files changed, 594 insertions, 0 deletions
diff --git a/python/rebuild_ipod_db.py b/python/rebuild_ipod_db.py
new file mode 100755
index 0000000..8599c65
--- /dev/null
+++ b/python/rebuild_ipod_db.py
@@ -0,0 +1,594 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+#
+# LICENSE:
+# ---------------------------------------------------------------------------
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+# ---------------------------------------------------------------------------
+#
+# Based on Matrin Fiedler's "rebuild_db.py" v1.0-rc1 (2006-04-26):
+# http://shuffle-db.sourceforge.net/
+#
+
+from __future__ import print_function
+
+
+__title__ = "iPod Shuffle Database Builder"
+__author__ = "Aaron LI"
+__version__ = "2.0.1"
+__date__ = "2016-04-14"
+
+
+import sys
+import os
+import operator
+import array
+import random
+import fnmatch
+import operator
+import string
+import argparse
+import functools
+import shutil
+from collections import OrderedDict
+
+
+domains = []
+total_count = 0
+
+
+class LogObj:
+ """
+ Print and log the process information.
+ """
+ def __init__(self, filename=None):
+ self.filename = filename
+
+ def open(self):
+ if self.filename:
+ try:
+ self.logfile = open(self.filename, "w")
+ except IOError:
+ self.logfile = None
+ else:
+ self.logfile = None
+
+ def log(self, line="", end="\n"):
+ value = line + end
+ if self.logfile:
+ self.logfile.write(value)
+ print(value, end="")
+
+ def close(self):
+ if self.logfile:
+ self.logfile.close()
+
+
+class Rule:
+ """
+ A RuleSet for the way to handle the found playable files.
+ """
+ SUPPORT_PROPS = ("filename", "size", "ignore", "type",
+ "shuffle", "reuse", "bookmark")
+
+ def __init__(self, conditions=None, actions=None):
+ self.conditions = conditions
+ self.actions = actions
+
+ @classmethod
+ def parse(cls, rule):
+ """
+ Parse the whole line of a rule.
+
+ Syntax:
+ condition1, condition2, ...: action1, action2, ...
+
+ condition examples:
+ * filename ~ "*.mp3"
+ * size > 100000
+ action examples:
+ * ignore = 1
+ * shuffle = 1
+
+ Return: a object of this class with the parsed rule.
+ """
+ conditions, actions = rule.split(":")
+ conditions = list(map(cls.parse_condition, conditions.split(",")))
+ actions = dict(map(cls.parse_action, actions.split(",")))
+ return cls(conditions, actions)
+
+ @classmethod
+ def parse_condition(cls, cond):
+ sep_pos = min([ cond.find(sep) for sep in "~=<>" \
+ if cond.find(sep)>0 ])
+ prop = cond[:sep_pos].strip()
+ if prop not in cls.SUPPORT_PROPS:
+ raise ValueError("WARNING: unknown property '%s'" % prop)
+ return (prop, cond[sep_pos],
+ cls.parse_value(cond[sep_pos+1:].strip()))
+
+ @classmethod
+ def parse_action(cls, action):
+ prop, value = map(str.strip, action.split("=", 1))
+ if prop not in cls.SUPPORT_PROPS:
+ raise ValueError("WARNING: unknown property '%s'" % prop)
+ return (prop, cls.parse_value(value))
+
+ @staticmethod
+ def parse_value(value):
+ value = value.strip().strip('"').strip("'")
+ try:
+ return int(value)
+ except ValueError:
+ return value
+
+ def match(self, props):
+ """
+ Check whether the given props match all the conditions.
+ """
+ def match_condition(props, cond):
+ """
+ Check whether the given props match the given condition.
+ """
+ try:
+ prop, op, ref = props[cond[0]], cond[1], cond[2]
+ except KeyError:
+ return False
+ if op == "~":
+ return fnmatch.fnmatchcase(prop.lower(), ref.lower())
+ elif op == "=":
+ return prop == ref
+ elif op == ">":
+ return prop > ref
+ elif op == "<":
+ return prop < ref
+ else:
+ return False
+ #
+ return functools.reduce(operator.and_,
+ [ match_condition(props, cond) \
+ for cond in self.conditions ],
+ True)
+
+
+class Entries:
+ """
+ Walk through the directory to find all files, and filter by the
+ extensions to get all the playable files.
+ """
+ PLAYABLE_EXTS = (".mp3", ".m4a", ".m4b", ".m4p", ".aa", ".wav")
+
+ def __init__(self, dirs=[], rename=True, recursive=True, ignore_dup=True):
+ self.entries = []
+ self.add_dirs(dirs=dirs, rename=rename, recursive=recursive,
+ ignore_dup=ignore_dup)
+
+ def add_dirs(self, dirs=[], rename=True, recursive=True, ignore_dup=True):
+ for dir in dirs:
+ self.add_dir(dir=dir, rename=rename, recursive=recursive,
+ ignore_dup=ignore_dup)
+
+ def add_dir(self, dir, rename=True, recursive=True, ignore_dup=True):
+ global logobj
+ if recursive:
+ # Get all directories, and rename them if needed
+ dirs = []
+ for dirName, subdirList, fileList in os.walk(dir):
+ dirs.append(dirName)
+ for dirName in dirs:
+ newDirName = self.get_newname(dirName)
+ if rename and newDirName != dirName:
+ logobj.log("Rename: '%s' -> '%s'" % (dirName, newDirName))
+ shutil.move(dirName, newDirName)
+ # Get all files
+ files = []
+ for dirName, subdirList, fileList in os.walk(dir):
+ files.extend([ os.path.join(dirName, f) for f in fileList ])
+ else:
+ # rename the directory if needed
+ newDir = self.get_newname(dir)
+ if rename and newDir != dir:
+ logobj.log("Rename: '%s' -> '%s'" % (dir, newDir))
+ shutil.move(dir, newDir)
+ files = [ os.path.join(newDir, f) for f in self.listfiles(newDir) ]
+ #
+ for fn in files:
+ # rename filename if needed
+ newfn = self.get_newname(fn)
+ if rename and newfn != fn:
+ logobj.log("Rename: '%s' -> '%s'" % (fn, newfn))
+ shutil.move(fn, newfn)
+ fn = newfn
+ # filter by playable extensions
+ if os.path.splitext(fn)[1].lower() not in self.PLAYABLE_EXTS:
+ continue
+ if ignore_dup and (fn in self.entries):
+ continue
+ self.entries.append(fn)
+ print("Entry: %s" % fn)
+
+ @staticmethod
+ def listfiles(path, ignore_hidden=True):
+ """
+ List only files of a directory
+ """
+ for f in os.listdir(path):
+ if os.path.isfile(os.path.join(path, f)):
+ if ignore_hidden and f[0] != ".":
+ yield f
+ else:
+ yield f
+
+ @staticmethod
+ def get_newname(path):
+ def conv_char(ch):
+ safe_char = string.ascii_letters + string.digits + "-_"
+ if ch in safe_char:
+ return ch
+ return "_"
+ #
+ if path == ".":
+ return path
+ dirname, basename = os.path.split(path)
+ base, ext = os.path.splitext(basename)
+ newbase = "".join(map(conv_char, base))
+ if basename == newbase+ext:
+ return os.path.join(dirname, basename)
+ if os.path.exists("%s/%s%s" % (dirname, newbase, ext)):
+ i = 0
+ while os.path.exists("%s/%s_%d%s" % (dirname, newbase, i, ext)):
+ i += 1
+ newbase += "_%d" % i
+ newname = "%s/%s%s" % (dirname, newbase, ext)
+ return newname
+
+ def fix_and_sort(self):
+ """
+ Fix the entries' pathes (should starts with "/"), and sort.
+ """
+ self.entries = [ "/"+f.lstrip("./") for f in self.entries ]
+ self.entries.sort()
+
+ def apply_rules(self, rules):
+ """
+ Apply rules to the found entries.
+ The filtered/updated entries and properties are saved in:
+ 'self.entries_dict'
+ """
+ self.entries_dict = OrderedDict()
+
+ for fn in self.entries:
+ # set default properties
+ props = {
+ "filename": fn,
+ "size": os.stat(fn[1:]).st_size,
+ "ignore": 0,
+ "type": 1,
+ "shuffle": 1,
+ "bookmark": 0
+ }
+ # check and apply rules
+ for rule in rules:
+ if rule.match(props):
+ props.update(rule.actions)
+ #
+ if props["ignore"]:
+ continue
+ #
+ self.entries_dict[fn] = props
+
+ def get_entries(self):
+ return self.entries_dict.items()
+
+
+class iTunesSD:
+ """
+ Class to handle the iPod Shuffle main database
+ "iPod_Control/iTunes/iTunesSD"
+ """
+ def __init__(self, dbfile="./iPod_Control/iTunes/iTunesSD"):
+ self.dbfile = dbfile
+ self.load()
+
+ def load(self):
+ """
+ Load original header and entries.
+ """
+ self.old_entries = {}
+ self.header_main = array.array("B") # unsigned integer array
+ self.header_entry = array.array("B") # unsigned integer array
+ db = open(self.dbfile, "rb")
+ try:
+ self.header_main.fromfile(db, 18)
+ self.header_entry.fromfile(db, 33)
+ db.seek(18)
+ entry = db.read(558)
+ while len(entry) == 558:
+ filename = entry[33::2].split(b"\0", 1)[0]
+ self.old_entries[filename] = entry
+ entry = db.read(558)
+ except EOFError:
+ pass
+ db.close()
+ print("Loaded %d entries from existing database" % \
+ len(self.old_entries))
+
+ def build_header(self, force=False):
+ global logobj
+ # rebuild database header
+ if force or len(self.header_main) != 18:
+ logobj.log("Rebuild iTunesSD main header ...")
+ del self.header_main[:]
+ self.header_main.fromlist([0,0,0,1,6,0,0,0,18] + [0]*9)
+ if force or len(self.header_entry) != 33:
+ logobj.log("Rebuild iTunesSD entry header ...")
+ del self.header_entry[:]
+ self.header_entry.fromlist([0,2,46,90,165,1] + [0]*20 + \
+ [100,0,0,1,0,2,0])
+
+ def add_entries(self, entries, reuse=True):
+ """
+ Prepare the entries for database
+ """
+ self.entries = OrderedDict()
+
+ for fn, props in entries.get_entries():
+ if reuse and props.get("reuse") and (fn in self.old_entries):
+ # retrieve entry from old entries
+ entry = self.old_entries[fn]
+ else:
+ # build new entry
+ self.header_entry[29] = props["type"]
+ entry = self.header_entry.tostring() + \
+ "".join([ c+"\0" for c in fn[:261] ]).encode("utf-8") + \
+ ("\0"*(525 - 2*len(fn))).encode("utf-8")
+ # modify the shuffle and bookmark flags
+ entry = entry[:555] + chr(props["shuffle"]).encode("utf-8") + \
+ chr(props["bookmark"]).encode("utf-8") + entry[557:]
+ #
+ self.entries[fn] = entry
+
+ def write(self, dbfile=None):
+ if dbfile is None:
+ dbfile = self.dbfile
+ # Make a backup
+ if os.path.exists(dbfile):
+ shutil.copy2(dbfile, dbfile+"_bak")
+
+ # write main database file
+ with open(dbfile, "wb") as db:
+ self.header_main.tofile(db)
+ for entry in self.entries.values():
+ db.write(entry)
+ # Update database header
+ num_entries = len(self.entries)
+ db.seek(0)
+ db.write(b"\0%c%c" % (num_entries>>8, num_entries&0xFF))
+
+
+class iTunesPState:
+ """
+ iPod Shuffle playback state database: "iPod_Control/iTunes/iTunesPState"
+ """
+ def __init__(self, dbfile="iPod_Control/iTunes/iTunesPState"):
+ self.dbfile = dbfile
+ self.load()
+
+ def load(self):
+ with open(self.dbfile, "rb") as db:
+ a = array.array("B")
+ a.fromstring(db.read())
+ self.PState = a.tolist()
+
+ def update(self, volume=None):
+ if len(self.PState) != 21:
+ # volume 29, FW ver 1.0
+ self.PState = self.listval(29) + [0]*15 + self.listval(1)
+ # track 0, shuffle mode, start of track
+ self.PState[3:15] = [0]*6 + [1] + [0]*5
+ if volume is not None:
+ self.PState[:3] = self.listval(volume)
+
+ def write(self, dbfile=None):
+ if dbfile is None:
+ dbfile = self.dbfile
+ # Make a backup
+ if os.path.exists(dbfile):
+ shutil.copy2(dbfile, dbfile+"_bak")
+
+ with open(dbfile, "wb") as db:
+ array.array("B", self.PState).tofile(db)
+
+ @staticmethod
+ def listval(i):
+ if i < 0:
+ i += 0x1000000
+ return [i&0xFF, (i>>8)&0xFF, (i>>16)&0xFF]
+
+
+class iTunesStats:
+ """
+ iPod Shuffle statistics database: "iPod_Control/iTunes/iTunesStats"
+ """
+ def __init__(self, dbfile="iPod_Control/iTunes/iTunesStats"):
+ self.dbfile = dbfile
+
+ def write(self, count, dbfile=None):
+ if dbfile is None:
+ dbfile = self.dbfile
+ # Make a backup
+ if os.path.exists(dbfile):
+ shutil.copy2(dbfile, dbfile+"_bak")
+
+ with open(dbfile, "wb") as db:
+ data = self.stringval(count) + "\0"*3 + \
+ (self.stringval(18) + "\xff"*3 + "\0"*12) * count
+ db.write(data.encode("utf-8"))
+
+ @staticmethod
+ def stringval(i):
+ if i < 0:
+ i += 0x1000000
+ return "%c%c%c" % (i&0xFF, (i>>8)&0xFF, (i>>16)&0xFF)
+
+
+class iTunesShuffle:
+ """
+ iPod shuffle database: "iPod_Control/iTunes/iTunesShuffle"
+ """
+ def __init__(self, dbfile="iPod_Control/iTunes/iTunesShuffle"):
+ self.dbfile = dbfile
+
+ def shuffle(self, entries):
+ """
+ Generate the shuffle sequences for the entries, and take care
+ of the "shuffle" property.
+ """
+ shuffle_prop = [ props["shuffle"]
+ for fn, props in entries.get_entries() ]
+ shuffle_idx = [ idx for idx, s in enumerate(shuffle_prop) if s == 1 ]
+ shuffled = shuffle_idx.copy()
+ random.seed()
+ random.shuffle(shuffled)
+ shuffle_seq = list(range(len(shuffle_prop)))
+ for i, idx in enumerate(shuffle_idx):
+ shuffle_seq[idx] = shuffled[i]
+ self.shuffle_seq = shuffle_seq
+
+ def write(self, dbfile=None):
+ if dbfile is None:
+ dbfile = self.dbfile
+ # Make a backup
+ if os.path.exists(dbfile):
+ shutil.copy2(dbfile, dbfile+"_bak")
+
+ with open(dbfile, "wb") as db:
+ data = "".join(map(iTunesStats.stringval, self.shuffle_seq))
+ db.write(data.encode("utf-8"))
+
+
+def main():
+ prog_basename = os.path.splitext(os.path.basename(sys.argv[0]))[0]
+
+ # command line arguments
+ parser = argparse.ArgumentParser(
+ description="Rebuild iPod Shuffle Database",
+ epilog="Version: %s (%s)\n\n" % (__version__, __date__) + \
+ "Only 1st and 2nd iPod Shuffle supported!\n\n" + \
+ "The script must be placed under the iPod's root directory")
+ parser.add_argument("-f", "--force", dest="force", action="store_true",
+ help="always rebuild database entries, do NOT reuse old ones")
+ parser.add_argument("-M", "--no-rename", dest="norename",
+ action="store_false", default=True,
+ help="do NOT rename files")
+ parser.add_argument("-V", "--volume", dest="volume", type=int,
+ help="set playback volume (0 - 38)")
+ parser.add_argument("-r", "--rulesfile", dest="rulesfile",
+ default="%s.rules" % prog_basename,
+ help="additional rules filename")
+ parser.add_argument("-l", "--logfile", dest="logfile",
+ default="%s.log" % prog_basename,
+ help="log output filename")
+ parser.add_argument("dirs", nargs="*",
+ help="directories to be searched for playable files")
+ args = parser.parse_args()
+
+ flag_reuse = not args.force
+
+ # Start logging
+ global logobj
+ logobj = LogObj(args.logfile)
+ logobj.open()
+
+ # Rules for how to handle the found playable files
+ rules = []
+ # Add default rules
+ rules.append(Rule(conditions=[("filename", "~", "*.mp3")],
+ actions={"type":1, "shuffle":1, "bookmark":0}))
+ rules.append(Rule(conditions=[("filename", "~", "*.m4?")],
+ actions={"type":2, "shuffle":1, "bookmark":0}))
+ rules.append(Rule(conditions=[("filename", "~", "*.m4b")],
+ actions={"shuffle":0, "bookmark":1}))
+ rules.append(Rule(conditions=[("filename", "~", "*.aa")],
+ actions={"type":1, "shuffle":0, "bookmark":1, "reuse":1}))
+ rules.append(Rule(conditions=[("filename", "~", "*.wav")],
+ actions={"type":4, "shuffle":0, "bookmark":0}))
+ rules.append(Rule(conditions=[("filename", "~", "*.book.???")],
+ actions={"shuffle":0, "bookmark":1}))
+ rules.append(Rule(conditions=[("filename", "~", "*.announce.???")],
+ actions={"shuffle":0, "bookmark":0}))
+ rules.append(Rule(conditions=[("filename", "~", "/backup/*")],
+ actions={"ignore":1}))
+ # Load additional rules
+ try:
+ for line in open(args.rulesfile, "r").readlines():
+ rules.append(Rule.parse(line))
+ logobj.log("Loaded additional rules from file: %s" % args.rulesfile)
+ except IOError:
+ pass
+
+ # cd to the directory of this script
+ os.chdir(os.path.dirname(sys.argv[0]))
+
+ if not os.path.isdir("iPod_Control/iTunes"):
+ logobj.log("ERROR: No iPod control directory found!")
+ logobj.log("Please make sure that:")
+ logobj.log("(*) this script is placed under the iPod's root directory")
+ logobj.log("(*) the iPod was correctly initialized with iTunes")
+ sys.exit(1)
+
+ # playable entries
+ logobj.log("Search for playable entries ...")
+ entries = Entries()
+ if args.dirs:
+ for dir in args.dirs:
+ entries.add_dir(dir=dir, recursive=True, rename=args.norename)
+ else:
+ entries.add_dir(".", recursive=True, rename=args.norename)
+ entries.fix_and_sort()
+ logobj.log("Apply rules to entries ...")
+ entries.apply_rules(rules=rules)
+
+ # read main database file
+ logobj.log("Update main database ...")
+ db = iTunesSD(dbfile="iPod_Control/iTunes/iTunesSD")
+ db.build_header(force=args.force)
+ db.add_entries(entries=entries, reuse=flag_reuse)
+ assert len(db.entries) == len(entries.get_entries())
+ db.write()
+ logobj.log("Added %d entries ..." % len(db.entries))
+
+ # other misc databases
+ logobj.log("Update playback state database ...")
+ db_pstate = iTunesPState(dbfile="iPod_Control/iTunes/iTunesPState")
+ db_pstate.update(volume=args.volume)
+ db_pstate.write()
+ logobj.log("Update statistics database ...")
+ db_stats = iTunesStats(dbfile="iPod_Control/iTunes/iTunesStats")
+ db_stats.write(count=len(db.entries))
+ logobj.log("Update shuffle database ...")
+ db_shuffle = iTunesShuffle(dbfile="iPod_Control/iTunes/iTunesShuffle")
+ db_shuffle.shuffle(entries=entries)
+ db_shuffle.write()
+
+ logobj.log("The iPod Shuffle database was rebuilt successfully!")
+
+ logobj.close()
+
+
+if __name__ == "__main__":
+ main()
+
+# vim: set ts=4 sw=4 tw=0 fenc=utf-8 ft=python: #