#!/usr/bin/python3 from todoist_api_python.api import TodoistAPI from todoist_api_python.models import Task from todoist_api_python.models import Section from todoist_api_python.models import Project from todoist_api_python.http_requests import get from todoist_api_python.http_requests import post from urllib.parse import urljoin import sys import time import requests import argparse import logging from datetime import datetime, timedelta import time import sqlite3 import os import re # Connect to SQLite database def create_connection(path): connection = None try: connection = sqlite3.connect(path) logging.info("Connection to SQLite DB successful") except Exception as e: logging.error( f"Could not connect to the SQLite database: the error '{e}' occurred") sys.exit(1) return connection # Close conenction to SQLite database def close_connection(connection): try: connection.close() except Exception as e: logging.error( f"Could not close the SQLite database: the error '{e}' occurred") sys.exit(1) # Execute any SQLite query passed to it in the form of string def execute_query(connection, query, *args): cursor = connection.cursor() try: value = args[0] # Useful to pass None/NULL value correctly cursor.execute(query, (value,)) except: cursor.execute(query) try: connection.commit() logging.debug("Query executed: {}".format(query)) except Exception as e: logging.debug(f"The error '{e}' occurred") # Pass query to select and read record. Outputs a tuple. def execute_read_query(connection, query): cursor = connection.cursor() result = None try: cursor.execute(query) result = cursor.fetchall() logging.debug("Query fetched: {}".format(query)) return result except Exception as e: logging.debug(f"The error '{e}' occurred") # Construct query and read a value def db_read_value(connection, model, column): try: if isinstance(model, Task): db_name = 'tasks' goal = 'task_id' elif isinstance(model, Section): db_name = 'sections' goal = 'section_id' elif isinstance(model, Project): db_name = 'projects' goal = 'project_id' query = "SELECT %s FROM %s where %s=%r" % ( column, db_name, goal, model.id) result = execute_read_query(connection, query) except Exception as e: logging.debug(f"The error '{e}' occurred") return result # Construct query and update a value def db_update_value(connection, model, column, value): try: if isinstance(model, Task): db_name = 'tasks' goal = 'task_id' elif isinstance(model, Section): db_name = 'sections' goal = 'section_id' elif isinstance(model, Project): db_name = 'projects' goal = 'project_id' query = """UPDATE %s SET %s = ? WHERE %s = %r""" % ( db_name, column, goal, model.id) result = execute_query(connection, query, value) return result except Exception as e: logging.debug(f"The error '{e}' occurred") # Check if the id of a model exists, if not, add to database def db_check_existance(connection, model): try: if isinstance(model, Task): db_name = 'tasks' goal = 'task_id' elif isinstance(model, Section): db_name = 'sections' goal = 'section_id' elif isinstance(model, Project): db_name = 'projects' goal = 'project_id' q_check_existence = "SELECT EXISTS(SELECT 1 FROM %s WHERE %s=%r)" % ( db_name, goal, model.id) existence_result = execute_read_query(connection, q_check_existence) if existence_result[0][0] == 0: if isinstance(model, Task): q_create = """ INSERT INTO tasks (task_id, task_type, parent_type, due_date, r_tag) VALUES (%r, %s, %s, %s, %i); """ % (model.id, 'NULL', 'NULL', 'NULL', 0) if isinstance(model, Section): q_create = """ INSERT INTO sections (section_id, section_type) VALUES (%r, %s); """ % (model.id, 'NULL') if isinstance(model, Project): q_create = """ INSERT INTO projects (project_id, project_type) VALUES (%r, %s); """ % (model.id, 'NULL') execute_query(connection, q_create) except Exception as e: logging.debug(f"The error '{e}' occurred") # Initialise new database tables def initialise_sqlite(): cwd = os.getcwdb() db_path = os.path.join(cwd, b'metadata.sqlite') connection = create_connection(db_path) q_create_projects_table = """ CREATE TABLE IF NOT EXISTS projects ( id INTEGER PRIMARY KEY AUTOINCREMENT, project_id INTEGER, project_type TEXT ); """ q_create_sections_table = """ CREATE TABLE IF NOT EXISTS sections ( id INTEGER PRIMARY KEY AUTOINCREMENT, section_id INTEGER, section_type ); """ q_create_tasks_table = """ CREATE TABLE IF NOT EXISTS tasks ( id INTEGER PRIMARY KEY AUTOINCREMENT, task_id INTEGER, task_type TEXT, parent_type TEXT, due_date TEXT, r_tag INTEGER ); """ execute_query(connection, q_create_projects_table) execute_query(connection, q_create_sections_table) execute_query(connection, q_create_tasks_table) return connection # Makes --help text wider def make_wide(formatter, w=120, h=36): """Return a wider HelpFormatter, if possible.""" try: # https://stackoverflow.com/a/5464440 # beware: "Only the name of this class is considered a public API." kwargs = {'width': w, 'max_help_position': h} formatter(None, **kwargs) return lambda prog: formatter(prog, **kwargs) except TypeError: logging.error("Argparse help formatter failed, falling back.") return formatter # Sync with Todoist API def sync(api): try: logging.debug('Syncing the current state from the API') api.sync() except Exception as e: logging.exception( 'Error trying to sync with Todoist API: %s' % str(e)) quit() # Simple query for yes/no answer def query_yes_no(question, default="yes"): # """Ask a yes/no question via raw_input() and return their answer. # "question" is a string that is presented to the user. # "default" is the presumed answer if the user just hits . # It must be "yes" (the default), "no" or None (meaning # an answer is required of the user). # The "answer" return value is True for "yes" or False for "no". # """ valid = {"yes": True, "y": True, "ye": True, "no": False, "n": False} if default is None: prompt = " [y/n] " elif default == "yes": prompt = " [Y/n] " elif default == "no": prompt = " [y/N] " else: raise ValueError("invalid default answer: '%s'" % default) while True: sys.stdout.write(question + prompt) choice = input().lower() if default is not None and choice == '': return valid[default] elif choice in valid: return valid[choice] else: sys.stdout.write("Please respond with 'yes' or 'no' " "(or 'y' or 'n').\n") # Check if label exists, if not, create it def verify_label_existance(api, label_name, prompt_mode): # Check the regeneration label exists labels = api.get_labels() label = [x for x in labels if x.name == label_name] if len(label) > 0: next_action_label = label[0].id logging.debug('Label \'%s\' found as label id %s', label_name, next_action_label) else: # Create a new label in Todoist logging.info( "\n\nLabel '{}' doesn't exist in your Todoist\n".format(label_name)) # sys.exit(1) if prompt_mode == 1: response = query_yes_no( 'Do you want to automatically create this label?') else: response = True if response: try: api.add_label(name=label_name) except Exception as error: print(error) labels = api.get_labels() label = [x for x in labels if x.name == label_name] next_action_label = label[0].id logging.info("Label '{}' has been created!".format(label_name)) else: logging.info('Exiting Autodoist.') exit(1) return 0 # Initialisation of Autodoist def initialise_api(args): # Check we have a API key if not args.api_key: logging.error( "\n\nNo API key set. Run Autodoist with '-a '\n") sys.exit(1) # Check if alternative end of day is used if args.end is not None: if args.end < 1 or args.end > 24: logging.error( "\n\nPlease choose a number from 1 to 24 to indicate which hour is used as alternative end-of-day time.\n") sys.exit(1) else: pass # Check if proper regeneration mode has been selected if args.regeneration is not None: if not set([0, 1, 2]) & set([args.regeneration]): logging.error( 'Wrong regeneration mode. Please choose a number from 0 to 2. Check --help for more information on the available modes.') exit(1) # Show which modes are enabled: modes = [] m_num = 0 for x in [args.label, args.regeneration, args.end]: if x: modes.append('Enabled') m_num += 1 else: modes.append('Disabled') logging.info("You are running with the following functionalities:\n\n Next action labelling mode: {}\n Regenerate sub-tasks mode: {}\n Shifted end-of-day mode: {}\n".format(*modes)) if m_num == 0: logging.info( "\n No functionality has been enabled. Please see --help for the available options.\n") exit(0) # Run the initial sync logging.debug('Connecting to the Todoist API') api_arguments = {'token': args.api_key} if args.nocache: logging.debug('Disabling local caching') api_arguments['cache'] = None api = TodoistAPI(**api_arguments) logging.info("Autodoist has successfully connected to Todoist!\n") # Check if labels exist # If labeling argument is used if args.label is not None: # Verify that the next action label exists; ask user if it needs to be created verify_label_existance(api, args.label, 1) # If regeneration mode is used, verify labels if args.regeneration is not None: # Verify the existance of the regeneraton labels; force creation of label regen_labels_id = [verify_label_existance( api, regen_label, 2) for regen_label in args.regen_label_names] else: # Label functionality not needed regen_labels_id = [None, None, None] return api # Check for Autodoist update def check_for_update(current_version): updateurl = 'https://api.github.com/repos/Hoffelhas/autodoist/releases' try: r = requests.get(updateurl) r.raise_for_status() release_info_json = r.json() if not current_version == release_info_json[0]['tag_name']: logging.warning("\n\nYour version is not up-to-date! \nYour version: {}. Latest version: {}\nFind the latest version at: {}\n".format( current_version, release_info_json[0]['tag_name'], release_info_json[0]['html_url'])) return 1 else: return 0 except requests.exceptions.ConnectionError as e: logging.error( "Error while checking for updates (Connection error): {}".format(e)) return 1 except requests.exceptions.HTTPError as e: logging.error( "Error while checking for updates (HTTP error): {}".format(e)) return 1 except requests.exceptions.RequestException as e: logging.error("Error while checking for updates: {}".format(e)) return 1 # Get all data through the SYNC API. Needed to see e.g. any completed tasks. def get_all_data(api): BASE_URL = "https://api.todoist.com" SYNC_VERSION = "v9" SYNC_API = urljoin(BASE_URL, f"/sync/{SYNC_VERSION}/") COMPLETED_GET_ALL = "completed/get_all" endpoint = urljoin(SYNC_API, COMPLETED_GET_ALL) data = get(api._session, endpoint, api._token) return data # Find the type based on name suffix. def check_name(args, string, num): try: # Find inbox or none section as exceptions if string == None: current_type = None pass elif string == 'Inbox': current_type = args.inbox pass else: # Find any = or - symbol at the end of the string. Look at last 3 for projects, 2 for sections, and 1 for tasks regex = '[%s%s]{1,%s}$' % (args.s_suffix, args.p_suffix, str(num)) re_ind = re.search(regex, string) suffix = re_ind[0] # Somebody put fewer characters than intended. Take last character and apply for every missing one. if len(suffix) < num: suffix += suffix[-1] * (num - len(suffix)) current_type = '' for s in suffix: if s == args.s_suffix: current_type += 's' elif s == args.p_suffix: current_type += 'p' # Always return a three letter string if len(current_type) == 2: current_type = 'x' + current_type elif len(current_type) == 1: current_type = 'xx' + current_type except: logging.debug("String {} not recognised.".format(string)) current_type = None return current_type # Scan the end of a name to find what type it is def get_type(args, connection, model, key): # model_name = '' try: old_type = '' old_type = db_read_value(connection, model, key)[0][0] except: # logging.debug('No defined project_type: %s' % str(e)) old_type = None if isinstance(model, Task): current_type = check_name(args, model.content, 1) # Tasks elif isinstance(model, Section): current_type = check_name(args, model.name, 2) # Sections elif isinstance(model, Project): current_type = check_name(args, model.name, 3) # Projects # Check if type changed with respect to previous run if old_type == current_type: type_changed = 0 else: type_changed = 1 db_update_value(connection, model, key, current_type) return current_type, type_changed # Determine a project type def get_project_type(args, connection, project): """Identifies how a project should be handled.""" project_type, project_type_changed = get_type( args, connection, project, 'project_type') if project_type is not None: logging.debug('Identified \'%s\' as %s type', project.name, project_type) return project_type, project_type_changed # Determine a section type def get_section_type(args, connection, section, project): """Identifies how a section should be handled.""" if section is not None: section_type, section_type_changed = get_type( args, connection, section, 'section_type') else: section_type = None section_type_changed = 0 if section_type is not None: logging.debug("Identified '%s > %s' as %s type", project.name, section.name, section_type) return section_type, section_type_changed # Determine an task type def get_task_type(args, connection, task, section, project): """Identifies how a task with sub tasks should be handled.""" task_type, task_type_changed = get_type( args, connection, task, 'task_type') if task_type is not None: logging.debug("Identified '%s > %s > %s' as %s type",project.name, section.name, task.content, task_type) return task_type, task_type_changed # Logic to track addition of a label to a task def add_label(connection, task, dominant_type, label, overview_task_ids, overview_task_labels): if label not in task.labels: labels = task.labels # To also copy other existing labels logging.debug('Updating \'%s\' with label', task.content) labels.append(label) try: overview_task_ids[task.id] += 1 except: overview_task_ids[task.id] = 1 overview_task_labels[task.id] = labels # Logic to track removal of a label from a task def remove_label(task, label, overview_task_ids, overview_task_labels): if label in task.labels: labels = task.labels logging.debug('Removing \'%s\' of its label', task.content) labels.remove(label) try: overview_task_ids[task.id] -= 1 except: overview_task_ids[task.id] = -1 overview_task_labels[task.id] = labels # Ensure label updates are only issued once per task def update_labels(api, overview_task_ids, overview_task_labels): filtered_overview_ids = [ k for k, v in overview_task_ids.items() if v != 0] for task_id in filtered_overview_ids: labels = overview_task_labels[task_id] api.update_task(task_id=task_id, labels=labels) return filtered_overview_ids # Check if header logic needs to be applied def check_header(api, model, overview_updated_ids): header_all_in_level = False unheader_all_in_level = False regex_a = '(^[*]{2}\s*)(.*)' regex_b = '(^\-\*\s*)(.*)' try: if isinstance(model, Task): ra = re.search(regex_a, model.content) rb = re.search(regex_b, model.content) if ra: header_all_in_level = True model.content = ra[2] # Local record api.update_task(task_id=model.id, content=ra[2]) # overview_updated_ids.append(model.id) # Ignore this one, since else it's count double if rb: unheader_all_in_level = True model.content = rb[2] # Local record api.update_task(task_id=model.id, content=rb[2]) overview_updated_ids.append(model.id) else: ra = re.search(regex_a, model.name) rb = re.search(regex_b, model.name) if isinstance(model, Section): if ra: header_all_in_level = True api.update_section(section_id=model.id, name=ra[2]) overview_updated_ids.append(model.id) if rb: unheader_all_in_level = True api.update_section(section_id=model.id, name=rb[2]) overview_updated_ids.append(model.id) elif isinstance(model, Project): if ra: header_all_in_level = True api.update_project(project_id=model.id, name=ra[2]) overview_updated_ids.append(model.id) if rb: unheader_all_in_level = True api.update_project(project_id=model.id, name=rb[2]) overview_updated_ids.append(model.id) except: logging.debug('check_header: no right model found') return header_all_in_level, unheader_all_in_level # Logic for applying and removing headers def modify_task_headers(api, task, section_tasks, overview_updated_ids, header_all_in_p, unheader_all_in_p, header_all_in_s, unheader_all_in_s, header_all_in_t, unheader_all_in_t): if any([header_all_in_p, header_all_in_s]): if task.content[:2] != '* ': api.update_task(task_id=task.id, content='* ' + task.content) overview_updated_ids.append(task.id) if any([unheader_all_in_p, unheader_all_in_s]): if task.content[:2] == '* ': api.update_task(task_id=task.id, content=task.content[2:]) overview_updated_ids.append(task.id) if header_all_in_t: if task.content[:2] != '* ': api.update_task(task_id=task.id, content='* ' + task.content) overview_updated_ids.append(task.id) find_and_headerify_all_children( api, task, section_tasks, overview_updated_ids, 1) if unheader_all_in_t: if task.content[:2] == '* ': api.update_task(task_id=task.id, content=task.content[2:]) overview_updated_ids.append(task.id) find_and_headerify_all_children( api, task, section_tasks, overview_updated_ids, 2) # Check regen mode based on label name def check_regen_mode(api, item, regen_labels_id): labels = item.labels overlap = set(labels) & set(regen_labels_id) overlap = [val for val in overlap] if len(overlap) > 1: logging.warning( 'Multiple regeneration labels used! Please pick only one for item: "{}".'.format(item.content)) return None try: regen_next_action_label = overlap[0] except: logging.debug( 'No regeneration label for item: %s' % item.content) regen_next_action_label = [0] if regen_next_action_label == regen_labels_id[0]: return 0 elif regen_next_action_label == regen_labels_id[1]: return 1 elif regen_next_action_label == regen_labels_id[2]: return 2 else: # label_name = api.labels.get_by_id(regen_next_action_label)['name'] # logging.debug( # 'No regeneration label for item: %s' % item.content) return None # Recurring lists logic def run_recurring_lists_logic(args, api, connection, overview_updated_ids, task, task_items, task_items_all, regen_labels_id): if task.parent_id == 0: try: if task.due.is_recurring: try: db_task_due_date = db_read_value( connection, task, 'due_date')[0][0] if db_task_due_date is None: # If date has never been saved before, create a new entry logging.debug( 'New recurring task detected: %s' % task.content) db_update_value(connection, task, 'due_date', task.due.date) # Check if the T0 task date has changed, because a user has checked the task if task.due.date != db_task_due_date: # TODO: reevaluate regeneration mode. Disabled for now. # # Mark children for action based on mode # if args.regeneration is not None: # # Check if task has a regen label # regen_mode = check_regen_mode( # api, item, regen_labels_id) # # If no label, use general mode instead # if regen_mode is None: # regen_mode = args.regeneration # logging.debug('Using general recurring mode \'%s\' for item: %s', # regen_mode, item.content) # else: # logging.debug('Using recurring label \'%s\' for item: %s', # regen_mode, item.content) # # Apply tags based on mode # give_regen_tag = 0 # if regen_mode == 1: # Regen all # give_regen_tag = 1 # elif regen_mode == 2: # Regen if all sub-tasks completed # if not child_items: # give_regen_tag = 1 # if give_regen_tag == 1: # for child_item in child_items_all: # child_item['r_tag'] = 1 # If alternative end of day, fix due date if needed if args.end is not None: # Determine current hour t = datetime.today() current_hour = t.hour # Check if current time is before our end-of-day if (args.end - current_hour) > 0: # Determine the difference in days set by todoist nd = [int(x) for x in task.due.date.split('-')] od = [int(x) for x in db_task_due_date.split('-')] new_date = datetime( nd[0], nd[1], nd[2]) old_date = datetime( od[0], od[1], od[2]) today = datetime( t.year, t.month, t.day) days_difference = ( new_date-today).days days_overdue = ( today - old_date).days # Only apply if overdue and if it's a daily recurring tasks if days_overdue >= 1 and days_difference == 1: # Find current date in string format today_str = t.strftime("%Y-%m-%d") # Update due-date to today api.update_task( task_id=task.id, due_date=today_str, due_string=task.due.string) logging.info( "Update date on task: '%s'" % (task.content)) # Save the new date for reference us db_update_value(connection, task, 'due_date', task.due.date) except: # If date has never been saved before, create a new entry logging.debug( 'New recurring task detected: %s' % task.content) db_update_value(connection, task, 'due_date', task.due.date) except: pass # TODO: reevaluate regeneration mode. Disabled for now. # if args.regeneration is not None and item.parent_id != 0: # try: # if item['r_tag'] == 1: # item.update(checked=0) # item.update(in_history=0) # item['r_tag'] = 0 # api.items.update(item['id']) # for child_item in child_items_all: # child_item['r_tag'] = 1 # except: # # logging.debug('Child not recurring: %s' % # # item.content) # pass # Find and clean all children under a task def find_and_clean_all_children(task_ids, task, section_tasks): child_tasks = list(filter(lambda x: x.parent_id == task.id, section_tasks)) if child_tasks != []: for child_task in child_tasks: # Children found, go deeper task_ids.append(child_task.id) task_ids = find_and_clean_all_children( task_ids, child_task, section_tasks) return task_ids def find_and_headerify_all_children(api, task, section_tasks, overview_updated_ids, mode): child_tasks = list(filter(lambda x: x.parent_id == task.id, section_tasks)) if child_tasks != []: for child_task in child_tasks: # Children found, go deeper if mode == 1: if child_task.content[:2] != '* ': api.update_task(task_id=child_task.id, content='* ' + child_task.content) overview_updated_ids.append(child_task.id) elif mode == 2: if child_task.content[:2] == '* ': api.update_task(task_id=child_task.id, content=child_task.content[2:]) overview_updated_ids.append(child_task.id) find_and_headerify_all_children( api, child_task, section_tasks, overview_updated_ids, mode) return 0 # Contains all main autodoist functionalities def autodoist_magic(args, api, connection): # Preallocate dictionaries and other values overview_task_ids = {} overview_task_labels = {} overview_updated_ids = [] next_action_label = args.label regen_labels_id = args.regen_label_names first_found = [False, False, False] # Get all projects info try: projects = api.get_projects() except Exception as error: print(error) for project in projects: # Skip processing inbox as intended feature if project.is_inbox_project: continue # Check db existance db_check_existance(connection, project) # Check if we need to (un)header entire project header_all_in_p, unheader_all_in_p = check_header( api, project, overview_updated_ids) # Get project type if next_action_label is not None: project_type, project_type_changed = get_project_type( args, connection, project) else: project_type = None project_type_changed = 0 # Get all tasks for the project try: project_tasks = api.get_tasks(project_id=project.id) except Exception as error: print(error) # If a project type has changed, clean all tasks in this project for good measure if next_action_label is not None: if project_type_changed == 1: for task in project_tasks: remove_label(task, next_action_label, overview_task_ids, overview_task_labels) db_update_value(connection, task, 'task_type', None) db_update_value(connection, task, 'parent_type', None) # Run for both non-sectioned and sectioned tasks # Get completed tasks: # endpoint = 'https://api.todoist.com/sync/v9/completed/get_all' # get(api._session, endpoint, api._token, '0')['items'] # $ curl https://api.todoist.com/sync/v9/sync-H "Authorization: Bearer e2f750b64e8fc06ae14383d5e15ea0792a2c1bf3" -d commands='[ {"type": "item_add", "temp_id": "63f7ed23-a038-46b5-b2c9-4abda9097ffa", "uuid": "997d4b43-55f1-48a9-9e66-de5785dfd69b", "args": {"content": "Buy Milk", "project_id": "2203306141","labels": ["Food", "Shopping"]}}]' # for s in [0,1]: # if s == 0: # sections = Section(None, None, 0, project.id) # elif s == 1: # try: # sections = api.get_sections(project_id=project.id) # except Exception as error: # print(error) # Get all sections and add the 'None' section too. try: sections = api.get_sections(project_id=project.id) sections.insert(0, Section(None, None, 0, project.id)) except Exception as error: print(error) # Reset first_found[0] = False disable_section_labelling = 0 for section in sections: # Check if section labelling is disabled (useful for e.g. Kanban) if next_action_label is not None: try: if section.name.startswith('*') or section.name.endswith('*'): disable_section_labelling = 1 except: pass # Check db existance db_check_existance(connection, section) # Check if we need to (un)header entire secion header_all_in_s, unheader_all_in_s = check_header( api, section, overview_updated_ids) # Get section type if next_action_label: section_type, section_type_changed = get_section_type( args, connection, section, project) else: section_type = None section_type_changed = 0 # Get all tasks for the section section_tasks = [x for x in project_tasks if x.section_id == section.id] # Change top tasks parents_id from 'None' to '0' in order to numerically sort later on for task in section_tasks: if not task.parent_id: task.parent_id = 0 # Sort by parent_id and child order # In the past, Todoist used to screw up the tasks orders, so originally I processed parentless tasks first such that children could properly inherit porperties. # With the new API this seems to be in order, but I'm keeping this just in case for now. TODO: Could be used for optimization in the future. section_tasks = sorted(section_tasks, key=lambda x: ( int(x.parent_id), x.order)) # If a type has changed, clean all tasks in this section for good measure if next_action_label is not None: if section_type_changed == 1: for task in section_tasks: remove_label(task, next_action_label, overview_task_ids, overview_task_labels) db_update_value(connection, task, 'task_type', None) db_update_value(connection, task, 'parent_type', None) # Reset first_found[1] = False # For all tasks in this section for task in section_tasks: # Reset dominant_type = None # Check db existance db_check_existance(connection, task) # Determine which child_tasks exist, both all and the ones that have not been checked yet non_completed_tasks = list( filter(lambda x: not x.is_completed, section_tasks)) child_tasks_all = list( filter(lambda x: x.parent_id == task.id, section_tasks)) child_tasks = list( filter(lambda x: x.parent_id == task.id, non_completed_tasks)) # Check if we need to (un)header entire task tree header_all_in_t, unheader_all_in_t = check_header( api, task, overview_updated_ids) # Modify headers where needed modify_task_headers(api, task, section_tasks, overview_updated_ids, header_all_in_p, unheader_all_in_p, header_all_in_s, unheader_all_in_s, header_all_in_t, unheader_all_in_t) # TODO: Check is regeneration is still needed, now that it's part of core Todoist. Disabled for now. # Logic for recurring lists # if not args.regeneration: # try: # # If old label is present, reset it # if item.r_tag == 1: #TODO: METADATA # item.r_tag = 0 #TODO: METADATA # api.items.update(item.id) # except: # pass # If options turned on, start recurring lists logic #TODO: regeneration currently doesn't work, becaue TASK_ENDPOINT doesn't show completed tasks. Use workaround. if args.regeneration is not None or args.end: run_recurring_lists_logic( args, api, connection, overview_updated_ids, task, child_tasks, child_tasks_all, regen_labels_id) # If options turned on, start labelling logic if next_action_label is not None: # Skip processing a task if it has already been checked or is a header if task.is_completed: continue # Remove clean all task and subtask data if task.content.startswith('*') or disable_section_labelling: remove_label(task, next_action_label, overview_task_ids, overview_task_labels) db_update_value(connection, task, 'task_type', None) db_update_value(connection, task, 'parent_type', None) task_ids = find_and_clean_all_children( [], task, section_tasks) child_tasks_all = list( filter(lambda x: x.id in task_ids, section_tasks)) for child_task in child_tasks_all: remove_label(child_task, next_action_label, overview_task_ids, overview_task_labels) db_update_value( connection, child_task, 'task_type', None) db_update_value( connection, child_task, 'parent_type', None) continue # Check task type task_type, task_type_changed = get_task_type( args, connection, task, section, project) # If task type has changed, clean all of its children for good measure if next_action_label is not None: if task_type_changed == 1: # Find all children under this task task_ids = find_and_clean_all_children( [], task, section_tasks) child_tasks_all = list( filter(lambda x: x.id in task_ids, section_tasks)) for child_task in child_tasks_all: remove_label( child_task, next_action_label, overview_task_ids, overview_task_labels) db_update_value( connection, child_task, 'task_type', None) db_update_value( connection, child_task, 'parent_type', None) # Determine hierarchy types for logic hierarchy_types = [task_type, section_type, project_type] hierarchy_boolean = [type(x) != type(None) for x in hierarchy_types] # If task has no type, but has a label, most likely the order has been changed by user. Remove data. if not True in hierarchy_boolean and next_action_label in task.labels: remove_label(task, next_action_label, overview_task_ids, overview_task_labels) db_update_value(connection, task, 'task_type', None) db_update_value(connection, task, 'parent_type', None) # If it is a parentless task, set task type based on hierarchy if task.parent_id == 0: if not True in hierarchy_boolean: # Parentless task has no type, so skip any children. continue else: if hierarchy_boolean[0]: # Inherit task type dominant_type = task_type elif hierarchy_boolean[1]: # Inherit section type dominant_type = section_type elif hierarchy_boolean[2]: # Inherit project type dominant_type = project_type # If indicated on project level if dominant_type[0] == 's': if not first_found[0]: if dominant_type[1] == 's': if not first_found[1]: add_label( connection, task, dominant_type, next_action_label, overview_task_ids, overview_task_labels) elif next_action_label in task.labels: # Probably the task has been manually moved, so if it has a label, let's remove it. remove_label( task, next_action_label, overview_task_ids, overview_task_labels) elif dominant_type[1] == 'p': add_label( connection, task, dominant_type, next_action_label, overview_task_ids, overview_task_labels) elif dominant_type[0] == 'p': if dominant_type[1] == 's': if not first_found[1]: add_label( connection, task, dominant_type, next_action_label, overview_task_ids, overview_task_labels) elif next_action_label in task.labels: # Probably the task has been manually moved, so if it has a label, let's remove it. remove_label( task, next_action_label, overview_task_ids, overview_task_labels) elif dominant_type[1] == 'p': add_label( connection, task, dominant_type, next_action_label, overview_task_ids, overview_task_labels) # If indicated on section level if dominant_type[0] == 'x' and dominant_type[1] == 's': if not first_found[1]: add_label( connection, task, dominant_type, next_action_label, overview_task_ids, overview_task_labels) elif next_action_label in task.labels: # Probably the task has been manually moved, so if it has a label, let's remove it. remove_label( task, next_action_label, overview_task_ids, overview_task_labels) elif dominant_type[0] == 'x' and dominant_type[1] == 'p': add_label(connection, task, dominant_type, next_action_label, overview_task_ids, overview_task_labels) # If indicated on parentless task level if dominant_type[1] == 'x' and dominant_type[2] == 's': if not first_found[1]: add_label( connection, task, dominant_type, next_action_label, overview_task_ids, overview_task_labels) if next_action_label in task.labels: # Probably the task has been manually moved, so if it has a label, let's remove it. remove_label( task, next_action_label, overview_task_ids, overview_task_labels) elif dominant_type[1] == 'x' and dominant_type[2] == 'p': add_label(connection, task, dominant_type, next_action_label, overview_task_ids, overview_task_labels) # If a parentless or sub-task which has children if len(child_tasks) > 0: # If it is a sub-task with no own type, inherit the parent task type instead if task.parent_id != 0 and task_type == None: dominant_type = db_read_value( connection, task, 'parent_type')[0][0] # If it is a sub-task with no dominant type (e.g. lower level child with new task_type), use the task type if task.parent_id != 0 and dominant_type == None: dominant_type = task_type if dominant_type is None: # Task with parent that has been headered, skip. continue else: # Only last character is relevant for subtasks dominant_type = dominant_type[-1] # Process sequential tagged tasks if dominant_type == 's': for child_task in child_tasks: # Ignore headered children if child_task.content.startswith('*'): continue # Clean up for good measure. remove_label( child_task, next_action_label, overview_task_ids, overview_task_labels) # Pass task_type down to the children db_update_value( connection, child_task, 'parent_type', dominant_type) # Pass label down to the first child if not child_task.is_completed and next_action_label in task.labels: add_label( connection, child_task, dominant_type, next_action_label, overview_task_ids, overview_task_labels) remove_label( task, next_action_label, overview_task_ids, overview_task_labels) # Process parallel tagged tasks or untagged parents elif dominant_type == 'p' and next_action_label in task.labels: remove_label( task, next_action_label, overview_task_ids, overview_task_labels) for child_task in child_tasks: # Ignore headered children if child_task.content.startswith('*'): continue db_update_value( connection, child_task, 'parent_type', dominant_type) if not child_task.is_completed: add_label( connection, child_task, dominant_type, next_action_label, overview_task_ids, overview_task_labels) # Remove labels based on start / due dates # If task is too far in the future, remove the next_action tag and skip try: if args.hide_future > 0 and task.due.date is not None: due_date = datetime.strptime( task.due.date, "%Y-%m-%d") future_diff = ( due_date - datetime.today()).days if future_diff >= args.hide_future: remove_label( task, next_action_label, overview_task_ids, overview_task_labels) except: # Hide-future not set, skip pass # If start-date has not passed yet, remove label try: f1 = re.search( 'start=(\d{2}[-]\d{2}[-]\d{4})', task.content) if f1: start_date = f1.groups()[0] start_date = datetime.strptime( start_date, args.dateformat) future_diff = ( datetime.today()-start_date).days # If start-date hasen't passed, remove all labels if future_diff < 0: remove_label( task, next_action_label, overview_task_ids, overview_task_labels) [remove_label(child_task, next_action_label, overview_task_ids, overview_task_labels) for child_task in child_tasks] except: logging.warning( 'Wrong start-date format for task: "%s". Please use "start="', task.content) continue # Recurring task friendly - remove label with relative change from due date if task.due is not None: try: f2 = re.search( 'start=due-(\d+)([dw])', task.content) if f2: offset = f2.groups()[0] if f2.groups()[1] == 'd': td = timedelta(days=int(offset)) elif f2.groups()[1] == 'w': td = timedelta(weeks=int(offset)) # Determine start-date try: due_date = datetime.strptime( task.due.datetime, "%Y-%m-%dT%H:%M:%S") except: due_date = datetime.strptime( task.due.date, "%Y-%m-%d") start_date = due_date - td # If we're not in the offset from the due date yet, remove all labels future_diff = ( datetime.today()-start_date).days if future_diff < 0: remove_label( task, next_action_label, overview_task_ids, overview_task_labels) [remove_label(child_task, next_action_label, overview_task_ids, overview_task_labels) for child_task in child_tasks] continue except: logging.warning( 'Wrong start-date format for task: %s. Please use "start=due-"', task.content) continue # Mark first found task in section # TODO: is this always true? What about starred tasks? if next_action_label is not None and first_found[1] == False: first_found[1] = True # Mark first found section with tasks in project (to account for None section) # TODO: is this always true? What about starred tasks? if next_action_label is not None and first_found[0] == False and section_tasks: first_found[0] = True # Return all ids and corresponding labels that need to be modified return overview_task_ids, overview_task_labels, overview_updated_ids # Main def main(): # Version current_version = 'v1.5' # Main process functions. parser = argparse.ArgumentParser( formatter_class=make_wide(argparse.HelpFormatter, w=120, h=60)) parser.add_argument('-a', '--api_key', help='takes your Todoist API Key.', type=str) parser.add_argument( '-l', '--label', help='enable next action labelling. Define which label to use.', type=str) parser.add_argument( '-r', '--regeneration', help='enable regeneration of sub-tasks in recurring lists. Chose overall mode: 0 - regen off, 1 - regen all (default), 2 - regen only if all sub-tasks are completed. Task labels can be used to overwrite this mode.', nargs='?', const='1', default=None, type=int) parser.add_argument( '-e', '--end', help='enable alternative end-of-day time instead of default midnight. Enter a number from 1 to 24 to define which hour is used.', type=int) parser.add_argument( '-d', '--delay', help='specify the delay in seconds between syncs (default 5).', default=5, type=int) parser.add_argument( '-p', '--p_suffix', help='change suffix for parallel labeling (default "=").', default='=') parser.add_argument( '-s', '--s_suffix', help='change suffix for sequential labeling (default "-").', default='-') parser.add_argument( '-df', '--dateformat', help='strptime() format of starting date (default "%%d-%%m-%%Y").', default='%d-%m-%Y') parser.add_argument( '-hf', '--hide_future', help='prevent labelling of future tasks beyond a specified number of days.', default=0, type=int) parser.add_argument( '--onetime', help='update Todoist once and exit.', action='store_true') parser.add_argument( '--nocache', help='disables caching data to disk for quicker syncing.', action='store_true') parser.add_argument('--debug', help='enable debugging and store detailed to a log file.', action='store_true') parser.add_argument('--inbox', help='the method the Inbox should be processed with.', default=None, choices=['parallel', 'sequential']) args = parser.parse_args() # #TODO: Temporary disable this feature for now. Find a way to see completed tasks first, since REST API v2 lost this funcionality. args.regeneration = 0 # Addition of regeneration labels args.regen_label_names = ('Regen_off', 'Regen_all', 'Regen_all_if_completed') # Set logging if args.debug: log_level = logging.DEBUG else: log_level = logging.INFO logging.basicConfig(level=log_level, format='%(asctime)s %(levelname)-8s %(message)s', datefmt='%Y-%m-%d %H:%M:%S', handlers=[logging.FileHandler( 'debug.log', 'w+', 'utf-8'), logging.StreamHandler()] ) # Check for updates check_for_update(current_version) # Initialise api api = initialise_api(args) # Initialise SQLite database connection = initialise_sqlite() # Start main loop while True: start_time = time.time() # sync(api) # Evaluate projects, sections, and tasks overview_task_ids, overview_task_labels, overview_updated_ids = autodoist_magic( args, api, connection) # Commit next action label changes if args.label is not None: updated_ids = update_labels(api, overview_task_ids, overview_task_labels) num_changes = len(updated_ids)+len(overview_updated_ids) if num_changes: if num_changes == 1: logging.info( '%d change committed to Todoist.', num_changes) else: logging.info( '%d changes committed to Todoist.', num_changes) else: logging.info('No changes in queue, skipping sync.') # If onetime is set, exit after first execution. if args.onetime: break # Set a delay before next sync end_time = time.time() delta_time = end_time - start_time if args.delay - delta_time < 0: logging.debug( 'Computation time %d is larger than the specified delay %d. Sleeping skipped.', delta_time, args.delay) elif args.delay >= 0: sleep_time = args.delay - delta_time logging.debug('Sleeping for %d seconds', sleep_time) time.sleep(sleep_time) if __name__ == '__main__': main()