diff --git a/NEXT_CHANGELOG.md b/NEXT_CHANGELOG.md index b406a874e8..81782f27c7 100644 --- a/NEXT_CHANGELOG.md +++ b/NEXT_CHANGELOG.md @@ -7,6 +7,7 @@ ### CLI ### Bundles +* Fix `bundle generate` job to preserve nested notebook directory structure ([#4596](https://github.com/databricks/cli/pull/4596)) * engine/direct: Fix drift in grants resource due to privilege reordering ([#4794](https://github.com/databricks/cli/pull/4794)) * engine/direct: Fix 400 error when deploying grants with ALL_PRIVILEGES ([#4801](https://github.com/databricks/cli/pull/4801)) * Deduplicate grant entries with duplicate principals or privileges during initialization ([#4801](https://github.com/databricks/cli/pull/4801)) diff --git a/acceptance/bundle/generate/job_nested_notebooks/databricks.yml b/acceptance/bundle/generate/job_nested_notebooks/databricks.yml new file mode 100644 index 0000000000..3331ecc849 --- /dev/null +++ b/acceptance/bundle/generate/job_nested_notebooks/databricks.yml @@ -0,0 +1,2 @@ +bundle: + name: nested_notebooks diff --git a/acceptance/bundle/generate/job_nested_notebooks/out.job.yml b/acceptance/bundle/generate/job_nested_notebooks/out.job.yml new file mode 100644 index 0000000000..83a90b6298 --- /dev/null +++ b/acceptance/bundle/generate/job_nested_notebooks/out.job.yml @@ -0,0 +1,11 @@ +resources: + jobs: + out: + name: dev.my_repo.my_job + tasks: + - task_key: my_notebook_task + notebook_task: + notebook_path: src/my_folder/my_notebook.py + - task_key: other_notebook_task + notebook_task: + notebook_path: src/other_folder/other_notebook.py diff --git a/acceptance/bundle/generate/job_nested_notebooks/out.test.toml b/acceptance/bundle/generate/job_nested_notebooks/out.test.toml new file mode 100644 index 0000000000..d560f1de04 --- /dev/null +++ b/acceptance/bundle/generate/job_nested_notebooks/out.test.toml @@ -0,0 +1,5 @@ +Local = true +Cloud = false + +[EnvMatrix] + DATABRICKS_BUNDLE_ENGINE = ["terraform", "direct"] diff --git a/acceptance/bundle/generate/job_nested_notebooks/output.txt b/acceptance/bundle/generate/job_nested_notebooks/output.txt new file mode 100644 index 0000000000..f0b8326de0 --- /dev/null +++ b/acceptance/bundle/generate/job_nested_notebooks/output.txt @@ -0,0 +1,9 @@ +File successfully saved to src/my_folder/my_notebook.py +File successfully saved to src/other_folder/other_notebook.py +Job configuration successfully saved to out.job.yml +=== old flattened files should be gone === +src/my_notebook.py removed +src/other_notebook.py removed +=== new nested files === +src/my_folder/my_notebook.py +src/other_folder/other_notebook.py diff --git a/acceptance/bundle/generate/job_nested_notebooks/script b/acceptance/bundle/generate/job_nested_notebooks/script new file mode 100644 index 0000000000..04805db638 --- /dev/null +++ b/acceptance/bundle/generate/job_nested_notebooks/script @@ -0,0 +1,12 @@ +mkdir -p src +echo "old" > src/my_notebook.py +echo "old" > src/other_notebook.py + +$CLI bundle generate job --existing-job-id 1234 --config-dir . --key out --force --source-dir src 2>&1 | sort + +echo "=== old flattened files should be gone ===" +test ! -f src/my_notebook.py && echo "src/my_notebook.py removed" || echo "src/my_notebook.py still exists" +test ! -f src/other_notebook.py && echo "src/other_notebook.py removed" || echo "src/other_notebook.py still exists" + +echo "=== new nested files ===" +find src -type f | sort diff --git a/acceptance/bundle/generate/job_nested_notebooks/test.toml b/acceptance/bundle/generate/job_nested_notebooks/test.toml new file mode 100644 index 0000000000..bdb350e53f --- /dev/null +++ b/acceptance/bundle/generate/job_nested_notebooks/test.toml @@ -0,0 +1,42 @@ +Ignore = ["src"] + +[[Server]] +Pattern = "GET /api/2.2/jobs/get" +Response.Body = ''' +{ + "job_id": 11223344, + "settings": { + "name": "dev.my_repo.my_job", + "tasks": [ + { + "task_key": "my_notebook_task", + "notebook_task": { + "notebook_path": "/my_data_product/dev/my_folder/my_notebook" + } + }, + { + "task_key": "other_notebook_task", + "notebook_task": { + "notebook_path": "/my_data_product/dev/other_folder/other_notebook" + } + } + ] + } +} +''' + +[[Server]] +Pattern = "GET /api/2.0/workspace/get-status" +Response.Body = ''' +{ + "object_type": "NOTEBOOK", + "language": "PYTHON", + "repos_export_format": "SOURCE" +} +''' + +[[Server]] +Pattern = "GET /api/2.0/workspace/export" +Response.Body = ''' +print("Hello, World!") +''' diff --git a/bundle/generate/downloader.go b/bundle/generate/downloader.go index 30b91c0539..0bddf4f1ce 100644 --- a/bundle/generate/downloader.go +++ b/bundle/generate/downloader.go @@ -11,6 +11,7 @@ import ( "strings" "github.com/databricks/cli/libs/cmdio" + "github.com/databricks/cli/libs/log" "github.com/databricks/cli/libs/notebook" "github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go/service/jobs" @@ -73,7 +74,7 @@ func (n *Downloader) markFileForDownload(ctx context.Context, filePath *string) return err } - *filePath = rel + *filePath = filepath.ToSlash(rel) return nil } @@ -109,7 +110,7 @@ func (n *Downloader) MarkDirectoryForDownload(ctx context.Context, dirPath *stri return err } - *dirPath = rel + *dirPath = filepath.ToSlash(rel) return nil } @@ -203,10 +204,75 @@ func (n *Downloader) markNotebookForDownload(ctx context.Context, notebookPath * return err } - *notebookPath = rel + *notebookPath = filepath.ToSlash(rel) return nil } +func (n *Downloader) MarkTasksForDownload(ctx context.Context, tasks []jobs.Task) error { + var paths []string + for _, task := range tasks { + if task.NotebookTask != nil { + paths = append(paths, task.NotebookTask.NotebookPath) + } + } + if len(paths) > 0 { + n.basePath = commonDirPrefix(paths) + } + for i := range tasks { + if err := n.MarkTaskForDownload(ctx, &tasks[i]); err != nil { + return err + } + } + return nil +} + +func (n *Downloader) CleanupOldFiles(ctx context.Context) { + for targetPath := range n.files { + rel, err := filepath.Rel(n.sourceDir, targetPath) + if err != nil { + continue + } + if filepath.Base(rel) == rel { + continue + } + oldPath := filepath.Join(n.sourceDir, filepath.Base(rel)) + if _, isNewFile := n.files[oldPath]; isNewFile { + continue + } + if err := os.Remove(oldPath); err == nil { + log.Infof(ctx, "Removed previously generated file %s", filepath.ToSlash(oldPath)) + } + } +} + +// commonDirPrefix returns the longest common directory-aligned prefix of the given paths. +func commonDirPrefix(paths []string) string { + if len(paths) == 0 { + return "" + } + if len(paths) == 1 { + return path.Dir(paths[0]) + } + + prefix := paths[0] + for _, p := range paths[1:] { + for !strings.HasPrefix(p, prefix) { + prefix = prefix[:len(prefix)-1] + if prefix == "" { + return "" + } + } + } + + // Truncate to last '/' to ensure directory alignment. + if i := strings.LastIndex(prefix, "/"); i >= 0 { + prefix = prefix[:i] + } else { + prefix = "" + } + return prefix +} + func (n *Downloader) relativePath(fullPath string) string { basePath := path.Dir(fullPath) if n.basePath != "" { diff --git a/bundle/generate/downloader_test.go b/bundle/generate/downloader_test.go index d0877ac3d2..9363ce98d3 100644 --- a/bundle/generate/downloader_test.go +++ b/bundle/generate/downloader_test.go @@ -1,10 +1,16 @@ package generate import ( + "encoding/json" + "net/http" + "net/http/httptest" + "os" "path/filepath" "testing" + "github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go/experimental/mocks" + "github.com/databricks/databricks-sdk-go/service/jobs" "github.com/databricks/databricks-sdk-go/service/workspace" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -28,7 +34,7 @@ func TestDownloader_MarkFileReturnsRelativePath(t *testing.T) { }, nil) err = downloader.markFileForDownload(ctx, &f1) require.NoError(t, err) - assert.Equal(t, filepath.FromSlash("../source/c"), f1) + assert.Equal(t, "../source/c", f1) // Test that the previous path doesn't influence the next path. f2 := "/a/b/c/d" @@ -37,7 +43,7 @@ func TestDownloader_MarkFileReturnsRelativePath(t *testing.T) { }, nil) err = downloader.markFileForDownload(ctx, &f2) require.NoError(t, err) - assert.Equal(t, filepath.FromSlash("../source/d"), f2) + assert.Equal(t, "../source/d", f2) } func TestDownloader_DoesNotRecurseIntoNodeModules(t *testing.T) { @@ -93,3 +99,186 @@ func TestDownloader_DoesNotRecurseIntoNodeModules(t *testing.T) { assert.Contains(t, downloader.files, filepath.Join(sourceDir, "app.py")) assert.Contains(t, downloader.files, filepath.Join(sourceDir, "src/index.js")) } + +func TestCommonDirPrefix(t *testing.T) { + tests := []struct { + name string + paths []string + want string + }{ + { + name: "empty", + paths: nil, + want: "", + }, + { + name: "single path", + paths: []string{"/a/b/c"}, + want: "/a/b", + }, + { + name: "shared parent", + paths: []string{"/a/b/c", "/a/b/d"}, + want: "/a/b", + }, + { + name: "root divergence", + paths: []string{"/x/y", "/z/w"}, + want: "", + }, + { + name: "partial dir name safety", + paths: []string{"/a/bc/d", "/a/bd/e"}, + want: "/a", + }, + { + name: "nested shared prefix", + paths: []string{"/Users/user/project/etl/extract", "/Users/user/project/reporting/dashboard"}, + want: "/Users/user/project", + }, + { + name: "identical paths", + paths: []string{"/a/b/c", "/a/b/c"}, + want: "/a/b", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.want, commonDirPrefix(tt.paths)) + }) + } +} + +func newTestWorkspaceClient(t *testing.T, handler http.HandlerFunc) *databricks.WorkspaceClient { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/.well-known/databricks-config" { + http.NotFound(w, r) + return + } + + handler(w, r) + })) + t.Cleanup(server.Close) + + w, err := databricks.NewWorkspaceClient(&databricks.Config{ + Host: server.URL, + Token: "test-token", + }) + require.NoError(t, err) + return w +} + +func notebookStatusHandler(t *testing.T) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/api/2.0/workspace/get-status" { + t.Fatalf("unexpected request path: %s", r.URL.Path) + } + resp := workspaceStatus{ + Language: workspace.LanguagePython, + ObjectType: workspace.ObjectTypeNotebook, + ExportFormat: workspace.ExportFormatSource, + } + w.Header().Set("Content-Type", "application/json") + err := json.NewEncoder(w).Encode(resp) + if err != nil { + t.Fatal(err) + } + } +} + +func TestDownloader_MarkTasksForDownload_PreservesStructure(t *testing.T) { + w := newTestWorkspaceClient(t, notebookStatusHandler(t)) + + dir := "base/dir" + sourceDir := filepath.Join(dir, "source") + configDir := filepath.Join(dir, "config") + downloader := NewDownloader(w, sourceDir, configDir) + + tasks := []jobs.Task{ + { + TaskKey: "extract_task", + NotebookTask: &jobs.NotebookTask{ + NotebookPath: "/Users/user/project/etl/extract", + }, + }, + { + TaskKey: "dashboard_task", + NotebookTask: &jobs.NotebookTask{ + NotebookPath: "/Users/user/project/reporting/dashboard", + }, + }, + } + + err := downloader.MarkTasksForDownload(t.Context(), tasks) + require.NoError(t, err) + + assert.Equal(t, "../source/etl/extract.py", tasks[0].NotebookTask.NotebookPath) + assert.Equal(t, "../source/reporting/dashboard.py", tasks[1].NotebookTask.NotebookPath) + assert.Len(t, downloader.files, 2) +} + +func TestDownloader_MarkTasksForDownload_SingleNotebook(t *testing.T) { + ctx := t.Context() + w := newTestWorkspaceClient(t, notebookStatusHandler(t)) + + dir := "base/dir" + sourceDir := filepath.Join(dir, "source") + configDir := filepath.Join(dir, "config") + downloader := NewDownloader(w, sourceDir, configDir) + + tasks := []jobs.Task{ + { + TaskKey: "task1", + NotebookTask: &jobs.NotebookTask{ + NotebookPath: "/Users/user/project/notebook", + }, + }, + } + + err := downloader.MarkTasksForDownload(ctx, tasks) + require.NoError(t, err) + + // Single notebook: basePath = path.Dir => same as old behavior. + assert.Equal(t, "../source/notebook.py", tasks[0].NotebookTask.NotebookPath) + assert.Len(t, downloader.files, 1) +} + +func TestDownloader_MarkTasksForDownload_NoNotebooks(t *testing.T) { + ctx := t.Context() + w := newTestWorkspaceClient(t, func(w http.ResponseWriter, r *http.Request) { + t.Fatalf("unexpected request: %s %s", r.Method, r.URL.Path) + }) + + downloader := NewDownloader(w, "source", "config") + + tasks := []jobs.Task{ + {TaskKey: "spark_task"}, + {TaskKey: "python_wheel_task"}, + } + + err := downloader.MarkTasksForDownload(ctx, tasks) + require.NoError(t, err) + assert.Empty(t, downloader.files) +} + +func TestDownloader_CleanupOldFiles(t *testing.T) { + ctx := t.Context() + sourceDir := t.TempDir() + + oldExtract := filepath.Join(sourceDir, "extract.py") + oldDashboard := filepath.Join(sourceDir, "dashboard.py") + unrelated := filepath.Join(sourceDir, "utils.py") + require.NoError(t, os.WriteFile(oldExtract, []byte("old"), 0o644)) + require.NoError(t, os.WriteFile(oldDashboard, []byte("old"), 0o644)) + require.NoError(t, os.WriteFile(unrelated, []byte("keep"), 0o644)) + + downloader := NewDownloader(nil, sourceDir, "config") + downloader.files[filepath.Join(sourceDir, "etl", "extract.py")] = exportFile{} + downloader.files[filepath.Join(sourceDir, "reporting", "dashboard.py")] = exportFile{} + + downloader.CleanupOldFiles(ctx) + + assert.NoFileExists(t, oldExtract) + assert.NoFileExists(t, oldDashboard) + assert.FileExists(t, unrelated) +} diff --git a/bundle/run/output/job_test.go b/bundle/run/output/job_test.go index 80c52c3e1f..9ecb7fc43e 100644 --- a/bundle/run/output/job_test.go +++ b/bundle/run/output/job_test.go @@ -110,6 +110,23 @@ func TestNotebookOutputToRunOutput(t *testing.T) { assert.Equal(t, expected, actual) } +func TestNotebookOutputWithEmptyResultFallsBackToLogs(t *testing.T) { + jobOutput := &jobs.RunOutput{ + NotebookOutput: &jobs.NotebookOutput{ + Result: "", + }, + Logs: "hello :)", + LogsTruncated: true, + } + actual := toRunOutput(jobOutput) + + expected := &LogsOutput{ + Logs: "hello :)", + LogsTruncated: true, + } + assert.Equal(t, expected, actual) +} + func TestDbtOutputToRunOutput(t *testing.T) { jobOutput := &jobs.RunOutput{ DbtOutput: &jobs.DbtOutput{ diff --git a/bundle/run/output/task.go b/bundle/run/output/task.go index 53b989e885..d30370bb01 100644 --- a/bundle/run/output/task.go +++ b/bundle/run/output/task.go @@ -67,6 +67,13 @@ func (out *LogsOutput) String() (string, error) { func toRunOutput(output *jobs.RunOutput) RunOutput { switch { case output.NotebookOutput != nil: + if output.NotebookOutput.Result == "" && !output.NotebookOutput.Truncated && output.Logs != "" { + result := LogsOutput{ + Logs: output.Logs, + LogsTruncated: output.LogsTruncated, + } + return &result + } result := NotebookOutput(*output.NotebookOutput) return &result case output.DbtOutput != nil: diff --git a/cmd/bundle/generate/job.go b/cmd/bundle/generate/job.go index 1dbb8521bf..5e4abda84e 100644 --- a/cmd/bundle/generate/job.go +++ b/cmd/bundle/generate/job.go @@ -92,11 +92,9 @@ After generation, you can deploy this job to other targets using: if job.Settings.GitSource != nil { cmdio.LogString(ctx, "Job is using Git source, skipping downloading files") } else { - for _, task := range job.Settings.Tasks { - err := downloader.MarkTaskForDownload(ctx, &task) - if err != nil { - return err - } + err = downloader.MarkTasksForDownload(ctx, job.Settings.Tasks) + if err != nil { + return err } } @@ -123,6 +121,8 @@ After generation, you can deploy this job to other targets using: return err } + downloader.CleanupOldFiles(ctx) + oldFilename := filepath.Join(configDir, jobKey+".yml") filename := filepath.Join(configDir, jobKey+".job.yml") diff --git a/integration/bundle/generate_job_test.go b/integration/bundle/generate_job_test.go index 8c51a55d40..3008e74606 100644 --- a/integration/bundle/generate_job_test.go +++ b/integration/bundle/generate_job_test.go @@ -55,7 +55,7 @@ func TestGenerateFromExistingJobAndDeploy(t *testing.T) { require.NoError(t, err) generatedYaml := string(data) require.Contains(t, generatedYaml, "notebook_task:") - require.Contains(t, generatedYaml, "notebook_path: "+filepath.Join("..", "src", "test.py")) + require.Contains(t, generatedYaml, "notebook_path: "+filepath.ToSlash(filepath.Join("..", "src", "test.py"))) require.Contains(t, generatedYaml, "task_key: test") require.Contains(t, generatedYaml, "new_cluster:") require.Contains(t, generatedYaml, "spark_version: 13.3.x-scala2.12") diff --git a/integration/bundle/generate_pipeline_test.go b/integration/bundle/generate_pipeline_test.go index b1f68f79df..984d555a86 100644 --- a/integration/bundle/generate_pipeline_test.go +++ b/integration/bundle/generate_pipeline_test.go @@ -64,9 +64,9 @@ func TestGenerateFromExistingPipelineAndDeploy(t *testing.T) { require.Contains(t, generatedYaml, "libraries:") require.Contains(t, generatedYaml, "- notebook:") - require.Contains(t, generatedYaml, "path: "+filepath.Join("..", "src", "notebook.py")) + require.Contains(t, generatedYaml, "path: "+filepath.ToSlash(filepath.Join("..", "src", "notebook.py"))) require.Contains(t, generatedYaml, "- file:") - require.Contains(t, generatedYaml, "path: "+filepath.Join("..", "src", "test.py")) + require.Contains(t, generatedYaml, "path: "+filepath.ToSlash(filepath.Join("..", "src", "test.py"))) deployBundle(t, ctx, bundleRoot)