Here are the answers to my questions. May help others reading the post:
- If the resources are available, they run in parallel.
- This is more interesting to my use case. The following testing framework design helped me understand that they run in parallel and one is not queued behind other unless resources are not available.
Design: To see if single-account-aggregations and Delta aggregations run in parallel
During the reload-accounts script running (for me this time is about 15 seconds to kick-off 100 reload-account calls), initiate the delta aggregation.
In the total time taken for the job-completion of reload-accounts, in the logs, we should also see delta aggregation related logs. This is how I confirm that both run in parallel.
steps:
1. Kickoff script(code provided in above reply on this post) to reload-accounts- 100 accounts.
Redirect the CLI logs to a log file. Make sure to collect logs longer than the usual execution of 100Reload-accountCalls.py. This longer duration will ensure that all the account aggregation responses come back and get logged
2. While the above script is executing(i.e. printing the async-job-ids on the console), kick off the delta aggregation in the UI
3. Calculate start and end times of the completion of all the 100 reload-account calls.
4. In the logs, if delta-aggregation logs exist between the start and finish times of reload-accounts, it confirms parallel execution.
Except for step 1, all the other steps are taken care of by
areDelta+singleAccAggRunningInParallel.py
Here are some questions that will help understand the testing framework:
1. What are the log lines that indicate delta-aggregation is kicked off?
For Okta connector: [2025-04-11T14:18:16.795-04:00] INFO | connectorMessage ▶︎ {"@timestamp":"2025-04-11T18:18:16.778Z","logger":"openconnector.connector.okta.OktaConnector","message":"CON_OKTA_ENHANCE_ACCOUNT_AGGR_PERFORMANCE_DELTA feature flag set to true","thread_name":"pool-2-thread-1"}
2. Though the above log line indicates delta-aggregation, what lines ensure that delta-aggregated accounts are coming through?
Both the below lines together indicate
please note, that the thread_name in both the lines needs to matching
the first line needs to have "/v1/users?limit=300\"
"message":"Executing request GET https://<baseurl>/api/v1/users?limit=200\u0026search=%28profile.**********,"thread_name":"pool-2-thread-1"}
"message":"Response is received from Okta. Status: 200","thread_name":"pool-2-thread-1"}
3. What are the log lines that ensure single-account-aggregation job is complete?
[2025-04-11T01:00:01.721-04:00] DEBUG | connectorMessage ▶︎ {"@timestamp":"2025-04-11T05:00:01.712Z","logger":"openconnector.connector.okta.OktaConnector","message":"Account Read {\"id\":\"<native identity value>\",\"status\":\"ACTIVE\",\"created\":\"<created date>\",\"activated\":\"2020-01-27T16:18:40.000Z\",\"statusChanged\":\"2020-01-27T16:18:40.000Z\",\"lastLogin\":null,\"lastUpdated\":\"2025-01-25T22:18:05.000Z\",\"passwordChanged\":\"2023-01-05T02:40:04.000Z\",\"realmId\":\"<realmid>\",\"type\":{\"id\":\"<native identity value>\"},\"profile\":{***********}
Here is the code for areDelta+singleAccAggRunningInParallel.py
#It first scans "log.txt" to compute the overall start and end timestamps using lines that contain an Account Read message (using the same pattern as before).
#These boundaries define the window in which we will consider subsequent log entries.
#It then re-reads the log file and searches for pairs of consecutive lines that meet all these requirements:
# • The first line (a GET request) must contain both the text "Executing request GET" and "/users?limit=200" (using a regex pattern).
# • The following line must contain the text "Response is received from Okta. Status: 200" (using another regex).
# • The thread names in both lines (extracted from the "thread_name" field) must match.
# • Both lines’ timestamps (extracted from the square brackets at the beginning of each line) must lie within the overall start and end timestamp computed earlier.
#Any matching pairs are written out (both lines) into an output file named "delta-aggregation-logs.txt."
#prints a console message indicating whether the delta (GET/Response) and single account aggregation ran in parallel. If at least one matching pair is found
import re
from datetime import datetime
# --- Step 1: Determine overall time boundaries from "Account Read" lines ---
account_read_pattern = r'"message":"Account Read \{\\"id\\":\\"([^\\"]+)\\"'
timestamp_pattern = r'^\[(.*?)\]'
account_timestamps = []
with open("log.txt", "r", encoding="utf-8", errors="replace") as f:
for line in f:
if re.search(account_read_pattern, line):
ts_match = re.match(timestamp_pattern, line)
if ts_match:
ts_str = ts_match.group(1)
try:
ts = datetime.fromisoformat(ts_str)
account_timestamps.append(ts)
except Exception as e:
print("Error parsing timestamp:", ts_str, e)
if account_timestamps:
overall_start = min(account_timestamps)
overall_end = max(account_timestamps)
print("Overall Start Timestamp:", overall_start.isoformat())
print("Overall End Timestamp: ", overall_end.isoformat())
else:
print("No Account Read entries found, cannot compute overall time boundaries.")
exit(1)
# --- Step 2: Define helper functions for extracting timestamp and thread ---
def extract_timestamp(line):
"""Extracts the timestamp from the beginning of a log line (inside square brackets)."""
ts_match = re.match(timestamp_pattern, line)
if ts_match:
ts_str = ts_match.group(1)
try:
return datetime.fromisoformat(ts_str)
except Exception:
return None
return None
def extract_thread(line):
"""Extracts the thread name from a log line (appears as 'thread_name':'...')."""
m = re.search(r'"thread_name":"([^"]+)"', line)
if m:
return m.group(1)
return None
# --- Step 3: Define patterns for the GET request and the response line ---
# GET request line must contain both "Executing request GET" and "/users?limit=200"
get_pattern = r'"message":"Executing request GET .*?/users\?limit=200'
# Response line must contain "Response is received from Okta. Status: 200"
response_pattern = r'"message":"Response is received from Okta\. Status: 200"'
# Read all lines from log file
with open("log.txt", "r", encoding="utf-8", errors="replace") as f:
lines = f.readlines()
matched_pairs = [] # Will store tuples (GET_line, response_line)
# --- Step 4: Search for consecutive GET/Response pairs that satisfy:
# 1. GET line contains the desired URL segment,
# 2. The following line is the corresponding response,
# 3. Both lines have the same thread_name,
# 4. Both lines' timestamps fall within overall_start and overall_end.
for i in range(len(lines) - 1):
line1 = lines[i]
line2 = lines[i + 1]
if re.search(get_pattern, line1) and re.search(response_pattern, line2):
thread1 = extract_thread(line1)
thread2 = extract_thread(line2)
ts1 = extract_timestamp(line1)
ts2 = extract_timestamp(line2)
if thread1 and thread2 and thread1 == thread2:
if ts1 and ts2 and (overall_start <= ts1 <= overall_end) and (overall_start <= ts2 <= overall_end):
matched_pairs.append((line1, line2))
print(f"Found {len(matched_pairs)} matching GET/Response pairs.")
# --- Step 5: Write matching pairs to an output file ---
output_file = "delta-aggregation-logs.txt"
with open(output_file, "w", encoding="utf-8") as outf:
for pair in matched_pairs:
outf.write(pair[0])
outf.write(pair[1])
outf.write("\n") # extra newline between pairs
print("Matching GET/Response pairs have been written to", output_file)
# --- Step 6: Print the parallel execution result to the console ---
if matched_pairs:
print("yes, delta and single account aggregaiton ran in parallel")
else:
print("no they did not run")