Harvest and Forecast Silent Deletes

#1

Hello!

I have an integration with Harvest and Forecast (new!) and everything is great with one specific issue.

Harvest and Forecast both do “hard deletes” within their system and therefore deletes are not communicated via the API, and therefore are unknown to the synced database.

I have had many conversations with Harvest on this topic. They have no plans to add “soft deletes” to their system/API. When asked specifically how to deal with this situation they have agreed that the only option is to do a full sync of the data from Harvest regardless of the burden on their API.

So, I was wondering if anyone has any other great ideas?

My only thoughts are
a) write a custom comparison that looks for mismatched rows and deletes
b) force a full resync by resetting the start date on a regular basis (weekly?, idk)

Thank you for your ideas!

#2

Hi Suzy. We just implemented the new v2 Harvest integration recently as well and ran into this. We brainstormed on it for a bit and discussed with Harvest support as well, and unfortunately we came to the same conclusion and ended up implementing your (a) option.

Here is our code for doing this in Python in an AWS Lambda function. At least one part won’t be too helpful for you, which is that we’re connecting to a Snowflake DB to do the queries, but hopefully there’s something in here that will help you get this done a bit faster.

import sys, os
import json
import logging
import boto3
import datetime
import time
from harvest import get_harvest_entries_v2
from secrets_manager import get_secrets
from dateutil import tz, parser
import snowUtils
import concurrent.futures

logger = logging.getLogger()
logging.basicConfig()
LOGGING_LEVEL = os.environ['LOGGING_LEVEL']
logger.setLevel(LOGGING_LEVEL)
#logger.setLevel(logging.WARN)

# override boto3 logging configuration
logging.getLogger('boto3').setLevel(logging.ERROR)
logging.getLogger('boto3').propagate = False
logging.getLogger('botocore').setLevel(logging.ERROR)
logging.getLogger('botocore').propagate = False

max_workers = 2

def get_ids(secrets, days):
        logger.info('[get_ids]\t getting harvest ids')
        results = []
        result_futures = []
        os.environ['HARVEST_TOKEN'] = secrets['harvest']['token']
        with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
            for day in range(days):
                iter_date = datetime.date.today()-datetime.timedelta(days=day)
                #logger.info('[get_ids]\t iter_date:{}'.format(iter_date))
                future = executor.submit(get_harvest_entries_v2, 'time_entries', from_d=iter_date, to_d=iter_date)
                time.sleep(2)
                result_futures.append(future)


        for future in result_futures:
            results += future.result()
        #logger.info('[get_ids]\t results:{}'.format(results))
        return results

def handler(event=None, context=None):
    logger.info('[handler]\t event: {}'.format(json.dumps(event)))
    os.environ['no_error'] = '1'
    secrets = get_secrets()
    os.environ['DATADOG_API_KEY'] = secrets['datadog']['api_key']
    days = int(event['days']) #how many days before
    SINCE = datetime.date.today()-datetime.timedelta(days=days)
    logger.info('[handler]\t SINCE: {}'.format(SINCE))
    logger.info("Establishing Snowflake connection")
    (cnx,DictCursor) = snowUtils.initializeSnowflake(secrets['snowflake']['User'],secrets['snowflake']['Password'],secrets['snowflake']['Acct'])
    logger.info("Snowflake initialize complete")
    cur = cnx.cursor()
    cur.execute("USE database "+os.environ['SNOWFLAKE_DB']+";")
    #cur.execute("USE schema "+event['SCHEMA']+";")
    ids_query = "SELECT ID FROM \"{}\".\"{}\" WHERE {} BETWEEN '{}' AND '{}'".format(event['SCHEMA'], event['TABLE'], event['TIME_FIELD'], SINCE.strftime("%Y-%m-%d"), datetime.date.today().strftime("%Y-%m-%d"))
    db_ids = list(set(map( lambda x:str(x[0]), cur.execute(ids_query).fetchall())))
    logger.info('[handler]\t ({})db_ids: '.format(len(db_ids),db_ids))
    service_ids = list(set(map(lambda x:str(x['id']), get_ids(secrets, days+1))))
    logger.info('[handler]\t ({})service_ids: '.format(len(service_ids),service_ids))
    c = 0
    if len(db_ids) - len(service_ids) > 100:
        logger.error('too many difference between snowflake and {} data, please check it manually'.format(event['service']))
        return
    #logger.info('[handler]\t diff: {}'.format(list(set(service_ids) - set(db_ids))))
    for id in db_ids:
        if id not in service_ids:
            c += 1
            if event.get('HISTORY_SCHEMA') and event.get('HISTORY_TABLE'):
                # if defined history table/schema store deletes into history.
                logger.info("INSERT INTO \"{}\".\"{}\" SELECT * FROM \"{}\".\"{}\" WHERE ID = {};".format(event['HISTORY_SCHEMA'], event['HISTORY_TABLE'], event['SCHEMA'], event['TABLE'], id))
                cur.execute("INSERT INTO \"{}\".\"{}\" SELECT * FROM \"{}\".\"{}\" WHERE ID = {};".format(event['HISTORY_SCHEMA'], event['HISTORY_TABLE'], event['SCHEMA'], event['TABLE'], id))
            logger.warning('[handler]\t id:{} was deleted in {} removing it in DB, c={}'.format(id, event['service'], c))
            cur.execute("DELETE FROM \"{}\".\"{}\" WHERE ID = {};".format(event['SCHEMA'], event['TABLE'], id))

    cnx.close()
    return
#3

Awarzon beat me to it and provided a real answer.

My only thought is that I wonder if Harvest could enable some sort of ‘logical replication / decoding’ in their db, listen for change data, and provide that through their API for Stitch to access.

Keeping soft deleted rows around is a pain, and I can see why Harvest may not want to do that. But logical replication / decoding could be less upkeep and an easier sell.