Complete overhaul of label-logic.

1) Labels are again used as primary markers for cascading the information.
2) API queue only gets filled with items that require an update. This drastically lowers the size of the queue.
3) Removal of legacy code.
Hoffelhas 2020-05-24 15:55:47 +02:00
parent 9e9567efaf
commit 0c47cfa81c
1 changed files with 181 additions and 154 deletions

View File

@ -1,6 +1,9 @@
# Autodoist v1.0.3
global overview_item_ids
global overview_item_labels
import logging
import argparse
import requests
@ -10,7 +13,6 @@ from datetime import datetime
from todoist.api import TodoistAPI
def main():
# Version
@ -153,6 +155,7 @@ def main():
item_type = item['parent_type']
item_type_changed = 1
item['item_type'] = item_type
item_type, item_type_changed = get_type(item, 'item_type')
@ -165,14 +168,30 @@ def main():
labels = item['labels']
logging.debug('Updating \'%s\' with label', item['content'])
api.items.update(item['id'], labels=labels)
overview_item_ids[str(item['id'])] += 1
overview_item_ids[str(item['id'])] = 1
overview_item_labels[str(item['id'])] = labels
def remove_label(item, label):
if label in item['labels']:
labels = item['labels']
logging.debug('Removing \'%s\' of its label', item['content'])
api.items.update(item['id'], labels=labels)
overview_item_ids[str(item['id'])] -= 1
overview_item_ids[str(item['id'])] = -1
overview_item_labels[str(item['id'])] = labels
def update_labels(label_id):
filtered_overview_ids = [k for k, v in overview_item_ids.items() if v != 0]
for item_id in filtered_overview_ids:
labels = overview_item_labels[item_id]
api.items.update(item_id, labels = labels)
# Check for updates
@ -182,85 +201,98 @@ def main():
# Main loop
while True:
overview_item_ids = {}
overview_item_labels = {}
except Exception as e:
'Error trying to sync with Todoist API: %s' % str(e))
for project in api.projects.all():
# Get project type
project_type, project_type_changed = get_project_type(project)
logging.debug('Project \'%s\' being processed as %s',
project['name'], project_type)
for project in api.projects.all():
# Get all items for the project
items = api.items.all(
lambda x: x['project_id'] == project['id'])
# Get project type
project_type, project_type_changed = get_project_type(project)
logging.debug('Project \'%s\' being processed as %s',
project['name'], project_type)
# Change top parents_id in order to sort later on
# Get all items for the project
items = api.items.all(
lambda x: x['project_id'] == project['id'])
# Change top parents_id in order to sort later on
for item in items:
if not item['parent_id']:
item['parent_id'] = 0
# Sort by parent_id and filter for completable items
items = sorted(items, key=lambda x: (
x['parent_id'], x['child_order']))
items = list(
filter(lambda x: not x['content'].startswith('*'), items))
# If project type has been changed, clean everything for good measure
if project_type_changed == 1:
# Remove labels
[remove_label(item, label_id) for item in items]
# Remove parent types
for item in items:
if not item['parent_id']:
item['parent_id'] = 0
item['parent_type'] = None
# Sort by parent_id and filter for completable items
items = sorted(items, key=lambda x: (
x['parent_id'], x['child_order']))
items = list(
filter(lambda x: not x['content'].startswith('*'), items))
# To determine if a sequential task was found
first_found_project = False
first_found_item = True
# If project type has been changed, clean everything for good measure
if project_type_changed == 1:
[remove_label(item, label_id) for item in items]
# For all items in this project
for item in items:
# To determine if a task was found on sequential level
first_found_project = False
first_found_item = True
# Determine which child_items exist, both all and the ones that have not been checked yet
non_checked_items = list(
filter(lambda x: x['checked'] == 0, items))
child_items_all = list(
filter(lambda x: x['parent_id'] == item['id'], items))
child_items = list(
filter(lambda x: x['parent_id'] == item['id'], non_checked_items))
for item in items:
# Determine which child_items exist, both all and the ones that have not been checked yet
non_checked_items = list(
filter(lambda x: x['checked'] == 0, items))
child_items_all = list(
filter(lambda x: x['parent_id'] == item['id'], items))
child_items = list(
filter(lambda x: x['parent_id'] == item['id'], non_checked_items))
# Logic for recurring lists
if not args.recurring:
# Logic for recurring lists
if not args.recurring:
# If old label is present, reset it
if item['r_tag'] == 1:
item['r_tag'] = 0
except Exception as e:
# If option turned on, start recurring logic
if item['parent_id'] == 0:
# If old label is present, reset it
if item['r_tag'] == 1:
item['r_tag'] = 0
except Exception as e:
if item['parent_id'] == 0:
if item['due']['is_recurring']:
# Check if the T0 task date has changed
if item['due']['date'] != item['old_date']:
# Save the new date
item['due']['date'] = item['old_date']
# Mark children for action
for child_item in child_items_all:
child_item['r_tag'] = 1
except Exception as e:
# If date has never been saved before, create a new entry
'New recurring task detected: %s' % str(e))
if item['due']['is_recurring']:
# Check if the T0 task date has changed
if item['due']['date'] != item['old_date']:
# Save the new date
item['old_date'] = item['due']['date']
except Exception as e:
'Parent not recurring: %s' % str(e))
# Mark children for action
for child_item in child_items_all:
child_item['r_tag'] = 1
except Exception as e:
# If date has never been saved before, create a new entry
'New recurring task detected: %s' % str(e))
item['old_date'] = item['due']['date']
except Exception as e:
'Parent not recurring: %s' % str(e))
if item['parent_id'] != 0:
if item['r_tag'] == 1:
@ -274,102 +306,97 @@ def main():
logging.debug('Child not recurring: %s' % str(e))
# Skip processing an item if it has already been checked
if item['checked'] == 1:
# Skip processing an item if it has already been checked
if item['checked'] == 1:
# Check item type
item_type, item_type_changed = get_item_type(
item, project_type)
logging.debug('Identified \'%s\' as %s type',
item['content'], item_type)
# Check item type
item_type, item_type_changed = get_item_type(
item, project_type)
logging.debug('Identified \'%s\' as %s type',
item['content'], item_type)
if project_type is None and item_type is None and project_type_changed == 1:
# Clean the item and its children
# Check the item_type of the project or parent
if item_type is None:
if item['parent_id'] == 0:
item_type = project_type
if item['parent_type'] is None:
item_type = project_type
item_type = item['parent_type']
item_type = project_type
# Reset in case that parentless task is tagged, overrules project
first_found_item = False
# If it is a parentless task
if item['parent_id'] == 0:
if project_type == 'sequential':
if not first_found_project:
add_label(item, label_id)
first_found_project = True
elif not first_found_item:
add_label(item, label_id)
first_found_item = True
# else:
# remove_label(item, label_id)
elif project_type == 'parallel':
add_label(item, label_id)
# If no project-type has been defined
if item_type:
add_label(item, label_id)
# If there are children
if len(child_items) > 0:
# Check if item state has changed, if so clean children for good measure
if item_type_changed == 1:
[remove_label(child_item, label_id)
for child_item in child_items]
# Process sequential tagged items (item_type can overrule project_type)
if item_type == 'sequential':
for child_item in child_items:
# Pass item_type down to the children
child_item['parent_type'] = item_type
# Pass label down to the first child
if child_item['checked'] == 0 and label_id in item['labels']:
add_label(child_item, label_id)
remove_label(item, label_id)
# Clean for good measure
remove_label(child_item, label_id)
# Process parallel tagged items or untagged parents
elif item_type == 'parallel':
remove_label(item, label_id)
for child_item in child_items:
child_item['parent_type'] = None
child_item['parent_type'] = item_type
if child_item['checked'] == 0:
# child_first_found = True
add_label(child_item, label_id)
# We can immediately continue
# Define the item_type
if item_type is None:
item_type = project_type
# Reset in case that task is tagged as sequential
first_found_item = False
# If item is too far in the future, remove the next_action tag and skip
if args.hide_future > 0 and 'due_date_utc' in and item['due_date_utc'] is not None:
due_date = datetime.strptime(
item['due_date_utc'], '%a %d %b %Y %H:%M:%S +0000')
future_diff = (
due_date - datetime.utcnow()).total_seconds()
if future_diff >= (args.hide_future * 86400):
remove_label(item, label_id)
# If it is a parentless task
if len(child_items) == 0:
if item['parent_id'] == 0:
if project_type == 'sequential':
if not first_found_project:
add_label(item, label_id)
first_found_project = True
elif not first_found_item:
add_label(item, label_id)
first_found_item = True
remove_label(item, label_id)
elif project_type == 'parallel':
add_label(item, label_id)
# If only the item type has been defined
if item_type:
add_label(item, label_id)
# Commit the queue with changes
# If there are children, label them instead
if len(child_items) > 0:
child_first_found = False
# Check if state has changed, if so clean for good measure
if item_type_changed == 1:
[remove_label(child_item, label_id)
for child_item in child_items]
# Process sequential tagged items (item_type can overrule project_type)
if item_type == 'sequential':
for child_item in child_items:
if child_item['checked'] == 0 and not child_first_found and not first_found_project:
first_found_project = True
child_first_found = True
add_label(child_item, label_id)
child_item['parent_type'] = item_type
elif child_item['checked'] == 0 and not child_first_found and not first_found_item:
first_found_item = True
child_first_found = True
add_label(child_item, label_id)
child_item['parent_type'] = item_type
remove_label(child_item, label_id)
# Process parallel tagged items or untagged parents
elif item_type == 'parallel':
for child_item in child_items:
if child_item['checked'] == 0:
child_first_found = True
add_label(child_item, label_id)
child_item['parent_type'] = item_type
# Remove the label from the parent (needed for if recurring list is reset)
if item_type and child_first_found:
remove_label(item, label_id)
# If item is too far in the future, remove the next_action tag and skip
if args.hide_future > 0 and 'due_date_utc' in and item['due_date_utc'] is not None:
due_date = datetime.strptime(
item['due_date_utc'], '%a %d %b %Y %H:%M:%S +0000')
future_diff = (
due_date - datetime.utcnow()).total_seconds()
if future_diff >= (args.hide_future * 86400):
remove_label(item, label_id)
if len(api.queue):
'%d changes queued for sync... commiting to Todoist.', len(api.queue))
logging.debug('No changes queued, skipping sync.')
if len(api.queue):
'%d changes queued for sync... commiting to Todoist.', len(api.queue))
logging.debug('No changes queued, skipping sync.')
# If onetime is set, exit after first execution.
if args.onetime: