]> Git Repo - linux.git/blob - tools/perf/scripts/python/parallel-perf.py
Merge tag 'input-for-v6.10-rc5' of git://git.kernel.org/pub/scm/linux/kernel/git...
[linux.git] / tools / perf / scripts / python / parallel-perf.py
1 #!/usr/bin/env python3
2 # SPDX-License-Identifier: GPL-2.0
3 #
4 # Run a perf script command multiple times in parallel, using perf script
5 # options --cpu and --time so that each job processes a different chunk
6 # of the data.
7 #
8 # Copyright (c) 2024, Intel Corporation.
9
10 import subprocess
11 import argparse
12 import pathlib
13 import shlex
14 import time
15 import copy
16 import sys
17 import os
18 import re
19
20 glb_prog_name = "parallel-perf.py"
21 glb_min_interval = 10.0
22 glb_min_samples = 64
23
24 class Verbosity():
25
26         def __init__(self, quiet=False, verbose=False, debug=False):
27                 self.normal    = True
28                 self.verbose   = verbose
29                 self.debug     = debug
30                 self.self_test = True
31                 if self.debug:
32                         self.verbose = True
33                 if self.verbose:
34                         quiet = False
35                 if quiet:
36                         self.normal = False
37
38 # Manage work (Start/Wait/Kill), as represented by a subprocess.Popen command
39 class Work():
40
41         def __init__(self, cmd, pipe_to, output_dir="."):
42                 self.popen = None
43                 self.consumer = None
44                 self.cmd = cmd
45                 self.pipe_to = pipe_to
46                 self.output_dir = output_dir
47                 self.cmdout_name = f"{output_dir}/cmd.txt"
48                 self.stdout_name = f"{output_dir}/out.txt"
49                 self.stderr_name = f"{output_dir}/err.txt"
50
51         def Command(self):
52                 sh_cmd = [ shlex.quote(x) for x in self.cmd ]
53                 return " ".join(self.cmd)
54
55         def Stdout(self):
56                 return open(self.stdout_name, "w")
57
58         def Stderr(self):
59                 return open(self.stderr_name, "w")
60
61         def CreateOutputDir(self):
62                 pathlib.Path(self.output_dir).mkdir(parents=True, exist_ok=True)
63
64         def Start(self):
65                 if self.popen:
66                         return
67                 self.CreateOutputDir()
68                 with open(self.cmdout_name, "w") as f:
69                         f.write(self.Command())
70                         f.write("\n")
71                 stdout = self.Stdout()
72                 stderr = self.Stderr()
73                 if self.pipe_to:
74                         self.popen = subprocess.Popen(self.cmd, stdout=subprocess.PIPE, stderr=stderr)
75                         args = shlex.split(self.pipe_to)
76                         self.consumer = subprocess.Popen(args, stdin=self.popen.stdout, stdout=stdout, stderr=stderr)
77                 else:
78                         self.popen = subprocess.Popen(self.cmd, stdout=stdout, stderr=stderr)
79
80         def RemoveEmptyErrFile(self):
81                 if os.path.exists(self.stderr_name):
82                         if os.path.getsize(self.stderr_name) == 0:
83                                 os.unlink(self.stderr_name)
84
85         def Errors(self):
86                 if os.path.exists(self.stderr_name):
87                         if os.path.getsize(self.stderr_name) != 0:
88                                 return [ f"Non-empty error file {self.stderr_name}" ]
89                 return []
90
91         def TidyUp(self):
92                 self.RemoveEmptyErrFile()
93
94         def RawPollWait(self, p, wait):
95                 if wait:
96                         return p.wait()
97                 return p.poll()
98
99         def Poll(self, wait=False):
100                 if not self.popen:
101                         return None
102                 result = self.RawPollWait(self.popen, wait)
103                 if self.consumer:
104                         res = result
105                         result = self.RawPollWait(self.consumer, wait)
106                         if result != None and res == None:
107                                 self.popen.kill()
108                                 result = None
109                         elif result == 0 and res != None and res != 0:
110                                 result = res
111                 if result != None:
112                         self.TidyUp()
113                 return result
114
115         def Wait(self):
116                 return self.Poll(wait=True)
117
118         def Kill(self):
119                 if not self.popen:
120                         return
121                 self.popen.kill()
122                 if self.consumer:
123                         self.consumer.kill()
124
125 def KillWork(worklist, verbosity):
126         for w in worklist:
127                 w.Kill()
128         for w in worklist:
129                 w.Wait()
130
131 def NumberOfCPUs():
132         return os.sysconf("SC_NPROCESSORS_ONLN")
133
134 def NanoSecsToSecsStr(x):
135         if x == None:
136                 return ""
137         x = str(x)
138         if len(x) < 10:
139                 x = "0" * (10 - len(x)) + x
140         return x[:len(x) - 9] + "." + x[-9:]
141
142 def InsertOptionAfter(cmd, option, after):
143         try:
144                 pos = cmd.index(after)
145                 cmd.insert(pos + 1, option)
146         except:
147                 cmd.append(option)
148
149 def CreateWorkList(cmd, pipe_to, output_dir, cpus, time_ranges_by_cpu):
150         max_len = len(str(cpus[-1]))
151         cpu_dir_fmt = f"cpu-%.{max_len}u"
152         worklist = []
153         pos = 0
154         for cpu in cpus:
155                 if cpu >= 0:
156                         cpu_dir = os.path.join(output_dir, cpu_dir_fmt % cpu)
157                         cpu_option = f"--cpu={cpu}"
158                 else:
159                         cpu_dir = output_dir
160                         cpu_option = None
161
162                 tr_dir_fmt = "time-range"
163
164                 if len(time_ranges_by_cpu) > 1:
165                         time_ranges = time_ranges_by_cpu[pos]
166                         tr_dir_fmt += f"-{pos}"
167                         pos += 1
168                 else:
169                         time_ranges = time_ranges_by_cpu[0]
170
171                 max_len = len(str(len(time_ranges)))
172                 tr_dir_fmt += f"-%.{max_len}u"
173
174                 i = 0
175                 for r in time_ranges:
176                         if r == [None, None]:
177                                 time_option = None
178                                 work_output_dir = cpu_dir
179                         else:
180                                 time_option = "--time=" + NanoSecsToSecsStr(r[0]) + "," + NanoSecsToSecsStr(r[1])
181                                 work_output_dir = os.path.join(cpu_dir, tr_dir_fmt % i)
182                                 i += 1
183                         work_cmd = list(cmd)
184                         if time_option != None:
185                                 InsertOptionAfter(work_cmd, time_option, "script")
186                         if cpu_option != None:
187                                 InsertOptionAfter(work_cmd, cpu_option, "script")
188                         w = Work(work_cmd, pipe_to, work_output_dir)
189                         worklist.append(w)
190         return worklist
191
192 def DoRunWork(worklist, nr_jobs, verbosity):
193         nr_to_do = len(worklist)
194         not_started = list(worklist)
195         running = []
196         done = []
197         chg = False
198         while True:
199                 nr_done = len(done)
200                 if chg and verbosity.normal:
201                         nr_run = len(running)
202                         print(f"\rThere are {nr_to_do} jobs: {nr_done} completed, {nr_run} running", flush=True, end=" ")
203                         if verbosity.verbose:
204                                 print()
205                         chg = False
206                 if nr_done == nr_to_do:
207                         break
208                 while len(running) < nr_jobs and len(not_started):
209                         w = not_started.pop(0)
210                         running.append(w)
211                         if verbosity.verbose:
212                                 print("Starting:", w.Command())
213                         w.Start()
214                         chg = True
215                 if len(running):
216                         time.sleep(0.1)
217                 finished = []
218                 not_finished = []
219                 while len(running):
220                         w = running.pop(0)
221                         r = w.Poll()
222                         if r == None:
223                                 not_finished.append(w)
224                                 continue
225                         if r == 0:
226                                 if verbosity.verbose:
227                                         print("Finished:", w.Command())
228                                 finished.append(w)
229                                 chg = True
230                                 continue
231                         if verbosity.normal and not verbosity.verbose:
232                                 print()
233                         print("Job failed!\n    return code:", r, "\n    command:    ", w.Command())
234                         if w.pipe_to:
235                                 print("    piped to:   ", w.pipe_to)
236                         print("Killing outstanding jobs")
237                         KillWork(not_finished, verbosity)
238                         KillWork(running, verbosity)
239                         return False
240                 running = not_finished
241                 done += finished
242         errorlist = []
243         for w in worklist:
244                 errorlist += w.Errors()
245         if len(errorlist):
246                 print("Errors:")
247                 for e in errorlist:
248                         print(e)
249         elif verbosity.normal:
250                 print("\r"," "*50, "\rAll jobs finished successfully", flush=True)
251         return True
252
253 def RunWork(worklist, nr_jobs=NumberOfCPUs(), verbosity=Verbosity()):
254         try:
255                 return DoRunWork(worklist, nr_jobs, verbosity)
256         except:
257                 for w in worklist:
258                         w.Kill()
259                 raise
260         return True
261
262 def ReadHeader(perf, file_name):
263         return subprocess.Popen([perf, "script", "--header-only", "--input", file_name], stdout=subprocess.PIPE).stdout.read().decode("utf-8")
264
265 def ParseHeader(hdr):
266         result = {}
267         lines = hdr.split("\n")
268         for line in lines:
269                 if ":" in line and line[0] == "#":
270                         pos = line.index(":")
271                         name = line[1:pos-1].strip()
272                         value = line[pos+1:].strip()
273                         if name in result:
274                                 orig_name = name
275                                 nr = 2
276                                 while True:
277                                         name = f"{orig_name} {nr}"
278                                         if name not in result:
279                                                 break
280                                         nr += 1
281                         result[name] = value
282         return result
283
284 def HeaderField(hdr_dict, hdr_fld):
285         if hdr_fld not in hdr_dict:
286                 raise Exception(f"'{hdr_fld}' missing from header information")
287         return hdr_dict[hdr_fld]
288
289 # Represent the position of an option within a command string
290 # and provide the option value and/or remove the option
291 class OptPos():
292
293         def Init(self, opt_element=-1, value_element=-1, opt_pos=-1, value_pos=-1, error=None):
294                 self.opt_element = opt_element          # list element that contains option
295                 self.value_element = value_element      # list element that contains option value
296                 self.opt_pos = opt_pos                  # string position of option
297                 self.value_pos = value_pos              # string position of value
298                 self.error = error                      # error message string
299
300         def __init__(self, args, short_name, long_name, default=None):
301                 self.args = list(args)
302                 self.default = default
303                 n = 2 + len(long_name)
304                 m = len(short_name)
305                 pos = -1
306                 for opt in args:
307                         pos += 1
308                         if m and opt[:2] == f"-{short_name}":
309                                 if len(opt) == 2:
310                                         if pos + 1 < len(args):
311                                                 self.Init(pos, pos + 1, 0, 0)
312                                         else:
313                                                 self.Init(error = f"-{short_name} option missing value")
314                                 else:
315                                         self.Init(pos, pos, 0, 2)
316                                 return
317                         if opt[:n] == f"--{long_name}":
318                                 if len(opt) == n:
319                                         if pos + 1 < len(args):
320                                                 self.Init(pos, pos + 1, 0, 0)
321                                         else:
322                                                 self.Init(error = f"--{long_name} option missing value")
323                                 elif opt[n] == "=":
324                                         self.Init(pos, pos, 0, n + 1)
325                                 else:
326                                         self.Init(error = f"--{long_name} option expected '='")
327                                 return
328                         if m and opt[:1] == "-" and opt[:2] != "--" and short_name in opt:
329                                 ipos = opt.index(short_name)
330                                 if "-" in opt[1:]:
331                                         hpos = opt[1:].index("-")
332                                         if hpos < ipos:
333                                                 continue
334                                 if ipos + 1 == len(opt):
335                                         if pos + 1 < len(args):
336                                                 self.Init(pos, pos + 1, ipos, 0)
337                                         else:
338                                                 self.Init(error = f"-{short_name} option missing value")
339                                 else:
340                                         self.Init(pos, pos, ipos, ipos + 1)
341                                 return
342                 self.Init()
343
344         def Value(self):
345                 if self.opt_element >= 0:
346                         if self.opt_element != self.value_element:
347                                 return self.args[self.value_element]
348                         else:
349                                 return self.args[self.value_element][self.value_pos:]
350                 return self.default
351
352         def Remove(self, args):
353                 if self.opt_element == -1:
354                         return
355                 if self.opt_element != self.value_element:
356                         del args[self.value_element]
357                 if self.opt_pos:
358                         args[self.opt_element] = args[self.opt_element][:self.opt_pos]
359                 else:
360                         del args[self.opt_element]
361
362 def DetermineInputFileName(cmd):
363         p = OptPos(cmd, "i", "input", "perf.data")
364         if p.error:
365                 raise Exception(f"perf command {p.error}")
366         file_name = p.Value()
367         if not os.path.exists(file_name):
368                 raise Exception(f"perf command input file '{file_name}' not found")
369         return file_name
370
371 def ReadOption(args, short_name, long_name, err_prefix, remove=False):
372         p = OptPos(args, short_name, long_name)
373         if p.error:
374                 raise Exception(f"{err_prefix}{p.error}")
375         value = p.Value()
376         if remove:
377                 p.Remove(args)
378         return value
379
380 def ExtractOption(args, short_name, long_name, err_prefix):
381         return ReadOption(args, short_name, long_name, err_prefix, True)
382
383 def ReadPerfOption(args, short_name, long_name):
384         return ReadOption(args, short_name, long_name, "perf command ")
385
386 def ExtractPerfOption(args, short_name, long_name):
387         return ExtractOption(args, short_name, long_name, "perf command ")
388
389 def PerfDoubleQuickCommands(cmd, file_name):
390         cpu_str = ReadPerfOption(cmd, "C", "cpu")
391         time_str = ReadPerfOption(cmd, "", "time")
392         # Use double-quick sampling to determine trace data density
393         times_cmd = ["perf", "script", "--ns", "--input", file_name, "--itrace=qqi"]
394         if cpu_str != None and cpu_str != "":
395                 times_cmd.append(f"--cpu={cpu_str}")
396         if time_str != None and time_str != "":
397                 times_cmd.append(f"--time={time_str}")
398         cnts_cmd = list(times_cmd)
399         cnts_cmd.append("-Fcpu")
400         times_cmd.append("-Fcpu,time")
401         return cnts_cmd, times_cmd
402
403 class CPUTimeRange():
404         def __init__(self, cpu):
405                 self.cpu = cpu
406                 self.sample_cnt = 0
407                 self.time_ranges = None
408                 self.interval = 0
409                 self.interval_remaining = 0
410                 self.remaining = 0
411                 self.tr_pos = 0
412
413 def CalcTimeRangesByCPU(line, cpu, cpu_time_ranges, max_time):
414         cpu_time_range = cpu_time_ranges[cpu]
415         cpu_time_range.remaining -= 1
416         cpu_time_range.interval_remaining -= 1
417         if cpu_time_range.remaining == 0:
418                 cpu_time_range.time_ranges[cpu_time_range.tr_pos][1] = max_time
419                 return
420         if cpu_time_range.interval_remaining == 0:
421                 time = TimeVal(line[1][:-1], 0)
422                 time_ranges = cpu_time_range.time_ranges
423                 time_ranges[cpu_time_range.tr_pos][1] = time - 1
424                 time_ranges.append([time, max_time])
425                 cpu_time_range.tr_pos += 1
426                 cpu_time_range.interval_remaining = cpu_time_range.interval
427
428 def CountSamplesByCPU(line, cpu, cpu_time_ranges):
429         try:
430                 cpu_time_ranges[cpu].sample_cnt += 1
431         except:
432                 print("exception")
433                 print("cpu", cpu)
434                 print("len(cpu_time_ranges)", len(cpu_time_ranges))
435                 raise
436
437 def ProcessCommandOutputLines(cmd, per_cpu, fn, *x):
438         # Assume CPU number is at beginning of line and enclosed by []
439         pat = re.compile(r"\s*\[[0-9]+\]")
440         p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
441         while True:
442                 if line := p.stdout.readline():
443                         line = line.decode("utf-8")
444                         if pat.match(line):
445                                 line = line.split()
446                                 if per_cpu:
447                                         # Assumes CPU number is enclosed by []
448                                         cpu = int(line[0][1:-1])
449                                 else:
450                                         cpu = 0
451                                 fn(line, cpu, *x)
452                 else:
453                         break
454         p.wait()
455
456 def IntersectTimeRanges(new_time_ranges, time_ranges):
457         pos = 0
458         new_pos = 0
459         # Can assume len(time_ranges) != 0 and len(new_time_ranges) != 0
460         # Note also, there *must* be at least one intersection.
461         while pos < len(time_ranges) and new_pos < len(new_time_ranges):
462                 # new end < old start => no intersection, remove new
463                 if new_time_ranges[new_pos][1] < time_ranges[pos][0]:
464                         del new_time_ranges[new_pos]
465                         continue
466                 # new start > old end => no intersection, check next
467                 if new_time_ranges[new_pos][0] > time_ranges[pos][1]:
468                         pos += 1
469                         if pos < len(time_ranges):
470                                 continue
471                         # no next, so remove remaining
472                         while new_pos < len(new_time_ranges):
473                                 del new_time_ranges[new_pos]
474                         return
475                 # Found an intersection
476                 # new start < old start => adjust new start = old start
477                 if new_time_ranges[new_pos][0] < time_ranges[pos][0]:
478                         new_time_ranges[new_pos][0] = time_ranges[pos][0]
479                 # new end > old end => keep the overlap, insert the remainder
480                 if new_time_ranges[new_pos][1] > time_ranges[pos][1]:
481                         r = [ time_ranges[pos][1] + 1, new_time_ranges[new_pos][1] ]
482                         new_time_ranges[new_pos][1] = time_ranges[pos][1]
483                         new_pos += 1
484                         new_time_ranges.insert(new_pos, r)
485                         continue
486                 # new [start, end] is within old [start, end]
487                 new_pos += 1
488
489 def SplitTimeRangesByTraceDataDensity(time_ranges, cpus, nr, cmd, file_name, per_cpu, min_size, min_interval, verbosity):
490         if verbosity.normal:
491                 print("\rAnalyzing...", flush=True, end=" ")
492                 if verbosity.verbose:
493                         print()
494         cnts_cmd, times_cmd = PerfDoubleQuickCommands(cmd, file_name)
495
496         nr_cpus = cpus[-1] + 1 if per_cpu else 1
497         if per_cpu:
498                 nr_cpus = cpus[-1] + 1
499                 cpu_time_ranges = [ CPUTimeRange(cpu) for cpu in range(nr_cpus) ]
500         else:
501                 nr_cpus = 1
502                 cpu_time_ranges = [ CPUTimeRange(-1) ]
503
504         if verbosity.debug:
505                 print("nr_cpus", nr_cpus)
506                 print("cnts_cmd", cnts_cmd)
507                 print("times_cmd", times_cmd)
508
509         # Count the number of "double quick" samples per CPU
510         ProcessCommandOutputLines(cnts_cmd, per_cpu, CountSamplesByCPU, cpu_time_ranges)
511
512         tot = 0
513         mx = 0
514         for cpu_time_range in cpu_time_ranges:
515                 cnt = cpu_time_range.sample_cnt
516                 tot += cnt
517                 if cnt > mx:
518                         mx = cnt
519                 if verbosity.debug:
520                         print("cpu:", cpu_time_range.cpu, "sample_cnt", cnt)
521
522         if min_size < 1:
523                 min_size = 1
524
525         if mx < min_size:
526                 # Too little data to be worth splitting
527                 if verbosity.debug:
528                         print("Too little data to split by time")
529                 if nr == 0:
530                         nr = 1
531                 return [ SplitTimeRangesIntoN(time_ranges, nr, min_interval) ]
532
533         if nr:
534                 divisor = nr
535                 min_size = 1
536         else:
537                 divisor = NumberOfCPUs()
538
539         interval = int(round(tot / divisor, 0))
540         if interval < min_size:
541                 interval = min_size
542
543         if verbosity.debug:
544                 print("divisor", divisor)
545                 print("min_size", min_size)
546                 print("interval", interval)
547
548         min_time = time_ranges[0][0]
549         max_time = time_ranges[-1][1]
550
551         for cpu_time_range in cpu_time_ranges:
552                 cnt = cpu_time_range.sample_cnt
553                 if cnt == 0:
554                         cpu_time_range.time_ranges = copy.deepcopy(time_ranges)
555                         continue
556                 # Adjust target interval for CPU to give approximately equal interval sizes
557                 # Determine number of intervals, rounding to nearest integer
558                 n = int(round(cnt / interval, 0))
559                 if n < 1:
560                         n = 1
561                 # Determine interval size, rounding up
562                 d, m = divmod(cnt, n)
563                 if m:
564                         d += 1
565                 cpu_time_range.interval = d
566                 cpu_time_range.interval_remaining = d
567                 cpu_time_range.remaining = cnt
568                 # Init. time ranges for each CPU with the start time
569                 cpu_time_range.time_ranges = [ [min_time, max_time] ]
570
571         # Set time ranges so that the same number of "double quick" samples
572         # will fall into each time range.
573         ProcessCommandOutputLines(times_cmd, per_cpu, CalcTimeRangesByCPU, cpu_time_ranges, max_time)
574
575         for cpu_time_range in cpu_time_ranges:
576                 if cpu_time_range.sample_cnt:
577                         IntersectTimeRanges(cpu_time_range.time_ranges, time_ranges)
578
579         return [cpu_time_ranges[cpu].time_ranges for cpu in cpus]
580
581 def SplitSingleTimeRangeIntoN(time_range, n):
582         if n <= 1:
583                 return [time_range]
584         start = time_range[0]
585         end   = time_range[1]
586         duration = int((end - start + 1) / n)
587         if duration < 1:
588                 return [time_range]
589         time_ranges = []
590         for i in range(n):
591                 time_ranges.append([start, start + duration - 1])
592                 start += duration
593         time_ranges[-1][1] = end
594         return time_ranges
595
596 def TimeRangeDuration(r):
597         return r[1] - r[0] + 1
598
599 def TotalDuration(time_ranges):
600         duration = 0
601         for r in time_ranges:
602                 duration += TimeRangeDuration(r)
603         return duration
604
605 def SplitTimeRangesByInterval(time_ranges, interval):
606         new_ranges = []
607         for r in time_ranges:
608                 duration = TimeRangeDuration(r)
609                 n = duration / interval
610                 n = int(round(n, 0))
611                 new_ranges += SplitSingleTimeRangeIntoN(r, n)
612         return new_ranges
613
614 def SplitTimeRangesIntoN(time_ranges, n, min_interval):
615         if n <= len(time_ranges):
616                 return time_ranges
617         duration = TotalDuration(time_ranges)
618         interval = duration / n
619         if interval < min_interval:
620                 interval = min_interval
621         return SplitTimeRangesByInterval(time_ranges, interval)
622
623 def RecombineTimeRanges(tr):
624         new_tr = copy.deepcopy(tr)
625         n = len(new_tr)
626         i = 1
627         while i < len(new_tr):
628                 # if prev end + 1 == cur start, combine them
629                 if new_tr[i - 1][1] + 1 == new_tr[i][0]:
630                         new_tr[i][0] = new_tr[i - 1][0]
631                         del new_tr[i - 1]
632                 else:
633                         i += 1
634         return new_tr
635
636 def OpenTimeRangeEnds(time_ranges, min_time, max_time):
637         if time_ranges[0][0] <= min_time:
638                 time_ranges[0][0] = None
639         if time_ranges[-1][1] >= max_time:
640                 time_ranges[-1][1] = None
641
642 def BadTimeStr(time_str):
643         raise Exception(f"perf command bad time option: '{time_str}'\nCheck also 'time of first sample' and 'time of last sample' in perf script --header-only")
644
645 def ValidateTimeRanges(time_ranges, time_str):
646         n = len(time_ranges)
647         for i in range(n):
648                 start = time_ranges[i][0]
649                 end   = time_ranges[i][1]
650                 if i != 0 and start <= time_ranges[i - 1][1]:
651                         BadTimeStr(time_str)
652                 if start > end:
653                         BadTimeStr(time_str)
654
655 def TimeVal(s, dflt):
656         s = s.strip()
657         if s == "":
658                 return dflt
659         a = s.split(".")
660         if len(a) > 2:
661                 raise Exception(f"Bad time value'{s}'")
662         x = int(a[0])
663         if x < 0:
664                 raise Exception("Negative time not allowed")
665         x *= 1000000000
666         if len(a) > 1:
667                 x += int((a[1] + "000000000")[:9])
668         return x
669
670 def BadCPUStr(cpu_str):
671         raise Exception(f"perf command bad cpu option: '{cpu_str}'\nCheck also 'nrcpus avail' in perf script --header-only")
672
673 def ParseTimeStr(time_str, min_time, max_time):
674         if time_str == None or time_str == "":
675                 return [[min_time, max_time]]
676         time_ranges = []
677         for r in time_str.split():
678                 a = r.split(",")
679                 if len(a) != 2:
680                         BadTimeStr(time_str)
681                 try:
682                         start = TimeVal(a[0], min_time)
683                         end   = TimeVal(a[1], max_time)
684                 except:
685                         BadTimeStr(time_str)
686                 time_ranges.append([start, end])
687         ValidateTimeRanges(time_ranges, time_str)
688         return time_ranges
689
690 def ParseCPUStr(cpu_str, nr_cpus):
691         if cpu_str == None or cpu_str == "":
692                 return [-1]
693         cpus = []
694         for r in cpu_str.split(","):
695                 a = r.split("-")
696                 if len(a) < 1 or len(a) > 2:
697                         BadCPUStr(cpu_str)
698                 try:
699                         start = int(a[0].strip())
700                         if len(a) > 1:
701                                 end = int(a[1].strip())
702                         else:
703                                 end = start
704                 except:
705                         BadCPUStr(cpu_str)
706                 if start < 0 or end < 0 or end < start or end >= nr_cpus:
707                         BadCPUStr(cpu_str)
708                 cpus.extend(range(start, end + 1))
709         cpus = list(set(cpus)) # Remove duplicates
710         cpus.sort()
711         return cpus
712
713 class ParallelPerf():
714
715         def __init__(self, a):
716                 for arg_name in vars(a):
717                         setattr(self, arg_name, getattr(a, arg_name))
718                 self.orig_nr = self.nr
719                 self.orig_cmd = list(self.cmd)
720                 self.perf = self.cmd[0]
721                 if os.path.exists(self.output_dir):
722                         raise Exception(f"Output '{self.output_dir}' already exists")
723                 if self.jobs < 0 or self.nr < 0 or self.interval < 0:
724                         raise Exception("Bad options (negative values): try -h option for help")
725                 if self.nr != 0 and self.interval != 0:
726                         raise Exception("Cannot specify number of time subdivisions and time interval")
727                 if self.jobs == 0:
728                         self.jobs = NumberOfCPUs()
729                 if self.nr == 0 and self.interval == 0:
730                         if self.per_cpu:
731                                 self.nr = 1
732                         else:
733                                 self.nr = self.jobs
734
735         def Init(self):
736                 if self.verbosity.debug:
737                         print("cmd", self.cmd)
738                 self.file_name = DetermineInputFileName(self.cmd)
739                 self.hdr = ReadHeader(self.perf, self.file_name)
740                 self.hdr_dict = ParseHeader(self.hdr)
741                 self.cmd_line = HeaderField(self.hdr_dict, "cmdline")
742
743         def ExtractTimeInfo(self):
744                 self.min_time = TimeVal(HeaderField(self.hdr_dict, "time of first sample"), 0)
745                 self.max_time = TimeVal(HeaderField(self.hdr_dict, "time of last sample"), 0)
746                 self.time_str = ExtractPerfOption(self.cmd, "", "time")
747                 self.time_ranges = ParseTimeStr(self.time_str, self.min_time, self.max_time)
748                 if self.verbosity.debug:
749                         print("time_ranges", self.time_ranges)
750
751         def ExtractCPUInfo(self):
752                 if self.per_cpu:
753                         nr_cpus = int(HeaderField(self.hdr_dict, "nrcpus avail"))
754                         self.cpu_str = ExtractPerfOption(self.cmd, "C", "cpu")
755                         if self.cpu_str == None or self.cpu_str == "":
756                                 self.cpus = [ x for x in range(nr_cpus) ]
757                         else:
758                                 self.cpus = ParseCPUStr(self.cpu_str, nr_cpus)
759                 else:
760                         self.cpu_str = None
761                         self.cpus = [-1]
762                 if self.verbosity.debug:
763                         print("cpus", self.cpus)
764
765         def IsIntelPT(self):
766                 return self.cmd_line.find("intel_pt") >= 0
767
768         def SplitTimeRanges(self):
769                 if self.IsIntelPT() and self.interval == 0:
770                         self.split_time_ranges_for_each_cpu = \
771                                 SplitTimeRangesByTraceDataDensity(self.time_ranges, self.cpus, self.orig_nr,
772                                                                   self.orig_cmd, self.file_name, self.per_cpu,
773                                                                   self.min_size, self.min_interval, self.verbosity)
774                 elif self.nr:
775                         self.split_time_ranges_for_each_cpu = [ SplitTimeRangesIntoN(self.time_ranges, self.nr, self.min_interval) ]
776                 else:
777                         self.split_time_ranges_for_each_cpu = [ SplitTimeRangesByInterval(self.time_ranges, self.interval) ]
778
779         def CheckTimeRanges(self):
780                 for tr in self.split_time_ranges_for_each_cpu:
781                         # Re-combined time ranges should be the same
782                         new_tr = RecombineTimeRanges(tr)
783                         if new_tr != self.time_ranges:
784                                 if self.verbosity.debug:
785                                         print("tr", tr)
786                                         print("new_tr", new_tr)
787                                 raise Exception("Self test failed!")
788
789         def OpenTimeRangeEnds(self):
790                 for time_ranges in self.split_time_ranges_for_each_cpu:
791                         OpenTimeRangeEnds(time_ranges, self.min_time, self.max_time)
792
793         def CreateWorkList(self):
794                 self.worklist = CreateWorkList(self.cmd, self.pipe_to, self.output_dir, self.cpus, self.split_time_ranges_for_each_cpu)
795
796         def PerfDataRecordedPerCPU(self):
797                 if "--per-thread" in self.cmd_line.split():
798                         return False
799                 return True
800
801         def DefaultToPerCPU(self):
802                 # --no-per-cpu option takes precedence
803                 if self.no_per_cpu:
804                         return False
805                 if not self.PerfDataRecordedPerCPU():
806                         return False
807                 # Default to per-cpu for Intel PT data that was recorded per-cpu,
808                 # because decoding can be done for each CPU separately.
809                 if self.IsIntelPT():
810                         return True
811                 return False
812
813         def Config(self):
814                 self.Init()
815                 self.ExtractTimeInfo()
816                 if not self.per_cpu:
817                         self.per_cpu = self.DefaultToPerCPU()
818                 if self.verbosity.debug:
819                         print("per_cpu", self.per_cpu)
820                 self.ExtractCPUInfo()
821                 self.SplitTimeRanges()
822                 if self.verbosity.self_test:
823                         self.CheckTimeRanges()
824                 # Prefer open-ended time range to starting / ending with min_time / max_time resp.
825                 self.OpenTimeRangeEnds()
826                 self.CreateWorkList()
827
828         def Run(self):
829                 if self.dry_run:
830                         print(len(self.worklist),"jobs:")
831                         for w in self.worklist:
832                                 print(w.Command())
833                         return True
834                 result = RunWork(self.worklist, self.jobs, verbosity=self.verbosity)
835                 if self.verbosity.verbose:
836                         print(glb_prog_name, "done")
837                 return result
838
839 def RunParallelPerf(a):
840         pp = ParallelPerf(a)
841         pp.Config()
842         return pp.Run()
843
844 def Main(args):
845         ap = argparse.ArgumentParser(
846                 prog=glb_prog_name, formatter_class = argparse.RawDescriptionHelpFormatter,
847                 description =
848 """
849 Run a perf script command multiple times in parallel, using perf script options
850 --cpu and --time so that each job processes a different chunk of the data.
851 """,
852                 epilog =
853 """
854 Follow the options by '--' and then the perf script command e.g.
855
856         $ perf record -a -- sleep 10
857         $ parallel-perf.py --nr=4 -- perf script --ns
858         All jobs finished successfully
859         $ tree parallel-perf-output/
860         parallel-perf-output/
861         ├── time-range-0
862         │   ├── cmd.txt
863         │   └── out.txt
864         ├── time-range-1
865         │   ├── cmd.txt
866         │   └── out.txt
867         ├── time-range-2
868         │   ├── cmd.txt
869         │   └── out.txt
870         └── time-range-3
871             ├── cmd.txt
872             └── out.txt
873         $ find parallel-perf-output -name cmd.txt | sort | xargs grep -H .
874         parallel-perf-output/time-range-0/cmd.txt:perf script --time=,9466.504461499 --ns
875         parallel-perf-output/time-range-1/cmd.txt:perf script --time=9466.504461500,9469.005396999 --ns
876         parallel-perf-output/time-range-2/cmd.txt:perf script --time=9469.005397000,9471.506332499 --ns
877         parallel-perf-output/time-range-3/cmd.txt:perf script --time=9471.506332500, --ns
878
879 Any perf script command can be used, including the use of perf script options
880 --dlfilter and --script, so that the benefit of running parallel jobs
881 naturally extends to them also.
882
883 If option --pipe-to is used, standard output is first piped through that
884 command. Beware, if the command fails (e.g. grep with no matches), it will be
885 considered a fatal error.
886
887 Final standard output is redirected to files named out.txt in separate
888 subdirectories under the output directory. Similarly, standard error is
889 written to files named err.txt. In addition, files named cmd.txt contain the
890 corresponding perf script command. After processing, err.txt files are removed
891 if they are empty.
892
893 If any job exits with a non-zero exit code, then all jobs are killed and no
894 more are started. A message is printed if any job results in a non-empty
895 err.txt file.
896
897 There is a separate output subdirectory for each time range. If the --per-cpu
898 option is used, these are further grouped under cpu-n subdirectories, e.g.
899
900         $ parallel-perf.py --per-cpu --nr=2 -- perf script --ns --cpu=0,1
901         All jobs finished successfully
902         $ tree parallel-perf-output
903         parallel-perf-output/
904         ├── cpu-0
905         │   ├── time-range-0
906         │   │   ├── cmd.txt
907         │   │   └── out.txt
908         │   └── time-range-1
909         │       ├── cmd.txt
910         │       └── out.txt
911         └── cpu-1
912             ├── time-range-0
913             │   ├── cmd.txt
914             │   └── out.txt
915             └── time-range-1
916                 ├── cmd.txt
917                 └── out.txt
918         $ find parallel-perf-output -name cmd.txt | sort | xargs grep -H .
919         parallel-perf-output/cpu-0/time-range-0/cmd.txt:perf script --cpu=0 --time=,9469.005396999 --ns
920         parallel-perf-output/cpu-0/time-range-1/cmd.txt:perf script --cpu=0 --time=9469.005397000, --ns
921         parallel-perf-output/cpu-1/time-range-0/cmd.txt:perf script --cpu=1 --time=,9469.005396999 --ns
922         parallel-perf-output/cpu-1/time-range-1/cmd.txt:perf script --cpu=1 --time=9469.005397000, --ns
923
924 Subdivisions of time range, and cpus if the --per-cpu option is used, are
925 expressed by the --time and --cpu perf script options respectively. If the
926 supplied perf script command has a --time option, then that time range is
927 subdivided, otherwise the time range given by 'time of first sample' to
928 'time of last sample' is used (refer perf script --header-only). Similarly, the
929 supplied perf script command may provide a --cpu option, and only those CPUs
930 will be processed.
931
932 To prevent time intervals becoming too small, the --min-interval option can
933 be used.
934
935 Note there is special handling for processing Intel PT traces. If an interval is
936 not specified and the perf record command contained the intel_pt event, then the
937 time range will be subdivided in order to produce subdivisions that contain
938 approximately the same amount of trace data. That is accomplished by counting
939 double-quick (--itrace=qqi) samples, and choosing time ranges that encompass
940 approximately the same number of samples. In that case, time ranges may not be
941 the same for each CPU processed. For Intel PT, --per-cpu is the default, but
942 that can be overridden by --no-per-cpu. Note, for Intel PT, double-quick
943 decoding produces 1 sample for each PSB synchronization packet, which in turn
944 come after a certain number of bytes output, determined by psb_period (refer
945 perf Intel PT documentation). The minimum number of double-quick samples that
946 will define a time range can be set by the --min_size option, which defaults to
947 64.
948 """)
949         ap.add_argument("-o", "--output-dir", default="parallel-perf-output", help="output directory (default 'parallel-perf-output')")
950         ap.add_argument("-j", "--jobs", type=int, default=0, help="maximum number of jobs to run in parallel at one time (default is the number of CPUs)")
951         ap.add_argument("-n", "--nr", type=int, default=0, help="number of time subdivisions (default is the number of jobs)")
952         ap.add_argument("-i", "--interval", type=float, default=0, help="subdivide the time range using this time interval (in seconds e.g. 0.1 for a tenth of a second)")
953         ap.add_argument("-c", "--per-cpu", action="store_true", help="process data for each CPU in parallel")
954         ap.add_argument("-m", "--min-interval", type=float, default=glb_min_interval, help=f"minimum interval (default {glb_min_interval} seconds)")
955         ap.add_argument("-p", "--pipe-to", help="command to pipe output to (optional)")
956         ap.add_argument("-N", "--no-per-cpu", action="store_true", help="do not process data for each CPU in parallel")
957         ap.add_argument("-b", "--min_size", type=int, default=glb_min_samples, help="minimum data size (for Intel PT in PSBs)")
958         ap.add_argument("-D", "--dry-run", action="store_true", help="do not run any jobs, just show the perf script commands")
959         ap.add_argument("-q", "--quiet", action="store_true", help="do not print any messages except errors")
960         ap.add_argument("-v", "--verbose", action="store_true", help="print more messages")
961         ap.add_argument("-d", "--debug", action="store_true", help="print debugging messages")
962         cmd_line = list(args)
963         try:
964                 split_pos = cmd_line.index("--")
965                 cmd = cmd_line[split_pos + 1:]
966                 args = cmd_line[:split_pos]
967         except:
968                 cmd = None
969                 args = cmd_line
970         a = ap.parse_args(args=args[1:])
971         a.cmd = cmd
972         a.verbosity = Verbosity(a.quiet, a.verbose, a.debug)
973         try:
974                 if a.cmd == None:
975                         if len(args) <= 1:
976                                 ap.print_help()
977                                 return True
978                         raise Exception("Command line must contain '--' before perf command")
979                 return RunParallelPerf(a)
980         except Exception as e:
981                 print("Fatal error: ", str(e))
982                 if a.debug:
983                         raise
984                 return False
985
986 if __name__ == "__main__":
987         if not Main(sys.argv):
988                 sys.exit(1)
This page took 0.095025 seconds and 4 git commands to generate.